在现代数据处理场景中,我们经常需要对包含数万甚至数十万条记录的大型列表执行耗时操作,例如网络请求、数据库查询、复杂计算或文件i/o。如果采用传统的顺序处理方式,即使单条记录的处理时间很短,累积起来也可能导致整个流程耗时数小时,严重影响系统吞吐量和用户体验。
Java 8引入的CompletableFuture为异步编程和并行处理提供了强大的支持,它能够帮助我们有效地将这些耗时任务分解并并行执行,从而显著缩短总处理时间。然而,不恰当的使用方式也可能导致并行能力无法充分发挥,甚至退化为串行执行。
许多开发者在尝试使用CompletableFuture进行并行处理时,可能会遇到一个常见问题:尽管代码看起来像是并行的,但实际执行却仍然是串行的。以下是一个典型的错误示例:
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
// 假设存在以下辅助类和方法
// class ListItem { /* ... */ }
// class ProcessResult { /* ... */ }
// class OutputBean { /* ... */ }
// class MyService { public Optional methodA(ListItem item) { /* ... */ } }
// class MyProcessor {
// private MyService service = new MyService();
// private OutputBean mapToBean(ProcessResult result, ListItem originalItem) { /* ... */ }
// public List executeListPart(List subList) {
// return subList.stream()
// .map(listItem -> service.methodA(listItem)
// .map(result -> mapToBean(result, listItem)))
// .flatMap(Optional::stream)
// .collect(Collectors.toList());
// }
// }
public class ParallelProcessingIncorrect {
// 假设这是您的列表和处理器实例
private static List largeList = /* 初始化一个包含50k ListItem的列表 */;
private static MyProcessor processor = new MyProcessor();
public static void main(String[] args) {
int noOfCores = Runtime.getRuntime().availableProcessors();
ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1);
try {
long startTime = System.currentTimeMillis();
List results = Lists.partition(largeList, 500).stream()
.map(item -> CompletableFuture.supplyAsync(() -> processor.executeListPart(item), service))
// 核心问题:在这里调用 CompletableFuture::join
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList());
long endTime = System.currentTimeMillis();
System.out.println("Incorrect approach total time: " + (endTime - startTime) + " ms");
System.out.println("Processed " + results.size() + " items.");
} finally {
service.shutdown();
}
}
} 上述代码的问题在于 .map(CompletableFuture::join) 这一行。CompletableFuture.join() 方法是一个阻塞操作,它会等待当前 CompletableFuture 完成并返回其结果。这意味着,当 Stream 处理第一个分区的 CompletableFuture 时,它会立即阻塞并等待该分区的所有任务完成,然后才能继续处理下一个分区的 CompletableFuture。结果是,尽管每个分区内部的任务可能在单独的线程中执行,但不同分区之间的处理却是严格串行的,从而失去了并行处理的优势。
要实现真正的并行执行,关键在于将异步任务的创建和结果的等待(join)分离。我们应该首先创建并提交所有异步任务,将它们的CompletableFuture实例收集到一个列表中,然后在一个单独的步骤中等待所有这些CompletableFuture完成。
以下是正确的并行处理代码示例:
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
// 辅助类定义(与上述示例相同,此处省略以保持简洁)
// class ListItem { /* ... */ }
// class ProcessResult { /* ... */ }
// class OutputBean { /* ... */ }
// class MyService { /* ... */ }
// class MyProcessor { /* ... */ }
public class ParallelProcessingCorrect {
private static List largeList; // 假设已初始化,例如:
static {
largeList = new ArrayList<>();
for (int i = 0; i < 50000; i++) {
largeList.add(new ListItem("item_" + i));
}
}
private static MyProcessor processor = new MyProcessor();
public static void main(String[] args) throws InterruptedException {
int noOfCores = Runtime.getRuntime().availableProcessors();
ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1); // 推荐线程池大小为核心数-1或根据IO/CPU密集型任务调整
try {
long startTime = System.currentTimeMillis();
// 1. 创建并提交所有异步任务,收集CompletableFuture实例
List>> futures = Lists.partition(largeList, 500).stream()
.map(itemPart -> CompletableFuture.supplyAsync(() -> processor.executeListPart(itemPart), service))
.collect(Collectors.toList());
// 2. 等待所有CompletableFuture完成并获取结果
// 使用 CompletableFuture.allOf() 可以等待所有Future完成,但其本身不返回结果
// 更好的做法是遍历futures列表,逐个join或使用allof().join()后,再map获取结果
// 方法一:遍历futures列表,逐个join(更直接,但仍然是顺序join)
// List results = futures.stream()
// .map(CompletableFuture::join) // 此时join是等待所有任务提交后才开始
// .flatMap(List::stream)
// .collect(Collectors.toList());
// 方法二:使用 CompletableFuture.allOf() 结合 thenApply/thenCompose(更优雅,推荐)
CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
List results = allOf.thenApply(v -> futures.stream()
.map(CompletableFuture::join) // 此时所有future都已完成,join是非阻塞的
.flatMap(List::stream)
.collect(Collectors.toList()))
.join(); // 等待所有结果收集完成
long endTime = System.currentTimeMillis();
System.out.println("Correct approach total time: " + (endTime - startTime) + " ms");
System.out.println("Processed " + results.size() + " items.");
} finally {
// 确保线程池关闭
service.shutdown();
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("ExecutorService did not terminate in the specified time.");
service.shutdownNow();
}
}
}
} 工作原理:
tream().map(...) 这部分会遍历所有分区,并为每个分区创建一个 CompletableFuture 任务。CompletableFuture.supplyAsync() 会将任务提交给 ExecutorService 立即执行,而不会阻塞当前的流处理。通过这种方式,所有分区的处理任务几乎同时开始执行,只有在需要聚合所有结果时,主线程才会被阻塞,从而实现了真正的并行加速。
ExecutorService 是管理线程池的核心组件。在并行处理中,它的配置和生命周期管理至关重要。
在应用程序的整个生命周期中,如果会频繁地进行并行处理,通常推荐复用同一个 ExecutorService 实例,而不是每次都创建和关闭新的实例,以减少资源开销。
通过 CompletableFuture 进行大型列表的并行处理是提升Java应用性能的有效手段。核心在于避免在任务提交阶段就阻塞性地等待结果。正确的做法是先将所有任务异步提交并收集其 CompletableFuture 实例,待所有任务均已启动或完成时,再统一进行结果的聚合。合理配置 ExecutorService、选择合适的任务粒度以及完善异常处理机制,将确保您的并行处理方案既高效又健壮。