Fork/Join框架是ExecutorService接口的一个实现,通过它我们可以实现多进程。Fork/Join可以用来将一个大任务递归的拆分为多个小任务,目标是充分利用所有的资源尽可能增强应用的性能。
和任何ExecutorService接口的实现一样,Fork/Join也会使用线程池来分布式的管理工作线程。Fork/Join框架的独特之处在于它使用了work-stealing(工作窃取)算法。通过这个算法,工作线程在无事可做时可以窃取其它正在繁忙的线程的任务来执行。
Fork/Join框架的核心是ForkJoinPool类,一个AbstractExecutorService类的子类。ForkJoinPool实现了核心的work-stealing算法并可以执行ForkJoinTask处理。
基础用法
使用Fork/Join框架的第一步是编写执行碎片任务的代码。要编写的代码类似如下伪代码:
1 2 3 4 5 |
if 任务足够小: 直接执行任务 else: 将任务切成两个小任务 执行两个小任务并等待结果 |
使用ForkJoinTask子类来封装如上的代码,通常会使用一些JDK提供的类,使用的有RecursiveTask(这个类会返回一个结果)和RecursiveAction两个类。
在准备好ForkJoinTask子类后,创建一个代表所有任务的对象,并将之传递给一个ForkJoinPool实例的invoke()方法。
由模糊到清晰
为了辅助理解Fork/Join框架是如何工作的,我们使用一个案例来进行说明:比如对一张图片进行模糊处理。我们用一个整型数组表示图片,其中的每个数值代表一个像素的颜色。被模糊的图片也用一个同等长度的数组来表示。
执行模糊是通过对代表图片的每个像素进行处理实现的。计算每个像素与其周围像素的均值(红黄蓝三原色的均值),计算生成的结果数组就是模糊后的图片。由于代表图像的通常都是一个大数组,整个处理过程需要通常会需要很多时间。可以使用Fork/Join框架利用多处理器系统上的并发处理优势来进行提速。下面是一个可能的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
package com.zhyea.robin; import java.util.concurrent.RecursiveAction; public class ForkBlur extends RecursiveAction { private int[] mSource; private int mStart; private int mLength; private int[] mDestination; // 处理窗口大小; 需要是一个奇数. private int mBlurWidth = 15; public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } protected void computeDirectly() { int sidePixels = (mBlurWidth - 1) / 2; for (int index = mStart; index < mStart + mLength; index++) { // 计算平均值. float rt = 0, gt = 0, bt = 0; for (int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1); int pixel = mSource[mindex]; rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth; gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth; bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth; } // 重组目标像素. int dpixel = (0xff000000) | (((int) rt) << 16) | (((int) gt) << 8) | (((int) bt) << 0); mDestination[index] = dpixel; } } .... } |
现在实现抽象方法compute(),在这个方法中既实现了模糊操作,也实现了将一个任务拆分成两个小任务。这里仅是简单依据数组长度来决定是直接执行任务还是将之拆分成两个小任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
protected static int sThreshold = 100000; protected void compute() { if (mLength < sThreshold) { computeDirectly(); return; } int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination)); } |
因为上面这些方法的实现是定义在RecursiveAction的一个子类中,可以直接在一个ForkJoinPool中创建并运行任务。具体步骤如下:
1. 创建一个代表要执行的任务的对象:
1 2 3 |
// src 表示源图片像素的数组 // dst 表示生成的图片的像素 ForkBlur fb = new ForkBlur(src, 0, src.length, dst); |
2. 创建一个运行任务的ForkJoinPool实例:
1 |
ForkJoinPool pool = new ForkJoinPool(); |
3. 运行任务:
1 |
pool.invoke(fb); |
在源代码中还包含了一些创建目标图片的代码。具体参考ForkBlur示例。
标准实现
要使用Fork/Join框架按自定义的算法在多核系统上执行并发任务当然需要实现自定义的类了(比如之前我们实现的ForkBlur类)。除此之外,在JavaSE中已经在广泛使用Fork/Join框架的一些特性了。比如Java8中的java.util.Arrays类的parallelSort()方法就使用了Fork/Join框架。具体可以参考Java API文档。
Fork/Join框架的另一个实现在java.util.streams包下,这也是java8的Lambda特性的一部分。
发表评论