生产者消费者问题是多线程中一个典型的案例. 其主要逻辑是 生产者生产产品, 将将生产的产品放入到产品队列中供消费者队列. 消费者再从产品队列中取得产品进行消费.
需要注意的是:
1) 当产品队列中产品的数量达到饱和状态, 生产者停止生产进入等待状态, 等待消费者从队列中取走产品时唤醒生产者继续生产.
2) 当产品队列中产品的数量不足时, 消费者停止消费进入等待状态, 等待生产者再往队列中存入产品时唤醒消费者继续消费.
下面是具体代码
主类MainProcesser:
import java.io.DataInputStream;import java.io.IOException;import com.fji.consumer.Consumer;import com.fji.producer.Producer;import com.fji.util.ProductQueue;/** * 主类 * @author mOnsoOn * */public class MainProcesser { private Producer pro; private Consumer con; public MainProcesser(Producer pro, Consumer con) { this.pro = pro; this.con = con; } public static void main(String[] args) { ProductQueue q = new ProductQueue(); Producer p = new Producer(q); Consumer c = new Consumer(q); MainProcesser process = new MainProcesser(p,c); process.startThread(); process.receiveKeyboard(); } private void startThread() { pro.start(); con.start(); } private void addConsumerSpeed() { pro.slowProduceSpeed(); con.addConsumerSpeed(); } private void addProducerSpeed() { pro.addProduceSpeed(); con.slowConsumerSpeed(); } /** * 可以通过键盘事件对生产者消费者执行速度进行控制 * quit: 系统退出 * 1: 增加生产速度 * 2: 增加消费速度 */ private void receiveKeyboard() { for(;;) { System.out.println("接收键盘事件(quit:退出 1:增加生产速度 2:增加消费速度)"); DataInputStream dis = new DataInputStream(System.in); try { String event = dis.readLine(); System.out.println("接收到键盘事件: " + event); if(event.equalsIgnoreCase("quit")) { System.out.println("系统退出"); System.exit(0); } else if(event.equalsIgnoreCase("1")) { addProducerSpeed(); } else if(event.equalsIgnoreCase("2")) { addConsumerSpeed(); } } catch (IOException e) { e.printStackTrace(); } try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } }}
生产者Producer :
package com.fji.producer;import com.fji.util.Product;import com.fji.util.ProductQueue;/** * 生产者类 * @author mOnsoOn * */public class Producer extends Thread { private ProductQueue queue; public static final int CAPACITY = 20; private int produce_speed = 3000; public Producer(ProductQueue queue) { this.queue = queue; } @Override public void run() { System.out.println("<生产者> 生产者开始生产产品"); for(;;) { //生产者生成产品以后放入到队列中供消费者使用 Product p = this.createProduct(); queue.put(p); System.out.println("<生产者> 生产者新生产了一个产品: " + p); try { System.out.println("<生产者> 生产者休息" + produce_speed + " 毫秒"); Thread.sleep(produce_speed); } catch (InterruptedException e) { e.printStackTrace(); } } } public void addProduceSpeed() { System.out.println("<生产者> 加速"); this.produce_speed = this.produce_speed - 100; } public void slowProduceSpeed() { System.out.println("<生产者> 减速"); this.produce_speed = this.produce_speed + 100; } private Product createProduct() { Product p = new Product("产品"); return p; }}
消费者Consumer :
package com.fji.consumer;import com.fji.util.Product;import com.fji.util.ProductQueue;/** * 消费者 * @author mOnsoOn * */public class Consumer extends Thread { private ProductQueue queue; public static final int CAPACITY = 5; private int consume_speed = 3000; public Consumer(ProductQueue queue) { this.queue = queue; } @Override public void run() { System.out.println("<消费者>开始消费"); for(;;) { //消费者从队列中取产品 Product p = queue.get(); System.out.println("<消费者> 消费者消费了一个产品" + p.toString()); try { System.out.println("<消费者> 消费者休息" + consume_speed + " 毫秒"); Thread.sleep(consume_speed); } catch (InterruptedException e) { e.printStackTrace(); } } } public void addConsumerSpeed() { System.out.println("<消费者>加速"); this.consume_speed = this.consume_speed - 100; } public void slowConsumerSpeed() { System.out.println("<消费者>减速"); this.consume_speed = this.consume_speed + 100; }}
产品队列ProductQueue :
package com.fji.util;import java.util.concurrent.ConcurrentLinkedQueue;import com.fji.consumer.*;import com.fji.producer.*;/** * 产品队列 * @author mOnsoOn * */public class ProductQueue { private ConcurrentLinkedQueue<Product> queue; public ProductQueue() { queue = new ConcurrentLinkedQueue<Product>(); } /** * 生产者将产品放入到队列中, 当队列中产品已满时, 生产者进入等待状态 * @param product 产品 * @return */ public synchronized boolean put(Product product) { if(this.size() >= Producer.CAPACITY ) { System.out.println("<生产者> 产品队列已满, 生产者暂停生产"); try { //当队列中产品达到一定数量以后, 生产者暂停, 等待消费者唤醒 wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("<生产者> 生产者继续生产产品"); } //生产者继续生产同时, 唤醒消费者继续消费 notifyAll(); return queue.offer(product); } /** * 消费者从队列中取得产品, 当队列中产品不足时,消费者进入等待状态 * @return */ public synchronized Product get() { if(this.size() <= Consumer.CAPACITY) { System.out.println("<消费者>产品池中产品数量不足, 消费者暂停消费"); try { //队列中产品数量不足, 消费者暂停,等待生产者唤醒 wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("<消费者>消费者继续消费"); } //消费者继续消费同时, 唤醒生产者继续生产 notifyAll(); return queue.poll(); } /** * 返回队列中产品数量 * @return */ private synchronized int size() { return queue.size(); }}
产品Product :
package com.fji.util;public class Product { private String name; private int id; private static int sid=0; public Product(String name) { this.name = name; id = Product.sid++; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getId() { return id; } @Override public String toString() { return "Product:(name=" + name + ", id=" + id + ")"; }}