Java并发:线程安全-实现

线程安全性

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为

无状态的对象永远是安全的。即指这个对象没有状态域,也没有引用其他对象的域,是一次特定计算的瞬时状态,会唯一存放在一个本地变量当中,即线程的栈当中。而两个线程并不共享状态。

三个方面

  • 原子性:提供了互斥访问,同一时刻只能有一个线程对它进行操作
  • 可见性:一个线程对主内存的修改可以及时被其他线程观察到
  • 有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般无序

原子性

原子性:能作为一个单独的、不可分割的操作去执行

当向一个无状态的对象添加一个域,并进行long++操作(读+改+写),则不是线程安全。

将long换作一个atomic包下的AtomicLong变量,则由于该变量是一个原子变量类,该计数器是线程安全的,该对象的状态即该计数器的对象,即该对象线程安全。(利用已有的线程安全类进行管理,如果只有一个,则线程安全,如果多个,则未必线程安全)。当变量之间相互关联,则在一个原子操作当中,要将几个相互关联的变量同时更新

Atomic

JDK的Atomic包,通过CAS完成原子性

int类型变为AtomicInteger,自增的方法为incrementAndGet

CAS核心

1
2
3
4
5
    //计数方法
private static void add(){
count.incrementAndGet();//先执行增加操作,再获取值
// count.getAndIncrement(); //先获取当前的值,再执行增加操作
}

源码实现,基于一个unsafe类

1
2
3
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

以dowhile语句为核心实现,CompareAndSwap是C.A.S的核心.

因为使用循环,如果修改很频繁,会不断循环尝试修改,使得性能受到影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* var1:当前的对象
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//通过调用底层方法,获得var1对象底层当前的值
//如果没有其他线程更改,则var5就会等于var2
var5 = this.getIntVolatile(var1, var2);
//比较底层的值var5与传入的值var2
//如果底层的值与传入值相同,那么更新为var5+增量var4
//如果值不相同,则将var2的值更改为底层当前的值,然后重新do,获得var5的值,进行比较
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

以native标识,即java底层的实现,不是java的实现

1
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

源码解析

AtomicLong

对于很精确的数值需要使用

LongAdder

原理:JVM对于普通的long与double,允许将64位的读操作与写操作拆分为两个32位的操作。

核心:将热点数据分离,将一个value分割为一个数组,每个线程针对一个数值,最后的value由数组的值合成,将单点的更新压力分散为多点的更新压力。在低并行的时候,对value直接更新。

优点:在高并发下,效率很高

缺点:如果有并行更新,可能导致统计数据有一些误差

AtomicReference

1
2
3
4
5
6
7
8
9
10
11
   private static AtomicReference<Integer> count = new AtomicReference<>(0);

//输出4
public static void main(String[] args) {
count.compareAndSet(0,2); // 2
count.compareAndSet(0,1); // no
count.compareAndSet(1,3); // no
count.compareAndSet(2,4); // 4
count.compareAndSet(3,5); // no
log.info("count:{}",count.get());
}

AtomicReferenceFieldUpdater

原子性更新某个类的一个实例的一个字段,字段必须volatile,并且非static

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class,"count");

@Getter
public volatile int count = 100;

private static AtomicExample5 atomicExample5 = new AtomicExample5();

public static void main(String[] args) {
if (updater.compareAndSet(atomicExample5,100,120)){
log.info("update success,{}",atomicExample5.getCount());
}
if (updater.compareAndSet(atomicExample5,100,120)){
log.info("update success,{}",atomicExample5.getCount());
}else {
log.error("update error{}",atomicExample5.getCount());
}


}

AtomicLongArray

更新一个long的数组

AtomicBoolean

实现代码只执行一次

1
2
3
4
5
6
7
private static AtomicBoolean isHappend = new AtomicBoolean(false);
private static void test(){
if (isHappend.compareAndSet(false, true)) {

log.info("execute");
}
}

CAS的ABA问题

ABA问题:在CAS操作中,其他线程将数据A改为B,又改为A。

解决:在每次更新的时候,记录一个版本号,每次更新+1

通过AtomicStampReference实现,核心方法为CompareAndSet

原子性:锁

synchronized:依赖JVM,在该关键字作用对象的作用范围内,只有一个线程可以操作

Lock:依赖特殊的CPU指令,由代码实现。ReentrantLock

synchronized

同步锁,修饰的对象:

  • 修饰代码块:大括号括起来的代码,作用于调用的对象
  • 修饰方法:整个方法,作用于调用的对象
  • 修饰静态方法:整个静态方法,作用于所有对象
  • 修饰类:括号括起来部分,作用于所有对象

作用于所有的对象。则如果两个对象调用一个修饰的方法,他们会并行执行。而如果调用一个静态方法,则他们无法并行执行。

对于修饰代码块与修饰方法,不同的调用对象互相不影响

继承

如果子类调用继承于父类的synchronize方法(synchronize不属于一个类),是没有synchronize效果的,必须显式声明

特性

  • 重进入:内部锁是重进入的,当线程试图获得它自己所占有的锁时候,请求会成功,即重进入是基于每线程的,而不是调用。

    实现是通过为每个锁关联一个请求计数与一个占有它的线程,当同一线程访问,则计数++,线程退出该锁,则计数–,直到计数为0,释放该锁(父类与子类的使用)

用锁来保持状态

  • 如果每个可被多个线程访问的可变状态变量,如果所有访问它的线程在执行状态当中占有同一个锁,则称该变量是由这个锁保护的
  • 每个共享的可变变量都需要唯一一个确定的锁保护

设计

  • 决定synchronize块大小需要权衡安全性(不能妥协)、简单性、性能。通常简单性与性能相互牵制,实现一个同步策略时候,不要过早地为了性能而牺牲简单性(是对安全性潜在的妥协)
  • 有些耗时的计算或操作,如网络或者控制台IO,难以快速完成,执行它们的时候不要占有锁

Lock

原子性对比:

  • synchronize;不可中断锁,适合竞争不激烈,可读性好
  • Lock:可中断锁,多样化同步,竞争激烈能维持常态
  • Atomic:竞争激烈能维持常态,比lock性能好,但是只能同步一个值

活跃度与性能

​ 不能武断地将整个方法设置为synchronize的,通过缩小synchronize的范围来提高并发性

1550563393365

可见性

一个线程对主内存的修改可以及时被其他线程观察到。

导致共享变量在线程间不可见的原因

  • 线程交叉执行
  • 重排序结合线程交叉执行
  • 共享变量更新后的值没有在工作内存与主内存间及时更新

synchronized

JVM对synchronize的规定

  • 线程解锁前,必须把共享变量的最新值刷新到主内存
  • 线程加锁时,将清空工作内存中共享变量的值,从而使用共享内存是需要从主内存中重新读取最新的值

volatile

通过加入内存屏障与禁止重排序优化来实现。

  • 对volatile变量写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存。(每次写之后都刷新),CPU指令级别进行操作。

  • 对volatile变量读操作时,会在读操作前加入一条load屏障指令,从主内存中读取共享变量。(每次读从主内存读)

1551425259624

但是volatile无法保证线程安全。

1
2
3
4
count++;
// 1、count//获得的是最新值
// 2、+1 //两个进程同时++
// 3、count//同时写回,即丢掉了一次.

volatile使用

  • 对变量的写操作不依赖于当前值
  • 该变量没有包含在具有其他变量的不变式中。

可以被写入 volatile 变量的这些有效值独立于任何程序的状态,包括变量的当前状态。

使用场景

  • 很适合作为状态标示量
  • 检查两次

有序性

一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般无序

方式:volatile、synchronized、lock、java内存模型的先天有序性(happens before原则)

happens before原则

  • 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作
  • 锁定操作:一个unlock操作先行发生于后面对同一个锁的lock操作
  • volatile变量规则:对一个变量的写操作先行发生于后面对于这个变量的读操作
  • 传递规则;如果操作A先行发生于操作B,而操作B又先行发生于操作C,则操作A先行发生于操作C
  • 线程启动规则:Thread对象的start()方法先行发生于此线程的每一个动作
  • 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生
  • 线程终结规则:线程中所有的操作都先行发生于线程的终止检测
  • 对象终结规则:一个对象的初始化完成先行发生于它的finalize()方法的开始

如果两个操作的执行次序无法从happens before推导出来,则JVM可以对它进行随意的重排序。

安全发布对象的四种方法

  • 通过静态初始化器初始化对象的引用(JVM内部的同步机制)
  • 将它的引用存储到volatile域或者atomicReference对象中
  • 将它的引用存储到正确创建的对象的final域
  • 或者将它的引用存储到由锁正确保护的域中

单例发布对象

懒汉式

线程不安全法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 懒汉模式
* 单例实例在第一次使用时候进行创建
*/
@NotRecommend
public class SingletonExample1 {

//私有的构造函数
//即其他途径无法创建这个类的对象
private SingletonExample1(){
//包含对资源的处理等等
}

//单例对象
private static SingletonExample1 instance = null;

//静态的工厂方法
private static SingletonExample1 getInstance(){
//多线程环境很容易出现问题
if (instance == null){
instance = new SingletonExample1();
}
return instance;
}
}

双重检测机制(线程不安全)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 懒汉模式 --> 双重同步锁单例模式
* 单例实例在第一次使用时候进行创建
*/
@NotRecommend
public class SingletonExample4 {

//私有的构造函数
private SingletonExample4(){}

// 1.memory = allocate() 分配对象的内存空间
// 2.ctorInstance() 初始化对象
// 3. instance = memory 设置instance 指向刚分配的内存

//JVM和CPU优化,发生了指令重排

// 1.memory = allocate() 分配对象的内存空间
// 3. instance = memory 设置instance 指向刚分配的内存
// 2.ctorInstance() 初始化对象

//在第三步的时候,instance!=null
//而此时在指令重排下,另一个线程就会获得一个没有初始化对象的引用,并将其返回

//单例对象
private static SingletonExample4 instance = null;

//静态的工厂方法
private static SingletonExample4 getInstance(){
if (instance == null){ //双重检测机制 //B
synchronized (SingletonExample4.class){ //同步锁
if (instance == null){
instance = new SingletonExample4(); //A - 3
}
}
}
return instance;
}

线程安全法

不推荐法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SingletonExample3 {

//私有的构造函数
private SingletonExample3(){}

//单例对象
private static SingletonExample3 instance = null;

//静态的工厂方法
//synchronized限制,而存在性能开销
private static synchronized SingletonExample3 getInstance(){
if (instance == null){
instance = new SingletonExample3();
}
return instance;
}
}

双重同步锁

基于volatile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 懒汉模式 --> 双重同步锁单例模式
* 单例实例在第一次使用时候进行创建
*/
@ThreadSafe
public class SingletonExample5 {

//私有的构造函数
private SingletonExample5(){}

// 1.memory = allocate() 分配对象的内存空间
// 2.ctorInstance() 初始化对象
// 3. instance = memory 设置instance 指向刚分配的内存

//单例对象 volatitle+ 双重检测机制 -> 禁止指令重排序
private volatile static SingletonExample5 instance = null;

//静态的工厂方法
private static SingletonExample5 getInstance(){
if (instance == null){ //双重检测机制 //B
synchronized (SingletonExample5.class){ //同步锁
if (instance == null){
instance = new SingletonExample5(); //A - 3
}
}
}
return instance;
}
}

饿汉模式

通过静态域实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 饿汉模式
* 单例实例在装载使用时候进行创建
*/
@ThreadSafe
public class SingletonExample2 {

//私有的构造函数
private SingletonExample2(){
//如果构造方法中存在过多的功能,则在加载时会过慢,存在性能问题
//只进行资源加载而没有实际调用,则会导致资源浪费
}

//单例对象
private static SingletonExample2 instance = new SingletonExample2();

//静态的工厂方法
private static SingletonExample2 getInstance(){
return instance;
}
}

通过静态块实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@ThreadSafe
public class SingletonExample6 {

//私有的构造函数
private SingletonExample6(){}
//静态资源是顺序执行的
//单例对象
private static SingletonExample6 instance = null;
//必须写在后面,如果写在前面,则会被上一句赋值为null
static {
instance = new SingletonExample6();
}

//静态的工厂方法
private static SingletonExample6 getInstance(){
return instance;
}

public static void main(String[] args) {
System.out.println(getInstance().hashCode());
System.out.println(getInstance().hashCode());

}
}

枚举模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 枚举模式:最安全
*/
@ThreadSafe
@Recommend
public class SingletonExample7 {
//私有的构造函数
private SingletonExample7(){}

public static SingletonExample7 getInstance(){
//在实际使用的时候才会初始化
return Singleton.INSTANCE.getSingleton();
}

private enum Singleton{
INSTANCE;
private SingletonExample7 singleton;

//JVM保证这个方法绝对只调用一次
Singleton(){
singleton = new SingletonExample7();
}

public SingletonExample7 getSingleton(){
return singleton;
}
}
}

线程安全策略

进行共享和发布对象,使得多个线程可以安全地访问他们。

  • 线程限制
    • 一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改
    • 线程封闭:把对象封装到一个线程当中,只有一个线程可以看到它
  • 共享只读
    • 一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改它
    • 不可变对象:一种对象只要发布了就是安全的,即不可变对象,是一种躲避并发的方法
  • 线程安全对象
    • 一个线程安全的对象或者容器,在内部通过同步机制来保证线程安全,所有其他线程无需额外同步就可以通过公共接口随意访问它
  • 被守护对象
    • 只能通过获取特定的锁来访问

线程不安全类与写法

线程不安全类:

  • 如果多个线程对同一个共享数据进行访问而不采取同步操作的话,那么操作的结果是不一致的。
  • 如果一个类的对象可以同时被多个线程访问,如果没有做并发处理,则会出现异常

StringBuilder是线程不安全的。

StringBuffer是线程安全的,它内部方法添加了synchronized,但也因此它的性能有损耗。

ArrayList、hashMap、hashSet等collection

线程不安全

示例

以下代码演示了 1000 个线程同时对 cnt 执行自增操作,操作结束之后它的值有可能小于 1000。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadUnsafeExample {

private int cnt = 0;

public void add() {
cnt++;
}

public int get() {
return cnt;
}
}
public static void main(String[] args) throws InterruptedException {
final int threadSize = 1000;
ThreadUnsafeExample example = new ThreadUnsafeExample();
final CountDownLatch countDownLatch = new CountDownLatch(threadSize);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadSize; i++) {
executorService.execute(() -> {
example.add();
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println(example.get());
}

安全发布

如果希望跨线程共享对象,则必须安全地共享它

对象的引用对其他线程可见,但它的状态可能是过期的,即对象的状态不一定对消费线程可见。

安全发布的模式

  • 通过静态初始化器初始化对象的引用(JVM内部的同步机制)
  • 将它的引用存储到volatile域或者atomicReference
  • 将它的引用存储到正确创建的对象的final域
  • 或者将它的引用存储到由锁正确保护的域中

线程安全容器

线程安全容器的内部同步,即将对象置入这些容器的操作符合最后一条要求

  • HashTable、synchronizedMap、concurrentMap
  • Vector、CopyOnWriteArrayList、synchronizedList
  • BlockingQueue、concurrentLinkedQueue

高效不可变对象

一个对象在技术上不是不可变得,但是它的状态在发布后不会再更改,即有效不可变对象。

任何线程都可以在没有额外同步的情况下安全使用一个安全发布的高效不可变对象

可变对象

安全发布仅仅保证发布当时的可见性,对于可变性,还需要线程安全或锁

  • 可变对象必须要安全发布,同时必须要线程安全或者是锁保护的

    • 线程限制:一个线程限制的对象,通过限制在线程中,而被线程独占,且只能被占有它的线程修改
    • 共享只读:在没有额外同步的情况下可以被多个对象并发访问,但是任何线程都不可以修改它,包括可变对象和高效不可变对象
    • 共享线程安全:一个线程安全的对象在内部同步,所以其他线程无须额外同步,就可以通过公共接口访问
    • 被守护的:一个被守护的对象只能通过特定的锁来访问。被守护的对象包括那些被线程安全对象封装的对象,和已知被特定的锁保护起来的已发布对象

线程安全实现方式(共享)

适合于从开开始构建一个类,或者将多个非线程安全的类组合成一个类

不可变

不可变(Immutable)的对象一定是线程安全的,不需要再采取任何的线程安全保障措施。只要一个不可变的对象被正确地构建出来,永远也不会看到它在多个线程之中处于不一致的状态。多线程环境下,应当尽量使对象成为不可变,来满足线程安全。

不可变的类型

  • final 关键字修饰的基本数据类型
  • String
  • 枚举类型
  • Number 部分子类,如 Long 和 Double 等数值包装类型,BigInteger 和 BigDecimal 等大数据类型。但同为 Number 的原子类 AtomicInteger 和 AtomicLong 则是可变的。

不可变对象需要满足的条件

  • 对象创建以后状态就不能修改
    • 将类声明为final
  • 对象所有域都是final类型
    • 所有域声明为私有
    • 不通过set方法
    • 将所有可变数据声明为final
  • 对象是正确创建的,this引用没有逸出
    • 通过构造器初始化所有成员
    • 在get方法不直接返回对象本身,而是返回一个clone

final关键字:类、方法、变量

  • 修饰类:
    • 不能被继承
    • 所有成员方法会隐式选择为final
  • 修饰方法
    • 锁定方法不能被继承修改
  • 修饰变量
    • 基本数据类型变量
    • 引用类型变量(初始化后,不能指向另一个对象)

其他创建不可变对象方法

  • 对于集合类型,Collections.unmodifiableXXX:Collection、List、Set、Map…
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ImmutableExample2 {

private static Map<Integer,Integer> map = Maps.newHashMap();

static {
map.put(1,2);
map.put(3,4);
map.put(5,6);
//创建final的map
map = Collections.unmodifiableMap(map);
}

public static void main(String[] args) {
//会抛出异常, map无法被修改
map.put(1,3);
log.info("{}",map.get(1));
}
}

将返回一个新的map,将数据拷贝过去,然后将所有更改数据转换为了抛出异常

1
2
3
public static <K,V> Map<K,V> unmodifiableMap(Map<? extends K, ? extends V> m) {
return new UnmodifiableMap<>(m);
}
  • Guava:ImmutableXXX:Collection、List、Set、Map…
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ImmutableExample3 {

private final static ImmutableList<Integer> list = ImmutableList.of(1,2,3);

private final static ImmutableSet set = ImmutableSet.copyOf(list);

private final static ImmutableMap<Integer,Integer> map = ImmutableMap.of(1,2,3,4);

private final static ImmutableMap<Integer,Integer> map2 = ImmutableMap.<Integer,Integer>builder()
.put(1,2).put(3,4).put(5,6).build();

public static void main(String[] args) {
// set.add(4);
// map2.put(1,4);
System.out.println(map2.get(3));
}
}

互斥同步

同步指多个线程并发访问共享数据时,保证共享数据在同一个时刻只被一个(或一些)线程使用。

互斥是实现同步的一种手段。

互斥同步最主要的问题是进行线程阻塞与唤醒带来的性能问题。悲观的并发策略。

  • 临界区、互斥量、信号量都是主要互斥实现方法。
  • synchronized关键字:需要系统帮助完成
  • J.U.C包下的重入锁,例如ReentrantLock

非阻塞同步

互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步。

互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。

CAS

随着硬件指令集的发展,我们可以使用基于冲突检测的乐观并发策略:先进行操作,如果没有其它线程争用共享数据,那操作就成功了,否则采取补偿措施(不断地重试,直到成功为止)。这种乐观的并发策略的许多实现都不需要将线程阻塞,因此这种同步操作称为非阻塞同步。

乐观锁需要操作和冲突检测这两个步骤具备原子性,这里就不能再使用互斥同步来保证了,只能靠硬件来完成。硬件支持的原子性操作最典型的是:比较并交换(Compare-and-Swap,CAS)。CAS 指令需要有 3 个操作数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操作时,只有当 V 的值等于 A,才将 V 的值更新为 B。

AtomicInteger

J.U.C 包里面的整数原子类 AtomicInteger 的方法调用了 Unsafe 类的 CAS 操作。

以下代码使用了 AtomicInteger 执行了自增的操作。

1
2
3
4
5
private AtomicInteger cnt = new AtomicInteger();

public void add() {
cnt.incrementAndGet();
}

以下代码是 incrementAndGet() 的源码,它调用了 Unsafe 的 getAndAddInt() 。

1
2
3
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

以下代码是 getAndAddInt() 源码,var1 指示对象内存地址,var2 指示该字段相对对象内存地址的偏移,var4 指示操作需要加的数值,这里为 1。通过 getIntVolatile(var1, var2) 得到旧的预期值,通过调用 compareAndSwapInt() 来进行 CAS 比较,如果该字段内存地址中的值等于 var5,那么就更新内存地址为 var1+var2 的变量为 var5+var4。

可以看到 getAndAddInt() 在一个循环中进行,发生冲突的做法是不断的进行重试。

1
2
3
4
5
6
7
8
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

ABA

如果一个变量初次读取的时候是 A 值,它的值被改成了 B,后来又被改回为 A,那 CAS 操作就会误认为它从来没有被改变过。

J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它可以通过控制变量值的版本来保证 CAS 的正确性。大部分情况下 ABA 问题不会影响程序并发的正确性,如果需要解决 ABA 问题,改用传统的互斥同步可能会比原子类更高效。

无同步方案

要保证线程安全,并不是一定就要进行同步。如果一个方法本来就不涉及共享数据,那它自然就无须任何同步措施去保证正确性。

线程封闭

访问共享的、可变的数据要求使用同步。一个可以避免同步的方法就是不共享数据,如果数据仅仅在单线程当中访问,则不需要任何同步。当对象封装在一个线程当中,则自动成为线程安全的。

  • Swing将事件分发到线程当中
  • JDBC从池中分配一个对象给线程。

线程封闭方法:

  • Ad-hoc线程封闭:程序控制实现,最糟糕。

    • 指维护线程限制性的任务全部落在实现上的情况
    • 确保只通过单一线程写入共享的volatile变量,则操作便是共享
  • 堆栈封闭:局部变量,无并发问题。

    • 是线程限制的特例,只能通过本地变量才可以触及对象。本地变量使得对象更容易被限制在线程本地中,本地变量本身就被限制在执行线程中,它们存在于执行线程栈。其他线程无法访问这个栈

    • 示例

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      public class StackClosedExample {
      public void add100() {
      int cnt = 0;
      for (int i = 0; i < 100; i++) {
      cnt++;
      }
      System.out.println(cnt);
      }
      }
      public static void main(String[] args) {
      StackClosedExample example = new StackClosedExample();
      ExecutorService executorService = Executors.newCachedThreadPool();
      executorService.execute(() -> example.add100());
      executorService.execute(() -> example.add100());
      executorService.shutdown();
      }
      100
      100
    • 例如方法当中的numPairs。在该方法当中,实例化的animals只有一个引用指向它,因此它保存在线程的栈当中,倘若发布了animals或其内部对象的引用,则破坏了限制,并导致了对象逸出

ThreadLocal

如果一段代码中所需要的数据必须与其他代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。如果能保证,我们就可以把共享数据的可见范围限制在同一个线程之内,这样,无须同步也能保证线程之间不出现数据争用的问题。

符合这种特点的应用并不少见,大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程尽量在一个线程中消费完。其中最重要的一个应用实例就是经典 Web 交互模型中的“一个请求对应一个服务器线程”(Thread-per-Request)的处理方式,这种处理方式的广泛应用使得很多 Web 服务端应用都可以使用线程本地存储来解决线程安全问题。

  • ThreadLocal线程封闭:特别好的封闭方法。
    • 内部维护了一个map,key是线程名称,值是对象
    • 更规范的方式,允许将每个线程与持有数值的对象关联在一起。ThradLocal提供了get和set,为每个使用它的线程维护一份单独的拷贝,所以get总是返回当前执行线程通过set设置的最新值。

ThreadLocal 提供了线程本地的实例。它与普通变量的区别在于,每个使用该变量的线程都会初始化一个完全独立的实例副本。ThreadLocal 变量通常被private static修饰。当一个线程结束时,它所使用的所有 ThreadLocal 相对的实例副本都可被回收。

总的来说,ThreadLocal 适用于每个线程需要自己独立的实例且该实例需要在多个方法中被使用,也即变量在线程间隔离而在方法或类间共享的场景

可以使用 java.lang.ThreadLocal 类来实现线程本地存储功能。

对于以下代码,thread1 中设置 threadLocal 为 1,而 thread2 设置 threadLocal 为 2。过了一段时间之后,thread1 读取 threadLocal 依然是 1,不受 thread2 的影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ThreadLocalExample {
public static void main(String[] args) {
ThreadLocal threadLocal = new ThreadLocal();
Thread thread1 = new Thread(() -> {
threadLocal.set(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadLocal.get());
threadLocal.remove();
});
Thread thread2 = new Thread(() -> {
threadLocal.set(2);
threadLocal.remove();
});
thread1.start();
thread2.start();
}
}
1

为了理解 ThreadLocal,先看以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThreadLocalExample1 {
public static void main(String[] args) {
ThreadLocal threadLocal1 = new ThreadLocal();
ThreadLocal threadLocal2 = new ThreadLocal();
Thread thread1 = new Thread(() -> {
threadLocal1.set(1);
threadLocal2.set(1);
});
Thread thread2 = new Thread(() -> {
threadLocal1.set(2);
threadLocal2.set(2);
});
thread1.start();
thread2.start();
}
}

底层原理

它所对应的底层结构图为:

img

每个 Thread 都有一个 ThreadLocal.ThreadLocalMap 对象。

1
2
3
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

当调用一个 ThreadLocal 的 set(T value) 方法时,先得到当前线程的 ThreadLocalMap 对象,然后将 ThreadLocal->value 键值对插入到该 Map 中。

1
2
3
4
5
6
7
8
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

get() 方法类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

ThreadLocal 从理论上讲并不是用来解决多线程并发问题的,因为根本不存在多线程竞争。

在一些场景 (尤其是使用线程池) 下,由于 ThreadLocal.ThreadLocalMap 的底层数据结构导致 ThreadLocal 有内存泄漏的情况,应该尽可能在每次使用 ThreadLocal 后手动调用 remove(),以避免出现 ThreadLocal 经典的内存泄漏甚至是造成自身业务混乱的风险。

可重入代码(Reentrant Code)

这种代码也叫做纯代码(Pure Code),可以在代码执行的任何时刻中断它,转而去执行另外一段代码(包括递归调用它本身),而在控制权返回后,原来的程序不会出现任何错误。

可重入代码有一些共同的特征,例如不依赖存储在堆上的数据和公用的系统资源、用到的状态量都由参数中传入、不调用非可重入的方法等。

基础构建模块

委托是创建线程安全的一个最有效的策略,只需让现有的线程安全类关联所有的状态即可。

同步容器

包括Vector和HashTable,这些类通过封装它们的状态,并对每一个公共方法进行同步实现了线程安全,这样一次只能有一个线程访问容器。通过对容器的所有状态串行访问实现的线程安全,削弱了并发性。

容器本身是线程安全的,无论有多少线程同时调用容器,也不会破坏容器。但是对于方法的调用者来说,当线程在并发地修改容器,最后得到的结果并不是所预期的结果。面对这种情况,需要对容器类自身进行加锁,synchronized(list),保证复合操作的原子性。因为它们的线程安全是相对线程安全

迭代

对容器进行迭代的时候,需要对容器进行加锁,防止容器数据更改,但是这么一来对并发的性能就大大下降,会在相当长时间加锁,甚至产生死锁。一个解决办法是复制容器,因为是存在线程当中,但是在复制过程中依然需要加锁,而且占用空间。

正如封装一个对象的状态,能够使它更容易地保持不变约束一样,封装它的同步则可以迫使它符合同步策略

隐藏迭代器:有些迭代器是隐藏的,比如toString方法,hashCode方法,equals方法都会对容器进行迭代

ConcurrentModificationException

当对Vector等容器进行迭代时,如果有并发的线程进行修改,则会表现出及时失败,即当它们发现容器在迭代过程中被修改,就会抛出一个ConcurrentModificationException

及时失败的迭代器只是善意地捕获并发错误,因此只能作为并发问题的预警指示器。其实现时将计数器与容器关联起来,如果在迭代期间计数器被修改,那么hasNextnext会抛出ConcurrentModificationException

并发容器

为多线程的并发访问而设计。

  • ConcurrentHashMap代替同步的HashMap
  • ConcurrentMap接口增加了常见复合操作的支持
  • ConcurrentSkipListMap代替同步的SortedMap
  • ConcurrentSkipListSet代替同步的SortedSet
  • 当多数操作为读取,CopyOnWriteArrayList是List的同步
  • ConcurrentLinkedQueue,传统的先进先出队列
  • BlockingQueue,增加了可阻塞的插入和获取等操作

用并发容器替换同步容器,这种做法以有很小的风险带来了可扩展性显著提高

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已满,则put方法将阻塞直到有空间可用;如果队列为空,则take将阻塞直到有元素可用。

put方法的阻塞特性,当队列充满,生产者将阻塞并不能继续生成工作。offer方法的阻塞特性,如果数据项不能添加到队列中,将返回一个失败状态,这样可以创建更多灵活的策略处理符合过载的情况。例如将多余的工作项序列化写入磁盘、减少生产者线程、通过某种方式抑制生产者线程。

构建高可靠应用程序时,有界队列是一种强大的资源管理工具,它们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。

阻塞方法与中断方法

阻塞状态:blocked,waiting,Timed_waiting

中断:

  • thread.interrupt(),用于中断线程或者查询线程是否已经被中断。中断是一种协作机制,当A中断B只是要求B在执行到某个可以暂停的地方停止执行,并且前提是B愿意停下来。
  • 中断一般用于取消某个操作。

当代码中调用了一个将抛出InterruptedException异常的方法(即该方法是一个阻塞方法)时,自己的方法也就变成了阻塞方法,并且必须要处理对中断的响应。

  • 传递InterruptedException
  • 恢复中断,捕获异常,并通过调用当前线程的interrupt方法恢复中断,这样在调用栈中更高层的代码将看到引发了一个中断

构建高效且可伸缩的结果缓存

简单的缓存可能会将性能瓶颈转变为可伸缩性瓶颈

first

1565139616505

1565139616505

  • Compute是一个需要很长事件计算结果的方法。Memorizerl是包装器,并将缓存的结果保存。

为确保线程安全使用了synchronized,但是带来了明显的伸缩性问题。

second

使用ConcurrentHashMap改进HashMap,避免了compute方法同步带来的串行性。

但是在两个线程同时调用compute时,可能导致计算得到相同的值,因为从缓存获取值与计算并不是一个原子操作。

final

使用FutureTask,表示一个计算的过程,即将Map修改为ConcurrentHashMap<A,Future<V>>

  • 会先检查某个相应的结果是否以及开始,如果没有启动就创建一个FutureTask并注册到Map,然后计算。
  • 如果已经启动,则等待现有的计算结果。

使用putIfAbsent实现原子操作。

参考