2017年8月6号:java并发包
可重入、可中断、可限时、公平锁 死锁会产生无限时等待。
基本写法:
package testReentrantLock; import java.util.concurrent.locks.ReentrantLock; public class ReenTerLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock(); public static int i = 0; @Override public void run() { for(int j=0;j<10000;j++){ lock.lock(); try { i++; } finally{ lock.unlock();//释放锁,要 放到finally里执行 } } } public static void main(String[] args) throws InterruptedException { ReenTerLock t1 = new ReenTerLock(); Thread thread1 = new Thread(t1); Thread thread2 = new Thread(t1); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(i); }}
单线程可以重复进入,但要重复退出。 重入,多加了一个锁,但没有多加一个释放锁, package testReentrantLock;
import java.util.concurrent.locks.ReentrantLock;
public class ReenTerLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock(); public static int i = 0; @Override public void run() { for(int j=0;j<10000;j++){ lock.lock(); lock.lock(); try { i++; } finally{ lock.unlock();//释放锁,要 放到finally里执行 } } } public static void main(String[] args) throws InterruptedException { ReenTerLock t1 = new ReenTerLock(); Thread thread1 = new Thread(t1); Thread thread2 = new Thread(t1); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(i); }} 运行,控制台一直没有输出,检查线程
可以看出线程Thread-0在等待waiting 线程停在了ReenTerLock的11行处
没有释放,其它线程不能进入,线程在等待释放,所以重入要重复退出,加一个释放锁。 package testReentrantLock;
import java.util.concurrent.locks.ReentrantLock;
public class ReenTerLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock(); public static int i = 0; @Override public void run() { for(int j=0;j<10000;j++){ lock.lock(); lock.lock(); try { i++; } finally{ lock.unlock();//释放锁,要 放到finally里执行 lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { ReenTerLock t1 = new ReenTerLock(); Thread thread1 = new Thread(t1); Thread thread2 = new Thread(t1); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(i); }} 即运行后控制台可以输出。
package testReentrantLock;
import java.util.concurrent.locks.ReentrantLock; import deadlock.DeadlockChecker;
public class ReenterLockInt implements Runnable {
public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock;//实例变量 /** * 控制加锁顺序,方便构造死锁 * @param lock */ public ReenterLockInt(int lock) { this.lock = lock; } @Override public void run() { try { if(lock==1){ lock1.lockInterruptibly();//加锁(是可中断 的锁) try { Thread.sleep(500); } catch (InterruptedException e) {} lock2.lockInterruptibly(); }else{ lock2.lockInterruptibly();//加锁(是可中断 的锁) try { Thread.sleep(500); } catch (InterruptedException e) {} lock1.lockInterruptibly(); } } catch (InterruptedException e) { e.printStackTrace(); }finally{ if(lock1.isHeldByCurrentThread()){ lock1.unlock(); } if(lock2.isHeldByCurrentThread()){ lock2.unlock(); } System.out.println(Thread.currentThread().getId()+":线程退出"); } } 运行出现死锁现象,中断线程: public static void main(String[] args) throws InterruptedException { ReenterLockInt r1 = new ReenterLockInt(1); ReenterLockInt r2 = new ReenterLockInt(2); Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); Thread.sleep(1000); //死锁检查,中断其中一个线程 DeadlockChecker.check(); }} 死锁检查,DeadlockChecker类: import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean;
public class DeadlockChecker {
private final static ThreadMXBean mbean = ManagementFactory.getThreadMXBean(); final static Runnable deadlockCheck = new Runnable(){ @Override public void run() { while(true){ long[] deadlockedThreadIds = mbean.findDeadlockedThreads(); if(deadlockedThreadIds!= null){ ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds); for(Thread t :Thread.getAllStackTraces().keySet()){ for(int i=0;i<threadInfos.length;i++){ //如果检查到了死锁 if(t.getId() == threadInfos[i].getThreadId()){ t.interrupt();//中断线程 } } } } try { Thread.sleep(5000); } catch (InterruptedException e) { } } } }; //死锁检查 public static void check(){ Thread t = new Thread(deadlockCheck); t.setDaemon(true);//设置为守护线程 t.start(); }} 1.1.3可限时tryLock()
package testReentrantLock;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock;
public class TimeLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock(); @Override public void run() { try { if(lock.tryLock(5,TimeUnit.SECONDS)){//5秒拿不到锁,就会返回释放锁,SECONDS是单位秒,可以改成其它的毫秒分钟等 Thread.sleep(6000);//睡了6秒 }else{//5秒到了没拿到锁,输出提醒 System.out.println("get lock failed"); } } catch (InterruptedException e) { e.printStackTrace(); }finally{//拿到锁后释放锁 if(lock.isHeldByCurrentThread()){ lock.unlock(); } } } public static void main(String[] args) { TimeLock t = new TimeLock(); Thread t1 = new Thread(t); Thread t2 = new Thread(t); t1.start(); t2.start(); }} 运行,现象:5秒后输出:get lock failed
类似于Object类的wait()和notify()方法与ReentrantLock()结合使用
Object.wait():导致当前线程等待另一个线程调用 notify()方法或 notifyAll()这个对象的方法。 Object.notify():唤醒一个线程正在等待这个对象的监视器。 Condition的api: await() :导致当前线程等待,直到信号或 interrupted await(long time, TimeUnit unit) :导致当前线程等待信号或中断,或指定的等待时间流逝。 awaitNanos(long nanosTimeout) :导致当前线程等待信号或中断,或指定的等待时间流逝。 awaitUninterruptibly() :导致当前线程等待信号。 awaitUntil(Date deadline):导致当前线程等待信号或中断,或指定的期限过后。 signal() :一个等待线程醒来。 signalAll() :所有等待线程醒来。 常用方法: await() :会使当前线程等待,同时释放当前锁,当其它线程使用signal()或signalAll()时,线程会重新获得锁或继续执行,或者当线程中断时,也能跳出等待,这和Object.wait方法很类似。 awaitUninterruptibly() 和 await() 方法基本相同,但是它并不会在等待过程中中断。 signal() 方法用于唤醒一个在等待中的线程,相对的signalAll() 会唤醒所有在等待中的线程。这和Object的notify()方法很类似 例: package testReentrantLock;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
public class ReenterLockCondition implements Runnable {
public static ReentrantLock lock = new ReentrantLock(); public static Condition con = lock.newCondition(); @Override public void run() { try { lock.lock(); con.await(); System.out.println("Thread is going on"); } catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock(); } } public static void main(String[] args) throws InterruptedException { ReenterLockCondition t = new ReenterLockCondition(); Thread t1 = new Thread(t); t1.start(); Thread.sleep(2000); //通知线程t1继续执行 lock.lock(); con.signal(); lock.unlock(); }} 运行,现象:两秒后输出Thread is going on
共享锁,运行多个线程同时进入临界区 常用方法: acquire():获得许可从这个信号量,阻塞,直到一个可用,或 interrupted线程。 acquireUninterruptibly() :获得许可从这个信号量,阻塞,直到一个是可用的。 tryAcquire() :获得许可从这个信号量,只有一个可用的时候调用。 tryAcquire(long timeout, TimeUnit unit) :获得许可从这个信号量,如果一个可用在给定的等待时间和当前线程没有 interrupted. release() :发布许可证,返回信号量。
例:package testReentrantLock;
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;
public class SemapDemo implements Runnable {
final Semaphore semap = new Semaphore(5);//允许5个线程 @Override public void run() { try { semap.acquire(); //模拟耗时操作 Thread.sleep(2000); System.out.println(Thread.currentThread().getId()+"done!"); } catch (InterruptedException e) { e.printStackTrace(); }finally{ semap.release(5);//释放5个线程,现象输出5个等两秒又输出5个 } } public static void main(String[] args) { ExecutorService execu = Executors.newFixedThreadPool(20); final SemapDemo demo = new SemapDemo(); for(int i=0;i<20;i++){//20个线程 execu.submit(demo);//线程提交 } }}
读、读不互斥,读、读之间不阻塞 读、写互斥,读阻塞写,写也会阻塞读 写、写互斥,写、写之间阻塞
主要接口 private static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private static Lock readLock = rwl .readLock(); private static Lock writeLock = rwl .writeLocke();
方法: readLock() :返回锁用于阅读。 writeLock() :返回锁用于写作。
等待所有检查线程完工后,再执行。 final static CountDownLatch end = new CountDownLatch(20); end.cuntDown(); end.write(); 方法: await() :导致当前线程等待锁数降至零,除非 interrupted线程。 await(long timeout, TimeUnit unit) :导致当前线程等待锁数降至零,除非 interrupted线程,或指定的等待时间流逝。 countDown() :计数的精神性的门闩,释放所有等待线程计数为0。 getCount() :返回当前计数。 toString() :返回一个字符串识别这个门闩,以及它的状态。 例: package testReentrantLock;
import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class CountDownLockDemo implements Runnable {
static final CountDownLatch end = new CountDownLatch(10); static final CountDownLockDemo demo = new CountDownLockDemo(); @Override public void run() { try { //模拟检查任务 Thread.sleep(new Random().nextInt(10)*1000); System.out.println("check complete"); end.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newFixedThreadPool(10); for(int i=0;i<10;i++){ exec.submit(demo); } //等待检查 end.await(); //发射火箭 System.out.println("fire"); exec.shutdown(); }}
构造函数: CyclicBarrier(int parties) :创建一个新 CyclicBarrier旅行当给定数量的政党(线程)等,并且不执行一个预定义的动作障碍时绊倒。 CyclicBarrier(int parties, Runnable barrierAction) 创建一个新 CyclicBarrier旅行当给定数量的政党(线程)等,并将执行给定的屏障作用的障碍绊倒时,由最后一个线程进入障碍。 barrierAction是一次计数完成后,要执行的动作
方法: await() :等待,直到所有 parties await调用这一障碍。 await(long timeout, TimeUnit unit) :等待,直到所有 parties await调用这一障碍,或指定的等待时间流逝。 getNumberWaiting() :返回的数量目前等候的屏障。 getParties() :返回所需要的方数旅行这一障碍。 isBroken() :如果这个屏障破裂状态查询。 reset() :重置初始状态的障碍。 例: package testReentrantLock;
import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static class Soldier implements Runnable{ private String soldier; private final CyclicBarrier cyclic; Soldier(CyclicBarrier cyclic,String soldier) { this.soldier = soldier; this.cyclic = cyclic; } @Override public void run() { try { //等待所有士兵到齐 cyclic.await(); doWork(); //等待所有士兵完成任务 cyclic.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e){ e.printStackTrace(); } } //模拟完成任务 void doWork(){ try { Thread.sleep(new Random().nextInt()00); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(soldier+"任务完成"); } } public static class BarrierRun implements Runnable{ boolean flag; int N; public BarrierRun(boolean flag, int n) { this.flag = flag; N = n; } @Override public void run() { if(flag){ System.out.println("司令:[士兵+"+N+"个,完成任务!]"); }else{ System.out.println("司令:[士兵+"+N+"个,集合完毕!]"); flag = true; } } } public static void main(String[] args) { final int N = 10; Thread[] allSoldier = new Thread[N]; boolean flag = false; CyclicBarrier cyclic = new CyclicBarrier(N,new BarrierRun(flag,N)); //设置屏障点,主要是为了执行这个方法 for(int i=0;i<N;++i){ System.out.println("士兵"+i+"报道"); allSoldier[i] = new Thread(new Soldier(cyclic,"士兵" + i)); allSoldier[i].start(); //中断第五个,输出现象报异常,第5个报InterruptedException,其它的报BrokenBarrierException if(i==5){ allSoldier[0].interrupt(); } } }}
基本线程阻塞同步原语创建锁和其他类。 能响应中断,但不抛出异常。中断响应结果是,park函数的返回,可以从Thread.interrupted()得到中断标志。
方法: getBlocker(Thread t):返回拦截器对象提供给最近的一个公园方法的调用,尚未畅通,或null如果没有屏蔽。 park() :禁用当前线程的线程调度的目的,除非允许是可用的。 park(Object blocker) :禁用当前线程的线程调度的目的,除非允许是可用的。 parkNanos(long nanos) :禁用当前线程的线程调度的目的,指定的等待时间,除非许可证是可用的。 parkNanos(Object blocker, long nanos) :禁用当前线程的线程调度的目的,指定的等待时间,除非许可证是可用的。 parkUntil(long deadline) :禁用当前线程的线程调度的目的,直到指定的最后期限,除非许可证是可用的。 parkUntil(Object blocker, long deadline) :禁用当前线程的线程调度的目的,直到指定的最后期限,除非许可证是可用的。 unpark(Thread thread) :使给定线程可用的许可证,如果不是已经可用。 例: package testReentrantLock;
import java.util.concurrent.locks.LockSupport;
public class LockSupportDemo {
public static Object o = new Object(); static ChangeObjectThread t1 = new ChangeObjectThread("t1"); static ChangeObjectThread t2 = new ChangeObjectThread("t2"); public static class ChangeObjectThread extends Thread{ public ChangeObjectThread(String name){ this.setName(name); } @Override public void run() { synchronized (o) { System.out.println("in "+getName()); LockSupport.park(); } } } public static void main(String[] args) throws InterruptedException { t1.start(); Thread.sleep(100); t2.start(); LockSupport.unpark(t1); LockSupport.unpark(t2); t1.join(); t2.join(); }} 输出: