本文旨在解决Java微服务在处理海量数据(如百万级记录迁移)时遇到的jvm堆内存耗尽问题。通过深入探讨内存溢出的根源,并提出一种高效的批处理策略,利用数据库的LIMIT和OFFSET机制分批次查询和处理数据,从而显著降低单次操作的内存消耗,避免服务崩溃,确保数据迁移的稳定性和效率。
1. 问题背景与内存溢出分析
在Java微服务架构中,当需要处理或迁移大量数据(例如百万级数据库记录)时,常见的做法是查询所有符合条件的数据并加载到内存中进行后续处理。然而,这种一次性加载海量数据的策略极易导致JVM堆内存耗尽(java.lang.OutOfMemoryError: Java heap space),表现为服务崩溃或响应缓慢。
错误日志通常会显示类似以下信息:
Resource exhaustion event: the JVM was unable to allocate memory from the heap. ResourceExhausted! (1/0)
这表明应用程序试图分配的内存超出了JVM堆的可用容量。在数据迁移场景中,如将一个表中的大量记录复制到另一个表,如果一次性查询并缓存所有源数据(例如使用 JdbcTemplate.queryForList()),即使后续使用了JDBC批处理进行写入,内存压力也可能在数据查询阶段就达到极限。
原始代码片段中的 List
立即学习“Java免费学习笔记(深入)”;
2. 解决方案:基于数据库分页的批处理策略
为了解决一次性加载海量数据导致的内存溢出问题,核心思想是将大批量数据处理分解为多个小批次处理。这可以通过结合数据库的分页查询能力和应用程序的迭代处理逻辑来实现。
2.1 数据库层面的批次查询 (LIMIT 和 OFFSET)
数据库提供了 LIMIT(或 TOP)和 OFFSET(或 SKIP)子句,允许我们指定查询结果的数量以及从哪个位置开始返回。这是实现数据分批查询的基础。
sql查询示例:
SELECT * FROM your_table WHERE your_condition ORDER BY unique_id_column -- 必须指定一个排序字段,确保每次查询的顺序稳定 LIMIT batch_size OFFSET current_offset;
- LIMIT batch_size: 定义每个批次要查询的记录数量。
- OFFSET current_offset: 定义从结果集的哪个位置开始返回记录。current_offset 会随着已处理记录的数量递增。
- ORDER BY unique_id_column: 至关重要! 必须根据一个稳定、唯一且通常是索引的列(如主键ID、创建时间戳等)进行排序。这确保了:
- 每次分页查询的结果顺序是确定的。
- 不会遗漏或重复获取数据。
- 对于大型数据集和高 OFFSET 值,数据库可能需要扫描大量跳过的行,因此 OFFSET 的性能会随之下降。在极端情况下,可以考虑使用基于游标(Keyset Pagination)的方法(即 WHERE id > last_id ORDER BY id LIMIT batch_size),但这超出了本教程的范围。
2.2 应用程序层面的迭代处理
在应用程序中,我们需要构建一个循环,在每次迭代中:
- 计算当前的 OFFSET 值。
- 使用 LIMIT 和计算出的 OFFSET 从源数据库中查询一个批次的数据。
- 处理这个批次的数据(例如,复制到目标表)。
- 更新 OFFSET 值,为下一次迭代做准备。
- 当查询结果为空时,表示所有数据已处理完毕,退出循环。
3. 示例代码实现
下面我们将基于原有的代码结构,展示如何修改 archiveTableRecords 和 buildSQLQueryToFetchSourceRecords 方法以实现批处理。
3.1 修改SQL查询构建方法
为了支持 LIMIT 和 OFFSET,我们需要修改 ArchiveSQLQueries.buildSQLQueryToFetchSourceRecords 方法,使其能够接收批次大小、偏移量以及用于排序的列名。
import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter; // 假设 ArchiveConfigDTO 和 getCSTDateNew 方法已定义 // 假设 ArchiveSQLQueries 是一个包含静态方法的类 public class DataArchiverService { private static final Logger logger = LoggerFactory.getLogger(DataArchiverService.class); @Value("${batch-size}") private int batchSize; // 配置的批处理大小,例如1000或5000 // 辅助方法,假设存在 private String getCSTDateNew(String archiveMonths) { // 实现日期计算逻辑 return "2022-09-01"; // 示例值 } // 辅助方法,假设存在 private int getSumOfArray(int[][] array) { int sum = 0; for (int[] innerArray : array) { for (int value : innerArray) { sum += value; } } return sum; } // --- 修改后的 ArchiveSQLQueries 类中的方法 --- public static class ArchiveSQLQueries { public static String buildSQLQueryToFetchSourceRecords(String sourceTable, String orderByColumn, int limit, int offset) { StringBuilder sb = new StringBuilder("SELECT * FROM " + sourceTable + " where update_dts <= ?"); // 确保 orderByColumn 是安全的,避免SQL注入风险 if (orderByColumn != null && !orderByColumn.isEmpty()) { sb.append(" ORDER BY ").append(orderByColumn); } sb.append(" LIMIT ").append(limit); sb.append(" OFFSET ").append(offset); return sb.toString(); } public static String buildSQLTargetRecordInsertionQuery(String targetTable, Map<String, Object> record, String primaryKeyColumn) { // 假设此方法已正确实现,根据Map构建INSERT语句 // 示例:INSERT INTO target_table (col1, col2) VALUES (?, ?) StringBuilder sb = new StringBuilder("INSERT INTO ").append(targetTable).append(" ("); StringBuilder values = new StringBuilder(" VALUES ("); boolean firstColumn = true; for (String key : record.keySet()) { if (!key.equals(primaryKeyColumn)) { // 假设主键在插入时是自增的,不包含在VALUES中 if (!firstColumn) { sb.append(", "); values.append(", "); } sb.append(key); values.append("?"); firstColumn = false; } } sb.append(")").append(values).append(")"); return sb.toString(); } } // --- 原始的 copySourceRecords 方法,其内部已使用批处理写入 --- public int copySourceRecords(JdbcTemplate targetDbTemplate, String targetTable, String primaryKeyColumn, List<Map<String, Object>> sourceRecords, List<Object> primaryKeyValueList) { int result = 0; logger.info("Copying {} records to {}", sourceRecords.size(), targetTable); if (sourceRecords.isEmpty()) { return 0; } // 构建插入语句,基于第一个记录的结构 String insertSql = ArchiveSQLQueries.buildSQLTargetRecordInsertionQuery(targetTable, sourceRecords.get(0), primaryKeyColumn); int[][] insertResult = targetDbTemplate.batchUpdate( insertSql, sourceRecords, batchSize, // 这里使用了配置的batchSize进行JDBC批处理写入 new ParameterizedPreparedStatementSetter<Map<String, Object>>() { @Override public void setValues(PreparedStatement ps, Map<String, Object> argument) throws SQLException { int index = 1; for (Entry<String, Object> obj : argument.entrySet()) { // 假设主键列在目标表是自增的,或者不作为插入参数 if (obj.getKey().equals(primaryKeyColumn)) { primaryKeyValueList.add(obj.getValue()); // 收集主键值 } else { ps.setObject(index++, obj.getValue()); } } } }); result = getSumOfArray(insertResult); logger.info("Inserted {} record(s) in {}", result, targetTable); return result; } // --- 修改后的 archiveTableRecords 方法,实现批次循环 --- public void archiveTableRecords(JdbcTemplate sourceDbTemplate, JdbcTemplate targetDbTemplate, ArchiveConfigDTO archiveObj) { try { String sourceTable = archiveObj.getSourceTable(); String targetTable = archiveObj.getTargetTable(); String primaryKeyColumn = archiveObj.getPrimaryKeyColumn(); // 假设这是用于排序的列 String archive_months = archiveObj.getArchiveCriteriaMonths(); String compareDate1 = getCSTDateNew(archive_months); logger.info("Archive criteria date: {}", compareDate1); int processedRecords = 0; List<Object> allPrimaryKeyValueList = new ArrayList<>(); // 用于收集所有已处理记录的主键,以便后续删除 while (true) { // 1. 批次查询源数据 List<Map<String, Object>> sourceRecordsBatch = sourceDbTemplate.queryForList( ArchiveSQLQueries.buildSQLQueryToFetchSourceRecords(sourceTable, primaryKeyColumn, batchSize, processedRecords), compareDate1 ); if (sourceRecordsBatch.isEmpty()) { logger.info("No more records to fetch for table {}. Total processed: {}", sourceTable, processedRecords); break; // 没有更多记录,退出循环 } logger.info("Fetched batch of {} records from {} (offset: {})", sourceRecordsBatch.size(), sourceTable, processedRecords); // 2. 准备当前批次的主键列表 List<Object> currentBatchPrimaryKeys = new ArrayList<>(); // 3. 复制当前批次数据到目标表 int recordsInsertedInBatch = copySourceRecords(targetDbTemplate, targetTable, primaryKeyColumn, sourceRecordsBatch, currentBatchPrimaryKeys); if (recordsInsertedInBatch > 0) { // 将当前批次的主键添加到总列表中 allPrimaryKeyValueList.addAll(currentBatchPrimaryKeys); logger.info("Copied {} record(s) to {}. Total copied: {}", recordsInsertedInBatch, targetTable, allPrimaryKeyValueList.size()); } // 4. 更新已处理记录数,作为下一次查询的偏移量 processedRecords += sourceRecordsBatch.size(); } // 5. 所有批次处理完毕后,根据收集到的主键删除源数据 if (!allPrimaryKeyValueList.isEmpty()) { logger.info("Initiating deletion of {} records from source table {}.", allPrimaryKeyValueList.size(), sourceTable); // 注意:如果 allPrimaryKeyValueList 极其庞大,deleteSourceRecords 方法也可能需要内部批处理 deleteSourceRecords(sourceDbTemplate, sourceTable, primaryKeyColumn, allPrimaryKeyValueList); logger.info("Deletion completed for {} records from source table {}.", allPrimaryKeyValueList.size(), sourceTable); } } catch (Exception e) { logger.error("Exception in archiveTableRecords: {} {}", e.getMessage(), e); // 在实际应用中,这里需要更健壮的异常处理,例如记录失败的批次信息,以便后续重试 } } // 假设 deleteSourceRecords 方法已存在 public void deleteSourceRecords(JdbcTemplate sourceDbTemplate, String sourceTable, String primaryKeyColumn, List<Object> primaryKeyValueList) { // 示例:DELETE FROM source_table WHERE primaryKeyColumn IN (?, ?, ...) // 对于非常大的 primaryKeyValueList,需要考虑分批次执行DELETE语句,或使用其他高效的删除策略 String deleteSql = "DELETE FROM " + sourceTable + " WHERE " + primaryKeyColumn + " IN (" + String.join(",", java.util.Collections.nCopies(primaryKeyValueList.size(), "?")) + ")"; try { sourceDbTemplate.batchUpdate(deleteSql, new ArrayList<>(primaryKeyValueList)); // 假设可以一次性处理 } catch (Exception e) {