生产者与消费者
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。
举个例子:
在肯德基里,你点单之后点单员会把所点的食物完成封装之后拿来你面前,然后让你结账,有时候有些耗时操作没完成就会留下一个餐台号稍后送来。
而在麦当劳的点餐模型大致是,你点完快餐之后要求你立即付款,付完款之后下一位点餐,而取餐的是在旁边等待,另一个服务员专责负责配餐。
在并发模型中,肯德基比较倾向于一个线程把所有的服务都做完,而麦当劳倾向于服务解耦,让他们更专注于自己的业务。而肯德基的模型与BIO服务器的模型设计类似,麦当劳的模型则与生产者消费者模型十分相似。
具体实现:
生产者负责生产一个数字并存入缓冲区,消费者从缓冲区中取出数据并且求出它的平方并输出。
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 生产者 * @author ysm * 生产者消费者模型 */ public class Producer implements Runnable { private volatile boolean isRunning = true; private BlockingQueue<PCData> queue;// 内存缓冲区 private static AtomicInteger count = new AtomicInteger();// 总数 原子操作 private static final int SLEEPTIME = 1000; public Producer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { PCData data = null; Random r = new Random(); System.out.println("start producting id:" + Thread.currentThread().getId()); try { while (isRunning) { Thread.sleep(r.nextInt(SLEEPTIME)); data = new PCData(count.incrementAndGet()); System.out.println(data + " 加入队列"); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.err.println(" 加入队列失败"); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } public void stop() { isRunning = false; } } import java.text.MessageFormat; import java.util.Random; import java.util.concurrent.BlockingQueue; /** * 消费者 * @author ysm */ public class Consumer implements Runnable{ private BlockingQueue<PCData> queue; private static final int SLEEPTIME = 1000; public Consumer(BlockingQueue<PCData> queue){ this.queue = queue; } @Override public void run() { System.out.println("start Consumer id :"+Thread.currentThread().getId()); Random r = new Random(); try{ while(true){ PCData data = queue.take(); if(data != null) { int re = data.getData() * data.getData(); System.out.println(MessageFormat.format("{0}*{1}={2}", data.getData(),data.getData(),re)); Thread.sleep(r.nextInt(SLEEPTIME)); } } }catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } } import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; /** * 主函数 * @author ysm * */ public class Main { public static void main(String[] args) throws InterruptedException { BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(10); Producer p1 = new Producer(queue); Producer p2 = new Producer(queue); Producer p3 = new Producer(queue); Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(p1); service.execute(p2); service.execute(p3); service.execute(c1); service.execute(c2); service.execute(c3); Thread.sleep(10*1000); p1.stop(); p2.stop(); p3.stop(); Thread.sleep(3000); service.shutdown(); } } /** * 容器数据类型 * @author ysm * */ public class PCData { private final int intData; public PCData(int d){ intData = d; } public PCData(String d){ intData = Integer.valueOf(d); } public int getData(){ return intData; } @Override public String toString(){ return "data:"+intData; } }因为BlockingQueue是一个阻塞队列,它的存取可以保证只有一个线程在进行,所以根据逻辑,生产者在内存满的时候进行等待,并且唤醒消费者队列,反过来消费者在饥饿状态下等待并唤醒生产者进行生产。
下面的版本是使用notify/wait()方法进行设计的。在结构上是一致遵从模型图的。
import java.util.List; /** * 消费者 * * @author ysm * */ public class Consumer implements Runnable { private List<PCData> queue; public Consumer(List<PCData> queue) { this.queue = queue; } @Override public void run() { try { while (true) { if (Thread.currentThread().isInterrupted()) break; PCData data = null; synchronized (queue) { if (queue.size() == 0) { queue.wait(); queue.notifyAll(); } data = queue.remove(0); } System.out.println( Thread.currentThread().getId() + " 消费了:" + data.get() + " result:" + (data.get() * data.get())); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } import java.util.List; import java.util.Random; /** * 生产者 * * @author ysm * */ public class Producer implements Runnable { private List<PCData> queue; private int length; public Producer(List<PCData> queue, int length) { this.queue = queue; this.length = length; } @Override public void run() { try { while (true) { if (Thread.currentThread().isInterrupted()) break; Random r = new Random(); long temp = r.nextInt(100); System.out.println(Thread.currentThread().getId() + " 生产了:" + temp); PCData data = new PCData(); data.set(temp); synchronized (queue) { if (queue.size() >= length) { queue.notifyAll(); queue.wait(); } else queue.add(data); } Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 测试主方法 * @author ysm * */ public class Main { public static void main(String[] args) { List<PCData> queue = new ArrayList<PCData>(); int length = 10; Producer p1 = new Producer(queue,length); Producer p2 = new Producer(queue,length); Producer p3 = new Producer(queue,length); Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(p1); service.execute(p2); service.execute(p3); service.execute(c1); service.execute(c2); service.execute(c3); } } /** * 基本数据类型 * @author ysm * */ public class PCData { private long value; public void set(long value){ this.value = value; } public long get(){ return value; } }下面的版本是使用await()/signal()方法进行设计的。在结构上是一致遵从模型图的。
import java.util.List; /** * 消费者 * @author ysm * */ public class Consumer implements Runnable{ private List<PCData> queue; public Consumer(List<PCData> queue){ this.queue = queue; } @Override public void run() { try { while (true) { if (Thread.currentThread().isInterrupted()) break; PCData data = null; Main.lock.lock(); if (queue.size() == 0){ Main.full.signalAll(); Main.empty.await(); } Thread.sleep(1000); data = queue.remove(0); Main.lock.unlock(); System.out.println("消费者ID:"+Thread.currentThread().getId()+" 消费了:"+data.getData()+" result:"+(data.getData()*data.getData())); } } catch (InterruptedException e) { e.printStackTrace(); } } } import java.util.List; import java.util.Random; /** * 生产者 * @author ysm * */ public class Producter implements Runnable{ private List<PCData> queue; private int len; public Producter(List<PCData> queue,int len){ this.queue = queue; this.len = len; } @Override public void run() { try{ while(true){ if(Thread.currentThread().isInterrupted()) break; Random r = new Random(); PCData data = new PCData(); data.setData(r.nextInt(500)); Main.lock.lock(); if(queue.size() >= len) { Main.empty.signalAll(); Main.full.await(); } Thread.sleep(1000); queue.add(data); Main.lock.unlock(); System.out.println("生产者ID:"+Thread.currentThread().getId()+" 生产了:"+data.getData()); } }catch (InterruptedException e) { e.printStackTrace(); } } } import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Main { public static ReentrantLock lock = new ReentrantLock(); public static Condition empty = lock.newCondition(); public static Condition full = lock.newCondition(); public static void main(String[] args) { List<PCData> queue = new ArrayList<PCData>(); int length = 10; Producter p1 = new Producter(queue,length); Producter p2 = new Producter(queue,length); Producter p3 = new Producter(queue,length); Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(p1); service.execute(p2); service.execute(p3); service.execute(c1); service.execute(c2); service.execute(c3); } } public class PCData { private int data; public int getData() { return data; } public void setData(int data) { this.data = data; } }Java中的线程池类其实就是一种生产者和消费者模式的实现方式,但是我觉得其实现方式更加高明。生产者把任务丢给线程池,线程池创建线程并处理任务,如果将要运行的任务数大于线程池的基本线程数就把任务扔到阻塞队列里,这种做法比只使用一个阻塞队列来实现生产者和消费者模式显然要高明很多,因为消费者能够处理直接就处理掉了,这样速度更快,而生产者先存,消费者再取这种方式显然慢一些。
我们的系统也可以使用线程池来实现多生产者消费者模式。比如创建N个不同规模的Java线程池来处理不同性质的任务,比如线程池1将数据读到内存之后,交给线程池2里的线程继续处理压缩数据。线程池1主要处理IO密集型任务,线程池2主要处理CPU密集型任务。
平时的工作中思考下哪些场景可以使用生产者消费者模式,这种场景应该非常之多,特别是需要处理任务时间比较长的场景,比如上传附件并处理,用户把文件上传到系统后,系统把文件丢到队列里,然后立刻返回告诉用户上传成功,最后消费者再去队列里取出文件处理。比如调用一个远程接口查询数据,如果远程服务接口查询时需要几十秒的时间,那么它可以提供一个申请查询的接口,这个接口把要申请查询任务放数据库中,然后该接口立刻返回。然后服务器端用线程轮询并获取申请任务进行处理,处理完之后发消息给调用方,让调用方再来调用另外一个接口拿数据。