看代码是如何实现的:
Worker:
//处理任务的线程 public class Worker implements Runnable{ private ConcurrentLinkedQueue workQueue; private ConcurrentHashMap resultMap; public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) { //指向Master的workeQueue(所有的线程任务都放在这个队列里面) this.workQueue = workQueue; } public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { //执行Master的resultMap(需要把结果集返回给Master进行统一处理) this.resultMap = resultMap; } //开启线程任务处理的业务 @Override public void run() { // 处理一个一个任务 while(true) { //从队列中获取任务 Task input = (Task) workQueue.poll(); if(input == null) { break; } //真正地区进行业务处理 Object output = handle(input); resultMap.put(Integer.toString(input.getId()), output); } } //进行任务处理 public Object handle(Task input) { Object output = null; try { Thread.sleep(500); // 模拟任务执行的之后用的时间 output = input.getPrice(); //模拟执行任务之后获得的结果集 } catch (Exception e) { e.printStackTrace(); } return output; } }Master:
public class Master { //存放任务的队列(必须是线程安全的的) 因为多个线程要从队列中拿到任务 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); //存放所有的worker对象(这个没有必要是线程安全的,只是一个标记的作用) private HashMap<String, Thread> workers = new HashMap<String, Thread>(); //存放每一个worker并发执行任务的结果集合 (这个也得是线程安全的) private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); //构造方法,传入worker,Master去管控所有的worker public Master(Worker worker, int workerCount) { // 持有Master的任务队列的引用(worker需要从任务队列中拿任务) worker.setWorkQueue(workQueue); // 持有Maste的结果集 引用, 因为worker处理完结果之后,需要把结果返回个Master的resultMap 让Master去集中处理 worker.setResultMap(resultMap); // 创建(workCount)个线程 线程对象是交给Master来管理的,所有用HashMap来存放 for(int i = 0; i < workerCount; i++) { workers.put("子节点" + Integer.valueOf(i), new Thread(worker)); //同时要创建worker线程 } } //提交任务的方法 public void submit(Task task) { this.workQueue.add(task); } //Master的启动方法,遍历所有的worker,让每一个线程类worker去start(); public void execute() { //遍历存放worker的HashMap for(Entry<String, Thread> me : workers.entrySet()) { me.getValue().start(); } } //判断任务十分执行完毕 public boolean isComplete() { for(Entry<String, Thread> me : workers.entrySet()) { //只要有一个线程没有执行完,就说明这整个任务还在运行,返回false if(me.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } //拿到worker返回的结果集统一处理 public int getResult() { int ret = 0; for(Entry<String, Object> me : resultMap.entrySet()) { ret += (Integer)me.getValue(); } return ret; } }Task:
public class Task { private Integer id; private String name; private Integer price; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getPrice() { return price; } public void setPrice(Integer price) { this.price = price; } }Main:(模拟客户端)
public class Main { public static void main(String[] args) { // Master master = new Master(new Worker(), Runtime.getRuntime().availableProcessors()); Master master = new Master(new Worker(), 100); Random random = new Random(); for(int i = 1; i <= 100; i++) { Task task = new Task(); task.setId(i); task.setName("任务:" + i); task.setPrice(random.nextInt(1000)); master.submit(task); } System.out.println("开始计算"); long start = System.currentTimeMillis(); master.execute(); while(true) { if(master.isComplete()) { long time = System.currentTimeMillis() - start; int ret = master.getResult(); System.out.println("处理完的结果是:" + ret + " | 用时: " + time); break; } } } }这个Runtime.getRuntime().availbleProcessors():是向 Java 虚拟机返回可用处理器的数目。 该值在特定的虚拟机调用期间可能发生更改。因此,对可用处理器数目很敏感的应用程序应该不定期地轮询该属性,并相应地调整其资源用法。
分析:
Master其实是做任务调度的,客户端传来任务,Master接受到之后,创建相应的Worker(线程类)进行处理,Worker是线程类,因为worker才是真正去处理任务的, 但是所有的任务并不是直接给worker,而是给了Master,所以worker得有一个引用执行master的workQueue(存放线程任务的队列),从master的任务队列中获取 任务,进行处理.