第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join使用两个类来完成以上两件事情:
ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类: RecursiveAction:用于没有返回结果的任务。RecursiveTask :用于有返回结果的任务。 ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。http://www.cnblogs.com/niejunlei/p/5985572.html
package threadPool; import java.util.concurrent.*; // 继承RecursiveAction来实现"可分解"的任务 class PrintTask extends RecursiveAction { // 每个“小任务”只最多只打印50个数 private static final int THRESHOLD = 50; private int start; private int end; // 打印从start到end的任务 public PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { // 当end与start之间的差小于THRESHOLD时,开始打印 if(end - start < THRESHOLD) { for (int i = start ; i < end ; i++ ) { System.out.println(Thread.currentThread().getName() + "的i值:" + i); } } else { // 如果当end与start之间的差大于THRESHOLD时,即要打印的数超过50个 // 将大任务分解成两个小任务。 int middle = (start + end) / 2; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end); // 并行执行两个“小任务” left.fork(); right.fork(); } } } public class ForkJoinPoolTest { public static void main(String[] args) throws Exception { ForkJoinPool pool = new ForkJoinPool(); // 提交可分解的PrintTask任务 pool.submit(new PrintTask(0 , 300)); pool.awaitTermination(2, TimeUnit.SECONDS); // 关闭线程池 pool.shutdown(); } }
package threadPool; import java.util.concurrent.*; import java.util.*; // 继承RecursiveTask来实现"可分解"的任务 class CalTask extends RecursiveTask<Integer> { // 每个“小任务”只最多只累加20个数 private static final int THRESHOLD = 20; private int arr[]; private int start; private int end; // 累加从start到end的数组元素 public CalTask(int[] arr , int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 当end与start之间的差小于THRESHOLD时,开始进行实际累加 if(end - start < THRESHOLD) { for (int i = start ; i < end ; i++ ) { sum += arr[i]; } return sum; } else { // 如果当end与start之间的差大于THRESHOLD时,即要累加的数超过20个时 // 将大任务分解成两个小任务。 int middle = (start + end) / 2; CalTask left = new CalTask(arr , start, middle); CalTask right = new CalTask(arr , middle, end); // 并行执行两个“小任务” left.fork(); right.fork(); // 把两个“小任务”累加的结果合并起来 return left.join() + right.join(); // ① } } } public class Sum { public static void main(String[] args) throws Exception { int[] arr = new int[100]; Random rand = new Random(); int total = 0; // 初始化100个数字元素 for (int i = 0 , len = arr.length; i < len ; i++ ) { int tmp = rand.nextInt(20); // 对数组元素赋值,并将数组元素的值添加到sum总和中。 total += (arr[i] = tmp); } System.out.println(total); // 创建一个通用池 ForkJoinPool pool = ForkJoinPool.commonPool(); // 提交可分解的CalTask任务 Future<Integer> future = pool.submit(new CalTask(arr , 0 , arr.length)); System.out.println(future.get()); // 关闭线程池 pool.shutdown(); } }