JDK 源码解析 —— ArrayBlockingQueue

xiaoxiao2021-02-28  31

零. 简介 ArrayBlockingQueue 是一个由 数组作为基础数据结构的 有界阻塞队列。出队入队使用先进先出算法,即 FIFO (first in first out)。head 是停留在队列中最长的节点,tail 停留在队列中最短的节点。从 head 出队列,从 tail 入队列。 这是一个典型的「有界缓冲区」,一个固定大小的数组持有从生产者(producers)产生的和被消费者(consumers)消费的数据。 此队列一旦创建,队列大小就固定了,不能再改变。尝试将数据加入到满队列中,将会被阻塞;尝试从空队列中取数据也同样会被阻塞。 本类提供生产者和消费者线程访问顺序的一个可选的公平策略。默认情况下,为了性能,这个访问顺序不被保证。不过,在初始化构造本类的时候,将 fairness 设置为 true,则可以保证线程访问的公平性,这样设置的坏处是降低吞吐量,好处是减少了可变性和线程的饥饿现象。 一. 代码

(1)核心变量

[java]  view plain  copy /** Main lock guarding all access */   final ReentrantLock lock;   /** Condition for waiting takes */   private final Condition notEmpty;   /** Condition for waiting puts */   private final Condition notFull;   阻塞队列实现的关键参数,一个可重入锁,和两个条件,使用经典的双状态算法(two-condition algorithm)

[java]  view plain  copy /** The queued items */   final Object[] items;      /** items index for next take, poll, peek or remove */   int takeIndex;      /** items index for next put, offer, or add */   int putIndex;      /** Number of elements in the queue */   int count;   一些基本参数,基础的数组对象,队头的位置,队尾的位置,队列中已有的数据量

(2)构造器

[java]  view plain  copy public ArrayBlockingQueue(int capacity, boolean fair) {       if (capacity <= 0)           throw new IllegalArgumentException();       this.items = new Object[capacity];       lock = new ReentrantLock(fair);       notEmpty = lock.newCondition();       notFull =  lock.newCondition();   }  

队列构造方法,设置固定队列大小,是否需要公平访问队列,公平锁和非公平锁由 ReentrantLock (公平锁是正常走锁排队申请流程,非公平锁先尝试获取 AQS stat 状态锁,然后才走正常锁排队申请)提供

[java]  view plain  copy public ArrayBlockingQueue(int capacity) {       this(capacity, false);   }   默认构造方法是非公平的

(3) add 方法:新增数据

实际上是调用继承的抽象类 AbstractQueue 的 add 方法

[java]  view plain  copy public boolean add(E e) {       return super.add(e);   }   public boolean add(E e) {       if (offer(e))           return true;       else           throw new IllegalStateException("Queue full");   }   上述的 offer(e) 是接口 Queue 未实现的方法,具体实现在 ArrayBlockingQueue

[java]  view plain  copy public boolean offer(E e) {       checkNotNull(e);       final ReentrantLock lock = this.lock;       lock.lock();       try {           if (count == items.length)               return false;           else {               insert(e);               return true;           }       } finally {           lock.unlock();       }   }   如果队列还没满,则加入队尾并返回 true; 可以看出来 offer 方法如果插入不了不是进入阻塞状态,是直接返回一个 false 状态

将数据插入队尾,移动数组下标( inc(putIndex) 保证循环移动),队列总数 count 加 1,notEmpty.signal 唤醒等待拿数据的线程(在 AQS 的等待队列中的线程)

[java]  view plain  copy private void insert(E x) {       items[putIndex] = x;       putIndex = inc(putIndex);       ++count;       notEmpty.signal();   }  

(4)put 方法:新增数据 (如果满了就阻塞)

[java]  view plain  copy public void put(E e) throws InterruptedException {       checkNotNull(e);       final ReentrantLock lock = this.lock;       lock.lockInterruptibly();       try {           while (count == items.length)               notFull.await();           insert(e);       } finally {           lock.unlock();       }   }   可以看到如果队列数据量 count == items.length 数组大小,则线程阻塞 await()

(5)poll:取数据,不是阻塞方法

[java]  view plain  copy public E poll() {       final ReentrantLock lock = this.lock;       lock.lock();       try {           return (count == 0) ? null : extract();       } finally {           lock.unlock();       }   }      private E extract() {       final Object[] items = this.items;       E x = this.<E>cast(items[takeIndex]);       items[takeIndex] = null;       takeIndex = inc(takeIndex);       --count;       notFull.signal();       return x;   }   如果数据为空则返回 null,不为空则返回数据,并且唤醒 notFull 状态挂起的线程

(5)take:取数据,如果为空则阻塞

[java]  view plain  copy public E take() throws InterruptedException {       final ReentrantLock lock = this.lock;       lock.lockInterruptibly();       try {           while (count == 0)               notEmpty.await();           return extract();       } finally {           lock.unlock();       }   }   (6) drainTo 方法:字面意思就是排干,就是把数据批量导入到一个集合类中,比一个一个 poll 效率高,因为加锁次数少

[java]  view plain  copy public int drainTo(Collection<? super E> c) {       checkNotNull(c);       if (c == this)           throw new IllegalArgumentException();       final Object[] items = this.items;       final ReentrantLock lock = this.lock;       lock.lock();       try {           int i = takeIndex;           int n = 0;           int max = count;           while (n < max) {               c.add(this.<E>cast(items[i]));               items[i] = null;               i = inc(i);               ++n;           }           if (n > 0) {               count = 0;               putIndex = 0;               takeIndex = 0;               notFull.signalAll();           }           return n;       } finally {           lock.unlock();       }   }   三. 总结 看懂 ArrayBlockingQueue 需要先看懂 AbstractQueuedSynchronizer 和 ReentrantLock,阻塞就是靠 ReentrantLock 来实现的,而 ReentrantLock 是靠 AbstractQueuedSynchronizer 来实现加锁和释放锁。主要的算法就是上文提到的 two-condition algorithm,这个算法应该在学生时代《操作系统》课程上见过很多次了  转发自:https://blog.csdn.net/wenniuwuren/article/details/51283505
转载请注明原文地址: https://www.6miu.com/read-2629622.html

最新回复(0)