第3章 线程间通信
标签: Java多线程编程
《Java多线程编程核心技术》 个人笔记
第3章 线程间通信
等待通知机制
不使用等待通知机制实现线程间通信什么是等待通知机制等待通知机制的实现方法wait锁释放与notify锁不释放当interrupt方法遇到wait方法只通知一个线程唤醒所有线程方法waitlong的使用通知过早等待wait的条件发生变化生产者消费者模式实现多生产与多消费操作值一生产与多消费操作栈解决wait条件改变与假死多生产与一消费操作栈多生产与多消费操作栈通过管道进行线程间通信字节流通过管道进行线程间通信字符流实战等待通知之交叉备份 方法join的使用
学习方法join前的铺垫用join方法类解决方法join与异常方法joinlong的使用方法joinlong和sleeplong的区别方法join后面的代码提前运行解释意外 类ThreadLocal的使用
方法get与null验证线程变量的隔离性解决get返回Null问题再次验证线程变量的隔离性类InhreitableThreadLocal的使用值继承再修改
本章需要着重掌握的重点: 1. 使用wait/notify实现线程间通信 2. 生产者/消费者模式的实现 3. 方法join的使用 4. ThreadLocal类的使用
等待/通知机制
不使用等待/通知机制实现线程间通信
什么是等待/通知机制
等待/通知机制的实现
方法wait()的作用是使当前执行代码的线程进行等待,wait()方法是Object类的方法,该方法用来将当前线程置入“预执行队列”中,并且在wait()所在的代码行处停止执行,直到接到通知或被中断为止。在调用wait()之前,线程必须获得该对象的对象级别锁,即只能在同步方法或同步块中调用wait()方法。在执行wait()方法后,当前线程释放锁 。在从wait()返回前,线程与其他线程竞争重新获得锁。
如果调用wait()时没有持有适当的锁,则抛出IllegalMonitorStateException,它是RunTimeException的一个子类,因此,不需要try catch语句进行捕捉异常
方法notify()也要在同步方法或同步块中调用,即在调用前,线程也必须获得该对象的对象级别锁。如果调用notify()时没有适当的锁,也会抛出IllegalMonitorStateException。
该方法用来通知那些可能在等待该对象的对象锁的其他线程,如果有多个线程等待,则由线程规划器随机挑选其中一个wait状态的线程,对其发出通知notify,并使它等待获取该对象的对象锁。需要说明的是,在执行notify()方法后,当前线程不会马上释放该对象锁,呈wait状态的线程也不能马上获取该对象锁,要等到执行notify()方法的线程将程序执行完,也就是退出synchronized代码块后,当前线程才会释放,而呈wait状态所在的线程才可以获得该对象锁。用一句话总结:wait使线程停止运行,而notify使停止的线程继续运行notifyAll()方法可以使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。
方法wait()锁释放与notify()锁不释放
当interrupt()方法遇到wait()方法
当线程呈wait状态时,调用线程对象 的interrupt()方法会出现InterruptedException异常
public class Service {
public void testMethod(Object lock) {
try {
synchronized (lock) {
System.out.println(
"begin wait()");
lock.wait();
System.out.println(
" end wait()");
}
}
catch (InterruptedException e) {
e.printStackTrace();
System.out.println(
"出现异常了,因为呈wait状态的线程被interrupt了!");
}
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
Service service =
new Service();
service.testMethod(lock);
}
}
public class Test {
public static void main(String[] args) {
try {
Object lock =
new Object();
ThreadA a =
new ThreadA(lock);
a.start();
Thread.sleep(
5000);
a.interrupt();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
begin wait()
java.lang.InterruptedException
出现异常了,因为呈wait状态的线程被interrupt了!
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Unknown Source)
at service.Service.testMethod(Service.java:
9)
at extthread.ThreadA.run(ThreadA.java:
17)
结论:
执行完同步代码块就会释放对象的锁在执行同步代码块的过程中,遇到异常而导致线程终止,锁也会释放在执行同步代码块的过程中,执行了锁所属对象的wait()方法,这个线程会释放对象锁,而此线程对象会进入线程等待池中,等待被唤醒
只通知一个线程
调用notify()一次只随机通知一个线程
唤醒所有线程
notifyAll()唤醒全部线程
方法wait(long)的使用
带参数的wait(long)方法的功能是等待某一时间内是否有线程对锁进行唤醒,如果超过这个时间则自动唤醒
通知过早
如果通知过早,则会打乱 程序正常的运行逻辑如果先通知了,则wait()方法也没有必要执行了,可以更改代码:
public class MyRun {
private String
lock =
new String(
"");
private boolean isFirstRunB =
false;
private Runnable runnableA =
new Runnable() {
@Override
public void run() {
try {
synchronized (
lock) {
while (isFirstRunB ==
false) {
System.
out.println(
"begin wait");
lock.wait();
System.
out.println(
"end wait");
}
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
};
private Runnable runnableB =
new Runnable() {
@Override
public void run() {
synchronized (
lock) {
System.
out.println(
"begin notify");
lock.notify();
System.
out.println(
"end notify");
isFirstRunB =
true;
}
}
};
public static void main(String[] args) throws InterruptedException {
MyRun run =
new MyRun();
Thread a =
new Thread(run.runnableA);
a.start();
Thread.sleep(
100);
Thread b =
new Thread(run.runnableB);
b.start();
}
}
begin wait
begin notify
end notify
end wait
public static void main(String[] args) throws InterruptedException {
MyRun run =
new MyRun();
Thread b =
new Thread(run.runnableB);
b.start();
Thread.sleep(
100);
Thread a =
new Thread(run.runnableA);
a.start();
}
begin notify
end notify
等待wait的条件发生变化
在使用wait/notify模式时,还需要注意另外一种情况,也就是wait等待的条件发生了变化,也容易造成程序逻辑的混乱
public void subtract() {
try {
synchronized (lock) {
while (ValueObject
.list.size() ==
0) { //用while循环,而不是if判断
System
.out.println(
"wait begin ThreadName="
+ Thread
.currentThread()
.getName())
lock
.wait()
System
.out.println(
"wait end ThreadName="
+ Thread
.currentThread()
.getName())
}
ValueObject
.list.remove(
0)
System
.out.println(
"list size=" + ValueObject
.list.size())
}
} catch (InterruptedException e) {
e
.printStackTrace()
}
}
用while循环,而不是if判断,当多个线程执行这段代码时,当一个被唤醒,会继续循环判断当前list.size()是否为0,若为0则继续等待,防止list被其他线程清空了导致list.remove(0)报异常
生产者/消费者模式实现
一生产者与一消费者多生产与多消费:操作值假死
‘假死’的现象其实就是线程进入WAITING等待状态。如果全部线程都进入WAITING状态,则程序就不再执行任务业务功能了,整个项目呈停止状态。这在生产者与消费者模式时经常遇到。假死的原因:notify()方法是随机唤醒的,当连续唤醒了同类时,譬如:有两个生产者,两个消费者,当第一个生产者唤醒另一个生产者,则这时仍然有生产未被消费,所以两个生产者都进入wait状态,接着,第一个消费者消费了生产的东西,唤醒另一个消费者,进入wait状态,此时被唤醒的消费者又没有生产可以消费,进入wait状态,全部线程都处于wait状态
public class ValueObject {
public static String value =
"";
}
public class P {
private String lock;
public P(String lock) {
super();
this.lock = lock;
}
public void setValue() {
try {
synchronized (lock) {
while (!ValueObject.value.equals(
"")) {
System.out.println(
"生产者 "
+ Thread.currentThread().getName() +
" WAITING了★");
lock.wait();
}
System.out.println(
"生产者 " + Thread.currentThread().getName()
+
" RUNNABLE了");
String value = System.currentTimeMillis() +
"_"
+ System.nanoTime();
ValueObject.value = value;
lock.notify();
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class C {
private String lock;
public C(String lock) {
super();
this.lock = lock;
}
public void getValue() {
try {
synchronized (lock) {
while (ValueObject.value.equals(
"")) {
System.out.println(
"消费者 "
+ Thread.currentThread().getName() +
" WAITING了☆");
lock.wait();
}
System.out.println(
"消费者 " + Thread.currentThread().getName()
+
" RUNNABLE了");
ValueObject.value =
"";
lock.notify();
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadP extends Thread {
private P p;
public ThreadP(P p) {
super();
this.p = p;
}
@Override
public void run() {
while (
true) {
p.setValue();
}
}
}
public class ThreadC extends Thread {
private C r;
public ThreadC(C r) {
super();
this.r = r;
}
@Override
public void run() {
while (
true) {
r.getValue();
}
}
}
public class Run {
public static void main(String[] args)
throws InterruptedException {
String lock =
new String(
"");
P p =
new P(lock);
C r =
new C(lock);
ThreadP[] pThread =
new ThreadP[
2];
ThreadC[] rThread =
new ThreadC[
2];
for (
int i =
0; i <
2; i++) {
pThread[i] =
new ThreadP(p);
pThread[i].setName(
"生产者" + (i +
1));
rThread[i] =
new ThreadC(r);
rThread[i].setName(
"消费者" + (i +
1));
pThread[i].start();
rThread[i].start();
}
Thread.sleep(
5000);
Thread[] threadArray =
new Thread[Thread.currentThread()
.getThreadGroup().activeCount()];
Thread.currentThread().getThreadGroup().enumerate(threadArray);
for (
int i =
0; i < threadArray.length; i++) {
System.out.println(threadArray[i].getName() +
" "
+ threadArray[i].getState());
}
}
}
多生产与多消费:操作值
解决“假死”很简单,将生产者和消费者的notify()改成notifyAll(),即不光通知同类线程,也通知异类线程。
一生产与多消费————操作栈:解决wait条件改变与假死
多生产与一消费————操作栈
多生产与多消费————操作栈
通过管道进行线程间通信:字节流
管道流是一种特殊的流,用于在不同线程间直接传送数据。一个线程发送数据到输出管道,另一个线程从输入管道中读取数据。通过使用管道,实现不同线程间的通信,而无需借助类似临时文件安之类的东西。JDK中提供了4各类来使线程间可以进行通信:
PipedInputStream和PipedOutputStreamPipedReader和PipedWriter
public class ReadData {
public void readMethod(PipedInputStream input) {
try {
System.out.println(
"read :");
byte[] byteArray =
new byte[
20];
int readLength = input.read(byteArray);
while (readLength != -
1) {
String newData =
new String(byteArray,
0, readLength);
System.out.print(newData);
readLength = input.read(byteArray);
}
System.out.println();
input.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
public class WriteData {
public void writeMethod(PipedOutputStream out) {
try {
System.out.println(
"write :");
for (
int i =
0; i <
30; i++) {
String outData =
" " + (i +
1);
out.write(outData.getBytes());
System.out.print(outData);
}
System.out.println();
out.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
public class ThreadRead extends Thread {
private ReadData read;
private PipedInputStream input;
public ThreadRead(ReadData read, PipedInputStream input) {
super();
this.read = read;
this.input = input;
}
@Override
public void run() {
read.readMethod(input);
}
}
public class ThreadWrite extends Thread {
private WriteData write;
private PipedOutputStream out;
public ThreadWrite(WriteData write, PipedOutputStream out) {
super();
this.write = write;
this.out = out;
}
@Override
public void run() {
write.writeMethod(out);
}
}
public class Run {
public static void main(String[] args) {
try {
WriteData writeData =
new WriteData();
ReadData readData =
new ReadData();
PipedInputStream inputStream =
new PipedInputStream();
PipedOutputStream outputStream =
new PipedOutputStream();
outputStream.connect(inputStream);
ThreadRead threadRead =
new ThreadRead(readData, inputStream);
threadRead.start();
Thread.sleep(
2000);
ThreadWrite threadWrite =
new ThreadWrite(writeData, outputStream);
threadWrite.start();
}
catch (IOException e) {
e.printStackTrace();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
read :
write :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
通过管道进行线程间通信:字符流
与字节流一样,只是把字节流管道类换成字符流管道类
public class ReadData {
public void readMethod(PipedReader input) {
try {
System.
out.println(
"read :");
char[] byteArray =
new char[
20];
int readLength = input.read(byteArray);
while (readLength != -
1) {
String newData =
new String(byteArray,
0, readLength);
System.
out.print(newData);
readLength = input.read(byteArray);
}
System.
out.println();
input.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
public class WriteData {
public void writeMethod(PipedWriter
out) {
try {
System.
out.println(
"write :");
for (
int i =
0; i <
30; i++) {
String outData =
" " + (i +
1);
out.write(outData);
System.
out.print(outData);
}
System.
out.println();
out.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
}
实战:等待/通知之交叉备份
本节的目的是要继续学习等待/通知相关知识点,创建20个线程,其中10个线程是将数据备份到A数据库中,另外10个线程将数据备份到B数据库中,并且备份A数据库和B数据库是交叉进行的
public class DBTools {
volatile private boolean prevIsA =
false;
synchronized
public void backupA() {
try {
while (prevIsA ==
true) {
wait();
}
for (
int i =
0; i <
5; i++) {
System.
out.println(
"AAAAA");
}
prevIsA =
true;
notifyAll();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized
public void backupB() {
try {
while (prevIsA ==
false) {
wait();
}
for (
int i =
0; i <
5; i++) {
System.
out.println(
"BBBBB");
}
prevIsA =
false;
notifyAll();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
DBTools dbtools =
new DBTools();
for (
int i =
0; i <
5; i++) {
BackupB output =
new BackupB(dbtools);
output.start();
BackupA input =
new BackupA(dbtools);
input.start();
}
}
方法join的使用
学习方法join前的铺垫
用join()方法类解决
方法join()的作用是使所属的线程对象x正常执行run()方法中的任务,而使当前线程z进行无限期的阻塞,等待线程x销毁后再继续执行线程z后面的代码方法join()具有使线程排队运行的作用,有些类似同步的运行效果。join与synchronized的区别是:join在内部使用wait()方法进行等待,而synchronized使用的是“对象监视器”原理作为同步
public class MyThread extends Thread {
@Override
public void run() {
try {
int secondValue = (
int) (Math.random() *
10000);
System.out.println(secondValue);
Thread.sleep(secondValue);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
try {
MyThread threadTest =
new MyThread();
threadTest.start();
threadTest.join();
System.out.println(
"我想当threadTest对象执行完后再执行!");
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2524
我想当threadTest对象执行完后再执行!
方法join与异常
在join过程中,如果当前线程对象被中断,则当前线程出现异常,但子线程仍然正常运行
方法join(long)的使用
方法join(long)中的参数是设定等待的时间
方法join(long)和sleep(long)的区别
方法join(long)的功能在内部是使用wait(long)方法来实现的,所以join(long)方法具有释放锁的特点。当执行完wait(long)方法后,当前线程的锁被释放,那么其他线程就可以调用此线程中的同步方法了。Thread.sleep(long)方法却不释放锁。
方法join()后面的代码提前运行:解释意外
类ThreadLocal的使用
类ThreadLocal主要解决的就是每个线程绑定自己的值,可以将ThreadLocal类比喻呈全局存放数据的盒子,盒子中可以存储每个线程的私有数据
方法get()与null
验证线程变量的隔离性
public class Tools {
public static ThreadLocal tl =
new ThreadLocal();
}
public class ThreadA extends Thread {
@Override
public void run() {
try {
for (
int i =
0; i <
100; i++) {
if (Tools.tl.get() ==
null) {
Tools.tl.set(
"ThreadA" + (i +
1));
}
else {
System.out.println(
"ThreadA get Value=" + Tools.tl.get());
}
Thread.sleep(
200);
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
@Override
public void run() {
try {
for (
int i =
0; i <
100; i++) {
if (Tools.tl.get() ==
null) {
Tools.tl.set(
"ThreadB" + (i +
1));
}
else {
System.out.println(
"ThreadB get Value=" + Tools.tl.get());
}
Thread.sleep(
200);
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Run {
public static void main(String[] args) {
try {
ThreadA a =
new ThreadA();
ThreadB b =
new ThreadB();
a.start();
b.start();
for (
int i =
0; i <
100; i++) {
if (Tools.tl.get() ==
null) {
Tools.tl.set(
"Main" + (i +
1));
}
else {
System.out.println(
"Main get Value=" + Tools.tl.get());
}
Thread.sleep(
200);
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
虽然3个线程都向tl对象中set()数据值,但每个线程还是能取出自己的数据
解决get()返回Null问题
默认值
public class ThreadLocalExt extends ThreadLocal {
@Override
protected Object
initialValue() {
return "我是默认是,第一次 get 不再为 null";
}
}
再次验证线程变量的隔离性
public class ThreadLocalExt extends ThreadLocal {
@Override
protected Object
initialValue() {
return new Date().getTime();
}
}
子线程和父线程各自有各自的值
类InhreitableThreadLocal的使用
使用类InhreitableThreadLocal可以子线程中取得父线程继承下来的值
值继承再修改
在使用InhreitableThreadLocal类需要注意一点的是,如果子线程在取得值的同时,主线程将InhreitableThreadLocal中的值进行修改,那么子线程取到的值还是旧值