悠悠楠杉
Java实现高效数据批量处理的全套解决方案
Java实现高效数据批量处理的全套解决方案
一、批量导入导出的核心实现
1.1 基于POI的Excel批处理
java
// 使用SXSSFWorkbook处理百万级数据
SXSSFWorkbook workbook = new SXSSFWorkbook(1000); // 保持1000行在内存中
Sheet sheet = workbook.createSheet("数据导入");
// 数据批写入优化
List
for(int i=0; i<dataList.size(); i++){
Row row = sheet.createRow(i);
row.createCell(0).setCellValue(dataList.get(i).getField1());
// 每1000行刷新到磁盘
if(i % 1000 == 0){
((SXSSFSheet)sheet).flushRows(1000);
}
}
1.2 数据库批量操作
java
// JDBC批处理示例
Connection conn = dataSource.getConnection();
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement("INSERT INTO table VALUES(?,?)");
final int BATCH_SIZE = 2000;
for(int i=0; i<dataList.size(); i++){
ps.setString(1, dataList.get(i).getField1());
ps.addBatch();
if(i%BATCH_SIZE == 0){
ps.executeBatch();
conn.commit();
}
}
ps.executeBatch(); // 处理剩余记录
二、提升处理效率的6个关键技巧
2.1 内存分块处理
采用分页查询+批处理组合方案:
java
int pageSize = 5000;
int total = getTotalCount();
for(int page=1; page<=total/pageSize+1; page++){
List<Data> chunk = queryByPage(page,pageSize);
processBatch(chunk); // 处理当前分页数据
System.gc(); // 主动触发垃圾回收
}
2.2 多线程并行处理
java
ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()*2);
List<Future
for(DataBatch batch : splitToBatches(dataList)){
futures.add(executor.submit(() -> processSingleBatch(batch)));
}
// 合并处理结果
List
.map(f -> {
try { return f.get(); }
catch(Exception e) { return null; }
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
三、异常处理与事务控制
3.1 健壮的异常恢复机制
java
try {
// 批量处理逻辑
} catch (BatchUpdateException e) {
int[] successCount = e.getUpdateCounts();
// 构建失败记录重试队列
List<Data> retryList = new ArrayList<>();
for(int i=0; i<successCount.length; i++){
if(successCount[i] == Statement.EXECUTE_FAILED){
retryList.add(dataList.get(i));
}
}
// 记录失败日志并报警
logRetryRecords(retryList);
}
3.2 分布式事务方案
java
// 使用Spring的编程式事务
TransactionTemplate template = new TransactionTemplate(transactionManager);
template.setPropagationBehavior(TransactionDefinition.PROPAGATIONREQUIRESNEW);
template.execute(status -> {
try {
importService.processBatch(batchData);
fileService.markProcessed(fileId);
return true;
} catch(Exception e) {
status.setRollbackOnly();
throw new BatchException("批量导入失败",e);
}
});
四、性能优化实战建议
缓冲区调优:根据服务器内存调整JVM参数,特别是-Xmx和-XX:MaxDirectMemorySize
连接池配置:properties
Tomcat JDBC连接池优化
spring.datasource.tomcat.max-active=50
spring.datasource.tomcat.max-idle=20
spring.datasource.tomcat.min-idle=10
spring.datasource.tomcat.max-wait=30000
- JVM层优化:
- 添加-XX:+UseG1GC启用G1垃圾回收器
- 设置-XX:MaxGCPauseMillis=200控制GC停顿时间
- 异步日志方案:采用Log4j2的异步日志写入,避免I/O阻塞
xml <AsyncLogger name="importLogger" level="info" includeLocation="true"> <AppenderRef ref="ImportFileAppender"/> </AsyncLogger>
五、扩展应用场景
5.1 与消息队列结合
java
// 处理完成后发送MQ通知
batchImportResult.setProcessTime(System.currentTimeMillis());
rabbitTemplate.convertAndSend(
"import.finish.queue",
result,
message -> {
message.getMessageProperties()
.setHeader("retry-count", 0);
return message;
}
);
5.2 断点续传实现
java
// 记录处理进度
public class ImportProgress {
private Long jobId;
private Integer totalCount;
private Integer processed;
private String checkpointFile;
private LocalDateTime lastUpdate;
// 每处理1000条更新一次进度
public void updateProgress(int batchSize){
this.processed += batchSize;
saveCheckpoint();
}
}
通过合理组合这些技术方案,可以实现每日千万级数据的高效处理。实际项目中需要根据具体业务场景进行参数调优,建议在预发布环境进行充分的压力测试。