(1)核心变量
[java] view plain copy /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; 阻塞队列实现的关键参数,一个可重入锁,和两个条件,使用经典的双状态算法(two-condition algorithm)
[java] view plain copy /** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count; 一些基本参数,基础的数组对象,队头的位置,队尾的位置,队列中已有的数据量
(2)构造器
[java] view plain copy public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
队列构造方法,设置固定队列大小,是否需要公平访问队列,公平锁和非公平锁由 ReentrantLock (公平锁是正常走锁排队申请流程,非公平锁先尝试获取 AQS stat 状态锁,然后才走正常锁排队申请)提供
[java] view plain copy public ArrayBlockingQueue(int capacity) { this(capacity, false); } 默认构造方法是非公平的
(3) add 方法:新增数据
实际上是调用继承的抽象类 AbstractQueue 的 add 方法
[java] view plain copy public boolean add(E e) { return super.add(e); } public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } 上述的 offer(e) 是接口 Queue 未实现的方法,具体实现在 ArrayBlockingQueue
[java] view plain copy public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } } 如果队列还没满,则加入队尾并返回 true; 可以看出来 offer 方法如果插入不了不是进入阻塞状态,是直接返回一个 false 状态
将数据插入队尾,移动数组下标( inc(putIndex) 保证循环移动),队列总数 count 加 1,notEmpty.signal 唤醒等待拿数据的线程(在 AQS 的等待队列中的线程)
[java] view plain copy private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
(4)put 方法:新增数据 (如果满了就阻塞)
[java] view plain copy public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } 可以看到如果队列数据量 count == items.length 数组大小,则线程阻塞 await()
(5)poll:取数据,不是阻塞方法
[java] view plain copy public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; } 如果数据为空则返回 null,不为空则返回数据,并且唤醒 notFull 状态挂起的线程
(5)take:取数据,如果为空则阻塞
[java] view plain copy public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } (6) drainTo 方法:字面意思就是排干,就是把数据批量导入到一个集合类中,比一个一个 poll 效率高,因为加锁次数少
[java] view plain copy public int drainTo(Collection<? super E> c) { checkNotNull(c); if (c == this) throw new IllegalArgumentException(); final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int i = takeIndex; int n = 0; int max = count; while (n < max) { c.add(this.<E>cast(items[i])); items[i] = null; i = inc(i); ++n; } if (n > 0) { count = 0; putIndex = 0; takeIndex = 0; notFull.signalAll(); } return n; } finally { lock.unlock(); } } 三. 总结 看懂 ArrayBlockingQueue 需要先看懂 AbstractQueuedSynchronizer 和 ReentrantLock,阻塞就是靠 ReentrantLock 来实现的,而 ReentrantLock 是靠 AbstractQueuedSynchronizer 来实现加锁和释放锁。主要的算法就是上文提到的 two-condition algorithm,这个算法应该在学生时代《操作系统》课程上见过很多次了 转发自:https://blog.csdn.net/wenniuwuren/article/details/51283505
