Lock and Condition

隐藏在并发包中的管程

Java SDK并发包中内容丰富,但最核心的还是其对管程的实现,理论上利用管程,几乎可以实现并发包中的所有工具类。

我们知道并发编程领域里,两大核心问题:互斥和同步,这两大问题管程都是可以解决的,Java SDK并发包通过Lock和Condition两个接口实现管程,其中Lock用于解决互斥问题,Condition用于解决同步问题。

首先考虑一个问题,就是Java 语言本身提供的 synchronized 也是管程的一种实现,既然 Java 从语言层面已经实现了管程了,那为什么还要在 SDK 里提供另外一种实现呢?

很显然synchronized与Lock有区别,理解这个问题有助于进一步深入。

再造管程的理由

在 Java 的 1.5 版本中,synchronized 性能不如 SDK 里面的 Lock,但 1.6 版本之后,synchronized 做了很多优化,将性能追了上来,所以 1.6 之后的版本又有人推荐使用 synchronized 了。

那么性能问题是否能作为理由呢,显然不能,因为性能问题是可以通过优化解决的。

还记得我们前面介绍的死锁问题,有一个破坏不可抢占条件方案,这个方案synchronized没有办法解决,原因是synchronized申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。但我们希望的是:

对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。

如果我们重新设计一把互斥锁去解决这个问题,那该怎么设计呢?我觉得有三种方案。

  1. 能够响应中断。
    • synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
  2. 支持超时。
    • 如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  3. 非阻塞地获取锁。
    • 如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

这三种方案可以全面弥补 synchronized 的问题。到这里相信你应该也能理解了,这三个方案就是“重复造轮子”的主要原因,体现在 API 上,就是 Lock 接口的三个方法。详情如下:

1
2
3
4
5
6
7
8
// 支持中断的API
void lockInterruptibly()
throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit)
throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();

如何保证可见性

Java SDK 里面 Lock 的使用,有一个经典的范例,就是try{}finally{},在 finally 里面释放锁。其中可见性是怎么保证的呢? Java 里多线程的可见性是通过 Happens-Before 规则保证的。

synchronized 之所以能够保证可见性,也是因为有一条 synchronized 相关的规则:synchronized 的解锁 Happens-Before 于后续对这个锁的加锁。那 Java SDK 里面 Lock 靠什么保证可见性呢?

Q:线程 T1 对 value 进行了 +=1 操作,那后续的线程 T2 能够看到 value 的正确结果吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public void addOne() {
// 获取锁
rtl.lock();
try {
value+=1;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}

A:答案必须是肯定的。Java SDK 里面锁的实现非常复杂,它是利用了 volatile 相关的 Happens-Before 规则。Java SDK 里面的 ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值(简化后的代码如下面所示)。也就是说,在执行 value+=1 之前,程序先读写了一次 volatile 变量 state,在执行 value+=1 之后,又读写了一次 volatile 变量 state。根据相关的 Happens-Before 规则:

  1. 顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
  2. volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
  3. 传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
class SampleLock {
volatile int state;
// 加锁
lock() {
// 省略代码无数
state = 1;
}
// 解锁
unlock() {
// 省略代码无数
state = 0;
}
}

所以,后续线程T2能够看到value的正确结果。

什么是可重入锁

可重入锁,ReentrantLock,指的是线程可以重复获取同一把锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class X {
private final Lock rtl =
new ReentrantLock();
int value;
public int get() {
// 获取锁
rtl.lock(); ②
try {
return value;
} finally {
// 保证锁能释放
rtl.unlock();
}
}
public void addOne() {
// 获取锁
rtl.lock();
try {
value = 1 + get(); ①
} finally {
// 保证锁能释放
rtl.unlock();
}
}
}

当线程 T1 执行到 ① 处时,已经获取到了锁 rtl ,当在 ① 处调用 get() 方法时,会在 ② 再次对锁 rtl 执行加锁操作。此时,如果锁 rtl 是可重入的,那么线程 T1 可以再次加锁成功;如果锁 rtl 是不可重入的,那么线程 T1 此时会被阻塞。

除了可重入锁,可能你还听说过可重入函数,可重入函数怎么理解呢?指的是线程可以重复调用?

显然不是,所谓可重入函数,指的是多个线程可以同时调用该函数,每个线程都能得到正确结果;同时在一个线程内支持线程切换,无论被切换多少次,结果都是正确的。多线程可以同时执行,还支持线程切换,这意味着什么呢?线程安全啊。所以,可重入函数是线程安全的

公平锁与非公平锁

在使用 ReentrantLock 的时候,你会发现 ReentrantLock 这个类有两个构造函数,一个是无参构造函数,一个是传入 fair 参数的构造函数。

fair 参数代表的是锁的公平策略,如果传入 true 就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。

1
2
3
4
5
6
7
8
9
//无参构造函数:默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//根据公平策略参数创建锁
public ReentrantLock(boolean fair){
sync = fair ? new FairSync()
: new NonfairSync();
}

在介绍管程的时候我们知道了入口等待队列,锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。

  • 如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;

  • 如果是非公平锁,则不提供这个保证,有可能等待时间短的线程反而先被唤醒。

用锁的最佳实践

锁虽然能解决很多并发问题,但风险很高,可能会导致死锁,也可能影响性能。

并发大师 Doug Lea《Java 并发编程:设计原则与模式》一书中,推荐的三个用锁的最佳实践,它们分别是:

  1. 永远只在更新对象的成员变量时加锁
  2. 永远只在访问可变的成员变量时加锁
  3. 永远不在调用其他对象的方法时加锁

这三条规则,前两条估计你一定会认同,最后一条你可能会觉得过于严苛。

但是我还是倾向于你去遵守,因为调用其他对象的方法,实在是太不安全了,也许“其他”方法里面有线程 sleep() 的调用,也可能会有奇慢无比的 I/O 操作,这些都会严重影响性能。更可怕的是,“其他”类的方法可能也会加锁,然后双重加锁就可能导致死锁。

并发问题,本来就难以诊断,所以你一定要让你的代码尽量安全,尽量简单,哪怕有一点可能会出问题,都要努力避免。

Conclusion

Java SDK 并发包里的 Lock 接口里面的每个方法,你可以感受到,都是经过深思熟虑的。除了支持类似 synchronized 隐式加锁的 lock() 方法外,还支持超时、非阻塞、可中断的方式获取锁,这三种方式为我们编写更加安全、健壮的并发程序提供了很大的便利。

除了并发大师 Doug Lea 推荐的三个最佳实践外,你也可以参考一些诸如:减少锁的持有时间、减小锁的粒度等业界广为人知的规则,其实本质上它们都是相通的,不过是在该加锁的地方加锁而已。

思考

tryLock() 支持非阻塞方式获取锁,那么是否存在死锁问题呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Account {
private int balance;
private final Lock lock
= new ReentrantLock();
// 转账
void transfer(Account tar, int amt){
while (true) {
if(this.lock.tryLock()) {
try {
if (tar.lock.tryLock()) {
try {
this.balance -= amt;
tar.balance += amt;
} finally {
tar.lock.unlock();
}
}//if
} finally {
this.lock.unlock();
}
}//if
}//while
}//transfer
}

A:

  • 存在活锁。这个例子可以稍微改下,成功转账后应该跳出循环。加个随机重试时间避免活锁
  • 有可能活锁,A,B两账户相互转账,各自持有自己lock的锁,都一直在尝试获取对方的锁,形成了活锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Account {
private int balance;
private final Lock lock
= new ReentrantLock();
// 转账
void transfer(Account tar, int amt){
boolean flag = true;
while (flag) {
if(this.lock.tryLock(随机数,NANOSECONDS)) {
try {
if (tar.lock.tryLock(随机数,NANOSECONDS)) {
try {
this.balance -= amt;
tar.balance += amt;
flag = false;
} finally {
tar.lock.unlock();
}
}//if
} finally {
this.lock.unlock();
}
}//if
}//while
}//transfer
}

Dubbo如何用管程实现异步转同步?

在上一部分我们说到了 Java SDK 并发包里的 Lock 有别于 synchronized 隐式锁的三个特性:能够响应中断、支持超时和非阻塞地获取锁。

这一部分我们来了解Java SDK 并发包里的 Condition,Condition 实现了管程模型里面的条件变量。

Java语言内置的管程里只有一个条件变量,而Lock & Condition实现的管程是支持多个条件变量的。

在很多并发场景下,支持多个条件变量能够让我们的并发程序可读性更好,实现起来也更容易。例如,实现一个阻塞队列,就需要两个条件变量。

那如何利用两个条件变量快速实现阻塞队列呢?

一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class BlockedQueue<T>{
final Lock lock =
new ReentrantLock();
// 条件变量:队列不满
final Condition notFull =
lock.newCondition();
// 条件变量:队列不空
final Condition notEmpty =
lock.newCondition();

// 入队
void enq(T x) {
lock.lock();
try {
while (队列已满){
// 等待队列不满
notFull.await();
}
// 省略入队操作...
//入队后,通知可出队
notEmpty.signal();
}finally {
lock.unlock();
}
}
// 出队
void deq(){
lock.lock();
try {
while (队列已空){
// 等待队列不空
notEmpty.await();
}
// 省略出队操作...
//出队后,通知可入队
notFull.signal();
}finally {
lock.unlock();
}
}
}

不过需要注意的是,Lock和Condition实现的管程,线程等待和通知需要调用 await()、signal()、signalAll(),它们的语义和 wait()、notify()、notifyAll() 是相同的。但是不一样的是,Lock&Condition 实现的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 实现的管程里才能使用。如果一不小心在 Lock&Condition 实现的管程里调用了 wait()、notify()、notifyAll(),那程序可就彻底玩完了。

Java SDK 并发包里的 Lock 和 Condition 不过就是管程的一种实现而已,熟悉管程,Lock和Condition自然就熟练于心了。

下面我们看看Dubbo中,Lock和Condition是怎么用的。

首先需要了同步和异步:我们平时写的代码基本都是同步的。同步和异步的区别通俗点讲就是调用方法是否需要等到结果,如果需要等待结果就是同步,反之则是异步。

E.g.:有一个计算圆周率小数点后 100 万位的方法pai1M(),这个方法可能需要执行俩礼拜,如果调用pai1M()之后,线程一直等着计算结果,等俩礼拜之后结果返回,就可以执行 printf(“hello world”)了,这个属于同步;如果调用pai1M()之后,线程不用等待计算结果,立刻就可以执行 printf(“hello world”),这个就属于异步。

1
2
3
4
5
6
7
// 计算圆周率小说点后100万位 
String pai1M() {
//省略代码无数
}

pai1M()
printf("hello world")

同步是java代码默认的处理方式,如果你想让程序支持异步,可以通过以下方法实现:

  1. 调用方创建一个子线程,在子线程中执行方法调用,称为异步调用;
  2. 方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接return,这种方法一般称为异步方法。

Dubbo源码分析

异步的场景还是很多的,例如TCP协议本身就是异步的,工作中经常用到RPC调用,在TCP协议层面,发送完RPC请求后,线程是不会等待RPC的响应结果。

可是平时工作中的RPC调用大多数是同步的,这是因为有人帮忙做了异步转同步的事情,就比如RPC框架Dubbo,下面我们具体分析。

对于一个简单的RPC调用,默认情况下sayHello()方法是个同步方法,也就是说,执行service.sayHello(“dubbo”)的时候,线程会停下来等结果。

1
2
3
4
DemoService service = 初始化部分省略
String message =
service.sayHello("dubbo");
System.out.println(message);

如果此时你将调用线程dump出来的话,会是这个样子:

img

调用栈信息

你会发现调用线程阻塞了,线程状态是TIMED_WAITING,本来发送请求是异步的,但是调用线程却阻塞了,说明Dubbo帮我们做了异步转同步的事情。通过调用栈,能看到线程是阻塞在DefaultFuture.get()方法是哪个,可以推断,Dubbo异步转同步的功能应该是通过DefaultFuture这个类实现的。

不过为了理清前后关系,有必要分析调用DefaultFuture.get()之前发生了什么,Fubbolnvoler的108行调用了DefaultFuture.get(),这一行先调用了request(inv, timeout)方法,其实就是发送RPC请求,之后调用get()方法等待RPC返回结果。

1
2
3
4
5
6
7
8
9
public class DubboInvoker{
Result doInvoke(Invocation inv){
// 下面这行就是源码中108行
// 为了便于展示,做了修改
return currentClient
.request(inv, timeout)
.get();
}
}

DefaultFuture 这个类是很关键,对其做一定程度的精简,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 创建锁与条件变量
private final Lock lock
= new ReentrantLock();
private final Condition done
= lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() ||
cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC结果是否已经返回
boolean isDone() {
return response != null;
}
// RPC结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}

还记得我们的需求,当RPC返回结果之前,阻塞调用线程,让调用线程等待;当RPC返回结果后,唤醒调用线程,让调用线程重新执行。

其实这就是经典的等待-通知机制,大概就已经想到了管程的解决方案了,我们看看Dubbo是怎么实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

// 创建锁与条件变量
private final Lock lock
= new ReentrantLock();
private final Condition done
= lock.newCondition();

// 调用方通过该方法等待结果
Object get(int timeout){
long start = System.nanoTime();
lock.lock();
try {
while (!isDone()) {
done.await(timeout);
long cur=System.nanoTime();
if (isDone() ||
cur-start > timeout){
break;
}
}
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
return returnFromResponse();
}
// RPC结果是否已经返回
boolean isDone() {
return response != null;
}
// RPC结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
}

调用线程通过get()方法等待RPC返回结果这个方法里面,看到最熟悉的就是:调用lock()获取锁,在finally里调用unlock()释放锁;获取锁后,通过经典的在循环中调用await()方法来实现等待。

当RPC结果返回时,会调用doReceived()方法,这个方法里面,调用lock()获取锁,在finally里面调用unlock()释放锁,获取锁后通过调用signal()来通知调用线程,结果已经返回,不用继续等待了。

工作中需要异步处理的越来越多,其中有一个主要原因就是有些 API 本身就是异步 API。例如 websocket 也是一个异步的通信协议,如果基于这个协议实现一个简单的 RPC,你也会遇到异步转同步的问题。

现在很多公有云的 API 本身也是异步的,例如创建云主机,就是一个异步的 API,调用虽然成功了,但是云主机并没有创建成功,你需要调用另外一个 API 去轮询云主机的状态。如果你需要在项目内部封装创建云主机的 API,你也会面临异步转同步的问题,因为同步的 API 更易用。

Conclusion

Lock and Condition是管程的一种实现,能否用好Lock and Condition取决于管程模型理解得怎样。

Lock&Condition 实现的管程相对于 synchronized 实现的管程来说更加灵活、功能也更丰富。

了解原理比了解实现更能让你快速学好并发编程,关于 Java SDK 并发包里锁和条件变量是如何实现的,可以参考《Java 并发编程的艺术》一书的第 5 章《Java 中的锁》。

可以去查看DefaultFuture 源码,Dubbo源码

思考

DefaultFuture 里面唤醒等待的线程,用的是 signal(),而不是 signalAll(),是否合理呢?

A:

  • 不合理,会导致很多请求超时,看了源码是调用signalAll()
  • 我理解异步的本质是利用多线程提升性能,异步一定是基于一个新开的线程,从调用线程来看是异步的,但是从新开的那个线程来看,正是同步(等待)的,只是对于调用方而言这种同步是透明的。正所谓生活哪有什么岁月静好,只是有人替你负重前行。
  • 提到异步转同步,让我想到这两天看的zookeeper客户端源码,感觉应该也是这个机制,客户端同步模式下发送请求后会执行packet.wait,收到服务端响应后执行packet.notifyAll
  • DefaultFuture的某个实例并非单线程访问的,可能会有多个线程访问同一个,因此需要用SignalAll通知全部,避免没有通知到正确的线程(不知道DUBBO中DefaultFuture同一个实例会不会共享)