模块  java.base
软件包  java.util.concurrent

Class Phaser


  • public class Phaserextends Object
    可重复使用的同步屏障,功能类似于CyclicBarrierCountDownLatch但支持更灵活的使用。

    注册。 与其他障碍的情况不同, 登记在移相器上同步的各方数量可能会随时间而变化。 任务可以在任何时间(使用的方法来注册register()bulkRegister(int) ,或构造建立各方的初始数的形式),和(使用任何抵达时任选注销arriveAndDeregister() )。 与大多数基本同步结构一样,注册和注销仅影响内部计数; 他们没有建立任何进一步的内部簿记,因此任务无法查询他们是否已注册。 (但是,你可以通过继承这个类来引入这样的簿记。)

    同步。 CyclicBarrier一样,可能会一再等待Phaser 方法arriveAndAwaitAdvance()具有类似于CyclicBarrier.await效果。 每一代移相器都有一个相关的相位数。 阶段编号从零开始,并在所有各方到达移相器时前进,在达到Integer.MAX_VALUEInteger.MAX_VALUE零。 使用阶段编号可以在到达移相器时以及在等待其他人时通过可由任何注册方调用的两种方法独立控制操作:

    • 到达。 方法arrive()arriveAndDeregister()记录到货。 这些方法不会阻止,但返回相关的到达阶段号 ; 也就是说,到达所应用的相位器的相数。 当给定阶段的最后一方到达时,执行可选动作并且阶段前进。 这些动作由触发相位超前的一方执行,并且通过重写方法onAdvance(int, int)来安排,该方法还控制终止。 覆盖此方法与为CyclicBarrier提供屏障操作类似,但更灵活。
    • 等候。 方法awaitAdvance(int)需要指示到达阶段号的参数,并且当相位器前进到(或已经在)不同阶段时返回。 与使用CyclicBarrier类似结构不同,方法awaitAdvance继续等待,即使等待线程被中断。 可以使用可中断和超时版本,但在任务中断或超时等待时遇到的异常不会更改移相器的状态。 如有必要,您可以在调用forceTermination之后,在这些异常的处理程序中执行任何相关的恢复。 ForkJoinPool执行的任务也可以使用相位器 如果池的parallelismLevel可以容纳最大数量的同时阻塞方,则可确保进度。

    终止。 移相器可以进入终止状态,可以使用方法isTerminated()进行检查。 在终止时,所有同步方法立即返回而不等待提前,如负返回值所示。 同样,终止时注册的尝试也没有效果。 调用onAdvance返回true时触发终止。 如果取消注册导致注册方的数量变为零,则默认实现返回true 如下所示,当相位器控制具有固定迭代次数的动作时,通常很方便地覆盖该方法以在当前相位数达到阈值时引起终止。 方法forceTermination()也可用于突然释放等待线程并允许它们终止。

    分层。 可以对移相器进行分层 (即,以树结构构造)以减少争用。 然而,可以设置具有大量方的否则会经历大量同步争用成本的相位器,使得子组相位器共享共同的父级。 即使它产生更大的每操作开销,这也可以大大增加吞吐量。

    在分层阶段的树中,自动管理子阶段与其父母的注册和注销。 只要子移相器的注册方数量变为非零(如Phaser(Phaser,int)构造函数register()bulkRegister(int)中所述 ),子移相器就会向其父移植器注册。 每当注册方的数量因调用arriveAndDeregister()而变为零时,子移相器将从其父级取消注册。

    监测。 虽然同步方法可以仅由注册方调用,但是相位器的当前状态可以由任何呼叫者监视。 在任何特定时刻共有getRegisteredParties()方,其中getArrivedParties()已到达当前阶段( getPhase() )。 当剩余的( getUnarrivedParties() )方到达时,阶段进展。 这些方法返回的值可能反映瞬态,因此通常对同步控制无用。 方法toString()以便于非正式监视的形式返回这些状态查询的快照。

    示例用法:

    可以使用Phaser代替CountDownLatch来控制为可变数量的方提供服务的一次性动作。 典型的习惯用法是将方法设置为首次注册,然后启动所有操作,然后取消注册,如下所示:

       void runTasks(List<Runnable> tasks) { Phaser startingGate = new Phaser(1); // "1" to register self // create and start threads for (Runnable task : tasks) { startingGate.register(); new Thread(() -> { startingGate.arriveAndAwaitAdvance(); task.run(); }).start(); } // deregister self to allow threads to proceed startingGate.arriveAndDeregister(); } 

    使一组线程重复执行给定迭代次数的操作的一种方法是覆盖onAdvance

       void startTasks(List<Runnable> tasks, int iterations) { Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations - 1 || registeredParties == 0; } }; phaser.register(); for (Runnable task : tasks) { phaser.register(); new Thread(() -> { do { task.run(); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); }).start(); } // allow threads to proceed; don't wait for them phaser.arriveAndDeregister(); } 
    如果主要任务必须稍后等待终止,它可能会重新注册然后执行类似的循环:
       // ... phaser.register(); while (!phaser.isTerminated()) phaser.arriveAndAwaitAdvance(); 

    相关结构可用于等待上下文中的特定阶段编号,其中您确定阶段永远不会包围Integer.MAX_VALUE 例如:

       void awaitPhase(Phaser phaser, int phase) { int p = phaser.register(); // assumes caller not already registered while (p < phase) { if (phaser.isTerminated()) // ... deal with unexpected termination else p = phaser.arriveAndAwaitAdvance(); } phaser.arriveAndDeregister(); } 

    要使用阶段树创建一组n任务,您可以使用以下形式的代码,假设具有构造函数的Task类接受在构造时注册的Phaser 在调用build(new Task[n], 0, n, new Phaser())之后,可以启动这些任务,例如通过提交到池:

       void build(Task[] tasks, int lo, int hi, Phaser ph) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(tasks, i, j, new Phaser(ph)); } } else { for (int i = lo; i < hi; ++i) tasks[i] = new Task(ph); // assumes new Task(ph) performs ph.register() } } 
    TASKS_PER_PHASER的最佳值主要取决于预期的同步速率。 低至4的值可能适用于极小的每相任务机构(因此高速率),或者对于极大的任务机构而言可能高达数百。

    实施说明 :此实施将最大参与方数限制为65535.尝试注册其他参与方会产生IllegalStateException 但是,您可以而且应该创建分层相位器以适应任意大量的参与者。

    从以下版本开始:
    1.7
    • 构造方法摘要

      构造方法  
      构造器 描述
      Phaser()
      创建一个新的移相器,没有最初注册的参与方,没有父级,初始阶段号为0。
      Phaser​(int parties)
      创建一个新的移相器,其中包含指定数量的已注册未获得的参与方,没有父级,初始阶段编号为0。
      Phaser​(Phaser parent)
      相当于 Phaser(parent, 0)
      Phaser​(Phaser parent, int parties)
      使用给定的父级和已注册的未获得方的数量创建新的移相器。
    • 方法摘要

      所有方法  实例方法 具体的方法 
      变量和类型 方法 描述
      int arrive()
      到达这个移相器,而不是等待其他人到达。
      int arriveAndAwaitAdvance()
      到达这个移相器并等待其他人。
      int arriveAndDeregister()
      到达此移相器并从中取消注册而无需等待其他人到达。
      int awaitAdvance​(int phase)
      等待该相位器的相位从给定相位值前进,如果当前相位不等于给定相位值或该相位器终止则立即返回。
      int awaitAdvanceInterruptibly​(int phase)
      等待该相位器的相位从给定相位值前进,如果在等待时中断则抛出 InterruptedException ,或者如果当前相位不等于给定相位值或该相位器终止则立即返回。
      int awaitAdvanceInterruptibly​(int phase, long timeout, TimeUnit unit)
      等待此相位器的相位从给定相位值或给定超时前进到过去,如果在等待时中断则抛出 InterruptedException ,或者如果当前相位不等于给定相位值或该相位器终止则立即返回。
      int bulkRegister​(int parties)
      在此移相器中添加给定数量的新未分支方。
      void forceTermination()
      强制此移相器进入终止状态。
      int getArrivedParties()
      返回已到达此移相器当前阶段的已注册方的数量。
      Phaser getParent()
      返回此移相器的父级,如果没有,则返回 null
      int getPhase()
      返回当前阶段号。
      int getRegisteredParties()
      返回此移相器注册的参与方数量。
      Phaser getRoot()
      返回此移相器的根祖先,如果它没有父移相器,则与此移相器相同。
      int getUnarrivedParties()
      返回尚未到达此移相器当前阶段的已注册方的数量。
      boolean isTerminated()
      如果此移相器已终止,则返回 true
      protected boolean onAdvance​(int phase, int registeredParties)
      可重写的方法,用于在即将发生相位超前时执行操作,并控制终止。
      int register()
      为此移相器添加了一个新的未受影响的聚会。
      String toString()
      返回标识此移相器的字符串及其状态。
    • 构造方法详细信息

      • Phaser

        public Phaser()
        创建一个新的移相器,没有最初注册的参与方,没有父级,初始阶段号为0.使用此移相器的任何线程都需要首先注册它。
      • Phaser

        public Phaser​(int parties)
        创建一个新的移相器,其中包含指定数量的已注册未获得的参与方,没有父级,初始阶段编号为0。
        参数
        parties - 进入下一阶段所需的参与方数量
        异常
        IllegalArgumentException - 如果当事方小于零或大于支持的最大当事方数
      • Phaser

        public Phaser​(Phaser parent,              int parties)
        使用给定的父级和已注册的未获得方的数量创建新的移相器。 当给定父级非空且给定方数大于零时,此子移相器将向其父级注册。
        参数
        parent - 父移相器
        parties - 进入下一阶段所需的参与方数量
        异常
        IllegalArgumentException - 如果当事方小于零或大于最大支持方数
    • 方法详细信息

      • register

        public int register()
        为此移相器添加了一个新的未受影响的聚会。 如果正在进行onAdvance(int, int)调用,则此方法可能会在返回之前等待其完成。 如果此移相器具有父级,并且此移相器以前没有注册方,则此子移相器也在其父级中注册。 如果此移相器终止,则注册尝试无效,并返回负值。
        结果
        该注册所适用的到达阶段号。 如果此值为负,则此移相器已终止,在这种情况下,注册无效。
        异常
        IllegalStateException - 如果尝试注册超过支持的最大数量的方
      • bulkRegister

        public int bulkRegister​(int parties)
        在此移相器中添加给定数量的新未分支方。 如果正在进行onAdvance(int, int)调用,则此方法可能会在返回之前等待其完成。 如果此移相器具有父级,并且给定数量的方大于零,并且此移相器之前没有注册方,则此子移相器也向其父级注册。 如果此移相器终止,则注册尝试无效,并返回负值。
        参数
        parties - 进入下一阶段所需的额外参与方数量
        结果
        该注册所适用的到达阶段号。 如果此值为负,则此移相器已终止,在这种情况下,注册无效。
        异常
        IllegalStateException - 如果尝试注册超过最大支持的参与方数量
        IllegalArgumentException - 如果 parties < 0
      • arrive

        public int arrive()
        到达这个移相器,而不是等待其他人到达。

        未注册方调用此方法是一个使用错误。 但是,此错误可能仅在此移相器的某些后续操作时导致IllegalStateException (如果有的话)。

        结果
        到达阶段号,如果终止则为负值
        异常
        IllegalStateException - 如果没有终止,未得到支付的政党数量将变为负数
      • arriveAndDeregister

        public int arriveAndDeregister()
        到达此移相器并从中取消注册而无需等待其他人到达。 撤销注册减少了在未来阶段推进所需的各方数量。 如果此移相器具有父级,并且取消注册导致此移相器具有零方,则此移相器也从其父级取消注册。

        未注册方调用此方法是一个使用错误。 但是,此错误可能仅在此移相器的某些后续操作时导致IllegalStateException ,如果有的话。

        结果
        到达阶段号,如果终止则为负值
        异常
        IllegalStateException - 如果没有终止,登记或未获得当事人的人数将变为负数
      • arriveAndAwaitAdvance

        public int arriveAndAwaitAdvance()
        到达这个移相器并等待其他人。 相当于awaitAdvance(arrive()) 如果您需要等待中断或超时,您可以使用awaitAdvance方法的其他形式之一进行类似构造。 如果您需要在抵达时取消注册,请使用awaitAdvance(arriveAndDeregister())

        未注册方调用此方法是一个使用错误。 但是,此错误可能仅在此移相器上的某些后续操作时导致IllegalStateException (如果有的话)。

        结果
        到达阶段号,或者(负) current phase如果终止
        异常
        IllegalStateException - 如果没有终止,未得到支付的政党数量将变为负数
      • awaitAdvance

        public int awaitAdvance​(int phase)
        等待该相位器的相位从给定相位值前进,如果当前相位不等于给定相位值或该相位器终止则立即返回。
        参数
        phase - 到达阶段号,如果终止则为负值; 此参数通常是先前调用arrivearriveAndDeregister返回的值。
        结果
        下一个到达阶段号,或者如果是负数则为参数,或者如果终止 则为 (负) current phase
      • awaitAdvanceInterruptibly

        public int awaitAdvanceInterruptibly​(int phase)                              throws InterruptedException
        等待该相位器的相位从给定相位值前进,如果在等待时中断则抛出 InterruptedException ,或者如果当前相位不等于给定相位值或该相位器终止则立即返回。
        参数
        phase - 到达阶段号,如果终止则为负值; 此参数通常是先前调用arrivearriveAndDeregister返回的值。
        结果
        下一个到达阶段号,或者如果它是否定的参数,或者如果终止 则为 (负) current phase
        异常
        InterruptedException - 如果线程在等待时中断
      • awaitAdvanceInterruptibly

        public int awaitAdvanceInterruptibly​(int phase,                                     long timeout,                                     TimeUnit unit)                              throws InterruptedException,                                     TimeoutException
        等待该相位器的相位从给定相位值或给定超时前进到过去,如果在等待时中断则抛出 InterruptedException ,或者如果当前相位不等于给定相位值或该相位器终止则立即返回。
        参数
        phase - 到达阶段号,如果终止则为负值; 此参数通常是先前调用arrivearriveAndDeregister返回的值。
        timeout - 放弃前等待多长时间,单位为 unit
        unit -一个 TimeUnit确定如何解释 timeout参数
        结果
        下一个到达阶段号,或者如果是负数则为参数,或者如果终止 则为 (负) current phase
        异常
        InterruptedException - 如果线程在等待时中断
        TimeoutException - 如果在等待时超时
      • forceTermination

        public void forceTermination()
        强制此移相器进入终止状态。 注册方的数量不受影响。 如果此移相器是分层相位器的成员,则该组中的所有移相器都将终止。 如果此移相器已终止,则此方法无效。 在一个或多个任务遇到意外异常后,此方法可用于协调恢复。
      • getPhase

        public final int getPhase()
        返回当前阶段号。 最大相数为Integer.MAX_VALUE ,之后重新启动为零。 在终止时,阶段编号为负,在这种情况下,终止之前的主要阶段可以通过getPhase() + Integer.MIN_VALUE获得。
        结果
        阶段号,如果终止则为负值
      • getRegisteredParties

        public int getRegisteredParties()
        返回此移相器注册的参与方数量。
        结果
        派对的数量
      • getArrivedParties

        public int getArrivedParties()
        返回已到达此移相器当前阶段的已注册方的数量。 如果此移相器已终止,则返回的值无意义且任意。
        结果
        到达的派对数量
      • getUnarrivedParties

        public int getUnarrivedParties()
        返回尚未到达此移相器当前阶段的已注册方的数量。 如果此移相器已终止,则返回的值无意义且任意。
        结果
        没有派对的人数
      • getParent

        public Phaser getParent()
        返回此移相器的父级,如果没有,则返回 null
        结果
        此移相器的父级,如果没有, null
      • getRoot

        public Phaser getRoot()
        返回此移相器的根祖先,如果它没有父移相器,则与此移相器相同。
        结果
        这个移相器的根本祖先
      • isTerminated

        public boolean isTerminated()
        如果此移相器已终止,则返回 true
        结果
        true此移相器是否已终止
      • onAdvance

        protected boolean onAdvance​(int phase,                            int registeredParties)
        可重写的方法,用于在即将发生相位超前时执行操作,并控制终止。 在推进该移相器的一方到来时(当所有其他等待方都处于休眠状态时)调用该方法。 如果此方法返回true ,则此移相器将在提前设置为最终终止状态,随后对isTerminated()调用将返回true。 通过调用此方法抛出的任何(未经检查的)异常或错误将传播到尝试推进此移相器的一方,在这种情况下不会发生进展。

        该方法的参数提供了当前转换的主要相位器状态。 onAdvanceonAdvance此移相器的到达,注册和等待方法的效果未指定,不应依赖。

        如果此移相器是分层移相器的成员,则仅在每次前进时调用其根移相器onAdvance

        支持最常见的用例,此方法的默认实现返回true当注册方的数量已变为零作为党调用的结果arriveAndDeregister 您可以禁用此行为,从而在将来注册时启用,通过重写此方法始终返回false

           Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int parties) { return false; } } 
        参数
        phase - 在此移相器前进之前,此方法的当前阶段编号
        registeredParties - 当前的注册用户数
        结果
        true如果此移相器应该终止
      • toString

        public String toString()
        返回标识此移相器的字符串及其状态。 状态,在括号中,它包括字符串"phase = "其次是相数, "parties = "其次是注册政党的数量,并"arrived = "随后赶到方的数量。
        重写:
        toString在类 Object
        结果
        标识此移相器的字符串及其状态