生产者消费者模式

xiaoxiao2021-02-28  31

生产者消费者模式

生产者消费者也是一个非常经典的多线程模式,是我们在实际开发中应用非常广泛的思想理念.在生产者-消费者模式中:通常由两类线程,即若干个生产者线程和若干个 消费者线程.生产者线程复制提交用户请求,消费者线程则负责具体处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信

生产者:

public class Provider implements Runnable { //共享缓存区 private BlockingQueue<Data> queue; //多线程间是否启动变量,有强制从主内存中刷新的功能,即返回线程的状态 private volatile boolean isRunning = true; //id生成器 private static AtomicInteger count = new AtomicInteger(); //随机对象 private Random r = new Random(); public Provider(BlockingQueue queue) { this.queue = queue; } @Override public void run() { while(isRunning) { //随机休眠0-1000毫秒,表示获取数据(产生数据时的消耗) try { Thread.sleep(r.nextInt(1000)); // 获取的数据进行累计 int id = count.incrementAndGet(); //比如通过一个getData()方法获取了 Data data = new Data(Integer.toString(id), "数据" + id); System.out.println("当前线程:" + Thread.currentThread().getName() +",获取了数据,id为:" + id +",进行装载到公共缓存区中..."); if(!this.queue.offer(data, 2, TimeUnit.SECONDS)) { /* * 在这个判断过程中,生产者就已经把数据交给了队列了 */ System.out.println("提交缓冲区数据失败"); //do something ... 比如说重新提交数据 } } catch (InterruptedException e) { e.printStackTrace(); } } } public void stop() { this.isRunning = false; } }

分析:生产者生产的数据并不是直接给消费者的,而是交给queue(队列)做为一个缓存,消费者也是从这个队列里面拿数据.


消费者:

public class Consumer implements Runnable { private BlockingQueue<Data> queue; public Consumer(BlockingQueue queue) { this.queue = queue; } //随机对象 private static Random r = new Random(); @Override public void run() { while(true) { try { //获取数据 Data data = this.queue.take(); //进行数据处理,休眠0-1000毫秒耗时 Thread.sleep(r.nextInt(1000)); System.out.println("当前消费线程:" + Thread.currentThread().getName() + ",消费成功,消费数据id:" + data.getId()); } catch (InterruptedException e) { e.printStackTrace(); } } } }

测试端:

public class Main { public static void main(String[] args) { // 内存缓冲区 BlockingQueue<Data> queue = new LinkedBlockingDeque<Data>(); // 生产者 Provider p1 = new Provider(queue); Provider p2 = new Provider(queue); Provider p3 = new Provider(queue); // 消费者 Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程, //没有任务的时候不创建线程。空闲线程存活时间为60s(默认值) ExecutorService cachedPool = Executors.newCachedThreadPool(); cachedPool.execute(p1); cachedPool.execute(p2); cachedPool.execute(p3); cachedPool.execute(c1); cachedPool.execute(c2); cachedPool.execute(c3); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } p1.stop(); p2.stop(); p3.stop(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //cachedPool.shutdown(); //cachedPool.shutdownNow(); } }

图:

转载请注明原文地址: https://www.6miu.com/read-2627286.html

最新回复(0)