生产者消费者模式之BlockingQueue

xiaoxiao2021-02-28  11

    前两篇关于生产者消费者模式的文章(《生产者消费者模式之synchronizedObject》,《生产者消费者模式之LockCondition》)介绍的都是比较传统的等待唤醒机制,即缓冲区有资源的时候就唤醒消费者消费,否则消费者就等待;缓冲区空的时候就唤醒生产者生产,否则生产者就等待。在这种机制下缓冲区最多只有一个资源供消费者消费,而且需要我们手动的控制等待和唤醒,比较麻烦,也容易出错。所以在JDK1.5 java.util.concurrent包下提供了BlockingQueue可以帮助我们更好的实现生产者消费者模式。

 

       BlockingQueue阻塞队列。顾名思义,带有阻塞的队列。其中造成队列阻塞的原因主要有如下两个:1、队列满时进行入队操作;2、队列空时进行出队操作。

    基于BlockingQueue的以上特点,当一个线程试图对一个已经满的队列进行入队操作时就会被阻塞;同理,当一个线程试图对一个空的队列进行出队操作时也会被阻塞。所以阻塞队列是线程安全的,我们编程的时候不用显式的加锁。

 

    下面简单介绍BlockingQueue在生产者消费者模式的应用,有关BlockingQueue及其家族成员的详细介绍请查看官方文档。

 

 BlockingQueue是一个接口,它有如下几个实现类:ArrayBlockingQueueDelayQueueLinkedBlockingDequeLinkedBlockingQueuePriorityBlockingQueueSynchronousQueue。在生产者消费者模式的应用中,使用ArrayBlockingQueueLinkedBlockingQueue就可以了,下面介绍这两者功能。

 

     ArrayBlockingQueue:基于数组的阻塞队列实现,在ArrayBlockingQueue内部维护了一个定长的数组以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量(putIndextakeIndex)分别标识着队列的头部和尾部在数组中的位置。ArrayBlockingQueue在生产者放入数据和消费者获取数据都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueueArrayBlockingQueueLinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

 

    LinkedBlockingQueue:基于链表的阻塞队列,与ArrayBlockingQueue类似,其内部也维护着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue也可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue就会指定容量Integer.MAX_VALUE,这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了

 

    作为BlockingQueue的使用者,我们不用也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给我们封装好了。BlockingQueue具有阻塞作用的两个方法是put()take()。

 

    下面演示使用LinkedBlockingQueue来实现生产者消费者模式:

 

    创建资源对象类Ticket

package com.gk.thread.communication; public class Ticket { private String place; // 车票的起始地 private String date; // 车票的日期 private int number; // 车票的数量 public boolean empty = true; // 标记,是否有车票,true表示没有车票 // 省略一系列getXxx()和setXxx() }

    创建生产者类Producer

package com.gk.thread.communication.queue; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.BlockingQueue; import com.gk.thread.communication.Ticket; public class Producer implements Runnable { private BlockingQueue<Ticket> blockingQueue; public Producer(BlockingQueue<Ticket> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { Ticket ticket = productionTicket(); try { blockingQueue.put(ticket); // 将ticket加入到队列中,如果队列满则阻塞 } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("生产了" + ticket.getNumber() + "张 " + ticket.getDate() + " " + ticket.getPlace() + "的票...\n"); } private Ticket productionTicket() { Ticket ticket = new Ticket(); String place = Thread.currentThread().getName(); String date = new SimpleDateFormat("yyyy-MM-dd HH点mm分").format(new Date()); int number = (int)(Math.random()*10 + 100); ticket.setPlace(place); ticket.setDate(date); ticket.setNumber(number); return ticket; } }

    创建消费者类Customer

package com.gk.thread.communication.queue; import java.util.concurrent.BlockingQueue; import com.gk.thread.communication.Ticket; public class Customer implements Runnable{ private BlockingQueue<Ticket> blockingQueue; public Customer(BlockingQueue<Ticket> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while(true) { Ticket ticket = null; try { ticket = blockingQueue.take(); // 从队列中获取ticket,如果队列为空则阻塞 } catch (InterruptedException e) { throw new RuntimeException(e); } String place = ticket.getPlace(); String date = ticket.getDate(); int number = ticket.getNumber(); System.out.println("消费了" + number + "张 " + date + " " + place + "的票...\n"); } } }

    测试代码

package com.gk.thread.communication.queue; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import com.gk.thread.communication.Ticket; import java.util.ArrayList; import java.util.Collections; public class Test { public static void main(String[] args) { /* * 多态方式创建BlockingQueue对象,存放的是Ticket类型, * 指定大小为5,不指定的话默认为Integer.MAX_VALUE */ BlockingQueue<Ticket> blockingQueue = new LinkedBlockingQueue<Ticket>(5); Runnable producer = new Producer(blockingQueue); Runnable customer = new Customer(blockingQueue); String from = "广州"; String[] to = {"北京", "上海", "长沙", "杭州", "重庆", "西安", "厦门", "拉萨", "西藏", "哈尔滨"}; List<String> place = Collections.synchronizedList(new ArrayList<String>()); for(int i=0; i<to.length; i++) { place.add(from + " ---> " + to[i]); } Thread t = new Thread(customer); // 一个消费者线程 t.setDaemon(true); t.start(); for(String p : place) { new Thread(producer, p).start(); // 多个生产者线程 // new Thread(customer).start(); } } }

转载请注明原文地址: https://www.6miu.com/read-2596323.html

最新回复(0)