1. ConcurrentHashMap
ConcurrentHashMap是线程安全并且高效的HashMap。
1.1 为什么使用ConcurrentHashMap?
(1) HashMap 是线程不安全的,在多线程环境下使用HashMap的put操作会使Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next指针将永远不为null,就会产生死循环获取Entry。
(2) Hashtable虽然是线程安全的,但是效率低下。Hashtable使用synchronized关键字保证线程安全,因此当一个线程访问Hashtable的同步方法时,娶她线程若也要访问Hashtable的同步方法,就会进入阻塞状态。(所有的线程竞争的是同一把锁)
(3) ConcurrentHashMap使用锁分段技术可以保证线程安全并提高并发访问效率。
锁分段技术:Hashtable在线程竞争激烈的并发环境下效率低下的原因是所有访问Hashtable 的线程都竞争同一把锁,假如容器有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程并发访问容器里的不同数据段的数据时,线程之间就不会存在竞争,从而可以有效提高并发访问效率。ConcurrentHashMap 就是使用的锁分段技术,首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据时,其他数据段地数据也可以被其他线程访问。
1.2 ConcurrentHashMap 的结构与操作
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种重入锁(ReentrantLock),HashEntry用来存储键值对数据。一个Segment里面包含一个HashEntry数组,每个Segment守护着一个HashEntry里的元素,当对HashEntry里的数据元素进行修改时,必修先获取对应的Segment锁。
1.2.1 get操作
先通过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。get操作简单且高效,器高效之处在于整个get过程是不需要加锁的,除非读到的值为空时才加锁重读。不加锁是因为它的get方法里将要使用的共享变量都被volatile修饰了,能够保证线程间的可见性,能够被多线程读,并不会读到过期数据。之所以不会读到过期数据是因为根据JAVA内存模型的happen before原则,对于volatile字段的写入操作先于读取操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的数据,这是用volatile替代锁的经典应用场景。
1.2.2 put 操作
put操作需要对共享变量进行写入操作,为了线程安全,必须加锁。Put方法首先定位到Segment,然后在Segment里进行插入操作。
1.2.3 size操作
要统计整个ConcurrentHashMap就要对所有的Segment 的count进行相加,虽然volatile保证相加获取的都是最新的count,但是在相加过程中,每个count都可能变化,那样统计结果就不准确了。最安全的统计方法就是在计算size时,将所有的Segment锁住,但是这样的做法是很低效的。
因为在累加count过程中,之前累加过的count发生变化的几率很小,所以ConcurrentHashMap的做法是先尝试2次通过锁住不同Segment的方式来统计各个Segment
的大小,如果统计过程中,count发生了变化,则再采取加锁的放肆统计所有的Segment大小。
2. ConcurrentLinkedQueue
2.1 线程安全的队列
线程安全的队列实现方式有两种:一种是使用阻塞算法,一种是使用非阻塞算法。使用阻塞算法的队列可以使用一把锁或两把锁实现,使用非阻塞算法的队列可以通过循环CAS的方式实现。ConcurrentLinkedQueue是使用非阻塞算法实现的。
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全的队列 ,它采用CAS算法实现。
2.1.1 入队列
入队列就是将入队节点添加到队列的尾部。入队主要做两件事情:一是将入队节点设置成当前尾节点的下一个节点,二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置称tail节点,如果tail节点的next为空,则将入队节点设置成tail节点的next节点,所以tail节点并不总是尾节点。
为什么tail节点不总是尾节点?原因:让tail永远作为队列的尾节点,每次入队都需要使用循环CAS更新tail节点。减少tail节点的更新次数,就可以提高入队效率。Doug lea 使用hops 变量控制并减少tail节点的更新频率,并不是每次节点入队后都需要更新tail节点,而是当tail节点和尾节点的距离大于等于常量hops时,才更新tail节点,tail节点和尾节点的距离越长,使用CAS更新tail的次数就越少,但是器负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位尾节点,但是这样仍可以提高入队效率,因为本质上来看它通过增加对volatile变量的读操作来减少对其的写操作,而volatile变量的写操作的开销要远大于读操作。
2.1.2 出队列
出队列就是从队列中返回一个节点元素,并清空该节点对元素的引用。和tail的跟新一样,head节点也并不是每次出队都更新。也是通过hops变量来控制并减少CAS更新head节点的操作。
3. 阻塞队列介绍
阻塞队列(BlockingQueue):是一个支持两个附加操作的队列。这两个附加操作支持阻塞插入和移除方法。
(1) 支持阻塞的插入方法:当队列满时,会阻塞插入元素的线程,直到队列不满。
(2) 支持阻塞的移除方法:当队列为空时,会阻塞移除元素的线程,直到队列不为空。
阻塞队列常用于生产者和消费者模式,生产者就是向队列添加元素的线程,消费者就是从队列移除元素的线程。阻塞队列就是生产者用来存放元素,消费者用来获取元素的容器。
当队列不可用时,会有4种处理方式:
抛出异常
返回特殊值
一直阻塞
超时退出
插入方法
add(e)
offer(e)
put(e)
offer(e,time,unit)
移除方法
Remove()
Poll()
Take()
Poll(time,unit)
检查方法
Element()
Peek()
(3) java里的7个阻塞队列
ArrayBlockingueue:一个由数组组成的有界阻塞队列
LinkedBlockingQueue: 一个有链表组成的有界阻塞队列
PriorityBolckingQueue: 一个支持优先级排序的无界阻塞队列
DelayQueue: 一个使用优先级队列实现的无界阻塞队列
SynchronousQueue: 一个不存储元素的阻塞队列
LinkedTransferQueue: 一个由链表组成的无界阻塞队列
LinkedBlockingDeque:一个由链表组成的双向阻塞队列
4. Fork/Join框架
4.1 Fork/Join框架
Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分解成若干个小任务,最终汇总每个小任务的结果后得到大任务结果的框架。(使用的分治法思想)
工作窃取算法:是指某个线程从其他队列里窃取任务来执行。为什么要使用工作窃取算法?原因:假如我们需要做一个比较大的任务,可以把这个任务分隔成若干个互不依赖的子任务,为了减少线程间的竞争,就把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是,有的线程会先把自己的任务做完,而其他线程对应的队列里还有任务等待处理,于是他就去其他线程里窃取一个任务执行。而此时它们访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常使用双端队列,被窃取任务的线程永远从队列头部拿任务执行,而窃取任务的线程永远从队列的尾部取任务。
4.2 Fork/Join 设计
步骤1:分割任务 首先我们需要一个fork类来把大任务分割成子任务,直到子任务足够小。
步骤2:执行任务并合并结果 分割的子任务分别放在双端队列里,然后启动结果线程分别从双端队列里拿任务。子任务执行完的结果都放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join 使用两个类来实现上述步骤:
① ForkJoinTask: 要使用Fork/Join框架,首先要创建一个ForkJoin任务。它提供在任务种执行fork()和join()的机制。通常不是直接继承ForkJoinTask类,而是继承其子类。
RecursiveAction: 用于没有返回结果的任务
RecursiveTask : 用于有返回结果的任务
② ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行。