递归处理一个可以折分成多个子任务的任务。
示例代码:
public class ForkJoinSum extends RecursiveTask<Integer> { public ForkJoinSum(int[] group, int from, int to) { _group = group; _from = from; _to = to; } private int[] _group; private int _from; private int _to; protected Integer compute() { if (_to - _from <= 2) { int back = 0; for (int i = _from; i <= _to; i++) back += _group[i]; return back; } else { int mid = (_from + _to) / 2; ForkJoinSum c1 = new ForkJoinSum(_group, _from, mid); ForkJoinSum c2 = new ForkJoinSum(_group, mid, _to); invokeAll(c1, c2); return c1.join() + c2.join(); } } } int[] a={1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20}; ForkJoinPool p= new ForkJoinPool(); ForkJoinSum task = new ForkJoinSum(a,0,19); p.invoke(task); System.out.println(task.join());对多个任务进行分步,只有所有任务的某一步骤完成时,才能进入下一步骤,和.net中的Barrier一样。 示例代码:
public class CyclicBarrierTask implements Runnable { public CyclicBarrierTask(CyclicBarrier barrier) { _barrier = barrier; } private CyclicBarrier _barrier; public void run() { long tid = Thread.currentThread().getId(); try { System.out.println(tid + ": Step 1"); _barrier.await(); System.out.println(tid + ": Step 2"); _barrier.await(); System.out.println(tid + ": Step 3"); _barrier.await(); System.out.println(tid + ": Step 4"); _barrier.await(); System.out.println(tid + "End"); } catch (BrokenBarrierException | InterruptedException e) { System.out.println(tid + ": Exception take with " + e.getMessage()); } } } ExecutorService pool = Executors.newCachedThreadPool(); CyclicBarrier barrier=new CyclicBarrier(4); for(int i=0;i<4;i++) { pool.submit(new CyclicBarrierTask(barrier)); }计数为0时执行,只能执行一次。
兄弟线程可以交换数据。
限定同时运行的任务数。示意代码如下:
final Semaphore semp = new Semaphore(5); for(int i=0;i<n;i++) Runnable run = new Runnable() { public void run() { try { // 获取许可 semp.acquire(); //... // 访问完后,释放 semp.release(); } catch (InterruptedException e) { e.printStackTrace(); } }需要注意的是不要释放多次semp,需要配对使用。
线程间共享数据,没有缓冲区。一个线程put即会阻塞,直到有线程take;一个线程take也会阻塞,直到有线程put。 示例代码
public class SynchronousQueueTest { private static SynchronousQueue<Integer> _syncQueue = new SynchronousQueue<>(); public static Runnable getPutTask() { return new Runnable() { @Override public void run() { try { Thread.sleep(2000); _syncQueue.put(2); } catch (InterruptedException e) { System.out.println(e.getMessage()); } } }; } public static Runnable getTakeTask(){ return new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println("take num is :"+_syncQueue.take()); } catch (InterruptedException e) { System.out.println(e.getMessage()); } } }; } } ExecutorService exec = Executors.newCachedThreadPool(); exec.submit(SynchronousQueueTest.getPutTask()); exec.submit(SynchronousQueueTest.getTakeTask());