17370845950

Java如何使用Phaser实现多阶段任务同步_Java并发协调控制机制分析
Phaser支持动态注册线程和多阶段同步,适用于分阶段执行的并发任务。通过arriveAndAwaitAdvance()等待阶段完成,register()增加参与者,arriveAndDeregister()退出后续阶段,onAdvance()可监听阶段切换并决定是否终止。示例中工人、质检员分阶段协作,三阶段后自动结束,适合流程化作业场景。

在Java并发编程中,Phaser 是一个灵活且强大的同步工具,特别适用于需要分阶段执行的多线程任务。与 CountDownLatch 和 CyclicBarrier 不同,Phaser 支持动态注册任务线程,并能重复使用于多个阶段,非常适合实现“多阶段任务同步”场景。

Phaser 的基本机制

Phaser 可以看作是 CyclicBarrier 和 CountDownLatch 的结合体,但更加灵活:

  • 支持动态增加或减少参与同步的线程数(通过 register()arriveAndDeregister()
  • 每个阶段执行完成后自动进入下一阶段,无需重新初始化
  • 线程可选择是否继续参与后续阶段

核心方法包括:

  • arriveAndAwaitAdvance():到达当前阶段并等待其他参与者完成
  • arriveAndDeregister():到达后退出同步,不再参与后续阶段
  • register():动态注册一个新参与者
  • getPhase():获取当前阶段编号(从0开始)

多阶段任务同步示例

假设我们有一组工人协作完成三个阶段的任务:准备、加工、收尾。每个阶段必须等所有当前参与者完成才能进入下一阶段。

import java.util.concurrent.Phaser;

public class MultiStageTask {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3); // 初始3个参与者

        for (int i = 1; i <= 3; i++) {
            new Thread(new Worker(phaser, "Worker-" + i)).start();
        }
    }

    static class Worker implements Runnable {
        private final Phaser phaser;
        private final String name;

        Worker(Phaser phaser, String name) {
            this.phaser = phaser;
            this.name = name;
        }

        @Override
        public void run() {
            int phase;

            // 阶段1:准备
            System.out.println(name + " 开始准备工作");
            sleep(500);
            phase = phaser.arriveAndAwaitAdvance();
            System.out.println(name + " 完成准备,进入阶段 " + (phase + 1));

            // 阶段2:加工
            System.out.println(name + " 开始加工任务");
            sleep(700);
            phase = phaser.arriveAndAwaitAdvance();
            System.out.println(name + " 完成加工,进入阶段 " + (phase + 1));

            // 阶段3:收尾
            System.out.println(name + " 进行收尾工作");
            sleep(400);
            phaser.arriveAndAwaitAdvance(); // 最后一阶段无需再继续
            System.out.println(name + " 任务结束");
        }

        private void sleep(long ms) {
            try {
                Thread.sleep(ms);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

动态参与者管理

Phaser 的一大优势是支持运行时动态加入或退出。例如,在第二阶段后可以新增一个质检员线程:

// 在某个阶段后动态添加参与者
phaser.register();
new Thread(() -> {
    System.out.println("质检员加入");
    phaser.arriveAndAwaitAdvance(); // 等待当前阶段结束
    System.out.println("质检员检查完成");
    phaser.arriveAndDeregister();  // 检查完退出
}).start();

也可以让某个线程在完成特定阶段后退出:

// 工人完成前两个阶段后退出
phaser.arriveAndAwaitAdvance(); // 第一阶段
phaser.arriveAndAwaitAdvance(); // 第二阶段
phaser.arriveAndDeregister();   // 不再参与后续阶段
System.out.println(name + " 提前离场");

阶段终止与监听

可以通过重写 onAdvance() 方法在每个阶段切换时执行自定义逻辑,常用于日志记录或判断是否终止:

Phaser phaser = new Phaser(3) {
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        System.out.println("▶ 阶段 " + phase + " 完成,当前参与者:" + registeredParties);
        return phase >= 2 || registeredParties == 0; // 执行完第3阶段后停止
    }
};

onAdvance() 返回 true 时,Phaser 进入终止状态,后续的同步调用会立即返回。

基本上就这些。Phaser 在处理阶段性并行任务时非常高效,尤其适合模拟流程化作业、分步计算或游戏回合制逻辑。关键在于合理管理参与者的生命周期和阶段切换逻辑,避免死锁或遗漏等待。