同步容器
早期版本的JDK提供的同步容器类为Vector和Hashtable和Stack。
JDK1.2 提供了Collections.synchronizedXxx等工程方法,将普通的容器继续包装。对每个共有方法都进行同步。
- Vector。
- Stack。
- HashTable。
- Collections.synchronized方法生成,例如:
- Collectinons.synchronizedList()。
- Collections.synchronizedSet()。
- Collections.synchronizedMap()。
- Collections.synchronizedSortedSet()。
- Collections.synchronizedSortedMap()。
Vector
同步
它的实现与 ArrayList 类似,但是使用了 synchronized 进行同步。
1 | public synchronized boolean add(E e) { |
与 ArrayList 的比较
- Vector 是同步的,因此开销就比 ArrayList 要大,访问速度更慢。最好使用 ArrayList 而不是 Vector,因为同步操作完全可以由程序员自己来控制;
- Vector 每次扩容请求其大小的 2 倍空间,而 ArrayList 是 1.5 倍。
替代方案
可以使用 Collections.synchronizedList();
得到一个线程安全的 ArrayList。
1 | List<String> list = new ArrayList<>(); |
也可以使用 concurrent 并发包下的 CopyOnWriteArrayList 类。
1 | List<String> list = new CopyOnWriteArrayList<>(); |
Map
由上面的分析我们知道,同步容器并不能保证多线程安全,而并发容器是针对多个线程并发访问而设计的,在jdk5.0引入了concurrent包,其中提供了很多并发容器,极大的提升同步容器类的性能。
并发容器J.U.C
JDK下的一个包Java.util.concurrent
- ArrayList->CopyOnWriteArrayList
- 写操作的时候复制,在新的数组上操作,写完后,将原有的数据指向新数组
- 拷贝数组消耗内存
- 不能用于实时读数组,可能读取到旧的,适合读多写少
- HashSet、TreeSet->CopyOnWriteArraySet、ConcurrentSkipListSet
- ConcurrentSkipListSet:支持自然排序(TreeSet),基于map。对于批量操作并不能保证原子操作,对于单个cotains操作可以
- HashMap、TreeMap->ConcurrentHashMap、ConcurrentSkipListMap
- ConcurrentHashMap:针对读操作做了大量的优化,能应付很大的并发,速度快
- ConcurrentSkipListMap:TreeMap的安全版,基于跳表实现,支持更高的并发,线程越多越优秀
- Collections.synchronizedXXX(List、Set、Map)
HashMap与ConcurrentHashMap
HashMap
数据结构:
- 数组
- 引用
参数
- 初始容量
- 加载因子
寻址方式
- 取模操作消耗操作较大,使用与操作与2^n-1进行与运算
ConcurrentHashMap
- 对应的非并发容器:HashMap
- 目标:代替Hashtable、synchronizedMap,支持复合操作
- 原理:JDK6中采用一种更加细粒度的加锁机制Segment“分段锁”,JDK8中采用CAS无锁算法,详细分析推荐阅读【JDK】:ConcurrentHashMap高并发机制——【转载】。
ConcurrentHashMap使用了一种粒度更细的加锁机制来实现更大程序的共享,称为分段锁。在这种机制中,任意数量的读取线程可以并发地访问Map,执行读操作的线程和执行写操作的线程可以并发访问Map,并且一定数量的写入线程可以并发修改Map。
ConcurrentHashMap的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁,它们返回的迭代器具有弱一致性,可以容忍并发的修改,当创建迭代器时会遍历已有元素,并可以(不保证)在迭代器被构造后将修改操作反映给容器。
存储结构
1 | static final class HashEntry<K,V> { |
ConcurrentHashMap 和 HashMap 实现上类似,最主要的差别是 ConcurrentHashMap 采用了分段锁(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶,从而使其并发度更高(并发度就是 Segment 的个数)。
Segment 继承自 ReentrantLock。
1 | static final class Segment<K,V> extends ReentrantLock implements Serializable { |
默认的并发级别为 16,也就是说默认创建 16 个 Segment。
1 | static final int DEFAULT_CONCURRENCY_LEVEL = 16; |
size 操作
每个 Segment 维护了一个 count 变量来统计该 Segment 中的键值对个数。
1 | /** |
在执行 size 操作时,需要遍历所有 Segment 然后把 count 累计起来。
ConcurrentHashMap 在执行size操作时先尝试不加锁,如果连续两次不加锁操作得到的结果一致,那么可以认为这个结果是正确的。
尝试次数使用RETRIES_BEFORE_LOCK定义,该值为2,retries初始值为-1,因此尝试次数为3。
如果尝试的次数超过 3 次,就需要对每个 Segment 加锁。
1 | /** |
JDK 1.8 的改动
JDK 1.7 使用分段锁机制来实现并发更新操作,核心类为 Segment,它继承自重入锁 ReentrantLock,并发度与 Segment 数量相等。
JDK 1.8 使用了 CAS 操作来支持更高的并发度,在 CAS 操作失败时使用内置锁 synchronized。
并且JDK1.8的实现也在链表过长时会转换为红黑树。
ConcurrentHashMap
ConcurrentHashMap 同样也分为 1.7 、1.8 版,两者在实现上略有不同。
Base 1.7
先来看看 1.7 的实现,下面是他的结构图:
如图所示,是由 Segment 数组、HashEntry 组成,和 HashMap 一样,仍然是数组加链表。
它的核心成员变量:
1 | /** |
Segment 是 ConcurrentHashMap 的一个内部类,主要的组成如下:
1 | static final class Segment<K,V> extends ReentrantLock implements Serializable { |
看看其中 HashEntry 的组成:
和 HashMap 非常类似,唯一的区别就是其中的核心数据如 value ,以及链表都是 volatile 修饰的,保证了获取时的可见性。
原理上来说:ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock。不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment。
下面也来看看核心的 put get
方法。
put 方法
1 | public V put(K key, V value) { |
首先是通过 key 定位到 Segment,之后在对应的 Segment 中进行具体的 put。
1 | final V put(K key, int hash, V value, boolean onlyIfAbsent) { |
虽然 HashEntry 中的 value 是用 volatile 关键词修饰的,但是并不能保证并发的原子性,所以 put 操作时仍然需要加锁处理。
首先第一步的时候会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut()
自旋获取锁。
- 尝试自旋获取锁。
- 如果重试的次数达到了
MAX_SCAN_RETRIES
则改为阻塞锁获取,保证能获取成功。
再结合图看看 put 的流程。
- 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry。
- 遍历该 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key 是否相等,相等则覆盖旧的 value。
- 不为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容。
- 最后会解除在 1 中所获取当前 Segment 的锁。
get 方法
1 | public V get(Object key) { |
get 逻辑比较简单:
只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。
由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。
ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁。
Base 1.8
1.7 已经解决了并发问题,并且能支持 N 个 Segment 这么多次数的并发,但依然存在 HashMap 在 1.7 版本中的问题。
那就是查询遍历链表效率太低。
因此 1.8 做了一些数据结构上的调整。
首先来看下底层的组成结构:
看起来是不是和 1.8 HashMap 结构类似?
其中抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized
来保证并发安全性。
也将 1.7 中存放数据的 HashEntry 改为 Node,但作用都是相同的。
其中的 val next
都用了 volatile 修饰,保证了可见性。
put 方法
重点来看看 put 函数:
- 根据 key 计算出 hashcode 。
- 判断是否需要进行初始化。
f
即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。- 如果当前位置的
hashcode == MOVED == -1
,则需要进行扩容。 - 如果都不满足,则利用 synchronized 锁写入数据。
- 如果数量大于
TREEIFY_THRESHOLD
则要转换为红黑树。
get 方法
- 根据计算出来的 hashcode 寻址,如果就在桶上那么直接返回值。
- 如果是红黑树那就按照树的方式获取值。
- 就不满足那就按照链表的方式遍历获取值。
1.8 在 1.7 的数据结构上做了大的改动,采用红黑树之后可以保证查询效率(
O(logn)
),甚至取消了 ReentrantLock 改为了 synchronized,这样可以看出在新版的 JDK 中对 synchronized 优化是很到位的。
ConcurrentSkipListMap
代替同步的SortedMap
Set
ConcurrentSkipListSet
代替同步的SortedSet
List
CopyOnWriteArrayList
- 对应的非并发容器:ArrayList。
- 目标:代替Vector、synchronizedList。
- 原理:利用高并发往往是读多写少的特性,对读操作不加锁,对写操作,先复制一份新的集合,在新的集合上面修改,然后将新集合赋值给旧的引用,并通过volatile 保证其可见性,当然写操作的锁是必不可少的了。
- 仅当迭代操作远远多于修改操作时才应该使用。
关于这一部分可参考【JDK】:CopyOnWriteArrayList、CopyOnWriteArraySet 源码解析。
1 | public class CopyOnWriteArrayList<E> |
读写分离
写操作在一个复制的数组上进行,读操作还是在原始数组中进行,读写分离,互不影响。
写操作需要加锁,防止并发写入时导致写入数据丢失。
写操作结束之后需要把原始数组指向新的复制数组。
1 | public boolean add(E e) { |
适用场景
CopyOnWriteArrayList 在写操作的同时允许读操作,大大提高了读操作的性能,因此很适合读多写少的应用场景。
但是 CopyOnWriteArrayList 有其缺陷:
- 内存占用:在写操作时需要复制一个新的数组,使得内存占用为原来的两倍左右。
- 数据不一致:读操作不能读取实时性的数据,因为部分写操作的数据还未同步到读数组中。
所以 CopyOnWriteArrayList 不适合内存敏感以及对实时性要求很高的场景。
CopyOnWriteArraySet
- 对应的非并发容器:HashSet。
- 目标:代替synchronizedSet。
- 原理:基于CopyOnWriteArrayList实现,其唯一的不同是在add时调用的是CopyOnWriteArrayList的addIfAbsent方法,其遍历当前Object数组,如Object数组中已有了当前元素,则直接返回,如果没有则放入Object数组的尾部,并返回。
关于这一部分可参考【JDK】:CopyOnWriteArrayList、CopyOnWriteArraySet 源码解析。
ConcurrentSkipListMap
- 对应的非并发容器:TreeMap。
- 目标:代替synchronizedSortedMap(TreeMap)。
- 原理:Skip list(跳表)是一种可以代替平衡树的数据结构,默认是按照Key值升序的。Skip list让已排序的数据分布在多层链表中,以0-1随机数决定一个数据的向上攀升与否,通过”空间来换取时间”的一个算法。ConcurrentSkipListMap提供了一种线程安全的并发访问的排序映射表。内部是SkipList(跳表)结构实现,在理论上能够在O(log(n))时间内完成查找、插入、删除操作。
ConcurrentSkipListSet
- 对应的非并发容器:TreeSet。
- 目标:代替synchronizedSortedSet。
- 原理:内部基于ConcurrentSkipListMap实现。
Queue
非阻塞队列
ConcurrentLinkedQueue
不会阻塞的队列:
- 对应的非并发容器:Queue。
- 原理:基于链表实现的FIFO队列(LinkedList的并发版本)。
阻塞队列
阻塞队列最核心的功能是,能够可阻塞式的插入和删除队列元素。当前队列为空时,会阻塞消费数据的线程,直至队列非空时,通知被阻塞的线程;当队列满时,会阻塞插入数据的线程,直至队列未满时,通知插入数据的线程(生产者线程)。
- 对应的接口:BlockingQueue。
- 特点:拓展了Queue,增加了可阻塞的插入和获取等操作。
- 原理:通过ReentrantLock实现线程安全,通过Condition实现阻塞和唤醒。
- 实现类:
- LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列。
- ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列。
- PriorityBlockingQueue:按优先级排序的队列。
SynchronousQueue
BlockingQueue
java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:
- FIFO 队列:LinkedBlockingQueue、ArrayBlockingQueue(固定长度)。
- 优先级队列:PriorityBlockingQueue。
提供了阻塞的take()和put()方法:如果队列为空take()将阻塞,直到队列中有内容;如果队列为满put()将阻塞,直到队列有空闲位置。
使用BlockingQueue实现生产者消费者问题
1 | public class ProducerConsumer { |
ArrayBlockingQueue
该并发集合其底层是使用了java.util.ReentrantLock和java.util.Condition来完成并发控制的。
主要属性
ArrayBlockingQueue的主要属性如下:
1 | /** The queued items */ |
- 从源码中可以看出ArrayBlockingQueue内部是采用数组进行数据存储的(
属性items
)。 - 为了保证线程安全,采用的是
ReentrantLock lock
,为了保证可阻塞式的插入删除数据利用的是Condition。 - 当获取数据的消费者线程被阻塞时会将该线程放置到notEmpty等待队列中。
- 当插入数据的生产者线程被阻塞时,会将该线程放置到notFull等待队列中。
- 而notEmpty和notFull等中要属性在构造方法中进行创建:
1 | public ArrayBlockingQueue(int capacity, boolean fair) { |
接下来,主要看看可阻塞式的put和take方法是怎样实现的。
put方法详解
put(E e)
方法源码如下:
1 | public void put(E e) throws InterruptedException { |
该方法的逻辑很简单,当队列已满时(count == items.length
)将线程移入到notFull等待队列中,如果当前满足插入数据的条件,就可以直接调用enqueue(e)
插入数据元素。enqueue方法源码为:
1 | private void enqueue(E x) { |
enqueue方法的逻辑同样也很简单,先完成插入数据,即往数组中添加数据(items[putIndex] = x
),然后通知被阻塞的消费者线程,当前队列中有数据可供消费(notEmpty.signal()
)。
take方法详解
take方法源码如下:
1 | public E take() throws InterruptedException { |
take方法也主要做了两步:
如果当前队列为空的话,则将获取数据的消费者线程移入到等待队列中。
若队列不为空则获取数据,即完成出队操作
dequeue
。
dequeue方法源码为:
1 | private E dequeue() { |
dequeue方法也主要做了两件事情:
- 获取队列中的数据,即获取数组中的数据元素(
(E) items[takeIndex]
)。 - 通知notFull等待队列中的线程,使其由等待队列移入到同步队列中,使其能够有机会获得lock,并执行完成功退出。
从以上分析,可以看出put和take方法主要是通过condition的通知机制来完成可阻塞式的插入数据和获取数据。在理解ArrayBlockingQueue后再去理解LinkedBlockingQueue就很容易了。
LinkedBlockingQueue
LinkedBlockingQueue是用链表实现的有界阻塞队列,当构造对象时为指定队列大小时,队列默认大小为Integer.MAX_VALUE
。从它的构造方法可以看出:
1 | public LinkedBlockingQueue() { |
LinkedBlockingQueue的主要属性
LinkedBlockingQueue的主要属性有:
1 | /** Current number of elements */ |
可以看出与ArrayBlockingQueue主要的区别是,LinkedBlockingQueue在插入数据和删除数据时分别是由两个不同的lock(takeLock
和putLock
)来控制线程安全的,因此,也由这两个lock生成了两个对应的condition(notEmpty
和notFull
)来实现可阻塞的插入和删除数据。并且,采用了链表的数据结构来实现队列,Node结点的定义为:
1 | static class Node<E> { |
接下来,我们也同样来看看put方法和take方法的实现。
put方法详解
put方法源码为:
1 | public void put(E e) throws InterruptedException { |
put方法的逻辑也同样很容易理解,可见注释。基本上和ArrayBlockingQueue的put方法一样。take方法的源码如下:
1 | public E take() throws InterruptedException { |
take方法的主要逻辑请见于注释,也很容易理解。
比较
相同点:
ArrayBlockingQueue和LinkedBlockingQueue都是通过condition通知机制来实现可阻塞式插入和删除元素,并满足线程安全的特性。
不同点:
- ArrayBlockingQueue底层是采用的数组进行实现,而LinkedBlockingQueue则是采用链表数据结构。
- ArrayBlockingQueue插入和删除数据,只采用了一个lock,而LinkedBlockingQueue则是在插入和删除分别采用了
putLock
和takeLock
,这样可以降低线程由于线程无法获取到lock而进入WAITING状态的可能性,从而提高了线程并发执行的效率。
PriorityBlockingQueue
- 优先级队列。
- 里面的元素需要实现Compare接口,其排序的时候是在取走元素的时候,在取出元素时取出最小的元素。
BlockingDeque
DelayQueue
- 带有延迟时间的队列,可以用于超时失效的设置。
Condition
详解Condition的await和signal等待/通知机制