Master-Worker模式是常用的并行计算模式,其核心思想是系统由两类进程协作工作:Master和Worker进程。Master负责接收和分配任务,Worker负责处理子任务。当Worker完成子任务后会将结果返回给Master,Master则对Worker们的返回结果做归纳和总结,其好处是将一个大任务分解成若干个小任务,并发执行从而提高系统的吞吐量。从某种角度来讲,其思想跟“分治”还是有点相似的。
下面给出一段小例子来进行模拟Master-worker模式:
Master:
master应当有三个容器,分别用于存放Task任务、Worker以及Worker执行Task后的返回结果集。master主要有四个方法,分别是submit(Task task)(用于提交任务,即将Task放到Task容器里边,这里我们使用了ConcurrentLinkedQueue队列来充当Task容器)、execute()(任务装载完毕,master启动worker们去完成task任务)、isComplete()(master用于判断worker是否已经完成其具体的任务)、getResult()(获取已经完成任务的worker的结果)。master有其对应的构造方法Master(Worker worker,int workerCount)用于初始化一个master。
public class Master { //容器一:存放来自main的任务,这里涉及到并发,所以使用并发类容器 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); //容器二:存放Worker,由于有Master进行分配,使每一个Worker不存在并发,所以使用普通容器 private HashMap<String,Thread> workerMap = new HashMap<String,Thread>(); //容器三:存放Worker们的任务返回结果集,由于这里涉及到并发,所以使用并发类容器 private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String,Object>(); //构造一个Master @SuppressWarnings("unused") private Master() {} public Master(Worker worker,int workerCount) { worker.setWorkQueue(this.workQueue); worker.setResultMap(this.resultMap); for(int index = 0;index < workerCount;index++) { this.workerMap.put(Integer.toString(index), new Thread(worker)); } } //提交任务 public void submit(Task task) { this.workQueue.add(task); } //启动Worker们去执行任务 public void execute() { for(Map.Entry<String, Thread> me : workerMap.entrySet()) { me.getValue().start(); } } //判断一个Worker是否运行结束 public boolean isComplete() { for(Map.Entry<String, Thread> me : workerMap.entrySet()) { if(me.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } //计算结果 public int getResult() { int priceResult = -1; for(Map.Entry<String, Object> me : resultMap.entrySet()) { priceResult += (Integer)me.getValue(); } return priceResult; } }Worker:
Worker们都是独立的一个线程,所以应当实现Runnable接口。Worker应当有一个任务队列workQueue用于装载这一个worker所需要执行的任务,有一个结果集resultMap用于装载这个worker执行每一个任务的结果。worker应当有一个任务处理方法 handle(Task input) 用于处理master分配给他的任务。
public class Worker implements Runnable { private ConcurrentLinkedQueue<Task> workQueue; private ConcurrentHashMap<String,Object> resultMap; public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) { this.workQueue = workQueue; } public void setResultMap(ConcurrentHashMap<String,Object> resultMap) { this.resultMap = resultMap; } @Override public void run() { while(true) { Task input = this.workQueue.poll(); if(input == null) break; Object output = handle(input); this.resultMap.put(Integer.toString(input.getId()), output); } } private Object handle(Task input) { Object output = null; try { //假设处理一个任务耗时5秒 Thread.sleep(5*1000); output = input.getPrice(); }catch(Exception e) { System.err.println(e.getMessage()); } return output; } }Task:
task表示任务,一个请求可能存在N个任务,master将这些任务分配给不同的worker进行处理。这里我们假设任务为计算,给出Task类:
public class Task { private int id; private int price; ... get/set方法 ... @Override public String toString() { return "Task [id=" + id + ", price=" + price + "]"; } }下面进行测试,给出测试类如下:
public static void main(String[] args) { Master master = new Master(new Worker(),1000); Random r = new Random(System.currentTimeMillis()); for(int index = 1;index < 1001;index++) { Task task = new Task(); task.setId(index); task.setPrice(r.nextInt(1000)); master.submit(task); } System.err.println("下面开始执行任务"); master.execute(); long start = System.currentTimeMillis(); while(true) { if(master.isComplete()) { long spend = System.currentTimeMillis() - start; int priceResult = master.getResult(); System.err.println("最终结果:"+priceResult+",执行时间:"+spend/1000+"秒"); break; } } }执行N次:
//20个worker:10-5;20-5;30-10;40-10;50-15;60-15;100-25;1000-250(task数量-执行时间) //200个worker:1000-25(task数量-执行时间) //1000个worker:1000-5(task数量-执行时间) //2000个worker:1000-4(task数量-执行时间)
