CountedCompleter通过pending count机制实现自定义任务完成条件,适用于分治算法;其核心是手动管理等待计数,调用tryComplete()触发onCompletion回调合并结果,比RecursiveTask更灵活但复杂。
在Java中,CountedCompleter 是 ForkJoinTask 的一个抽象子类,适用于需要自定义任务完成条件的场景,特别适合实现分治并行算法。它比普通的 RecursiveAction 或 RecursiveTask 更灵活,因为你可以控制任务何时才算“完成”——不是执行完 compute 就结束,
而是通过手动调用 tryComplete() 或 complete(...) 来触发完成逻辑。
CountedCompleter 的关键在于“等待计数器”(pending count)。每个任务都有一个内部计数器,表示它等待多少个子任务完成。当一个任务被创建时,你可以设置它的 pending count。每当一个子任务完成,计数器减一。当计数器归零时,系统自动调用任务的 onCompletion(CountedCompleter) 方法,并向上游传播完成状态。
常用方法:
compute():主逻辑入口,通常用于拆分任务或执行计算。tryComplete():将当前任务的 pending count 减一;如果归零,则触发 onCompletion 并通知父任务。onCompletion(CountedCompleter):任务完成时的回调,常用于合并结果。addToPendingCount(int delta):增加等待的子任务数量。quietlyComplete():强制完成任务而不触发异常传播。下面以数组求和为例,展示如何用 CountedCompleter 实现并行分治:
import java.util.concurrent.*;public class SumTask extends CountedCompleter
{ private final long[] array; private final int lo, hi; private Long result; public SumTask(CountedCompleter> parent, long[] array, int lo, int hi) { super(parent); this.array = array; this.lo = lo; this.hi = hi; } @Override public void compute() { if (hi - lo zuojiankuohaophpcn= 1000) { // 小数据量直接计算 long sum = 0; for (int i = lo; i zuojiankuohaophpcn hi; i++) { sum += array[i]; } result = sum; // 告知自己已完成,减少父任务的等待计数 tryComplete(); } else { // 拆分为两个子任务 int mid = (lo + hi) / 2; // 增加两个待完成的子任务 addToPendingCount(1); // 右半部分 addToPendingCount(1); // 左半部分 // 提交右半部分(作为新任务) new SumTask(this, array, mid, hi).fork(); // 当前任务处理左半部分 new SumTask(this, array, lo, mid).fork(); // 也可复用当前任务 } } @Override public void onCompletion(CountedCompleter> caller) { // 合并子任务结果 SumTask left = (SumTask) getFirstChild(); SumTask right = (SumTask) left.getNextSibling(); long leftResult = left == null ? 0 : left.result; long rightResult = right == null ? 0 : right.result; result = leftResult + rightResult; } public Long getRawResult() { return result; } // 示例运行 public static void main(String[] args) { long[] data = new long[100_000]; Arrays.fill(data, 1); ForkJoinPool pool = new ForkJoinPool(); SumTask task = new SumTask(null, data, 0, data.length); pool.invoke(task); System.out.println("Sum: " + task.getResult()); pool.shutdown(); }}
关键设计要点
使用
CountedCompleter时需注意以下几点:
addToPendingCount(1),否则父任务不会等待它。tryComplete() 触发完成流程。onCompletion 中处理,确保所有子任务已完成。CountedCompleter 特别适合以下情况:
相比 RecursiveTask,它提供了更大的灵活性,但代价是代码复杂度上升。对于标准的分治问题(如归并排序、矩阵乘法),若子任务数固定为2,RecursiveTask 更简洁;但若子任务数可变或需要延迟完成判断,CountedCompleter 是更好选择。
基本上就这些。