Java并发容器和框架

xiaoxiao2021-02-27  298

1.       ConcurrentHashMap

 

ConcurrentHashMap是线程安全并且高效的HashMap。

 

1.1   为什么使用ConcurrentHashMap?

(1)      HashMap 是线程不安全的,在多线程环境下使用HashMap的put操作会使Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next指针将永远不为null,就会产生死循环获取Entry。

(2)      Hashtable虽然是线程安全的,但是效率低下。Hashtable使用synchronized关键字保证线程安全,因此当一个线程访问Hashtable的同步方法时,娶她线程若也要访问Hashtable的同步方法,就会进入阻塞状态。(所有的线程竞争的是同一把锁)

(3)      ConcurrentHashMap使用锁分段技术可以保证线程安全并提高并发访问效率。

锁分段技术:Hashtable在线程竞争激烈的并发环境下效率低下的原因是所有访问Hashtable 的线程都竞争同一把锁,假如容器有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程并发访问容器里的不同数据段的数据时,线程之间就不会存在竞争,从而可以有效提高并发访问效率。ConcurrentHashMap 就是使用的锁分段技术,首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据时,其他数据段地数据也可以被其他线程访问。

 

1.2   ConcurrentHashMap 的结构与操作

 

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种重入锁(ReentrantLock),HashEntry用来存储键值对数据。一个Segment里面包含一个HashEntry数组,每个Segment守护着一个HashEntry里的元素,当对HashEntry里的数据元素进行修改时,必修先获取对应的Segment锁。

 

1.2.1        get操作

先通过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。get操作简单且高效,器高效之处在于整个get过程是不需要加锁的,除非读到的值为空时才加锁重读。不加锁是因为它的get方法里将要使用的共享变量都被volatile修饰了,能够保证线程间的可见性,能够被多线程读,并不会读到过期数据。之所以不会读到过期数据是因为根据JAVA内存模型的happen before原则,对于volatile字段的写入操作先于读取操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的数据,这是用volatile替代锁的经典应用场景。

1.2.2        put 操作

put操作需要对共享变量进行写入操作,为了线程安全,必须加锁。Put方法首先定位到Segment,然后在Segment里进行插入操作。

1.2.3        size操作

要统计整个ConcurrentHashMap就要对所有的Segment 的count进行相加,虽然volatile保证相加获取的都是最新的count,但是在相加过程中,每个count都可能变化,那样统计结果就不准确了。最安全的统计方法就是在计算size时,将所有的Segment锁住,但是这样的做法是很低效的。

因为在累加count过程中,之前累加过的count发生变化的几率很小,所以ConcurrentHashMap的做法是先尝试2次通过锁住不同Segment的方式来统计各个Segment

的大小,如果统计过程中,count发生了变化,则再采取加锁的放肆统计所有的Segment大小。

 

2.       ConcurrentLinkedQueue

2.1   线程安全的队列

线程安全的队列实现方式有两种:一种是使用阻塞算法,一种是使用非阻塞算法。使用阻塞算法的队列可以使用一把锁或两把锁实现,使用非阻塞算法的队列可以通过循环CAS的方式实现。ConcurrentLinkedQueue是使用非阻塞算法实现的。

 

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全的队列 ,它采用CAS算法实现。

 

2.1.1        入队列

入队列就是将入队节点添加到队列的尾部。入队主要做两件事情:一是将入队节点设置成当前尾节点的下一个节点,二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置称tail节点,如果tail节点的next为空,则将入队节点设置成tail节点的next节点,所以tail节点并不总是尾节点。

为什么tail节点不总是尾节点?原因:让tail永远作为队列的尾节点,每次入队都需要使用循环CAS更新tail节点。减少tail节点的更新次数,就可以提高入队效率。Doug lea 使用hops 变量控制并减少tail节点的更新频率,并不是每次节点入队后都需要更新tail节点,而是当tail节点和尾节点的距离大于等于常量hops时,才更新tail节点,tail节点和尾节点的距离越长,使用CAS更新tail的次数就越少,但是器负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位尾节点,但是这样仍可以提高入队效率,因为本质上来看它通过增加对volatile变量的读操作来减少对其的写操作,而volatile变量的写操作的开销要远大于读操作。

2.1.2        出队列

出队列就是从队列中返回一个节点元素,并清空该节点对元素的引用。和tail的跟新一样,head节点也并不是每次出队都更新。也是通过hops变量来控制并减少CAS更新head节点的操作。

3.       阻塞队列介绍

 

阻塞队列(BlockingQueue):是一个支持两个附加操作的队列。这两个附加操作支持阻塞插入和移除方法。

(1)      支持阻塞的插入方法:当队列满时,会阻塞插入元素的线程,直到队列不满。

(2)      支持阻塞的移除方法:当队列为空时,会阻塞移除元素的线程,直到队列不为空。

 

阻塞队列常用于生产者和消费者模式,生产者就是向队列添加元素的线程,消费者就是从队列移除元素的线程。阻塞队列就是生产者用来存放元素,消费者用来获取元素的容器。

 

当队列不可用时,会有4种处理方式:

 

 

 

抛出异常

返回特殊值

一直阻塞

超时退出

插入方法

add(e)

offer(e)

put(e)

offer(e,time,unit)

移除方法

Remove()

Poll()

Take()

Poll(time,unit)

检查方法

Element()

Peek()

 

 

 

(3)  java里的7个阻塞队列

                ArrayBlockingueue:一个由数组组成的有界阻塞队列

                LinkedBlockingQueue: 一个有链表组成的有界阻塞队列

                PriorityBolckingQueue: 一个支持优先级排序的无界阻塞队列

                DelayQueue: 一个使用优先级队列实现的无界阻塞队列

                SynchronousQueue: 一个不存储元素的阻塞队列

                LinkedTransferQueue: 一个由链表组成的无界阻塞队列

                LinkedBlockingDeque:一个由链表组成的双向阻塞队列

4.      Fork/Join框架

 

4.1  Fork/Join框架

Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分解成若干个小任务,最终汇总每个小任务的结果后得到大任务结果的框架。(使用的分治法思想)

工作窃取算法:是指某个线程从其他队列里窃取任务来执行。为什么要使用工作窃取算法?原因:假如我们需要做一个比较大的任务,可以把这个任务分隔成若干个互不依赖的子任务,为了减少线程间的竞争,就把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是,有的线程会先把自己的任务做完,而其他线程对应的队列里还有任务等待处理,于是他就去其他线程里窃取一个任务执行。而此时它们访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常使用双端队列,被窃取任务的线程永远从队列头部拿任务执行,而窃取任务的线程永远从队列的尾部取任务。

4.2  Fork/Join 设计

步骤1:分割任务  首先我们需要一个fork类来把大任务分割成子任务,直到子任务足够小。

步骤2:执行任务并合并结果   分割的子任务分别放在双端队列里,然后启动结果线程分别从双端队列里拿任务。子任务执行完的结果都放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join 使用两个类来实现上述步骤:

①  ForkJoinTask: 要使用Fork/Join框架,首先要创建一个ForkJoin任务。它提供在任务种执行fork()和join()的机制。通常不是直接继承ForkJoinTask类,而是继承其子类。

RecursiveAction: 用于没有返回结果的任务

RecursiveTask : 用于有返回结果的任务

②   ForkJoinPool ForkJoinTask需要通过ForkJoinPool来执行。

 

 

 

 

 

 

 

 

 

 

 

 

转载请注明原文地址: https://www.6miu.com/read-5110.html

最新回复(0)