Module  java.base
软件包  java.util.concurrent

Class ForkJoinPool

  • All Implemented Interfaces:
    ExecutorExecutorService


    public class ForkJoinPool
    extends AbstractExecutorService
    一个ExecutorService运行ForkJoinTask s。 A ForkJoinPool提供了非ForkJoinTask客户提交的入口以及管理和监控操作。

    A ForkJoinPool与其他类型的ForkJoinPool不同之主要是采用工作窃取 :池中的所有线程尝试查找和执行提交到池和/或由其他活动任务创建的任务(最终阻止等待工作,如果不存在) 。 当大多数任务产生其他子任务(大多数ForkJoinTask s)以及许多小任务从外部客户端提交到池时,这样可以实现高效的处理。 尤其是在构造函数设置asyncMode为真时, ForkJoinPool S还可能适合于与事件式的任务中使用那些从未加入。 所有工作线程将初始化为Thread.isDaemon()设置为true

    静态commonPool()可用,适用于大多数应用。 公共池被任何未显式提交到指定池的ForkJoinTask使用。 使用公共池通常会减少资源使用(其线程在不使用期间缓慢回收,并在后续使用时恢复)。

    对于需要单独或自定义池的应用程序,可以使用给定的目标并行级别构建一个ForkJoinPool ; 默认情况下,等于可用处理器的数量。 池尝试通过动态添加,挂起或恢复内部工作线程来维护足够的活动(或可用)线程,即使某些任务停止等待加入其他线程。 但是,面对阻塞的I / O或其他非托管同步,不能保证这样的调整。 嵌套的ForkJoinPool.ManagedBlocker接口可以扩展所容纳的同步类型。 可以使用具有与类ThreadPoolExecutor记录的参数对应的参数的构造函数来覆盖默认策略。

    除了执行和生命周期控制方法之外,该类还提供了用于帮助开发,调优和监视fork / join应用程序的状态检查方法(例如getStealCount() )。 而且,方法toString()以方便的形式返回池状态的指示用于非正式监视。

    与其他ExecutorServices的情况一样,下表总结了三个主要任务执行方法。 这些设计主要由尚未在当前池中进行fork / join计算的客户端使用。 这些方法的主要形式接受ForkJoinTask实例,但重载的表单也允许混合执行纯粹的Runnable或基于Callable的活动。 但是,通常情况下,在池中已经执行的任务会使用表中列出的计算内表单,除非使用不通常连接的异步事件式任务,否则在方法选择方面几乎没有区别。

    Summary of task execution methods Call from non-fork/join clients Call from within fork/join computations Arrange async execution execute(ForkJoinTask) ForkJoinTask.fork() Await and obtain result invoke(ForkJoinTask) ForkJoinTask.invoke() Arrange exec and obtain Future submit(ForkJoinTask) ForkJoinTask.fork() (ForkJoinTasks are Futures)

    用于构建公共池的参数可以通过设置以下system properties来控制:

    如果没有通过系统属性提供线程工厂,则公共池使用使用系统类加载器的工厂作为thread context class loader 另外,如果存在SecurityManager ,则公用池使用一个工厂提供的线程不启用Permissions 建立这些设置有任何错误,使用默认参数。 通过将并行属性设置为零,和/或使用可能返回null的工厂,可以禁用或限制公共池中的线程的使用。 但是这样做可能导致未连接的任务永远不会被执行。

    实现注意事项 :此实现将最大正在运行的线程数限制为32767.尝试创建大于最大数目的池导致IllegalArgumentException

    只有当池被关闭或内部资源耗尽时,此实现才会拒绝提交的任务(即通过抛出RejectedExecutionException )。

    从以下版本开始:
    1.7
    • 字段详细信息

      • defaultForkJoinWorkerThreadFactory

        public static final ForkJoinPool.ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory
        创建一个新的ForkJoinWorkerThread。 该工厂被使用,除非在ForkJoinPool构造函数中被覆盖。
    • 构造方法详细信息

      • ForkJoinPool

        public ForkJoinPool​(int parallelism,
                            ForkJoinPool.ForkJoinWorkerThreadFactory factory,
                            Thread.UncaughtExceptionHandler handler,
                            boolean asyncMode,
                            int corePoolSize,
                            int maximumPoolSize,
                            int minimumRunnable,
                            Predicate<? super ForkJoinPool> saturate,
                            long keepAliveTime,
                            TimeUnit unit)
        使用给定的参数创建一个 ForkJoinPool
        参数
        parallelism - 并行级别。 默认值为Runtime.availableProcessors()
        factory - 创建新线程的工厂。 默认值为defaultForkJoinWorkerThreadFactory
        handler - 由于执行任务时遇到不可恢复的错误而终止的内部工作线程的处理程序。 默认值为null
        asyncMode - 如果为true,请为从未连接的分叉任务建立本地先进先出调度模式。 在工作线程仅处理事件式异步任务的应用程序中,此模式可能比默认的基于本地堆栈的模式更合适。 默认值为false
        corePoolSize - 要保留在池中的线程数(除非在保持活动之后超时)。 通常(和默认情况下)这是与并行级别相同的值,但如果任务定期阻塞,可以将其设置为更大的值以减少动态开销。 使用较小的值(例如0 )具有与默认值相同的效果。
        maximumPoolSize - 允许的最大线程数。 达到最大值时,尝试替换阻塞的线程失败。 (但是,由于不同线程的创建和终止可能会重叠,并且可能由给定的线程工厂进行管理,所以该值可能会被暂时超过。)为了安排与公共池默认使用的值相同的值,请使用256加上parallelism级。 (默认情况下,公共池允许最多256个备用线程。)使用大于实现的总线程限制的值(例如Integer.MAX_VALUE )与使用此限制(默认值)具有相同的效果。
        minimumRunnable - 未被连接阻止的核心线程允许的最小数量或ForkJoinPool.ManagedBlocker 为了确保进度,当存在未被阻塞的线程太少并且可能存在未执行的任务时,将构建新的线程,直到给定的maximumPoolSize。 对于默认值,请使用1 ,以确保活跃。 更大的值可能会在存在受阻的活动的情况下提高吞吐量,但由于增加的开销可能不会。 当提交的任务不具有需要额外线程的依赖项时,值为零可能是可以接受的。
        saturate - 如果非空,则在尝试创建超过最大总允许线程时调用谓词。 默认情况下,当一个线程即将阻止连接或ForkJoinPool.ManagedBlocker ,但由于将超过maximumPoolSize不能被替换,因此抛出一个RejectedExecutionException 但是,如果此谓词返回true ,则不会抛出异常,所以池继续以少于可运行线程的目标数量运行,这可能无法确保进度。
        keepAliveTime - 在线程终止之前自上次使用以来经过的时间(然后如果需要,则稍后更换)。 对于默认值,请使用60, TimeUnit.SECONDS
        unit - keepAliveTime参数的时间单位
        异常
        IllegalArgumentException - 如果并行性小于或等于零,或者大于实现限制,或者如果maximumPoolSize小于并行性,则为keepAliveTime小于或等于零。
        NullPointerException - 如果工厂为空
        SecurityException - 如果安全管理器存在,并且主叫方不允许修改线程,因为它不保留RuntimePermission ("modifyThread")
        从以下版本开始:
        9
    • 方法详细信息

      • commonPool

        public static ForkJoinPool commonPool​()
        返回公共池实例。 这个游泳池是静态的; 其运行状态不受尝试shutdown()shutdownNow()的影响 然而,该池和任何正在进行的处理在程序System.exit(int)自动终止。 在程序终止前依赖于异步任务处理完成的任何程序应在退出之前调用commonPool(). awaitQuiescence
        结果
        公共池实例
        从以下版本开始:
        1.8
      • invoke

        public <T> T invoke​(ForkJoinTask<T> task)
        执行给定的任务,在完成后返回其结果。 如果计算遇到未检查的异常或错误,则将其重新定义为此调用的结果。 Rethrown异常的行为与常规异常的方式相同,但是尽可能包含当前线程以及实际遇到异常的线程的堆栈跟踪(例如使用ex.printStackTrace()显示); 最低限度只有后者。
        参数类型
        T - 任务结果的类型
        参数
        task - 任务
        结果
        任务的结果
        异常
        NullPointerException - 如果任务为空
        RejectedExecutionException - 如果该任务无法安排执行
      • execute

        public void execute​(Runnable task)
        说明从界面: Executor复制
        在将来的某个时间执行给定的命令。 该命令可以在一个新线程,一个合并的线程中或在调用线程中执行,由Executor实现决定。
        参数
        task - 可运行的任务
        异常
        NullPointerException - 如果任务为空
        RejectedExecutionException - 如果任务无法安排执行
      • invokeAll

        public <T> List<Future<T>> invokeAll​(Collection<? extends Callable<T>> tasks)
        描述从接口ExecutorService复制
        执行给定的任务,返回持有他们的状态和结果的所有完成的期货列表。 Future.isDone()对于返回列表的每个元素是true 请注意, 完成的任务可能会正常终止或抛出异常。 如果在此操作进行过程中修改了给定的集合,则此方法的结果是未定义的。
        Specified by:
        invokeAll在接口 ExecutorService
        重写:
        invokeAllAbstractExecutorService
        参数类型
        T - 从任务返回的值的类型
        参数
        tasks - 任务的收集
        结果
        表示任务的期货列表,按照给定任务列表的迭代器产生的顺序顺序,每个都已完成
        异常
        NullPointerException - 如果任务或其任何元素是 null
        RejectedExecutionException - 如果任何任务无法安排执行
      • getUncaughtExceptionHandler

        public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler​()
        返回由于在执行任务时遇到不可恢复的错误而终止的内部工作线程的处理程序。
        结果
        处理程序,或 null如果没有
      • getParallelism

        public int getParallelism​()
        返回此池的目标并行度级别。
        结果
        这个池的目标平行度水平
      • getCommonPoolParallelism

        public static int getCommonPoolParallelism​()
        返回公共池的目标并行度级别。
        结果
        共同池的目标平行度水平
        从以下版本开始:
        1.8
      • getPoolSize

        public int getPoolSize​()
        返回已启动但尚未终止的工作线程数。 该方法返回的结果可能与getParallelism()不同,当创建线程以在其他协作被阻止时保持并行性。
        结果
        工作线程数
      • getAsyncMode

        public boolean getAsyncMode​()
        返回 true如果此池使用从未连接的分叉任务的本地先进先出调度模式。
        结果
        true如果此池使用异步模式
      • getRunningThreadCount

        public int getRunningThreadCount​()
        返回等待加入任务或其他受管同步的未阻止的工作线程数的估计。 此方法可能会高估正在运行的线程数。
        结果
        工作线程数
      • getActiveThreadCount

        public int getActiveThreadCount​()
        返回当前正在窃取或执行任务的线程数的估计。 此方法可能会高估活动线程的数量。
        结果
        活动线程的数量
      • isQuiescent

        public boolean isQuiescent​()
        如果所有工作线程当前处于空闲状态,则返回true 空闲的工作者是无法获取执行任务的工作,因为没有可用于从其他线程窃取,并且没有等待提交给池。 这种方法是保守的 它可能不会立即返回true所有线程空闲,但如果线程保持不活动,最终将成为true。
        结果
        如果所有线程当前处于空闲状态, true
      • getStealCount

        public long getStealCount​()
        返回从另一个线程的工作队列中偷取的任务总数的估计值。 报告的价值低估了池不是静止时的实际偷窃总数。 该值可能对于监视和调优fork / join程序很有用:通常来说,窃取计数应该足够高以保持线程忙,但足够低以避免线程间的开销和争用。
        结果
        偷窃的次数
      • getQueuedTaskCount

        public long getQueuedTaskCount​()
        返回由工作线程(但不包括提交到池中尚未开始执行的任务)当前在队列中保留的任务总数的估计值。 该值只是通过遍历池中的所有线程获得的近似值。 此方法对于调整任务粒度可能很有用。
        结果
        排队任务的数量
      • getQueuedSubmissionCount

        public int getQueuedSubmissionCount​()
        返回提交给此池尚未开始执行的任务数量的估计。 该方法可能需要与提交的数量成比例的时间。
        结果
        排队提交的数量
      • hasQueuedSubmissions

        public boolean hasQueuedSubmissions​()
        如果有任何任务提交到此池尚未开始执行,则返回 true
        结果
        true如果有任何排队的提交
      • pollSubmission

        protected ForkJoinTask<?> pollSubmission​()
        删除并返回下一个未执行的提交(如果有)。 此方法在对具有多个池的系统中重新分配工作的此类的扩展中可能很有用。
        结果
        下一次提交,或 null如果没有
      • drainTasksTo

        protected int drainTasksTo​(Collection<? super ForkJoinTask<?>> c)
        从调度队列中删除所有可用的未执行的提交和分派任务,并将其添加到给定集合中,而不会更改其执行状态。 这些可能包括人为生成或包装的任务。 该方法仅在池已知静止时被调用。 其他时间的调用可能不会删除所有任务。 尝试向集合c添加元素时遇到的失败可能导致在抛出关联的异常时,元素既不在两个集合中,也可能不是两个集合。 如果在操作进行中修改了指定的集合,则此操作的行为是未定义的。
        参数
        c - 将元素传输到的集合
        结果
        转移的元素数量
      • toString

        public String toString​()
        返回一个标识此池的字符串,以及它的状态,包括运行状态,并行级和工作和任务计数的指示。
        重写:
        toStringObject
        结果
        一个标识这个池的字符串,以及它的状态
      • shutdown

        public void shutdown​()
        可能启动有序关闭,其中先前提交的任务被执行,但不会接受新的任务。 如果这是commonPool() ,调用对执行状态没有影响,如果已经关闭,则不起作用。 在此方法过程中同时提交的任务可能会被拒绝,也可能不会被拒绝。
        异常
        SecurityException - 如果安全管理器存在,并且主叫方不允许修改线程,因为它不保留RuntimePermission ("modifyThread")
      • shutdownNow

        public List<Runnable> shutdownNow​()
        可能尝试取消和/或停止所有任务,并拒绝所有后续提交的任务。 如果这是commonPool() ,调用对执行状态没有影响,如果已经关闭,则不起作用。 否则,在此方法过程中同时提交或执行的任务可能会被拒绝也可能不会被拒绝。 该方法取消现有和未执行的任务,以便在存在任务依赖性的情况下允许终止。 所以方法总是返回一个空列表(与其他执行程序不同)。
        结果
        一个空的列表
        异常
        SecurityException - 如果安全管理器存在并且调用者不允许修改线程,因为它不保留RuntimePermission ("modifyThread")
      • isTerminated

        public boolean isTerminated​()
        如果所有任务在关闭后完成,则返回 true
        结果
        true如果所有任务在关闭后已经完成
      • isTerminating

        public boolean isTerminating​()
        如果终止程序已经开始但尚未完成,则返回true 此方法可能对调试有用。 返回true报告了关闭后的足够的时间可能表明提交的任务已经忽略或抑制中断,或者正在等待I / O,导致执行器不能正常终止。 (见ForkJoinTask的咨询说明,说明任务通常不应该阻塞操作,但如果这样做,则必须中断它们)。
        结果
        true如果终止但尚未终止
      • isShutdown

        public boolean isShutdown​()
        如果此池已关闭,则返回 true
        结果
        true如果这个池已被关闭
      • awaitTermination

        public boolean awaitTermination​(long timeout,
                                        TimeUnit unit)
                                 throws InterruptedException
        阻止所有任务在关闭请求完成后执行,或发生超时或当前线程中断,以先到者为准。 因为commonPool()从不终止,直到程序关闭,当应用于公共池时,此方法相当于awaitQuiescence(long, TimeUnit)但始终返回false
        参数
        timeout - 等待的最长时间
        unit - 超时参数的时间单位
        结果
        true如果这个执行者终止并且 false如果终止之前的超时时间
        异常
        InterruptedException - 如果在等待时中断
      • awaitQuiescence

        public boolean awaitQuiescence​(long timeout,
                                       TimeUnit unit)
        如果被这个池中的一个ForkJoinTask调用,相当于ForkJoinTask.helpQuiesce() 否则,等待和/或尝试协助执行任务,直到该池isQuiescent()或指示的超时过去。
        参数
        timeout - 等待的最长时间
        unit - 超时参数的时间单位
        结果
        true如果静止; false如果超时已过。
      • managedBlock

        public static void managedBlock​(ForkJoinPool.ManagedBlocker blocker)
                                 throws InterruptedException
        运行给定的可能的阻止任务。 running in a ForkJoinPool时 ,如果需要 ,该方法可能会排列备用线程,以确保当前线程在blocker.block()被阻塞时的足够的并行

        该方法重复调用blocker.isReleasable()blocker.block()直到任一方法返回true 每次拨打blocker.block()之前都会打电话给blocker.isReleasable() ,返回false

        如果没有在ForkJoinPool中运行,则此方法在行为上相当于

           while (!blocker.isReleasable()) if (blocker.block()) break; 
        如果在ForkJoinPool中运行,则可以首先扩展池,以确保在呼叫期间提供足够的并行性为blocker.block()
        参数
        blocker - 阻止任务
        异常
        InterruptedException - 如果 blocker.block()这样做
      • newTaskFor

        protected <T> RunnableFuture<T> newTaskFor​(Runnable runnable,
                                                   T value)
        描述从类别复制: AbstractExecutorService
        为给定的可运行和默认值返回一个 RunnableFuture
        重写:
        newTaskForAbstractExecutorService
        参数类型
        T - 给定值的类型
        参数
        runnable - 正在包装的可运行任务
        value - 返回的未来的默认值
        结果
        一个 RunnableFuture ,当运行时,将运行底层可运行程序,并作为一个 Future ,将产生给定的值作为其结果,并提供取消基础任务
      • newTaskFor

        protected <T> RunnableFuture<T> newTaskFor​(Callable<T> callable)
        描述从类复制: AbstractExecutorService
        为给定的可调用任务返回一个 RunnableFuture
        重写:
        newTaskForAbstractExecutorService
        参数类型
        T - 可调用结果的类型
        参数
        callable - 被打包的可执行任务
        结果
        一个 RunnableFuture ,当运行时,将调用底层可调用,作为一个 Future ,将作为其结果产生可调用的结果,并提供取消基础任务