目录
synchronized
Volatile
信号量Semaphore
实现所有线程在等待某个事件的发生才去执行
CAS缺陷以及如何解决
AQS
死锁、活锁
线程池
线程池的种类
配置线程池大小
工作机制及原理
ThreadPoolExecutor类
ThreadLoad原理
LockSupport工具
Condition原理
Fork/Join框架的理解
分段锁(JUC)的原理、锁力度的减小思考
八种阻塞队列以及各个阻塞队列的特性
class TestSynchronized{ Object lock = null; public void method1(){ lock = new Object(); synchronized (lock){ System.out.println("----method1----"); } } public void method2(){ synchronized (this){ System.out.println("----method2----"); } } public synchronized void method3(){ System.out.println("----method3----"); } public static synchronized void method4(){ System.out.println("----method4----"); } }
类别
Synchronized
Lock
存在形式
JAVA关键字,JVM
是一个类
锁的释放
获取锁的线程执行完成同步中的代码线程发生异常在finally中必须释放锁,不然会造成死锁
获取锁
如果A线程获取了锁,那么B线程等待。如果A线程阻塞,那么B线程会一直等待
Lock有多种锁的获取方式,都可以尝试获得锁,线程可以不用一直等待
锁状态
无法判断
可判断
锁类型
悲观锁(可重入 不可中断 非公平)
乐观锁(可重入 可判断 可公平(两者皆可))
性能
少量同步
大量同步
偏向锁-》轻量级锁-》重量级锁(1.6后)
互斥锁ReentrantLock
volatile:用来修饰被不同线程访问和修改的变量
volatile关键字与JVM内存模型相关
Java语言是支持多线程的,为了解决线程并发的问题,在语言内部引入了同步块和volatile关键字机制
volatile关键字机制:
synchronized修饰方法和代码块,以实现同步
用volatile修饰的变量,线程在每次使用变量的时候,都会读取变量修改后的值。volatile经常被误用进行原子性的操作。但是这些操作并不是真正的原子性。在读取加载之后,如果变量发生了变化,线程工作内存中的值由于已加载,不会产生对应的变法。对于volatile修改的变量,JVM只是保证从内存加载到线程工作内存的值是最新的。
交互图:
public class VolatileTest { //public static int count = 0; //实际运算每次结果都不一样 public static volatile int count = 0; // public static void inc(){ //这里延迟1毫秒,使得结果明显 try { Thread.sleep(1); } catch (Exception e) { // TODO: handle exception } count ++; } public static void main(String[] args) { //同时启动1000个线程,去进行i++运算,看看实际结果 for (int i = 0; i < 1000; i++) { new Thread(new Runnable() { @Override public void run() { VolatileTest.inc(); } }).start(); } System.out.println("运行结果:"+count); } }
在多线程中,线程间传递信号的一种方式。
与互斥量的区别
互斥量用于线程的互斥,信号量用于线程的同步互斥量值只能为0/1,信号量值可以为非负整数。信号量可以实现多个同类资源的多线程互斥和同步。互斥量的加锁和解锁必须由同一线程分别对应使用,信号量可以由一个线程释放,另一个线程得到。class TestSemaphore{ Semaphore semaphore = new Semaphore(10);//控制10个共享资源的使用 public void use(){ try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } } public void release(){ semaphore.release(); } }
1.闭锁CountDownLatch
闭锁是典型的等待事件发生的同步工具类,将闭锁的初始值设置1,所有线程调用await方法等待,当事件发生时调用countDown将闭锁值减为0,则所有await等待闭锁的线程得以继续执行。
class TestCountDownLatch{ public static void main(String[] args){ int num = 2; final CountDownLatch latch = new CountDownLatch(num); for(;num> 0;num--){ new Thread(){ public void run() { try { System.out.println("子线程"+Thread.currentThread().getName()+"正在执行"); Thread.sleep(3000); System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"); latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start(); } try { System.out.println("等待2个子线程执行完毕..."); latch.await(); System.out.println("2个子线程已经执行完毕"); System.out.println("继续执行主线程"); } catch (InterruptedException e) { e.printStackTrace(); } } }
2.阻塞队列BlockingQueue
所有等待事件的线程尝试从空的阻塞队列获取元素,将阻塞,当事件发生时,向阻塞队列中同时放入N个元素(N的值与等待的线程数相同),则所有等待的线程从阻塞队列中取出元素后得以继续执行。
3.信号量Semaphore
设置信号量的初始值为等待的线程数N,一开始将信号量申请完,让剩余的信号量为0,待事件发生时,同时释放N个占用的信号量,则等待信号量的所有线程将获取信号量得以继续执行。
class TestSemaphore{ public static void main(String[] args){ // 8位工人 int n = 8; // 5台机器 Semaphore semaphore = new Semaphore(5); for(int i=0;i<n;i++){ new Worker(i,semaphore).start(); } } static class Worker extends Thread{ private int num; private Semaphore semaphore; public Worker(int num,Semaphore semaphore){ this.num = num; this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println("工人"+this.num+"占用一个机器在生产..."); Thread.sleep(2000); System.out.println("工人"+this.num+"释放出机器"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
4.栅栏CyclicBarrier
设置栅栏的初始值为1,当事件发生时,调用barrier.wait()冲破设置的栅栏,将调用指定的Runable线程执行,在该线程中启动N个新的子线程执行。这个方法并不是让执行中的线程全部等待在某个点,待某一事件发生后继续执行。
class TestCyclicBarrier{ public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for(int i=0;i<N;i++){ new Writer(barrier).start(); } } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据..."); try { //以睡眠来模拟写入数据操作 Thread.sleep(5000); System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有线程写入完毕,继续处理其他任务..."); } } }
参考地址:https://blog.csdn.net/jeffleo/article/details/75269618
CAS:Compare And Swap,即比较并交换。(AtomicInteger)
实现了Java多线程的并发操作。整个AQS同步组件、Atomic原子类操作等都是以CAS实现的,甚至ConcurrentHashMap在1.8的版本中也调整为了CAS+Synchronized。可以说CAS是整个JUC的基石。
缺陷:
主要表现在三个方法:
循环时间太长
只能保证一个共享变量原子操作
ABA问题:解决方案则是版本号
参考资料:https://www.cnblogs.com/daydaynobug/p/6752837.html
AQS:AbstractQueuedSynchronized, 抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch
AQS里的state只有两种状态:0表示未锁定,1表示锁定
死锁:指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。4个产生条件:
互斥条件请求和保持条件不剥夺条件环路等待条件活锁:指的是任务或者执行者没有被阻塞,由于某些条件没有满足,导致一直重复尝试—失败—尝试—失败的过程。处于活锁的实体是在不断的改变状态,活锁有可能自行解开。
资源地址:https://blog.csdn.net/wxq544483342/article/details/53118674
1、创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
ExecutorService pool = Executors.newCachedThreadPool();2、创建容量为1的缓冲池
ExecutorService pool = Executors.newSingleThreadExecutor();3、创建固定容量大小的缓冲池
ExecutorService pool = Executors.newFixedThreadPool(int);
4、创建一个定长线程池,支持定时及周期性任务执行。
ExecutorService pool = Executors.newScheduledThreadPool(1);5、创建一个拥有多个任务队列(以便减少连接数)的线程池。
ExecutorService pool = Executors.newWorkStealingPool(); ExecutorService类的方法: void shutdown(); 关闭线程池 不可以再 submit 新的 task,已经 submit 的将继续执行 List<Runnable> shutdownNow(); 关闭线程池 试图停止当前正在执行的 task,并返回尚未执行的 task 的 list boolean isShutdown(); 是否已经关闭了线程池,当调用shutdown()或shutdownNow()方法后返回为true。 boolean isTerminated(); 当调用shutdown()方法后,并且所有提交的任务完成后返回为true; 当调用shutdownNow()方法后,成功停止后返回为true; boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;线程池的两个核心队列:
线程等待池,即线程队列BlockingQueue任务处理池(PoolWorker),即正在工作的Thread列表(HashSet)线程池的核心参数:
核心池大小(corePoolSize),即固定大小,设定好之后,线程池的稳定峰值,达到这个值之后池的线程数大小不会释放。最大处理线程池数(maximumPoolSize),当线程池里面的线程数超过corePoolSize,小于maximumPoolSize时会动态创建与回收线池里面的线程池资源。构造方法:
/** * * @param corePoolSize 核心池大小 默认情况下,在创建好线程池之后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务, * 当线程池中的线程数量达到corePoolSize后,就会把这些任务放到缓存队列中 * @param maximumPoolSize 线程池最大线程数量,表示在线程池中最多能创建线程的数量;在corePoolSize和maximumPoolSize的线程数会被自动释放,而小于corePoolSize的则不会。 * @param keepAliveTime 表示线程没有执行任务时最多保持多久时间会终止。 * 默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会生效,直到线程池数量不大于corePoolSize, * 即只有当线程池数量大于corePoolSize数量,超出这个数量的线程一旦到达keepAliveTime就会终止。 * 但是如果调用了allowCoreThreadTimeout(boolean)方法,即使线程池的线程数量不大于corePoolSize,线程也会在keepAliveTime之后就终止,知道线程池的数量为0为止。 * @param unit 参数keepAliveTime的时间单位,一个时间单位枚举类。 Nanos/Micros/Millis/Seconds/Minutes/Hours/Days * @param workQueue 一个阻塞队列,用来存储等待执行任务的队列,这个参数选择也很重要,会对线程池的运行过程产生重大影响。 * 一般来说,这里的阻塞队列就是(ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue)。 * @param threadFactory 线程工厂,主要用来创建线程;可以是一个自定义的线程工厂,默认就是Executors.defaultThreadFactory()。用来在线程池中创建线程。 * @param handler 表示当拒绝处理任务时的策略,也是可以自定义的,默认是我们前面的4种取值: * ThreadPoolExecutor.AbortPolicy(默认的,一言不合即抛异常的) * ThreadPoolExecutor.DiscardPolicy(一言不合就丢弃任务) * ThreadPoolExecutor.DiscardOldestPolicy(一言不合就把最近的任务给抛弃,然后执行当前任务) * ThreadPoolExecutor.CallerRunsPolicy(由调用者所在线程来执行任务) */public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue, public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
class TestThreadLocal{ private static TestThreadLocal testThreadLocal = null; private static ThreadLocal<TestThreadLocal> map = new ThreadLocal<TestThreadLocal>(); private TestThreadLocal(){} public static synchronized TestThreadLocal getInstance1(){ if(testThreadLocal==null){ testThreadLocal = new TestThreadLocal(); } return testThreadLocal; } public static TestThreadLocal getInstance2(){ //效率大于getInstance1() testThreadLocal = map.get(); if(testThreadLocal==null){ testThreadLocal = new TestThreadLocal(); map.set(testThreadLocal); } return testThreadLocal; } }
LockSupport提供了park和unpark方法实现阻塞和解除阻塞,都是基于“许可(permit)”作为关联。permit相当于一个信号量(0,1),默认是0。
使用偏移量来获取对象。是因为线程已经被阻塞了,如果不通过内存的方式,直接调用线程内的方法,线程是不会响应的。
park()、unpark()和wait()、notify()的区别
面向的主体不同。LockSupport是针对Thread进行阻塞,可以指定阻塞的对象,每次可以唤醒指定具体的线程。Wait()是以对象,阻塞当前的线程和唤醒单个线程或者所有线程。实现的机制不同。LockSupport可以指定monitor的object对象,但和object.wait()两者的阻塞队列并不交叉。static void testLockSupport(){ // 调用native方法阻塞当前线程 LockSupport.park(); // 阻塞当前线程,最长不超过nanos纳秒,返回条件在park()的基础上增加了超时返回 LockSupport.parkNanos(1); // 阻塞当前线程,知道deadline时间(deadline - 毫秒数) LockSupport.parkUntil(1); Object blocker = new Object(); //1) 记录当前线程等待的对象(阻塞对象); //2) 阻塞当前线程; //3) 当前线程等待对象置为null。 LockSupport.park(blocker); // 阻塞当前线程,最长等待时间不超过nanos毫秒,同样,在阻塞当前线程的时候做了记录当前线程等待的对象操作 LockSupport.parkNanos(blocker,1); // 阻塞当前线程直到deadline时间,相同的,也做了阻塞前记录当前线程等待对象的操作 LockSupport.parkUntil(blocker,1); // 唤醒处于阻塞状态的线程Thread LockSupport.unpark(new Thread());}
在经典的生产者和消费者模式中,可以使用Object.wait()和Object. Notify()阻塞和唤醒线程,但这样处理只有一个等待队列。在可重入锁ReentrantLock中,使用AQS的condition可以实现设置多个等待队列,可以使用lock.newCondition生成一个等待队列。
static void testCondition() throws InterruptedException { // 需要结合lock使用 Lock lock = new ReentrantLock(); final Condition notEmpty = lock.newCondition(); // 将当前线程阻塞,调用await()之前必须先获取锁, // 调用await()时,将线程构造成节点加入等待队列,同时释放锁,并挂起当前线程 notEmpty.await(); notEmpty.await(1, TimeUnit.MINUTES); notEmpty.awaitNanos(1); notEmpty.awaitUninterruptibly(); notEmpty.awaitUntil(new Date()); // 另一个线程将已经阻塞的线程唤醒 // 其他线程调用signal()时也必须获取锁, // 当执行signal()方法时将等待队列的节点移入到同步队列, // 当线程退出临界区释放锁时,唤醒同步队列的首个节点 notEmpty.signal(); notEmpty.signalAll();}
并行流:把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。(fork/join框架)
串行流:则相反
Fork/Join框架:
在必要的情况下,将一个大的任务进行拆分(fork)成若干个子任务(拆到临界值),再将一个个任务的结果进行join汇总。
Fork/Join与线程池的区别:
Fork/Join采用“工作窃取模式”,当执行新的任务时可以将其拆分成更小的任务执行,并将小任务加到线程队列中。然后再随机从一个线程中偷一个任务并把他加入到自己的队列中。
例如:CPU上有两个线程A/B,A已经执行完了,B还有任务未执行。这时A将B队尾的任务偷过来,加入到自己的队列中。相对于传统的线程池,Fork/Join更有效的使用了CPU资源。
// ForkJoin这个框架的实现需要继承RecursiveTask 或者 RecursiveAction, // RecursiveTask是有返回值的,相反RecursiveAction则没有@Datapublic class TestForkJoinWork extends RecursiveTask<Long> { private Long start; private Long end; public static final Long CRITICAL = 10000L; public TestForkJoinWork(Long start , Long end){ this.start = start; this.end = end; } @Override protected Long compute() { Long length = end - start; if (length <= CRITICAL){ Long sum = 0L; for (Long i = start;i<=end;i++){ sum += i; } return sum; }else{ Long middle = (start+end) / 2; TestForkJoinWork right = new TestForkJoinWork(start,middle); right.fork(); // 拆分 压入线程队列 TestForkJoinWork left = new TestForkJoinWork(middle+1,end); left.fork(); // 拆分 压入线程队列 return right.join() + left.join(); } } }class TestForkJoinWorkDemo{ void test1(){ //Fork/Join实现 long l = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool();//实现ForkJoin 就必须有ForkJoinPool的支持 TestForkJoinWork task = new TestForkJoinWork(0L,10000000000L);//参数为起始值与结束值 Long invoke = forkJoinPool.invoke(task); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + invoke+" time: " + (l1-l)); } void test2(){ // 普通线程完成 Long x = 0L; Long y = 10000000000L; long l = System.currentTimeMillis(); for (Long i = 0L; i <= y; i++) { x += i; } long l1 = System.currentTimeMillis(); System.out.println("invoke = " + x+" time: " + (l1-l)); } void test3(){ // 并行流 long l = System.currentTimeMillis(); //parallel()与sequential()在并行流与串行流中随意切换 long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, Long::sum); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + reduce+" time: " + (l1-l)); } public static void main(String[] args){ TestForkJoinWorkDemo demo = new TestForkJoinWorkDemo(); demo.test1(); demo.test2(); demo.test3(); /** * invoke = -5340232216128654848 time: 40213 * invoke = -5340232216128654848 time: 52510 * invoke = -5340232216128654848 time: 963 */ } }
容器中有多把锁,每一把锁用于锁容器中的一部分数据。当多线程时访问容器中的不同数据段的数据时,线程间就不会发生锁竞争了。因而有效的提高了高并发的访问效率。例如ConcurrentHashMap使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据一把锁,当一个线程占用锁访问其中一个段数据时,其他段的数据也可被其他线程访问。ConcurrentHashMap中使用了一个包含16个锁的数组,每个锁保护所在的散列桶的1/16,其中第N个散列桶由第(N mod 16)个锁来保护。合理的使用散列算法来使关键字均匀分布,那么可使对锁的请求减少到原来的1/16。这样可使ConcurrentHashMap的并发达到16个线程。
独占锁:只有一把锁,每次只能一个线程能访问。例如HashTable。采用串行方式,会降低其伸缩性。一般有三种方式降低锁的竞争:
减少锁的持有时间降低锁的请求频率使用带有协调机制的独占锁,这些机制允许更高的并发性。public class TestQueue { public static void main(String[] args) throws InterruptedException { Object obj = new Object(); boolean flag; Object val; // 由数组结构组成的有界阻塞队列 BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10); // 队未满时,返回true,队满时则抛出异常IllegalStateException flag = arrayBlockingQueue.add(obj); // 队未满时,返回true,队满时返回false。非阻塞立即返回 arrayBlockingQueue.offer(obj); // 队未满时直接插入无返回值,队满时阻塞等待,一直等到队列未满时再插入 arrayBlockingQueue.put(obj); // 队列不为空时,返回队首值并移除。队为空时抛出异常NoSuchElementException val = arrayBlockingQueue.remove(); // 队列不为空时,返回队首值并移除。队为空时返回null。非阻塞立即返回 val = arrayBlockingQueue.poll(); // 设定等待的时间,如果在指定时间内队列为空则返回null,不为空则返回队首值 val = arrayBlockingQueue.poll(1,TimeUnit.MINUTES); // 队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 val = arrayBlockingQueue.take(); flag = arrayBlockingQueue.contains(obj); // 由链表结构组成的有界阻塞队列 BlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(10); // 支持优先级别排序的无界阻塞队列 默认大小为11 BlockingQueue priorityBlockingQueue = new PriorityBlockingQueue(); // 使用有限级队列实现的无界阻塞队列 PriorityQueue BlockingQueue delayQueue = new DelayQueue(); // 不存储元素的阻塞队列 BlockingQueue synchronousQueue = new SynchronousQueue(); // 由链表结构组成的无界阻塞队列 BlockingQueue linkedTransferQueue = new LinkedTransferQueue(); // 由链表结构组成的双阻塞队列 BlockingQueue linkedBlockingDeque = new LinkedBlockingDeque(); }