17370845950

在Java中如何实现并发任务分组执行
使用CompletableFuture结合ExecutorService按组并行执行任务,先将任务按groupId分组到Map中,再对每组提交异步任务并等待全部完成。

在Java中实现并发任务分组执行,关键在于将任务按组划分,并确保每组任务可以并行执行,同时组与组之间可控制执行顺序或并发级别。常用的方式是结合ExecutorServiceCompletableFuture和集合分组逻辑来完成。

使用CompletableFuture进行任务分组并行处理

通过CompletableFuture可以方便地对每组任务进行异步执行,并等待整组完成。

示例场景:假设有多个任务,需要按类别(如用户ID分组)分别执行,每组内部任务并行,组间也可并行。

  • 先将任务按某个键(如groupId)分组到Map中
  • 对每个组创建一个CompletableFuture,内部使用线程池并行处理该组所有任务
  • 收集所有组的future,调用join()等待全部完成

代码示例:

Map> groupedTasks = // 按组分类的任务

ExecutorService groupPool = Executors.newFixedThreadPool(4);
List> groupFutures = new ArrayList<>();

for (Map.Entry> entry : groupedTasks.entrySet()) {
    String groupId = entry.getKey();
    List tasks = entry.getValue();

    CompletableFuture groupFuture = CompletableFuture.runAsync(() -> {
        ExecutorService taskPool = Executors.newFixedThreadPool(2); // 每组内并发度
        List> taskFutures = tasks.stream()
            .map(task -> CompletableFuture.runAsync(task, taskPool))
            .collect(Collectors.toList());

        // 等待本组所有任务完成
        CompletableFuture.allOf(taskFutures.toArray(new CompletableFuture[0])).join();
        taskPool.shutdown();
    }, groupPool);

    groupFutures.add(groupFuture);
}

// 等待所有组执行完毕
CompletableFuture.allOf(groupFutures.toArray(new CompletableFuture[0])).join();

groupPool.shutdown();

控制组间执行顺序(可选串行)

如果要求组之间按顺序执行,可以不用并行流,而是逐个处理每组:

  • 遍历分组map(保持顺序可用LinkedHashMap
  • 每组提交到线程池后调用future.get()同步等待

这样就能实现“组内并行、组间串行”。

使用CountDownLatch协调任务组

若不使用CompletableFuture,也可以用CountDownLatch手动控制同步。

为每组创建一个latch,初始值为任务数,每个任务完成后countDown,主线程await。

适合更底层控制场景,但代码较繁琐,推荐优先使用CompletableFuture。

注意事项

线程池管理:避免频繁创建过多线程池,建议复用或使用共享池。

异常处理:CompletableFuture中任务抛异常不会中断主流程,需调用exceptionally()或检查get()结果。

资源释放:务必调用shutdown()关闭线程池,防止内存泄漏。

基本上就这些。根据实际需求选择组内并行、组间并行或串行策略,CompletableFuture配合分组map是最清晰实用的方式。