Java的ParallelStream API提供了一种便捷的方式来并行处理集合数据。在底层,它默认使用ForkJoinPool.commonPool()来执行并行任务。这个通用线程池的大小通常根据系统可用的处理器核心数(Runtime.getRuntime().availableProcessors() - 1,至少为1)来确定,旨在优化CPU密集型任务的性能。
然而,当ParallelStream内部执行的是I/O密集型操作(例如数据库查询、网络请求、文件读写)时,默认的commonPool行为可能并非最优。I/O操作通常会导致线程阻塞等待外部资源响应,如果commonPool中的线程被大量阻塞,将无法有效利用CPU,甚至可能导致线程饥饿,降低整体吞吐量。此时,我们可能希望限制ParallelStream使用的线程数量,或者将I/O任务从commonPool中分离出来。
直接通过设置系统属性java.util.concurrent.ForkJoinPool.common.parallelism来改变commonPool的并行度,虽然在某些情况下有效,但它是一个全局设置,会影响所有使用commonPool的任务,且对于已经启动的应用程序可能无法动态生效。更重要的是,对于I/O密集型任务,这种方式并不能根本解决线程阻塞的问题。
为了更精细地控制ParallelStream的线程数,我们可以创建一个自定义的ForkJoinPool,然后将ParallelStream的执行包裹在一个Callable任务中,并提交给这个自定义线程池。这样,ParallelStream内部的并行操作就会使用我们指定的线程池,而不是commonPool。
示例代码:
import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; public class CustomParallelStreamPool { // 模拟一个执行数据库查询的服务 static class ObjectService { public String getParam(String field) { // 模拟数据库查询耗时 try { Thread.sleep(100); // 模拟I/O等待 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println(Thread.currentThread().getName() + " - Fetched param for " + field); return "Param for " + field; } } static class MyObject { String field; public MyObject(String field) { this.field = field; } public String getField() { return field; } } private static ObjectService objectService = new ObjectService(); /** * 使用自定义ForkJoinPool处理ParallelStream * @param objects 待处理对象列表 * @param poolSize 自定义线程池大小 * @return 处理结果列表 * @throws InterruptedException * @throws ExecutionException */ public static List
processWithCustomPool(List objects, int poolSize) throws InterruptedException, ExecutionException { ForkJoinPool customThreadPool = null; try { // 创建一个指定并行度的ForkJoinPool customThreadPool = new ForkJoinPool(poolSize); // 将ParallelStream操作封装为Callable任务 Callable > task = () -> objects.parallelStream() .map(object -> objectService.getParam(object.getField())) .collect(Collectors.toList()); // 提交任务并获取结果 return customThreadPool.submit(task).get(); } finally { // 关闭自定义线程池 if (customThreadPool != null) { customThreadPool.shutdown(); } } } public static void main(String[] args) throws ExecutionException, InterruptedException { List
data = List.of( new MyObject("A"), new MyObject("B"), new MyObject("C"), new MyObject("D"), new MyObject("E"), new MyObject("F"), new MyObject("G"), new MyObject("H"), new MyObject("I"), new MyObject("J") ); System.out.println("--- Processing with custom pool size 4 ---"); long startTime = System.currentTimeMillis(); List results = processWithCustomPool(data, 4); long endTime = System.currentTimeMillis(); System.out.println("Results: " + results); System.out.println("Total time: " + (endTime - startTime) + "ms"); } }
注意事项:
对于包含I/O密集型操作的并行处理,更推荐的做法是利用CompletableFuture和专门为I/O任务设计的线程池。这种方法将CPU密集型的流处理与I/O密集型的具体操作解耦,从而更好地管理线程资源。
ParallelStream可以用于快速遍历元素并提交异步I/O任务,而实际的I/O操作则由一个独立的、为I/O优化的线程池来执行。这样,ParallelStream的线程(无论是commonPool还是自定义ForkJoinPool的线程)可以迅速完成任务提交,而不会被I/O阻塞。
示例代码:
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class ParallelStreamWithCompletableFuture {
static class ObjectService {
public String getParam(String field) {
try {
Thread.sleep(100); // 模拟I/O等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println(Thread.currentThread().getName() + " - Fetched param for " + field);
return "Param for " + field;
}
}
static class MyObject {
String field;
public MyObject(String field) { this.field = field; }
public String getField() { return field; }
}
private static ObjectService objectService = new ObjectService();
// 建议使用有限的线程池处理I/O,其大小应与数据库连接池大小匹配
private static ExecutorService ioExecutor = Executors.newFixedThreadPool(5); // 示例:假设数据库连接池最大为5
/**
* 使用ParallelStream结合CompletableFuture和专用I/O执行器处理异步I/O任务
* @param objects 待处理对象列表
* @return 处理结果列表
*/
public static List processParallelWithAsyncIO(List objects) {
// ParallelStream用于快速提交CompletableFuture任务
List> futures = objects.parallelStream()
.map(object -> CompletableFuture.supplyAsync(() -> objectService.getParam(object.getField()), ioExecutor)
.thenApply(param -> Optional.ofNullable(param).orElse("N/A")))
.collect(Collectors.toList());
// 阻塞等待所有CompletableFuture完成,并收集结果
return futures.stream()
.map(CompletableFuture::join) // join()会阻塞直到CompletableFuture完成
.collect(Collectors.toList());
}
public static void main(String[] args) {
List data = List.of(
new MyObject("A"), new MyObject("B"), new MyObject("C"), new MyObject("D"),
new MyObject("E"), new MyObject("F"), new MyObject("G"), new MyObject("H"),
new MyObject("I"), new MyObject("J")
);
System.out.println("--- Processing with ParallelStream and async I/O ---");
long startTime = System.currentTimeMillis();
List results = processParallelWithAsyncIO(data);
long endTime = System.currentTimeMillis();
System.out.println("Results: " + results);
System.out.println("Total time: " + (endTime - startTime) + "ms");
// 关闭I/O执行器
ioExecutor.shutdown();
}
} 优点:
注意事项:
在涉及数据库查询的场景中,线程池的配置必须与数据库连接池的容量紧密协调。每个执行数据库查询的线程都需要一个数据库连接。如果并发执行的线程数超过了数据库连接池的最大连接数,将会导致:
因此,无论采用哪种线程池管理方式,都应确保并发执行数据库操作的线程数量不超过数据库连接池所能提供的最大