线程同步工具-CyclicBarrier

xiaoxiao2021-02-27  227

CyclicBarrier可以允许一个或者多个线程在一个确定的点同步数据,他的使用上和CountDownLatch类似,但是有趣的是你可以传递一个外加的 Runnable 对象作为初始参数,并且当全部线程都到达同一个点时,CyclicBarrier类 会把这个对象当做线程来执行,这在使用分区处理编程的时候能很好的发挥其并发性能 比如我们有一个数据量较大的二维整数数组,要找到某个整数在其中出现的次数。

生成一个二维矩阵 public class MatrixMock { private int data[][]; public MatrixMock(int rows , int widths) { Random random = new Random(47); data = new int[rows][widths]; for (int i = 0; i < rows; i++) { for (int j = 0; j < widths; j++) { data[i][j] = random.nextInt(10); } } } public int[] getRows(int row){ return data[row]; } } //此类用来装每个线程的查询结果 public class Results { private List<Integer> resultList; public Results() { this.resultList = new ArrayList<>(); } public List<Integer> getResultList() { return resultList; } public void setResult(int data){ this.resultList.add(data); } } //查询工具类 public class Searcher implements Runnable { private MatrixMock mock; private int startRows; private int endRows; private int seacherNumber; private Results results; private final CyclicBarrier cyclicBarrier; public Searcher(MatrixMock mock, int startRows, int endRows, int seacherNumber, Results results,CyclicBarrier cyclicBarrier) { this.mock = mock; this.startRows = startRows; this.endRows = endRows; this.seacherNumber = seacherNumber; this.results = results; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { int count = 0; System.out.printf("%s : Processing lines from %d to %d .\n",Thread.currentThread().getName(),startRows,endRows); for(int i = startRows ; i < endRows ; i++){ int row[] = mock.getRows(i); for (int j = 0; j < row.length; j++) { if(row[j]==seacherNumber){ count++; } } } results.setResult(count); System.out.printf("%s: Lines processed.\n",Thread. currentThread().getName()); try { //计算完成以后等待 cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } //聚合类 将查询结果聚合 此类在CyclicBarrier唤醒线程后执行 public class Grouper implements Runnable { private Results results; private int count; public Grouper(Results results) { this.results = results; } @Override public void run() { for (int i = 0; i < results.getResultList().size(); i++) { count += results.getResultList().get(i); } } public int getCount(){ return count; } } //测试类 public class Main { public static void main(String[] args) { final int ROWS = 10000; final int NUMBERS = 1000; final int SEARCH = 5; final int PARTICIPANTS = 5; final int LINES_PARTICIPANT = 2000; MatrixMock matrixMock = new MatrixMock(ROWS,NUMBERS); Results results = new Results(); Grouper grouper = new Grouper(results); CyclicBarrier cyclicBarrier = new CyclicBarrier(PARTICIPANTS,grouper); Thread[] threads = new Thread[PARTICIPANTS]; for (int i = 0; i < PARTICIPANTS; i++) { threads[i] = new Thread(new Searcher(matrixMock, i * LINES_PARTICIPANT, (i + 1) * LINES_PARTICIPANT, SEARCH, results, cyclicBarrier)); threads[i].start(); } for (int i = 0; i < PARTICIPANTS; i++) { try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.printf("final result : "+grouper.getCount()); } }

例子中解决的问题比较简单。我们有一个很大的随机的整数矩阵,然后你想知道这矩阵里面某个数字出现的次数。为了更好的执行,我们使用了 divide 和 conquer 技术。我们 divide 矩阵成5个子集,然后在每个子集里使用一个线程来查找数字。这些线程是 Searcher 类的对象。 如我们之前提到的,CyclicBarrier 类有一个内部计数器控制到达同步点的线程数量。每次线程到达同步点,它调用 await() 方法告知 CyclicBarrier 对象到达同步点了。CyclicBarrier 把线程放入睡眠状态直到全部的线程都到达他们的同步点。

当全部的线程都到达他们的同步点,CyclicBarrier 对象叫醒全部正在 await() 方法中等待的线程们,然后,选择性的,为CyclicBarrier的构造函数 传递的 Runnable 对象(例子里,是 Grouper 对象)创建新的线程执行外加任务。

转载请注明原文地址: https://www.6miu.com/read-8776.html

最新回复(0)