Multi-Programming-8 线程安全类实现生产者和消费者

xiaoxiao2021-02-28  73

1.线程安全类 ArrayBlockingQueue

java库中提供了线程安全的类BlockingQueue, 使得我们在获取一个空的队列元素时被阻塞,在向一个满队列添加元素时被阻塞,线程等待直到满足合适的条件为止。 BlockingQueue中的方法按照处理异常的方式共分为四大类: 1). 第一种是直接抛出异常; 2). 第二种是返回一种特殊值,比如null或者false; 3). 在条件合适之前无限期地阻塞当前线程; 4). 在一个给定的最大时间内阻塞线程,超过时间则放弃。 java.util.concurrent.BlockingQueue<Integer> A java.util.Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element. BlockingQueue methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future: one throws an exception, the second returns a special value (either null or false, depending on the operation), the third blocks the current thread indefinitely until the operation can succeed, and the fourth blocks for only a given maximum time limit before giving up. These methods are summarized in the following table: Summary of BlockingQueue methods 这四种方法如下表所示: MethodThrows exceptionSpecial valueBlocksTimes outInsertadd(e)offer(e)put(e)offer(e, time,unit)Removeremove()poll()take()poll(time, unit)Examineelement()peek()not applicablenot applicable

A BlockingQueue does not accept null elements. Implementations throw NullPointerException on attempts to add, put or offer a null. A null is used as a sentinel value to indicate failure of poll operations.

A BlockingQueue may be capacity bounded. At any given time it may have a remainingCapacity beyond which no additional elements can be put without blocking. A BlockingQueue without any intrinsic capacity constraints always reports a remaining capacity of Integer.MAX_VALUE. BlockingQueue implementations are designed to be used primarily for producer-consumer queues, but additionally support the java.util.Collection interface. So, for example, it is possible to remove an arbitrary element from a queue using remove(x). However, such operations are in general not performed very efficiently, and are intended for only occasional use, such as when a queued message is cancelled. BlockingQueue implementations are thread-safe. 注意: 1).BlockingQueue是不接受null值的,因为null值是该类中某些方法的实现中的返回值的标志; 2).BlockingQueue可以设定容量大小,超过一定容量则会阻塞一些操作; 3).阻塞队列设计的初衷是为了解决生产者和消费者问题的,但后来为了支持Collection接口,添加了add()/remove()等方法,一般不要使用它们,这样效率比较低。

2. 代码实现

package com.fqyuan.blog; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ConcurrentQueue { public static void main(String[] args) { ConcurrentQueueUtil.demonstrate(); } } class SharedObjectCon { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); Random random = new Random(); public void produce() throws InterruptedException { while (true) { Thread.sleep(80); if (random.nextDouble() < 0.1) queue.put(random.nextInt(100)); } } public void consume() throws InterruptedException { while (true) { Thread.sleep(100); if (random.nextInt(10) < 1) { int val = queue.take(); System.out.println("Value taken is " + val + "; queue size is " + queue.size()); } } } } class ConcurrentQueueUtil { public static void demonstrate() { SharedObjectCon sharedObjectCon = new SharedObjectCon(); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { sharedObjectCon.produce(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { sharedObjectCon.consume(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); t1.start(); t2.start(); try { t1.join(); t2.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }

运行结果:

Value taken is 67; queue size is 0 Value taken is 19; queue size is 0 Value taken is 79; queue size is 2 Value taken is 76; queue size is 8 Value taken is 16; queue size is 8 Value taken is 18; queue size is 9 Value taken is 89; queue size is 8 Value taken is 27; queue size is 10 Value taken is 41; queue size is 9 Value taken is 40; queue size is 9 Value taken is 23; queue size is 9 Value taken is 82; queue size is 9

3.Why not common containers like List?

List不是线程安全的。

相关问题参见疑问。 What does it mean array list is synchronized in java? It means it is thread-safe.

Vectors are synchronized. Any method that touches the Vector's contents is thread safe. ArrayList, on the other hand, is unsynchronized, making them, therefore, not thread safe.

相关代码实现,点击 这里。

转载请注明原文地址: https://www.6miu.com/read-26884.html

最新回复(0)