Module  java.base
软件包  java.util.concurrent

Class CountedCompleter<T>

  • All Implemented Interfaces:
    SerializableFuture<T>


    public abstract class CountedCompleter<T>
    extends ForkJoinTask<T>
    A ForkJoinTask ,其触发时执行完成操作,并且没有剩余的待处理操作。 CountedCompleters通常比其他形式的ForkJoinTasks在子任务停顿和阻塞的情况下更加强大,但是不太直观的编程。 CountedCompleter的用途类似于其他基于完成部件(如CompletionHandler ),除了多个挂起完井可能是必要的,以触发完成动作onCompletion(CountedCompleter) ,不只是一个。 除非另有初始化, pending count开始于零,但也可以是(原子),使用方法改变setPendingCount(int)addToPendingCount(int) ,和compareAndSetPendingCount(int, int) 在调用tryComplete() ,如果待处理的行动计数不为零,则递减; 否则,执行完成操作,如果完成者本身具有完整性,则该过程将继续完成。 PhaserSemaphore相关的同步组件一样,这些方法仅影响内部计数; 他们没有建立任何进一步的内部簿记。 特别地,未维护未决任务的身份。 如下所示,您可以创建在需要时记录一些或所有待处理任务或其结果的子类。 如下所示,还提供了支持定制完成遍历的实用程序方法。 然而,由于CountedCompleters仅提供基本的同步机制,因此创建进一步的抽象子类可能是有用的,这些子类保持适用于一组相关用法的链接,字段和其他支持方法。

    具体的CountedCompleter类必须定义方法compute() ,在大多数情况下(如下所示),在返回之前调用tryComplete()一次。 该类还可以可选地覆盖方法onCompletion(CountedCompleter)以在正常完成时执行动作,以及方法onExceptionalCompletion(Throwable, CountedCompleter)以对任何异常执行动作。

    CountedCompleter通常不承担结果,在这种情况下,它们通常被声明为CountedCompleter<Void> ,并将始终返回null作为结果值。 在其他情况下,你应该重写方法getRawResult()提供从结果join(), invoke() ,以及相关方法。 一般来说,该方法应该返回在完成后保存结果的CountedCompleter对象的一个字段(或一个或多个字段的函数)的值。 默认方法setRawResult(T)在CountedCompleters中不起作用。 可能但很少适用于覆盖此方法来维护其他对象或保存结果数据的字段。

    一个CountedCompleter本身不具有一个完整的(即getCompleter()返回null )可以用作这个添加功能的常规ForkJoinTask。 然而,任何完成者又具有另一个完成者只能作为其他计算的内部帮助器,因此其自己的任务状态(如方法如ForkJoinTask.isDone()所报告)是任意的; 这种状况只有在明确调用改变complete(T)ForkJoinTask.cancel(boolean)ForkJoinTask.completeExceptionally(Throwable)或方法的特殊结束后compute 在任何异常完成之后,如果有任何异常可能会被传递到任务的完成者(以及其完成者等),如果存在并且尚未完成。 同样地,取消一个内部的CountedCompleter只对该完成者有局部的影响,所以并不常用。

    示例用法

    并行递归分解。 CountedCompleters可以安排在类似于RecursiveAction经常使用的树中,尽管与设置相关的构造通常是不同的。 这里,每个任务的完成者是其计算树中的父项。 即使它们需要更多的簿记,CountedCompleters可能是更好的选择,当应用可能耗时的操作(不能进一步细分)到数组或集合的每个元素; 特别是当操作对于一些元素的时间要比其他元素完成时要多得多,这是因为内在的变化(例如I / O)或诸如垃圾收集的辅助效应。 由于CountedCompleters提供自己的延续,其他任务不需要阻止等待执行它们。

    例如,这里是一个实用方法的初始版本,它使用二分法递归分解将工作分成单个部分(叶子任务)。 即使将工作分解为单独的调用,基于树的技术通常比直接分支叶子任务更为可取,因为它们可以减少线程间通信并改善负载平衡。 在递归的情况下,要完成的每对子任务对的第二个触发器完成它们的父onCompletion (因为没有执行结果组合,所以方法onCompletion的默认的无操作实现不被覆盖)。 实用程序方法设置根任务并调用它(这里隐式使用ForkJoinPool.commonPool() )。 始终将挂起的计数设置为子任务的数量是直接可靠的(但不是最佳的),并在返回之前立即调用tryComplete()

       public static <E> void forEach(E[] array, Consumer<E> action) { class Task extends CountedCompleter<Void> { final int lo, hi; Task(Task parent, int lo, int hi) { super(parent); this.lo = lo; this.hi = hi; } public void compute() { if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; // must set pending count before fork setPendingCount(2); new Task(this, mid, hi).fork(); // right child new Task(this, lo, mid).fork(); // left child } else if (hi > lo) action.accept(array[lo]); tryComplete(); } } new Task(null, 0, array.length).invoke(); } 
    通过注意到在递归的情况下,该任务在分配正确的任务后无关,因此可以在返回之前直接调用其左任务,从而可以改善此设计。 (这是一个尾递归删除的模拟方法。)而且,当任务中的最后一个动作是分支或调用子任务(“尾呼”)时,可以调用tryComplete()tryComplete()造成待定数量看“一个”。
       public void compute() { if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; setPendingCount(1); // looks off by one, but correct! new Task(this, mid, hi).fork(); // right child new Task(this, lo, mid).compute(); // direct invoke } else { if (hi > lo) action.accept(array[lo]); tryComplete(); } } 
    作为进一步优化,请注意,左侧任务不需要甚至不存在。 而不是创建一个新的,我们可以继续使用原始任务,并为每个fork添加一个挂起的计数。 另外,因为这个树中的任务没有实现一个onCompletion(CountedCompleter)方法, tryComplete可以替换为propagateCompletion()
       public void compute() { int n = hi - lo; for (; n >= 2; n /= 2) { addToPendingCount(1); new Task(this, lo + n/2, lo + n).fork(); } if (n > 0) action.accept(array[lo]); propagateCompletion(); } 
    当待计数可以预先计算时,可以在构造函数中建立它们:
       public static <E> void forEach(E[] array, Consumer<E> action) { class Task extends CountedCompleter<Void> { final int lo, hi; Task(Task parent, int lo, int hi) { super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo)); this.lo = lo; this.hi = hi; } public void compute() { for (int n = hi - lo; n >= 2; n /= 2) new Task(this, lo + n/2, lo + n).fork(); action.accept(array[lo]); propagateCompletion(); } } if (array.length > 0) new Task(null, 0, array.length).invoke(); } 
    这些类的其他优化可能需要专门为叶子步骤进行分类,除以四,而不是每次迭代两次,并使用自适应阈值,而不是总是细分为单个元素。

    搜索。 CountedCompleters的树可以在数据结构的不同部分搜索一个值或属性,一旦找到结果,就会在AtomicReference报告结果。 其他人可以轮询结果,以避免不必要的工作。 (您可以另外添加cancel其他任务,但通常只需让他们注意到结果被设置,并且如果是这样,则进一步处理)通常更简单和更有效。)再次使用完全分区(再次实际上,叶子任务)几乎总是处理多个元素):

       class Searcher<E> extends CountedCompleter<E> { final E[] array; final AtomicReference<E> result; final int lo, hi; Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) { super(p); this.array = array; this.result = result; this.lo = lo; this.hi = hi; } public E getRawResult() { return result.get(); } public void compute() { // similar to ForEach version 3 int l = lo, h = hi; while (result.get() == null && h >= l) { if (h - l >= 2) { int mid = (l + h) >>> 1; addToPendingCount(1); new Searcher(this, array, result, mid, h).fork(); h = mid; } else { E x = array[l]; if (matches(x) && result.compareAndSet(null, x)) quietlyCompleteRoot(); // root task is now joinable break; } } tryComplete(); // normally complete whether or not found } boolean matches(E e) { ... } // return true if found public static <E> E search(E[] array) { return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke(); } } 
    在这个例子中,以及除了compareAndSet以外的其他任务的其他任务,通常的结果是tryComplete无条件调用可以作为条件( if (result.get() == null) tryComplete(); ),因为一旦根任务完成,就不需要进一步的簿记管理完成。

    记录子任务 CountedCompleter任务组合多个子任务的结果通常需要访问这些结果方法onCompletion(CountedCompleter) 如下面的类所示(执行map-reduce的简化形式,其中映射和缩减都是类型为E ),分割和征服设计的一种方法是使每个子任务记录成为兄弟,以便它可以可以在方法onCompletion访问。 这种技术适用于结合左和右结果的顺序无关紧要的减少; 有序减少需要明确的左/右指定。 上述示例中可以看到其他流程图的变体。

       class MyMapper<E> { E apply(E v) { ... } } class MyReducer<E> { E apply(E x, E y) { ... } } class MapReducer<E> extends CountedCompleter<E> { final E[] array; final MyMapper<E> mapper; final MyReducer<E> reducer; final int lo, hi; MapReducer<E> sibling; E result; MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper, MyReducer<E> reducer, int lo, int hi) { super(p); this.array = array; this.mapper = mapper; this.reducer = reducer; this.lo = lo; this.hi = hi; } public void compute() { if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid); MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi); left.sibling = right; right.sibling = left; setPendingCount(1); // only right is pending right.fork(); left.compute(); // directly execute left } else { if (hi > lo) result = mapper.apply(array[lo]); tryComplete(); } } public void onCompletion(CountedCompleter<?> caller) { if (caller != this) { MapReducer<E> child = (MapReducer<E>)caller; MapReducer<E> sib = child.sibling; if (sib == null || sib.result == null) result = child.result; else result = reducer.apply(child.result, sib.result); } } public E getRawResult() { return result; } public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { return new MapReducer<E>(null, array, mapper, reducer, 0, array.length).invoke(); } } 
    这里,方法onCompletion采用结合结果的许多完成设计共同的形式。 这种回调式方法在每个任务被触发一次,在挂起的计数的两个不同的上下文中,或者当任务本身变为零:(1)时,如果其挂起的计数在调用tryComplete为零,或(2)通过任何其子任务,当它们完成并将待处理的计数递减到零时。 caller论证区分案例。 通常,当来电者是this ,不需要采取任何行动。 否则,可以使用调用者参数(通常通过转换)来提供要组合的值(和/或链接到其他值)。 假设正确使用待处理的计数,则完成任务及其子任务后,会发生onCompletion内的操作(一次)。 在此方法中不需要额外的同步来确保对此任务或其他完成任务的字段的访问的线程安全性。

    完成遍历 如果使用onCompletion处理完成不适用或不方便,则可以使用方法firstComplete()nextComplete()创建自定义遍历。 例如,要定义一个仅以第三个ForEach示例的形式分割右侧任务的MapReducer,完成必须按照未用尽的子任务链接合作减少,可以如下完成:

       class MapReducer<E> extends CountedCompleter<E> { // version 2 final E[] array; final MyMapper<E> mapper; final MyReducer<E> reducer; final int lo, hi; MapReducer<E> forks, next; // record subtask forks in list E result; MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper, MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) { super(p); this.array = array; this.mapper = mapper; this.reducer = reducer; this.lo = lo; this.hi = hi; this.next = next; } public void compute() { int l = lo, h = hi; while (h - l >= 2) { int mid = (l + h) >>> 1; addToPendingCount(1); (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork(); h = mid; } if (h > l) result = mapper.apply(array[l]); // process completions by reducing along and advancing subtask links for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) { for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next) t.result = reducer.apply(t.result, s.result); } } public E getRawResult() { return result; } public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { return new MapReducer<E>(null, array, mapper, reducer, 0, array.length, null).invoke(); } } 

    触发。 一些CountedCompleters本身从来没有分叉,而是作为其他设计中的一些管道; 包括完成一个或多个异步任务触发另一个异步任务。 例如:

       class HeaderBuilder extends CountedCompleter<...> { ... } class BodyBuilder extends CountedCompleter<...> { ... } class PacketSender extends CountedCompleter<...> { PacketSender(...) { super(null, 1); ... } // trigger on second completion public void compute() { } // never called public void onCompletion(CountedCompleter<?> caller) { sendPacket(); } } // sample use: PacketSender p = new PacketSender(); new HeaderBuilder(p, ...).fork(); new BodyBuilder(p, ...).fork(); 
    从以下版本开始:
    1.8
    另请参见:
    Serialized Form
    • 构造方法详细信息

      • CountedCompleter

        protected CountedCompleter​(CountedCompleter<?> completer,
                                   int initialPendingCount)
        创建一个新的CountedCompleter与给定的完成和初始挂起计数。
        参数
        completer - 这个任务的完成,或者 null如果没有
        initialPendingCount - 初始 initialPendingCount
      • CountedCompleter

        protected CountedCompleter​(CountedCompleter<?> completer)
        创建一个新的CountedCompleter与给定的完成者和初始挂起计数为零。
        参数
        completer - 这个任务的完成者,或 null如果没有
      • CountedCompleter

        protected CountedCompleter​()
        创建一个新的CountedCompleter,没有完成,初始挂起计数为零。
    • 方法详细信息

      • compute

        public abstract void compute​()
        这个任务执行的主要计算。
      • onCompletion

        public void onCompletion​(CountedCompleter<?> caller)
        当方法tryComplete()被调用并且挂起的计数为零时,或当无条件方法complete(T)被调用时执行动作。 默认情况下,此方法什么都不做。 您可以通过检查给定的呼叫者参数的身份来区分情况。 如果不等于this ,那么它通常是可以包含结合(和/或其他结果的链接)的子任务。
        参数
        caller - 调用此方法的任务(可能是此任务本身)
      • onExceptionalCompletion

        public boolean onExceptionalCompletion​(Throwable ex,
                                               CountedCompleter<?> caller)
        当方法ForkJoinTask.completeExceptionally(Throwable)被调用或方法compute()引发异常时,执行一个操作,并且此任务尚未正常完成。 进入这个方法,这个任务ForkJoinTask.isCompletedAbnormally() 该方法的返回值控制进一步的传播:如果true并且此任务具有尚未完成的完成者,则完成者也完成异常,与此完成相同。 该方法的默认实现不会返回true
        参数
        ex - 例外
        caller - 调用此方法的任务(可能是此任务本身)
        结果
        true如果这个异常应该被传播到这个任务的完成者(如果存在)
      • getCompleter

        public final CountedCompleter<?> getCompleter​()
        返回在此任务的构造函数中建立的完成者,如果没有,则返回 null
        结果
        完成者
      • getPendingCount

        public final int getPendingCount​()
        返回当前挂起的计数。
        结果
        当前挂单数
      • setPendingCount

        public final void setPendingCount​(int count)
        将待处理计数设置为给定值。
        参数
        count - 计数
      • addToPendingCount

        public final void addToPendingCount​(int delta)
        将给定值添加(原子地)给挂起的计数。
        参数
        delta - 要添加的值
      • compareAndSetPendingCount

        public final boolean compareAndSetPendingCount​(int expected,
                                                       int count)
        只有当当前持有给定的预期值时,将挂起的计数设置为(原子地)给定计数。
        参数
        expected - 预期值
        count - 新价值
        结果
        true如果成功
      • decrementPendingCountUnlessZero

        public final int decrementPendingCountUnlessZero​()
        如果挂起的计数非零,(原子地)减少它。
        结果
        初始(未确认)待处理的计数保持进入此方法
      • getRoot

        public final CountedCompleter<?> getRoot​()
        返回当前计算的根; 即这个任务,如果它没有完成,否则它的完成者的根。
        结果
        当前计算的根
      • tryComplete

        public final void tryComplete​()
        如果挂起的计数不为零,则减去计数; 否则调用onCompletion(CountedCompleter) ,然后同样尝试完成此任务的完成,如果存在,否则将此任务标记为完成。
      • propagateCompletion

        public final void propagateCompletion​()
        相当于tryComplete()但不会在完成路径中调用onCompletion(CountedCompleter) :如果挂起的计数非零,则递减计数; 否则,同样尝试完成此任务的完成,如果存在,否则将此任务标记为完成。 在方案onCompletion对于计算中的每个完成者不应该或不需要调用的情况下,该方法可能是有用的。
      • complete

        public void complete​(T rawResult)
        无论挂起的数量如何,调用onCompletion(CountedCompleter) ,将此任务标记为完成,并进一步触发tryComplete()对此任务的完成(如果存在)。 在调用onCompletion(CountedCompleter)或将此任务标记为完成之前,给定的rawResult用作参数setRawResult(T) ; 其值仅对setRawResult类有意义。 此方法不会修改挂起的计数。

        当获得几个子任务结果的任何一个(相对于所有)结果时,强制完成时,此方法可能是有用的。 但是,在setRawResult未被覆盖的普通(和推荐)情况下,可以使用quietlyCompleteRoot()更简单地获得该效果。

        重写:
        complete中的 ForkJoinTask<T>
        参数
        rawResult - 原始结果
      • firstComplete

        public final CountedCompleter<?> firstComplete​()
        如果此任务的挂起计数为零,则返回此任务; 否则递减其挂起的计数并返回null 该方法设计为与完成遍历循环中的nextComplete()一起使用。
        结果
        这个任务,如果挂起计数为零,否则为 null
      • nextComplete

        public final CountedCompleter<?> nextComplete​()
        如果此任务没有完成,请调用ForkJoinTask.quietlyComplete()并返回null 或者,如果完成者的待处理计数不为零,则减少待处理的计数并返回null 否则,返回完成者。 此方法可用作同构任务层次结构的完成遍历循环的一部分:
           for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) { // ... process c ... } 
        结果
        完成者,或 null如果没有
      • quietlyCompleteRoot

        public final void quietlyCompleteRoot​()
        相当于 getRoot().quietlyComplete()
      • helpComplete

        public final void helpComplete​(int maxTasks)
        如果此任务尚未完成,则尝试至少处理此任务在完成路径上的给定数量的其他未处理任务(如果有)。
        参数
        maxTasks - 要处理的最大任务数。 如果小于或等于零,则不处理任务。
      • exec

        protected final boolean exec​()
        实现CountedCompleters的执行约定。
        Specified by:
        execForkJoinTask<T>
        结果
        true如果此任务已知已正常完成
      • getRawResult

        public T getRawResult​()
        返回计算结果。 默认情况下,返回null ,这适用于Void操作,但在其他情况下应该被覆盖,几乎总是返回在完成后保存结果的字段或函数。
        Specified by:
        getRawResultForkJoinTask<T>
        结果
        计算的结果
      • setRawResult

        protected void setRawResult​(T t)
        带有CountedCompleters结果的方法可以可选地用于帮助维护结果数据。 默认情况下,什么都不做。 不建议使用覆盖。 但是,如果覆盖此方法来更新现有对象或字段,则通常将其定义为线程安全。
        Specified by:
        setRawResultForkJoinTask<T>
        参数
        t - 值