本文旨在解决在Java中使用CompletableFuture进行并行处理时常见的性能陷阱。许多开发者尝试通过在流式操作中直接调用CompletableFuture::join来并行化任务,但这往往导致任务实际串行执行。本教程将详细解释这一现象,并提供一种正确的、高效的并行处理策略,通过分离异步任务的创建与结果的聚合,结合CompletableFuture.allOf实现真正的并行计算,最终将分散的结果合并成一个单一的列表。
理解并行处理的常见误区
在处理大量数据时,将耗时操作并行化是提升性能的有效手段。Java 8引入的CompletableFuture为异步编程提供了强大的支持。然而,不恰当的使用方式可能导致预期的并行效果无法实现。
考虑以下场景:有一个包含大量数据(例如50,000条记录)的列表,需要对每个列表项执行一个耗时操作,并将结果映射到Java对象,最终写入csv文件。如果采用顺序处理,例如:
list.stream() .map(listItem -> service.methodA(listItem).map(result -> mapToBean(result, listItem))) .flatMap(Optional::stream) .collect(Collectors.toList());
当数据量较大时,这种方式可能非常慢,例如处理2,000条数据就需要4小时。为了加速,开发者可能会尝试使用CompletableFuture进行并行化,常见的错误尝试如下:
ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1); Lists.partition(list, 500).stream() // 将大列表分成小块 .map(item -> CompletableFuture.supplyAsync(() -> executeListPart(item), service)) // 提交异步任务 .map(CompletableFuture::join) // 立即等待每个任务完成 .flatMap(List::stream) .collect(Collectors.toList());
尽管代码中使用了CompletableFuture.supplyAsync将任务提交到线程池,但紧随其后的.map(CompletableFuture::join)操作是导致性能问题的关键。CompletableFuture::join是一个阻塞操作,它会暂停当前流的执行,直到对应的CompletableFuture完成并返回结果。这意味着,尽管每个任务可能在不同的线程中执行,但流本身是按顺序处理每个CompletableFuture的,一个任务完成后,流才会处理下一个任务。这实际上将并行执行变成了顺序等待,从而失去了并行化的优势。
立即学习“Java免费学习笔记(深入)”;
正确的并行处理策略
要实现真正的并行,核心思想是:先创建并启动所有异步任务,然后统一等待它们完成并收集结果。 避免在创建任务的同一流式管道中立即阻塞等待。
以下是实现这一策略的步骤和示例代码:
- 创建并启动所有异步任务: 遍历数据分片,为每个分片创建一个CompletableFuture,并将其提交到ExecutorService中执行。将这些CompletableFuture实例收集到一个列表中。
- 统一等待所有任务完成: 使用CompletableFuture.allOf()方法创建一个新的CompletableFuture,它将在所有已提交的任务都完成时才完成。
- 聚合所有任务的结果: 当CompletableFuture.allOf()完成时,表明所有子任务都已完成,此时可以安全地对之前收集的CompletableFuture列表调用join()方法,并对结果进行扁平化和收集。
import com.google.common.collect.Lists; // 假设使用guava的Lists.partition import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; public class ParallelProcessingExample { // 假设这是您的业务逻辑方法,处理列表的一个分片并返回结果列表 // executeListPart(List<MyItem> partition) 应该返回 List<MyProcessedBean> private List<MyProcessedBean> executeListPart(List<MyItem> partition) { // 模拟耗时操作 try { Thread.sleep(100); // 假设每个分片处理100ms } catch (InterruptedException e) { Thread.currentThread().interrupt(); } // 实际业务逻辑:处理partition中的每个MyItem,并生成MyProcessedBean return partition.stream() .map(item -> new MyProcessedBean("Processed_" + item.getId())) // 示例转换 .collect(Collectors.toList()); } public List<MyProcessedBean> processLargeListInParallel(List<MyItem> largeList, int partitionSize, int threadPoolSize) { // 1. 创建并配置线程池 // 建议线程池大小根据CPU核心数和任务类型(IO密集型/CPU密集型)调整 ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize); try { // 2. 将大列表分成小块,并为每个小块创建异步任务 // CompletableFuture<List<MyProcessedBean>> 表示每个任务会返回一个MyProcessedBean列表 List<CompletableFuture<List<MyProcessedBean>>> futures = Lists.partition(largeList, partitionSize).stream() .map(partition -> CompletableFuture.supplyAsync(() -> executeListPart(partition), executorService)) .collect(Collectors.toList()); // 3. 创建一个CompletableFuture,等待所有子任务完成 CompletableFuture<void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); // 4. 当所有子任务完成后,聚合结果 List<MyProcessedBean> finalResults = allOf.thenApply(v -> futures.stream() .map(CompletableFuture::join) // 此时所有future都已完成,join是非阻塞的 .flatMap(List::stream) // 扁平化List<List<MyProcessedBean>>为List<MyProcessedBean> .collect(Collectors.toList()) ).join(); // 阻塞等待最终结果的聚合 return finalResults; } finally { // 5. 关闭线程池,释放资源 executorService.shutdown(); // 可选:等待线程池终止,确保所有任务都已完成 // try { // if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { // executorService.shutdownNow(); // } // } catch (InterruptedException ex) { // executorService.shutdownNow(); // Thread.currentThread().interrupt(); // } } } // 示例数据类 static class MyItem { private String id; public MyItem(String id) { this.id = id; } public String getId() { return id; } } static class MyProcessedBean { private String processedId; public MyProcessedBean(String processedId) { this.processedId = processedId; } public String getProcessedId() { return processedId; } @Override public String toString() { return "MyProcessedBean{" + "processedId='" + processedId + ''' + '}'; } } public static void main(String[] args) { ParallelProcessingExample app = new ParallelProcessingExample(); // 构造一个大型列表 List<MyItem> largeList = new java.util.ArrayList<>(); for (int i = 0; i < 5000; i++) { largeList.add(new MyItem("item_" + i)); } long startTime = System.currentTimeMillis(); List<MyProcessedBean> results = app.processLargeListInParallel(largeList, 500, Runtime.getRuntime().availableProcessors() - 1); long endTime = System.currentTimeMillis(); System.out.println("Processed " + results.size() + " items in " + (endTime - startTime) + " ms"); // System.out.println("First 10 results: " + results.subList(0, Math.min(10, results.size()))); } }
注意事项与最佳实践
-
线程池管理:
- ExecutorService是管理线程的关键。对于CPU密集型任务,线程池大小通常设置为Runtime.getRuntime().availableProcessors()或noOfCores – 1。对于IO密集型任务,可以适当增加线程池大小,因为线程在等待IO时不会占用CPU。
- 在任务完成后,务必调用executorService.shutdown()来优雅地关闭线程池,释放资源。如果线程池是应用程序生命周期内的单例,则可以在应用程序关闭时统一管理。
- awaitTermination()可以用于等待所有已提交的任务完成,但对于一次性任务聚合,CompletableFuture.allOf().join()通常就足够了。
-
列表分片:
- 将大列表分片(例如使用Guava的Lists.partition)是一个很好的策略。每个分片的大小需要根据任务的粒度和系统资源进行调整。过小的分片会增加任务调度开销,过大的分片可能导致部分线程长时间空闲。
- 确保executeListPart方法是线程安全的,并且不依赖于共享的可变状态,或者对共享状态进行适当的同步。
-
错误处理:
- CompletableFuture提供了丰富的错误处理机制,例如exceptionally()、handle()、whenComplete()等。在生产环境中,应为异步任务添加健壮的错误处理逻辑,以防止单个任务失败导致整个流程中断。
- 当使用CompletableFuture.allOf()时,如果任何一个子CompletableFuture异常完成,那么allOf也会异常完成。你可以通过.exceptionally()或.handle()来捕获和处理这些异常。
-
结果聚合:
- CompletableFuture.allOf()返回的是CompletableFuture
,因为它本身不关心子任务的结果,只关心它们是否完成。 - 要获取所有子任务的结果,需要像示例中那样,在allOf完成后,再次遍历原始的futures列表,并调用join()(此时是非阻塞的),然后进行结果的flatMap和collect。
- CompletableFuture.allOf()返回的是CompletableFuture
总结
通过将CompletableFuture的创建和结果的join操作分离,我们能够充分利用多核CPU的优势,实现真正意义上的并行处理。这种模式是处理大量数据或执行耗时操作时提升java应用程序性能的关键。理解CompletableFuture的非阻塞特性以及如何正确地聚合结果,是编写高效、并发代码的重要一步。