Module  java.base
软件包  java.util.concurrent

Class Phaser



  • public class Phaser
    extends Object
    一个可重复使用的同步屏障,功能类似于CyclicBarrierCountDownLatch但支持更灵活的使用。

    注册。 与其他障碍的情况不同, 注册在移相器上同步的各方的数量可能随时间而变化。 任务可以在任何时间(使用的方法来注册register()bulkRegister(int) ,或构造建立各方的初始数的形式),和(使用任何抵达时任选注销arriveAndDeregister() )。 与大多数基本同步结构一样,注册和注销仅影响内部计数; 他们没有建立任何进一步的内部簿记,所以任务无法查询是否注册。 (但是,您可以通过对此类进行子类化来介绍此类簿记。)

    同步。 CyclicBarrier一样, Phaser可能会重复等待。 方法arriveAndAwaitAdvance()具有类似于CyclicBarrier.await效果。 每一代移相器都具有相关的相位数。 相位数从零开始,当所有各方到达移相器时,前进,达到Integer.MAX_VALUEInteger.MAX_VALUE零。 通过使用阶段数字,可以通过两种可由任何注册方调用的方法,在到达移相器和等待其他人时独立控制动作:

    • 到达。 方法arrive()arriveAndDeregister()记录到达。 这些方法不会阻塞,而是返回相关的到达阶段数 ; 即到达应用的移相器的相位数。 当给定阶段的最后一方到达时,执行可选操作,并且阶段前进。 这些动作由触发相位提前的一方执行,并且通过重写方法onAdvance(int, int)进行排列,该方法也控制终止。 覆盖此方法与CyclicBarrier相似,但更灵活。
    • 等候。 方法awaitAdvance(int)需要一个指示到达阶段数的参数,并且当相位器前进到(或已经处于)不同阶段时返回。 与使用CyclicBarrier类似结构不同,即使等待的线程被中断,方法awaitAdvance继续等待。 中断和超时版本也是可用的,但任务等待中断或超时时遇到的异常不会改变移相器的状态。 如有必要,您可以在调用forceTermination之后,在这些异常的处理程序中执行任何关联的恢复。 也可以在ForkJoinPool执行的任务使用抖动 如果池的并行度级别可以容纳同时被阻止的方的最大数量,则确保进度。

    终止。 移相器可以进入终止状态,可以使用方法isTerminated()进行检查。 一旦终止,所有同步方法立即返回而不等待提前,如负值返回值所示。 同样,终止时注册的尝试也没有效果。 当调用onAdvance返回true时触发终止。 如果注销已经使注册方的数量变为零,默认实现将返回true 如下所示,当相位器以固定次数的迭代控制动作时,当当前相位数达到阈值时,通常方便的是重写该方法以导致终止。 方法forceTermination()也可用于突然释放等待线程并允许它们终止。

    分层。 移相器可以分层 (即,以树结构构造)以减少争用。 具有大量聚会的激动剂,否则将会遇到重度的同步竞争成本,可能会被设置为使得一些子相位器共享一个共同的父节点。 这可能会大大增加吞吐量,即使它产生更大的每操作开销。

    在一个分层相位的树中,自动管理儿童相机与其父母的注册和注销。 每当儿童移动设备的注册方数量变为非零(在Phaser(Phaser,int)构造函数register()bulkRegister(int)中确定 )时,子移相器已向其父级注册。 无论何时由于引用arriveAndDeregister() ,注册方的数量变为零,则子移动程序将从其父级注销。

    监测。 虽然同步方法只能由注册方调用,但是可以由任何呼叫者监视移相器的当前状态。 任何时候,共有getRegisteredParties()派对,其中getArrivedParties()已到达现阶段( getPhase() )。 剩余的( getUnarrivedParties() )派对到达时,阶段进行。 这些方法返回的值可以反映瞬态状态,因此对于同步控制通常不是有用的。 方法toString()以便于非正式监控的形式返回这些状态查询的快照。

    示例用法:

    可以使用A Phaser而不是CountDownLatch来控制为可变数量的方提供服务的一次性动作。 典型的成语是用于首先注册的方法,然后启动所有操作,然后注销,如:

       void runTasks(List<Runnable> tasks) { Phaser startingGate = new Phaser(1); // "1" to register self // create and start threads for (Runnable task : tasks) { startingGate.register(); new Thread(() -> { startingGate.arriveAndAwaitAdvance(); task.run(); }).start(); } // deregister self to allow threads to proceed startingGate.arriveAndDeregister(); } 

    导致一组线程重复执行给定次数迭代的一种方法是覆盖onAdvance

       void startTasks(List<Runnable> tasks, int iterations) { Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations - 1 || registeredParties == 0; } }; phaser.register(); for (Runnable task : tasks) { phaser.register(); new Thread(() -> { do { task.run(); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); }).start(); } // allow threads to proceed; don't wait for them phaser.arriveAndDeregister(); } 
    如果主要任务必须等待终止,则可能会重新注册,然后执行类似的循环:
       // ... phaser.register(); while (!phaser.isTerminated()) phaser.arriveAndAwaitAdvance(); 

    可以使用相关的结构来等待上下文中的特定阶段数字,在这些上下文中,您确定该阶段永远不会包围Integer.MAX_VALUE 例如:

       void awaitPhase(Phaser phaser, int phase) { int p = phaser.register(); // assumes caller not already registered while (p < phase) { if (phaser.isTerminated()) // ... deal with unexpected termination else p = phaser.arriveAndAwaitAdvance(); } phaser.arriveAndDeregister(); } 

    要使用一个相n树创建一组n任务,您可以使用以下形式的代码,假设一个具有构造函数的Task类在构造时接受它注册的Phaser 在调用build(new Task[n], 0, n, new Phaser())之后,可以启动这些任务,例如通过提交到池:

       void build(Task[] tasks, int lo, int hi, Phaser ph) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(tasks, i, j, new Phaser(ph)); } } else { for (int i = lo; i < hi; ++i) tasks[i] = new Task(ph); // assumes new Task(ph) performs ph.register() } } 
    TASKS_PER_PHASER的最佳值主要取决于预期的同步速率。 低至4的值可能适用于非常小的每阶段任务机构(因此高速率),或者对于极大的任务机构可能达到数百。

    实施说明 :本实施方式将最大派对数限制为65535.尝试注册附加方将导致IllegalStateException 但是,您可以并且应该创建分层相位器来适应任意大量的参与者。

    从以下版本开始:
    1.7
    • 构造方法摘要

      构造方法  
      Constructor 描述
      Phaser​()
      创建一个新的移相器,没有最初的注册方,没有父级和初始阶段数0。
      Phaser​(int parties)
      创建一个新的移相器与给定数量的注册无障碍方,没有父母和初始阶段0。
      Phaser​(Phaser parent)
      相当于 Phaser(parent, 0)
      Phaser​(Phaser parent, int parties)
      与给定的父母和注册的无礼方的数量创建一个新的移相器。
    • 方法摘要

      所有方法  接口方法  具体的方法 
      Modifier and Type 方法 描述
      int arrive​()
      抵达这个移相器,而不用等待别人到达。
      int arriveAndAwaitAdvance​()
      到达这个移相器,等待其他人。
      int arriveAndDeregister​()
      到达这个移相器并从其中注销,而无需等待别人到达。
      int awaitAdvance​(int phase)
      等待该相位器的相位从给定相位值前进,如果当前相位不等于给定相位值,则立即返回,或者该相位器被终止。
      int awaitAdvanceInterruptibly​(int phase)
      等待相移器的相位从给定的相位值推进,抛出 InterruptedException如果在等待时中断,或者如果当前相位不等于给定的相位值或者该相位器被终止,则立即返回。
      int awaitAdvanceInterruptibly​(int phase, long timeout, TimeUnit unit)
      等待该移相器的相位从给定的相位值或给定的超时时间 InterruptedException到等待时抛出 InterruptedException ,如果当前相位不等于给定的相位值或者该相位器被终止,则立即返回。
      int bulkRegister​(int parties)
      增加给定数量的新的有争议的派对到这个移相器。
      void forceTermination​()
      强制此移相器进入终止状态。
      int getArrivedParties​()
      返回在此移相器的当前阶段到达的已注册方的数量。
      Phaser getParent​()
      返回此移相器的父级,如果没有,则返回 null
      int getPhase​()
      返回当前相位数。
      int getRegisteredParties​()
      返回在此移动设备上注册的各方数量。
      Phaser getRoot​()
      返回此移相器的根祖先,如果它没有父代,则与该移相器相同。
      int getUnarrivedParties​()
      返回尚未到达此移相器当前阶段的已注册方的数量。
      boolean isTerminated​()
      如果此移相器已被终止,则返回 true
      protected boolean onAdvance​(int phase, int registeredParties)
      在即将进行的相位提前执行动作的可覆盖方法,并控制终止。
      int register​()
      添加一个新的unririved派对这个移相器。
      String toString​()
      返回一个标识此移相器的字符串及其状态。
    • 构造方法详细信息

      • Phaser

        public Phaser​()
        创建一个没有初始注册方,没有父级和初始阶段数0的新移相器。使用此移相器的任何线程都需要先注册。
      • Phaser

        public Phaser​(int parties)
        创建一个新的移相器与给定数量的注册无障碍方,没有父母和初始阶段0。
        参数
        parties - 需要进入下一阶段的各方数量
        异常
        IllegalArgumentException - 如果当事人不到零或大于支持的最高人数
      • Phaser

        public Phaser​(Phaser parent,
                      int parties)
        与给定的父母和注册的无礼方的数量创建一个新的移相器。 当给定的父对象不为空且给定的参与方数量大于零时,该子级移动器在其父级注册。
        参数
        parent - 父移相器
        parties - 进入下一阶段所需的当事人数
        异常
        IllegalArgumentException - 如果当事方小于零或大于所支持的最大数目
    • 方法详细信息

      • register

        public int register​()
        添加一个新的unririved派对这个移相器。 如果正在进行onAdvance(int, int)调用,则此方法可能会在返回前等待完成。 如果此移动设备有父母,而此移动设备以前没有注册方,则该儿童移动设备也向其父母注册。 如果此移相器被终止,则尝试注册不起作用,并返回负值。
        结果
        此注册申请的到达阶段编号。 如果该值为负值,则该移相器已终止,在这种情况下注册无效。
        异常
        IllegalStateException - 如果尝试注册超过最多支持的方数
      • bulkRegister

        public int bulkRegister​(int parties)
        增加给定数量的新的有争议的派对到这个移相器。 如果持续调用onAdvance(int, int)正在进行中,则此方法可能会在返回前等待完成。 如果此移动设备有父母,而且指定的当事人数量大于零,并且此移动设备以前没有注册方,则该子版移动设备也向其父母注册。 如果此移相器被终止,则尝试注册不起作用,并返回负值。
        参数
        parties - 需要进入下一阶段的附加方的数量
        结果
        此注册申请的到达阶段编号。 如果该值为负值,则该移相器已终止,在这种情况下注册无效。
        异常
        IllegalStateException - 如果尝试注册超过最多支持的方数
        IllegalArgumentException - 如果是 parties < 0
      • arrive

        public int arrive​()
        抵达这个移相器,而不用等待别人到达。

        这是非注册方调用此方法的使用错误。 但是,如果有的话,这个错误可能会导致一个IllegalStateException

        结果
        到达阶段数,如果终止则为负值
        异常
        IllegalStateException - 如果没有终止,无礼的人数将变为负数
      • arriveAndDeregister

        public int arriveAndDeregister​()
        到达这个移相器并从其中注销,而无需等待别人到达。 撤销注销减少了在未来阶段推进的各方数量。 如果这个移动设备有父母,并且取消注册会导致此移动设备有零个派对,则此移动设备也将从父母中注销。

        这是非注册方调用此方法的使用错误。 但是,如果有的话,这个错误可能会导致一个IllegalStateException在这个移相器上的后续操作。

        结果
        到达阶段数,如果终止则为负值
        异常
        IllegalStateException - 如果没有终止,登记的或没有任何一方的人数将变为负数
      • arriveAndAwaitAdvance

        public int arriveAndAwaitAdvance​()
        到达这个移相器,等待其他人。 等效于awaitAdvance(arrive()) 如果您需要等待中断或超时,您可以使用awaitAdvance方法的其他形式之一使用类似的结构进行awaitAdvance 如果相反,您需要在抵达时注销,请使用awaitAdvance(arriveAndDeregister())

        这是非注册方调用此方法的使用错误。 但是,这个错误可能会导致一个IllegalStateException只有在这个移相器上的一些后续操作,如果有的话。

        结果
        到达阶段数,或(负) current phase如果终止
        异常
        IllegalStateException - 如果没有终止,而有争议的党的人数将变为负数
      • awaitAdvance

        public int awaitAdvance​(int phase)
        等待该相位器的相位从给定相位值前进,如果当前相位不等于给定相位值,则立即返回,或者该相位器被终止。
        参数
        phase - 到达阶段数,如果终止则为负值; 此参数通常是由先前调用arrivearriveAndDeregister返回的值。
        结果
        下一个到达阶段数字,如果是负数则为参数,否则为(负) current phase
      • awaitAdvanceInterruptibly

        public int awaitAdvanceInterruptibly​(int phase)
                                      throws InterruptedException
        等待相移器的相位从给定的相位值推进,抛出 InterruptedException如果在等待时中断,或者如果当前相位不等于给定相位值,则立即返回,或者该相位器被终止。
        参数
        phase - 到达阶段数,如果终止则为负值; 此参数通常是由先前调用arrivearriveAndDeregister返回的值。
        结果
        下一个到达阶段数,或参数如果为负,或(负) current phase如果终止
        异常
        InterruptedException - 如果线程在等待时中断
      • awaitAdvanceInterruptibly

        public int awaitAdvanceInterruptibly​(int phase,
                                             long timeout,
                                             TimeUnit unit)
                                      throws InterruptedException,
                                             TimeoutException
        等待该移相器的阶段从给定的相位值或给定的超时 InterruptedException到等待时抛出 InterruptedException ,如果当前相位不等于给定的相位值,则立即返回,或者该相位器被终止。
        参数
        phase - 到达阶段数,如果终止则为负值; 此参数通常是由先前调用arrivearriveAndDeregister返回的值。
        timeout - 放弃之前等待多长时间,以 unit为单位
        unit - a TimeUnit确定如何解释 timeout参数
        结果
        下一个到达阶段的数字,或者参数是否为负数,或者(负) current phase如果终止
        异常
        InterruptedException - 如果线程在等待时中断
        TimeoutException - 如果在等待时超时
      • forceTermination

        public void forceTermination​()
        强制此移相器进入终止状态。 注册方的数量不受影响。 如果这个移相器是分层的相位片的成员,则集合中的所有相移器都将被终止。 如果此移相器已被终止,则此方法无效。 在一个或多个任务遇到意外异常之后,此方法可能有助于协调恢复。
      • getPhase

        public final int getPhase​()
        返回当前相位数。 最大相位数为Integer.MAX_VALUE ,之后重新启动为零。 一旦终止,相位数为负,在这种情况下,终止前的主要阶段可以通过getPhase() + Integer.MIN_VALUE获得。
        结果
        相位数,如果终止则为负值
      • getRegisteredParties

        public int getRegisteredParties​()
        返回在此移动设备上注册的各方数量。
        结果
        当事人数
      • getArrivedParties

        public int getArrivedParties​()
        返回在此移相器的当前阶段到达的已注册方的数量。 如果这个移相器已经终止,返回的值是无意义和任意的。
        结果
        到达方的数量
      • getUnarrivedParties

        public int getUnarrivedParties​()
        返回尚未到达此移相器当前阶段的已注册方的数量。 如果这个移相器已经终止,返回的值是无意义和任意的。
        结果
        无数派的人数
      • getParent

        public Phaser getParent​()
        返回此移相器的父级,如果没有,则返回 null
        结果
        该移相器的父级,如果没有, null
      • getRoot

        public Phaser getRoot​()
        返回此移相器的根祖先,如果它没有父代,则与该移相器相同。
        结果
        这个移相器的根祖先
      • isTerminated

        public boolean isTerminated​()
        如果此移相器已被终止,则返回 true
        结果
        true如果此移相器已被终止
      • onAdvance

        protected boolean onAdvance​(int phase,
                                    int registeredParties)
        在即将进行的相位提前执行动作的可覆盖方法,并控制终止。 这种方法在派对推进这个移相器(当所有其他等待方休眠)的时候被调用。 如果此方法返回true ,则该移相器true将被设置为最终终止状态,并且对isTerminated()后续调用将返回true。 任何(未选中)通过调用此方法引发的异常或错误将传播给尝试推进此移相器的方,在这种情况下不会发生提前。

        该方法的参数提供了当前转换占优势的状态。 onAdvance内对这个移相器引用到达,注册和等待方法的onAdvance是未指定的,不应该依赖。

        如果这个移相器是分层的相位器的成员,那么onAdvance仅在每个onAdvance的根onAdvance器上被调用。

        支持最常见的用例,此方法的默认实现返回true时注册方的数量已变为零作为党调用的结果arriveAndDeregister 您可以禁用此行为,从而通过覆盖此方法始终返回false ,从而可以在将来注册时继续使用:

           Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int parties) { return false; } } 
        参数
        phase - 在此移相器提前之前输入此方法的当前阶段编号
        registeredParties - 当前注册方的数量
        结果
        true如果这个移相器应该终止
      • toString

        public String toString​()
        返回一个标识此移相器的字符串及其状态。 括号中的状态包括字符串"phase = "后跟阶段号码"parties = "后跟注册方数,而"arrived = "后跟随方数。
        重写:
        toStringObject
        结果
        一个标识这个移相器的字符串,以及它的状态