- java.lang.Object
-
- java.util.concurrent.Phaser
-
public class Phaser extends Object
一个可重复使用的同步屏障,功能类似于CyclicBarrier
和CountDownLatch
,但支持更灵活的使用。注册。 与其他障碍的情况不同, 注册在移相器上同步的各方的数量可能随时间而变化。 任务可以在任何时间(使用的方法来注册
register()
,bulkRegister(int)
,或构造建立各方的初始数的形式),和(使用任何抵达时任选注销arriveAndDeregister()
)。 与大多数基本同步结构一样,注册和注销仅影响内部计数; 他们没有建立任何进一步的内部簿记,所以任务无法查询是否注册。 (但是,您可以通过对此类进行子类化来介绍此类簿记。)同步。 像
CyclicBarrier
一样,Phaser
可能会重复等待。 方法arriveAndAwaitAdvance()
具有类似于CyclicBarrier.await
的效果。 每一代移相器都具有相关的相位数。 相位数从零开始,当所有各方到达移相器时,前进,达到Integer.MAX_VALUE
后Integer.MAX_VALUE
零。 通过使用阶段数字,可以通过两种可由任何注册方调用的方法,在到达移相器和等待其他人时独立控制动作:- 到达。 方法
arrive()
和arriveAndDeregister()
记录到达。 这些方法不会阻塞,而是返回相关的到达阶段数 ; 即到达应用的移相器的相位数。 当给定阶段的最后一方到达时,执行可选操作,并且阶段前进。 这些动作由触发相位提前的一方执行,并且通过重写方法onAdvance(int, int)
进行排列,该方法也控制终止。 覆盖此方法与CyclicBarrier
相似,但更灵活。 - 等候。 方法
awaitAdvance(int)
需要一个指示到达阶段数的参数,并且当相位器前进到(或已经处于)不同阶段时返回。 与使用CyclicBarrier
类似结构不同,即使等待的线程被中断,方法awaitAdvance
继续等待。 中断和超时版本也是可用的,但任务等待中断或超时时遇到的异常不会改变移相器的状态。 如有必要,您可以在调用forceTermination
之后,在这些异常的处理程序中执行任何关联的恢复。 也可以在ForkJoinPool
中执行的任务使用抖动 。 如果池的并行度级别可以容纳同时被阻止的方的最大数量,则确保进度。
终止。 移相器可以进入终止状态,可以使用方法
isTerminated()
进行检查。 一旦终止,所有同步方法立即返回而不等待提前,如负值返回值所示。 同样,终止时注册的尝试也没有效果。 当调用onAdvance
返回true
时触发终止。 如果注销已经使注册方的数量变为零,默认实现将返回true
。 如下所示,当相位器以固定次数的迭代控制动作时,当当前相位数达到阈值时,通常方便的是重写该方法以导致终止。 方法forceTermination()
也可用于突然释放等待线程并允许它们终止。分层。 移相器可以分层 (即,以树结构构造)以减少争用。 具有大量聚会的激动剂,否则将会遇到重度的同步竞争成本,可能会被设置为使得一些子相位器共享一个共同的父节点。 这可能会大大增加吞吐量,即使它产生更大的每操作开销。
在一个分层相位的树中,自动管理儿童相机与其父母的注册和注销。 每当儿童移动设备的注册方数量变为非零(在
Phaser(Phaser,int)
构造函数register()
或bulkRegister(int)
中确定 )时,子移相器已向其父级注册。 无论何时由于引用arriveAndDeregister()
,注册方的数量变为零,则子移动程序将从其父级注销。监测。 虽然同步方法只能由注册方调用,但是可以由任何呼叫者监视移相器的当前状态。 任何时候,共有
getRegisteredParties()
个派对,其中getArrivedParties()
已到达现阶段(getPhase()
)。 剩余的(getUnarrivedParties()
)派对到达时,阶段进行。 这些方法返回的值可以反映瞬态状态,因此对于同步控制通常不是有用的。 方法toString()
以便于非正式监控的形式返回这些状态查询的快照。示例用法:
可以使用A
Phaser
而不是CountDownLatch
来控制为可变数量的方提供服务的一次性动作。 典型的成语是用于首先注册的方法,然后启动所有操作,然后注销,如:void runTasks(List<Runnable> tasks) { Phaser startingGate = new Phaser(1); // "1" to register self // create and start threads for (Runnable task : tasks) { startingGate.register(); new Thread(() -> { startingGate.arriveAndAwaitAdvance(); task.run(); }).start(); } // deregister self to allow threads to proceed startingGate.arriveAndDeregister(); }
导致一组线程重复执行给定次数迭代的一种方法是覆盖
onAdvance
:void startTasks(List<Runnable> tasks, int iterations) { Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations - 1 || registeredParties == 0; } }; phaser.register(); for (Runnable task : tasks) { phaser.register(); new Thread(() -> { do { task.run(); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); }).start(); } // allow threads to proceed; don't wait for them phaser.arriveAndDeregister(); }
// ... phaser.register(); while (!phaser.isTerminated()) phaser.arriveAndAwaitAdvance();
可以使用相关的结构来等待上下文中的特定阶段数字,在这些上下文中,您确定该阶段永远不会包围
Integer.MAX_VALUE
。 例如:void awaitPhase(Phaser phaser, int phase) { int p = phaser.register(); // assumes caller not already registered while (p < phase) { if (phaser.isTerminated()) // ... deal with unexpected termination else p = phaser.arriveAndAwaitAdvance(); } phaser.arriveAndDeregister(); }
要使用一个相
n
树创建一组n
任务,您可以使用以下形式的代码,假设一个具有构造函数的Task类在构造时接受它注册的Phaser
。 在调用build(new Task[n], 0, n, new Phaser())
之后,可以启动这些任务,例如通过提交到池:void build(Task[] tasks, int lo, int hi, Phaser ph) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(tasks, i, j, new Phaser(ph)); } } else { for (int i = lo; i < hi; ++i) tasks[i] = new Task(ph); // assumes new Task(ph) performs ph.register() } }
TASKS_PER_PHASER
的最佳值主要取决于预期的同步速率。 低至4的值可能适用于非常小的每阶段任务机构(因此高速率),或者对于极大的任务机构可能达到数百。实施说明 :本实施方式将最大派对数限制为65535.尝试注册附加方将导致
IllegalStateException
。 但是,您可以并且应该创建分层相位器来适应任意大量的参与者。- 从以下版本开始:
- 1.7
- 到达。 方法
-
-
方法摘要
所有方法 接口方法 具体的方法 Modifier and Type 方法 描述 int
arrive()
抵达这个移相器,而不用等待别人到达。int
arriveAndAwaitAdvance()
到达这个移相器,等待其他人。int
arriveAndDeregister()
到达这个移相器并从其中注销,而无需等待别人到达。int
awaitAdvance(int phase)
等待该相位器的相位从给定相位值前进,如果当前相位不等于给定相位值,则立即返回,或者该相位器被终止。int
awaitAdvanceInterruptibly(int phase)
等待相移器的相位从给定的相位值推进,抛出InterruptedException
如果在等待时中断,或者如果当前相位不等于给定的相位值或者该相位器被终止,则立即返回。int
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
等待该移相器的相位从给定的相位值或给定的超时时间InterruptedException
到等待时抛出InterruptedException
,如果当前相位不等于给定的相位值或者该相位器被终止,则立即返回。int
bulkRegister(int parties)
增加给定数量的新的有争议的派对到这个移相器。void
forceTermination()
强制此移相器进入终止状态。int
getArrivedParties()
返回在此移相器的当前阶段到达的已注册方的数量。Phaser
getParent()
返回此移相器的父级,如果没有,则返回null
。int
getPhase()
返回当前相位数。int
getRegisteredParties()
返回在此移动设备上注册的各方数量。Phaser
getRoot()
返回此移相器的根祖先,如果它没有父代,则与该移相器相同。int
getUnarrivedParties()
返回尚未到达此移相器当前阶段的已注册方的数量。boolean
isTerminated()
如果此移相器已被终止,则返回true
。protected boolean
onAdvance(int phase, int registeredParties)
在即将进行的相位提前执行动作的可覆盖方法,并控制终止。int
register()
添加一个新的unririved派对这个移相器。String
toString()
返回一个标识此移相器的字符串及其状态。
-
-
-
构造方法详细信息
-
Phaser
public Phaser()
创建一个没有初始注册方,没有父级和初始阶段数0的新移相器。使用此移相器的任何线程都需要先注册。
-
Phaser
public Phaser(int parties)
创建一个新的移相器与给定数量的注册无障碍方,没有父母和初始阶段0。- 参数
-
parties
- 需要进入下一阶段的各方数量 - 异常
-
IllegalArgumentException
- 如果当事人不到零或大于支持的最高人数
-
Phaser
public Phaser(Phaser parent)
相当于Phaser(parent, 0)
。- 参数
-
parent
- 父移相器
-
Phaser
public Phaser(Phaser parent, int parties)
与给定的父母和注册的无礼方的数量创建一个新的移相器。 当给定的父对象不为空且给定的参与方数量大于零时,该子级移动器在其父级注册。- 参数
-
parent
- 父移相器 -
parties
- 进入下一阶段所需的当事人数 - 异常
-
IllegalArgumentException
- 如果当事方小于零或大于所支持的最大数目
-
-
方法详细信息
-
register
public int register()
添加一个新的unririved派对这个移相器。 如果正在进行onAdvance(int, int)
的调用,则此方法可能会在返回前等待完成。 如果此移动设备有父母,而此移动设备以前没有注册方,则该儿童移动设备也向其父母注册。 如果此移相器被终止,则尝试注册不起作用,并返回负值。- 结果
- 此注册申请的到达阶段编号。 如果该值为负值,则该移相器已终止,在这种情况下注册无效。
- 异常
-
IllegalStateException
- 如果尝试注册超过最多支持的方数
-
bulkRegister
public int bulkRegister(int parties)
增加给定数量的新的有争议的派对到这个移相器。 如果持续调用onAdvance(int, int)
正在进行中,则此方法可能会在返回前等待完成。 如果此移动设备有父母,而且指定的当事人数量大于零,并且此移动设备以前没有注册方,则该子版移动设备也向其父母注册。 如果此移相器被终止,则尝试注册不起作用,并返回负值。- 参数
-
parties
- 需要进入下一阶段的附加方的数量 - 结果
- 此注册申请的到达阶段编号。 如果该值为负值,则该移相器已终止,在这种情况下注册无效。
- 异常
-
IllegalStateException
- 如果尝试注册超过最多支持的方数 -
IllegalArgumentException
- 如果是parties < 0
-
arrive
public int arrive()
抵达这个移相器,而不用等待别人到达。这是非注册方调用此方法的使用错误。 但是,如果有的话,这个错误可能会导致一个
IllegalStateException
。- 结果
- 到达阶段数,如果终止则为负值
- 异常
-
IllegalStateException
- 如果没有终止,无礼的人数将变为负数
-
arriveAndDeregister
public int arriveAndDeregister()
到达这个移相器并从其中注销,而无需等待别人到达。 撤销注销减少了在未来阶段推进的各方数量。 如果这个移动设备有父母,并且取消注册会导致此移动设备有零个派对,则此移动设备也将从父母中注销。这是非注册方调用此方法的使用错误。 但是,如果有的话,这个错误可能会导致一个
IllegalStateException
在这个移相器上的后续操作。- 结果
- 到达阶段数,如果终止则为负值
- 异常
-
IllegalStateException
- 如果没有终止,登记的或没有任何一方的人数将变为负数
-
arriveAndAwaitAdvance
public int arriveAndAwaitAdvance()
到达这个移相器,等待其他人。 等效于awaitAdvance(arrive())
。 如果您需要等待中断或超时,您可以使用awaitAdvance
方法的其他形式之一使用类似的结构进行awaitAdvance
。 如果相反,您需要在抵达时注销,请使用awaitAdvance(arriveAndDeregister())
。这是非注册方调用此方法的使用错误。 但是,这个错误可能会导致一个
IllegalStateException
只有在这个移相器上的一些后续操作,如果有的话。- 结果
- 到达阶段数,或(负) current phase如果终止
- 异常
-
IllegalStateException
- 如果没有终止,而有争议的党的人数将变为负数
-
awaitAdvance
public int awaitAdvance(int phase)
等待该相位器的相位从给定相位值前进,如果当前相位不等于给定相位值,则立即返回,或者该相位器被终止。- 参数
-
phase
- 到达阶段数,如果终止则为负值; 此参数通常是由先前调用arrive
或arriveAndDeregister
返回的值。 - 结果
- 下一个到达阶段数字,如果是负数则为参数,否则为(负) current phase
-
awaitAdvanceInterruptibly
public int awaitAdvanceInterruptibly(int phase) throws InterruptedException
等待相移器的相位从给定的相位值推进,抛出InterruptedException
如果在等待时中断,或者如果当前相位不等于给定相位值,则立即返回,或者该相位器被终止。- 参数
-
phase
- 到达阶段数,如果终止则为负值; 此参数通常是由先前调用arrive
或arriveAndDeregister
返回的值。 - 结果
- 下一个到达阶段数,或参数如果为负,或(负) current phase如果终止
- 异常
-
InterruptedException
- 如果线程在等待时中断
-
awaitAdvanceInterruptibly
public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
等待该移相器的阶段从给定的相位值或给定的超时InterruptedException
到等待时抛出InterruptedException
,如果当前相位不等于给定的相位值,则立即返回,或者该相位器被终止。- 参数
-
phase
- 到达阶段数,如果终止则为负值; 此参数通常是由先前调用arrive
或arriveAndDeregister
返回的值。 -
timeout
- 放弃之前等待多长时间,以unit
为单位 -
unit
- aTimeUnit
确定如何解释timeout
参数 - 结果
- 下一个到达阶段的数字,或者参数是否为负数,或者(负) current phase如果终止
- 异常
-
InterruptedException
- 如果线程在等待时中断 -
TimeoutException
- 如果在等待时超时
-
forceTermination
public void forceTermination()
强制此移相器进入终止状态。 注册方的数量不受影响。 如果这个移相器是分层的相位片的成员,则集合中的所有相移器都将被终止。 如果此移相器已被终止,则此方法无效。 在一个或多个任务遇到意外异常之后,此方法可能有助于协调恢复。
-
getPhase
public final int getPhase()
返回当前相位数。 最大相位数为Integer.MAX_VALUE
,之后重新启动为零。 一旦终止,相位数为负,在这种情况下,终止前的主要阶段可以通过getPhase() + Integer.MIN_VALUE
获得。- 结果
- 相位数,如果终止则为负值
-
getRegisteredParties
public int getRegisteredParties()
返回在此移动设备上注册的各方数量。- 结果
- 当事人数
-
getArrivedParties
public int getArrivedParties()
返回在此移相器的当前阶段到达的已注册方的数量。 如果这个移相器已经终止,返回的值是无意义和任意的。- 结果
- 到达方的数量
-
getUnarrivedParties
public int getUnarrivedParties()
返回尚未到达此移相器当前阶段的已注册方的数量。 如果这个移相器已经终止,返回的值是无意义和任意的。- 结果
- 无数派的人数
-
getParent
public Phaser getParent()
返回此移相器的父级,如果没有,则返回null
。- 结果
-
该移相器的父级,如果没有,
null
-
getRoot
public Phaser getRoot()
返回此移相器的根祖先,如果它没有父代,则与该移相器相同。- 结果
- 这个移相器的根祖先
-
isTerminated
public boolean isTerminated()
如果此移相器已被终止,则返回true
。- 结果
-
true
如果此移相器已被终止
-
onAdvance
protected boolean onAdvance(int phase, int registeredParties)
在即将进行的相位提前执行动作的可覆盖方法,并控制终止。 这种方法在派对推进这个移相器(当所有其他等待方休眠)的时候被调用。 如果此方法返回true
,则该移相器true
将被设置为最终终止状态,并且对isTerminated()
的后续调用将返回true。 任何(未选中)通过调用此方法引发的异常或错误将传播给尝试推进此移相器的方,在这种情况下不会发生提前。该方法的参数提供了当前转换占优势的状态。 在
onAdvance
内对这个移相器引用到达,注册和等待方法的onAdvance
是未指定的,不应该依赖。如果这个移相器是分层的相位器的成员,那么
onAdvance
仅在每个onAdvance
的根onAdvance
器上被调用。支持最常见的用例,此方法的默认实现返回
true
时注册方的数量已变为零作为党调用的结果arriveAndDeregister
。 您可以禁用此行为,从而通过覆盖此方法始终返回false
,从而可以在将来注册时继续使用:Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int parties) { return false; } }
- 参数
-
phase
- 在此移相器提前之前输入此方法的当前阶段编号 -
registeredParties
- 当前注册方的数量 - 结果
-
true
如果这个移相器应该终止
-
-