Master-Worker模式

xiaoxiao2021-02-28  47

Master-Worker模式

常用的并行计算模式,它的核心思想是系统由两类进程协作合作:Master进程和Worker进程.Master负责接收和分配任务,Worker负责处理子任务. 当各个Worker子进程处理完成之后,会将结果返回给Master返回给Master,有Master做归纳和总结,其好处是能将一个大任务分解成若干个小任务, ,并行执行,从而提高系统的吞吐量

看代码是如何实现的:

Worker:

//处理任务的线程 public class Worker implements Runnable{ private ConcurrentLinkedQueue workQueue; private ConcurrentHashMap resultMap; public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) { //指向Master的workeQueue(所有的线程任务都放在这个队列里面) this.workQueue = workQueue; } public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { //执行Master的resultMap(需要把结果集返回给Master进行统一处理) this.resultMap = resultMap; } //开启线程任务处理的业务 @Override public void run() { // 处理一个一个任务 while(true) { //从队列中获取任务 Task input = (Task) workQueue.poll(); if(input == null) { break; } //真正地区进行业务处理 Object output = handle(input); resultMap.put(Integer.toString(input.getId()), output); } } //进行任务处理 public Object handle(Task input) { Object output = null; try { Thread.sleep(500); // 模拟任务执行的之后用的时间 output = input.getPrice(); //模拟执行任务之后获得的结果集 } catch (Exception e) { e.printStackTrace(); } return output; } }

Master:

public class Master { //存放任务的队列(必须是线程安全的的) 因为多个线程要从队列中拿到任务 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); //存放所有的worker对象(这个没有必要是线程安全的,只是一个标记的作用) private HashMap<String, Thread> workers = new HashMap<String, Thread>(); //存放每一个worker并发执行任务的结果集合 (这个也得是线程安全的) private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); //构造方法,传入worker,Master去管控所有的worker public Master(Worker worker, int workerCount) { // 持有Master的任务队列的引用(worker需要从任务队列中拿任务) worker.setWorkQueue(workQueue); // 持有Maste的结果集 引用, 因为worker处理完结果之后,需要把结果返回个Master的resultMap 让Master去集中处理 worker.setResultMap(resultMap); // 创建(workCount)个线程 线程对象是交给Master来管理的,所有用HashMap来存放 for(int i = 0; i < workerCount; i++) { workers.put("子节点" + Integer.valueOf(i), new Thread(worker)); //同时要创建worker线程 } } //提交任务的方法 public void submit(Task task) { this.workQueue.add(task); } //Master的启动方法,遍历所有的worker,让每一个线程类worker去start(); public void execute() { //遍历存放worker的HashMap for(Entry<String, Thread> me : workers.entrySet()) { me.getValue().start(); } } //判断任务十分执行完毕 public boolean isComplete() { for(Entry<String, Thread> me : workers.entrySet()) { //只要有一个线程没有执行完,就说明这整个任务还在运行,返回false if(me.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } //拿到worker返回的结果集统一处理 public int getResult() { int ret = 0; for(Entry<String, Object> me : resultMap.entrySet()) { ret += (Integer)me.getValue(); } return ret; } }

Task:

public class Task { private Integer id; private String name; private Integer price; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getPrice() { return price; } public void setPrice(Integer price) { this.price = price; } }

Main:(模拟客户端)

public class Main { public static void main(String[] args) { // Master master = new Master(new Worker(), Runtime.getRuntime().availableProcessors()); Master master = new Master(new Worker(), 100); Random random = new Random(); for(int i = 1; i <= 100; i++) { Task task = new Task(); task.setId(i); task.setName("任务:" + i); task.setPrice(random.nextInt(1000)); master.submit(task); } System.out.println("开始计算"); long start = System.currentTimeMillis(); master.execute(); while(true) { if(master.isComplete()) { long time = System.currentTimeMillis() - start; int ret = master.getResult(); System.out.println("处理完的结果是:" + ret + " | 用时: " + time); break; } } } }

这个Runtime.getRuntime().availbleProcessors():是向 Java 虚拟机返回可用处理器的数目。 该值在特定的虚拟机调用期间可能发生更改。因此,对可用处理器数目很敏感的应用程序应该不定期地轮询该属性,并相应地调整其资源用法。

分析:

Master其实是做任务调度的,客户端传来任务,Master接受到之后,创建相应的Worker(线程类)进行处理,Worker是线程类,因为worker才是真正去处理任务的, 但是所有的任务并不是直接给worker,而是给了Master,所以worker得有一个引用执行master的workQueue(存放线程任务的队列),从master的任务队列中获取 任务,进行处理.
并且客户端也是从master中拿到结果集,所以worker也得有引用指向master的resultMap,让每一个worker处理任务得到的结果集都返回给master,由master 统一进行处理.
这小案例中使用了三个集合有两个是并发类的容器:ConcurrentLinkedQueue,这个是用用来存放任务的,由于是多个线程去处理任务,需要从集合中拿到任务去进行 出处理,所以这个集合得是并发类的容器(保证线程安全) 还有一个并发类容器是ConcurrentHashMap,这个是worker用来想master返回结果集的,多个线程可能在同一时间类进行返回,所以这个容器也得是并发类的容器 还有一个集合就是一个普通的HashMap;这个HashMap的作用就是用来存放所有的所有的worker,让这个容器的的线程worker都去start,气孔,还有对里面的每一个 worker进行判断,看是否是执行完毕了的线程.这个集合就没有必要是线程安全的了.
转载请注明原文地址: https://www.6miu.com/read-2626891.html

最新回复(0)