- 介绍Java Fork/Join的基本概念和使用Demo。
Ref
- 《Java 8实战》书籍第7章节 - 分支/合并框架
- 聊聊并发(八)——Fork/Join框架介绍
- Java并发 - Fork/Join框架介绍
- 双端队列和工作窃取
什么是Fork/Join框架
Fork/Join
框架,即分支/合并框架,是 Java 7 中提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join
框架,和 MapReduce
的原理类似,都是通过将大任务拆分为小任务来实现并行计算,主要是利用分治法的思想来实现多任务并行计算。
Fork
就是把一个大任务切分为若干子任务并行的执行,Join
就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算 1+2+。。+10000
,可以分割成 10 个子任务,每个子任务分别对 1000 个数进行求和,最终汇总这 10 个子任务的结果。Fork/Join
的运行流程图如下。
Fork/Join
框架创建的任务需要通过 ForkJoinPool
来启动,ForkJoinPool
是一个线程池,比较特殊的是其线程数量是根据 CPU 的核心数来设置的。ForkJoinPool
是通过工作窃取(work-stealing
)算法来提高 CPU 的利用率的。
工作窃取算法
工作窃取(work-stealing
)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下。
每个线程中维护了一个双端队列来存储所需要执行的任务,而工作窃取算法允许从其他线程的双端队列中窃取一个最晚(Oldest
,队列的尾部) 的任务来执行,这样可以避免和当前任务所属的线程发生竞争。
为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
如上图所示,Thread2 从 Thread1 队列中拿出最晚的 Task1 来执行,Thread1 则拿出 Task2 来执行,这样就会避免发生竞争。
工作窃取算法优点
- 充分利用线程进行并行计算
- 减少了线程间的竞争
工作窃取算法缺点
- 在某些情况下会存在竞争(双端队列中只有一个任务)
- 消耗了更多的系统资源
在实际应用中,工作窃取算意味着这些任务差不多被平均分配到 ForkJoinPool
中的所有线程上,用于在池中的工作线程之间重新分配和平衡任务。下图展示了这个过程,当工作线程队列中有一个任务被分成两个子任务时,一个子任务就被闲置的工作线程“偷走”了。如前所述,这个过程可以不断递归,直到规定子任务应顺序执行的条件为真。
Fork/Join框架基础类
下面考虑如何设计一个 Fork/Join
框架,需要考虑如下 2 点
第 1 步分割任务。首先需要有一个
fork
类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。第 2 步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join
使用 2 个类来完成以上两件事情
ForkJoinTask
:我们要使用Fork/Join
框架,必须首先创建一个ForkJoinTask
任务。它提供在任务中执行fork()
和join()
操作的机制。通常情况下我们不需要直接继承ForkJoinTask
类,而只需要继承它的子类,Fork/Join
框架提供了以下两个子类RecursiveAction
:用于没有返回结果的任务。RecursiveTask
:用于有返回结果的任务。
ForkJoinPool
:ForkJoinTask
需要通过ForkJoinPool
来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
此外,框架还提供了下面 2 个类
ForkJoinWorkerThread
:是ForkJoinPool
内的worker thread
,执行ForkJoinTask
,内部有ForkJoinPool.WorkQueue
来保存要执行的ForkJoinTask
。ForkJoinPool.WorkQueue
:保存要执行的ForkJoinTask
。
更形象的总结如下
ForkJoinPool
: “管理者”ForkJoinTask
: “任务类型”,如RecursiveAction
和RecursiveTask
ForkJoinWorkerThread
: “工人”
Fork/Join 框架执行流程
1 | // fork/join计算斐波那契 |
ForkJoinPool
的每个工作线程都维护着一个双端工作队列(WorkQueue
),队列中存放着是任务(ForkJoinTask
)。- 每个工作线程在运行中产生新的任务(调用
fork()
)时,放入工作队列的队首(队首的任务的等待时间最短),并且工作线程在处理自己的工作队列时,使用的是FIFO
方式,也就是说每次从队首取出任务来执行。 - 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到
pool
的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队尾,也就是说工作线程在窃取其他工作线程的任务时,使用的是LIFO
方式。 - 在遇到
join()
时,如果需要join
的任务尚未完成,则会先处理其他任务,并等待其完成。 - 在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
FIFO : First in, First out,先进先出。 LIFO : Last in, First out,后进先出。
Fork/Join 使用Demo
1 | public class CountTest { |
程序运行结果如下
1 | 任务过大,切割的任务: 1加到 12的和 执行此任务的线程:ForkJoinPool-1-worker-1 |
从结果可以看出,提交的计算任务是由线程1执行,线程1进行了第一次切割,切割成两个子任务 “7加到12” 和 “1加到6”,并提交这两个子任务。然后这两个任务被线程2、线程3给窃取了。线程1 的内部队列中已经没有任务了,这时候,线程2、线程3 也分别进行了一次任务切割并各自提交了两个子任务,于是线程 1 也去窃取任务(这里窃取的都是线程2的子任务)。
Fork/Join 框架的异常处理
ForkJoinTask
在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以 ForkJoinTask
提供了 isCompletedAbnormally()
方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask
的 getException
方法获取异常。使用如下代码
1 | if(task.isCompletedAbnormally()) { |
getException
方法返回 Throwable
对象,如果任务被取消了则返回 CancellationException
。如果任务没有完成或者没有抛出异常则返回 null
。
FAQ
ForkJoinPool 使用 submit 与 invoke 提交的区别
invoke
是同步执行,调用之后需要等待任务完成,才能执行后面的代码。submit
是异步执行,只有在Future
调用get
的时候会阻塞。
继承 RecursiveTask 与 RecursiveAction的区别?
- 继承
RecursiveTask
:适用于有返回值的场景。 - 继承
RecursiveAction
:适合于没有返回值的场景。
子任务调用 fork 与 invokeAll 的区别?
fork
:让子线程自己去完成任务,父线程监督子线程执行,浪费父线程。invokeAll
:子父线程共同完成任务,可以更好的利用线程池。