线程间通信的方式主要有共享内存、信号、信号量、管道等。下面分别用示例代码对各种通信方式进行演示。
1.共享内存
class Test implements Runnable { static volatile boolean flag = true; @Override public void run() { // TODO Auto-generated method stub while (flag) ; System.out.println(Thread.currentThread().getName() + " is end"); } } public class SharadMem { public static void main(String[] args) { new Thread(new Test()).start(); try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } Test.flag = false; } }对于这个测试用例,flag标志用于判断结束线程的运行,通过共享内存达到通信的作用,这是个人理解的共享内存,不知道有没有偏差,如果有的话望大家指正。
2.信号
public class Signal { //http://blog.csdn.net/sunset108/article/details/38819529 static Monitor monitor = new Monitor(); //生产者 static class Producer implements Runnable { static int num = 1; @Override public void run() { // TODO Auto-generated method stub while (true) { try { monitor.insert(num); System.out.println("生产物品" + num); num++; Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } //消费者 static class Consumer implements Runnable { @Override public void run() { // TODO Auto-generated method stub while (true) { try { System.out.println("消费物品" + monitor.remove()); Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } //管程,只能有一个线程占用 static class Monitor { private final int capacity = 10; private int insertIndex = 0; //仓库中当前可以放置物品的位置 private int removeIndex = 0; //仓库中当前可以拿走物品的位置 private final Object[] items = new Object[capacity]; //仓库中的所有物品 int count = 0; //仓库中的现有物品数 //向仓库中放置物品 public synchronized void insert(Object item) throws InterruptedException { //当仓库已满时,挂起生产线程 if (count == capacity) { wait(); } items[insertIndex++] = item; if (insertIndex == capacity) { insertIndex = 0; } count++; //当仓库由空变为不空时,唤起消费线程 if (count == 1) { notify(); } } //从仓库中拿走物品 public synchronized Object remove() throws InterruptedException { //当仓库没有物品时,挂起消费线程 if (count == 0) { wait(); } Object item = items[removeIndex++]; if (removeIndex == capacity) { removeIndex = 0; } count--; //当仓库由满变为不满时,唤起生产线程 if (count == capacity - 1) { notify(); } return item; } } public static void main(String[] args) { new Thread(new Producer()).start(); new Thread(new Consumer()).start(); } }通过Object类的wait、notify方法实现线程间的通信,wait和notify方法只有在同步代码中才可以调用。
3.信号量
public class TestSemaphore { static WareHouse wareHouse = new WareHouse(); //生产者 static class Producer implements Runnable { static int num = 1; @Override public void run() { // TODO Auto-generated method stub while (true) { try { wareHouse.insert(num); System.out.println("生产物品" + num); num++; Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } //消费者 static class Consumer implements Runnable { @Override public void run() { // TODO Auto-generated method stub while (true) { try { System.out.println("消费物品" + wareHouse.remove()); Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } //仓库,可以放置和拿走物品 static class WareHouse { private final int capacity = 10; private final Semaphore full = new Semaphore(0); //仓库中被占用的槽的信号量 private final Semaphore empty = new Semaphore(capacity); //仓库中空的槽的信号量 private final Semaphore mutex = new Semaphore(1); //互斥信号量 private int insertIndex = 0; //仓库中当前可以放置物品的位置 private int removeIndex = 0; //仓库中当前可以拿走物品的位置 private final Object[] items = new Object[capacity]; //仓库中的所有物品 int count = 0; //仓库中的现有物品数 //向仓库中放置物品 public void insert(Object item) throws InterruptedException { empty.acquire(); mutex.acquire(); items[insertIndex++] = item; if (insertIndex == capacity) { System.out.println("--------------满了"); insertIndex = 0; } count++; mutex.release(); full.release(); } //从仓库中拿走物品 public Object remove() throws InterruptedException { full.acquire(); mutex.acquire(); Object item = items[removeIndex++]; if (removeIndex == capacity) { System.out.println("--------------取完了"); removeIndex = 0; } count--; mutex.release(); empty.release(); return item; } } public static void main(String[] args) { new Thread(new Producer()).start(); new Thread(new Consumer()).start(); } }通过信号量来解决消费者和生产者的问题。
4.管道
class ThreadRead extends Thread{ private PipedReader input; public ThreadRead(PipedReader input){ //super(); this.input = input; } @Override public void run(){ try{ System.out.println("read:"); char[] byteArray = new char[100]; int readLength = input.read(byteArray); while (readLength!=-1){ String newData = new String(byteArray,0,readLength); System.out.println("读取:"+newData); readLength = input.read(byteArray); } System.out.println(); input.close(); }catch (IOException e){ e.printStackTrace(); } } } class ThreadWrite extends Thread{ private PipedWriter out; public ThreadWrite(PipedWriter out){ //super(); this.out = out; } @Override public void run(){ try{ System.out.println("write:"); for(int i=0;i<300;i++){ String outData = ""+(i+1); out.write(outData); System.out.println("写入:"+outData); } System.out.println(); out.close(); }catch (IOException e){ e.printStackTrace(); } } } public class Pipe { public static void main(String[] args){ try{ PipedReader pipedReader = new PipedReader(); PipedWriter pipedWriter = new PipedWriter(); //inputStream.connect(outputStream); pipedWriter.connect(pipedReader); //将pipedWriter和pipeRead通过connect相连 ThreadRead threadRead = new ThreadRead(pipedReader); //启动读线程 threadRead.start(); Thread.sleep(500); ThreadWrite threadWrite = new ThreadWrite(pipedWriter); //启动写线程 threadWrite.start(); }catch (IOException e){ e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }通过建立两个对象PipedReader和PipedWriter,一个线程在一端写入数据,另外一个线程在另外一端读取。
