生产者:
public class Provider implements Runnable { //共享缓存区 private BlockingQueue<Data> queue; //多线程间是否启动变量,有强制从主内存中刷新的功能,即返回线程的状态 private volatile boolean isRunning = true; //id生成器 private static AtomicInteger count = new AtomicInteger(); //随机对象 private Random r = new Random(); public Provider(BlockingQueue queue) { this.queue = queue; } @Override public void run() { while(isRunning) { //随机休眠0-1000毫秒,表示获取数据(产生数据时的消耗) try { Thread.sleep(r.nextInt(1000)); // 获取的数据进行累计 int id = count.incrementAndGet(); //比如通过一个getData()方法获取了 Data data = new Data(Integer.toString(id), "数据" + id); System.out.println("当前线程:" + Thread.currentThread().getName() +",获取了数据,id为:" + id +",进行装载到公共缓存区中..."); if(!this.queue.offer(data, 2, TimeUnit.SECONDS)) { /* * 在这个判断过程中,生产者就已经把数据交给了队列了 */ System.out.println("提交缓冲区数据失败"); //do something ... 比如说重新提交数据 } } catch (InterruptedException e) { e.printStackTrace(); } } } public void stop() { this.isRunning = false; } }分析:生产者生产的数据并不是直接给消费者的,而是交给queue(队列)做为一个缓存,消费者也是从这个队列里面拿数据.
消费者:
public class Consumer implements Runnable { private BlockingQueue<Data> queue; public Consumer(BlockingQueue queue) { this.queue = queue; } //随机对象 private static Random r = new Random(); @Override public void run() { while(true) { try { //获取数据 Data data = this.queue.take(); //进行数据处理,休眠0-1000毫秒耗时 Thread.sleep(r.nextInt(1000)); System.out.println("当前消费线程:" + Thread.currentThread().getName() + ",消费成功,消费数据id:" + data.getId()); } catch (InterruptedException e) { e.printStackTrace(); } } } }测试端:
public class Main { public static void main(String[] args) { // 内存缓冲区 BlockingQueue<Data> queue = new LinkedBlockingDeque<Data>(); // 生产者 Provider p1 = new Provider(queue); Provider p2 = new Provider(queue); Provider p3 = new Provider(queue); // 消费者 Consumer c1 = new Consumer(queue); Consumer c2 = new Consumer(queue); Consumer c3 = new Consumer(queue); //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程, //没有任务的时候不创建线程。空闲线程存活时间为60s(默认值) ExecutorService cachedPool = Executors.newCachedThreadPool(); cachedPool.execute(p1); cachedPool.execute(p2); cachedPool.execute(p3); cachedPool.execute(c1); cachedPool.execute(c2); cachedPool.execute(c3); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } p1.stop(); p2.stop(); p3.stop(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //cachedPool.shutdown(); //cachedPool.shutdownNow(); } }图:
