利用Semaphore实现一个限流器

Semaphore普遍翻译为信号量,编程世界里,线程能不能执行,要看信号量是不是允许。

信号量是由大名鼎鼎的计算机科学家迪杰斯特拉(Dijkstra)于 1965 年提出,在这之后的 15 年,信号量一直都是并发编程领域的终结者,直到 1980 年管程被提出来,我们才有了第二选择。目前几乎所有支持并发编程的语言都支持信号量机制。

信号量模型

简单概括为:一个计数器、一个等待队列、三个方法

在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()。

img

信号量模型图

这三个方法详细的语义具体如下:

  • init():设置计数器的初始值
  • down():计数器的值减1,;如果此时计数器的值小于0,则当前线程将被阻塞,否则当前线程可以继续执行;
  • up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

这里提到的 init()、down() 和 up() 三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。

在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。

给出一个参考信号量模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Semaphore{
// 计数器
int count;
// 等待队列
Queue queue;
// 初始化操作
Semaphore(int c){
this.count=c;
}
//
void down(){
this.count--;
if(this.count<0){
//将当前线程插入等待队列
//阻塞当前线程
}
}
void up(){
this.count++;
if(this.count<=0) {
//移除等待队列中的某个线程T
//唤醒线程T
}
}
}

信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语。

还有些人喜欢用 semWait() 和 semSignal() 来称呼它们,虽然叫法不同,但是语义都是相同的。在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 release()。

如何使用信号量

信号量的模型比较简单,关于信号量模型的使用,可以参考红绿灯,车辆在通过路口前必须检查是否是绿灯,只有绿灯才能通行。

信号量也是类似的,使用累加器的例子说明,在累加器的例子里面,count+=1操作是个临界区,只允许一个线程执行,也就是说要保证互斥,用信号量怎么控制呢?

就像用互斥锁一样,只要在进入临界区之前执行以下down()操作,退出临界区之前执行一下up()操作就可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static int count;
//初始化信号量
static final Semaphore s
= new Semaphore(1);
//用信号量保证互斥
static void addOne() {
//down()
s.acquire();
try {
count+=1;
} finally {
//up()
s.release();
}
}

信号量是如何保证互斥的呢?

假设两个线程 T1 和 T2 同时访问 addOne() 方法,当它们同时调用 acquire() 的时候,由于 acquire() 是一个原子操作,所以只能有一个线程(假设 T1)把信号量里的计数器减为 0,另外一个线程(T2)则是将计数器减为 -1。对于线程 T1,信号量里面的计数器的值是 0,大于等于 0,所以线程 T1 会继续执行;对于线程 T2,信号量里面的计数器的值是 -1,小于 0,按照信号量模型里对 down() 操作的描述,线程 T2 将被阻塞。所以此时只有线程 T1 会进入临界区执行count+=1;。

当线程 T1 执行 release() 操作,也就是 up() 操作的时候,信号量里计数器的值是 -1,加 1 之后的值是 0,小于等于 0,按照信号量模型里对 up() 操作的描述,此时等待队列中的 T2 将会被唤醒。于是 T2 在 T1 执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。

快速实现一个限流器

我们用信号量实现了一个最简单的互斥锁功能。

既然Java SDK里面提供了Lock,为啥还要提供一个 Semaphore ?

其实实现一个互斥锁,仅仅是 Semaphore 的部分功能,Semaphore 还有一个功能是 Lock 不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区。

现实中也是有这样的需求的,常见的就是工作中遇到的各种池化资源,如连接池、对象池、线程池等,其中,可能最熟悉的就是数据库连接池,同一时刻,一定时允许多个线程同时使用连接池的,当然,每个连接在释放前,是不允许其他线程使用的。

所谓对象池,指的是一次性创建出 N 个对象,之后所有的线程重复利用这 N 个对象,当然对象在被释放前,也是不允许其他线程使用的。

对象池,可以用 List 保存实例对象,这个很简单。但关键是限流器的设计,这里的限流,指的是不允许多于 N 个线程同时进入临界区。

那如何快速实现一个这样的限流器呢?这种场景,我立刻就想到了信号量的解决方案。

信号量的计数器,在上面的例子中,我们设置成了 1,这个 1 表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数 N,就能完美解决对象池的限流问题了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//对象池
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 创建对象池
ObjPool<Long, String> pool =
new ObjPool<Long, String>(10, 2);
// 通过对象池获取t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});

我们用一个 List来保存对象实例,用 Semaphore 实现限流器。

关键的代码是 ObjPool 里面的 exec() 方法,这个方法里面实现了限流的功能。

在这个方法里面,我们首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是 10,信号量的计数器初始化为 10,那么前 10 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在 acquire() 方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用 release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。

使用信号量,我们可以轻松地实现一个限流器,使用起来还是非常简单的

Conclusion

信号量在其他语言里相对于Java里更加有名,java在并发编程领域重点支持管程模型。

管程模型理论上解决了信号量模型的一些不足,主要体现在易用性和工程化方面,例如用信号量解决我们曾经提到过的阻塞队列问题,就比管程模型麻烦很多。

思考

Q:在上面对象池的例子中,对象保存在了 Vector 中,Vector 是 Java 提供的线程安全的容器,如果我们把 Vector 换成 ArrayList,是否可以呢?

A:

  • 和管程相比,信号量可以实现的独特功能就是同时允许多个线程进入临界区,但是信号量不能做的就是同时唤醒多个线程去争抢锁,只能唤醒一个阻塞中的线程,而且信号量模型是没有Condition的概念的,即阻塞线程被醒了直接就运行了而不会去检查此时临界条件是否已经不满足了,基于此考虑信号量模型才会设计出只能让一个线程被唤醒,否则就会出现因为缺少Condition检查而带来的线程安全问题。正因为缺失了Condition,所以用信号量来实现阻塞队列就很麻烦,因为要自己实现类似Condition的逻辑。
  • 需要用线程安全的vector,因为信号量支持多个线程进入临界区,执行list的add和remove方法时可能是多线程并发执行
  • 有同学认为up()中的判断条件应该>=0,我觉得有可能理解为生产者-消费者模式中的生产者了。可以这么想,>0就意味着没有阻塞的线程了,所以只有<=0的情况才需要唤醒一个等待的线程。其实down()和up()是成对出现的,并且是先调用down()获得锁,处理完成再调用up()释放锁,如果信号量初始值为1,应该是不会出现>0的情况的,除非故意调先用up(),这也失去了信号量本身的意义了。
  • 很多人对up()方法的计数器count<=0不理解,可以看下这里:
    1、反证法验证一下,假如一个线程先执行down()操作,那么此时count的值是0,接着这个线程执行up()操作,此时count的值是1,如果count应该是大于等于0,那么应该唤醒其他线程,可是此时并没有线程在睡眠呀,count的值不应该是大于等于0。
    2、假如一个线程t1执行down()操作,此时count = 0,然后t1被中断,另外的线程t2执行down()操作,此时count=-1,t2阻塞睡眠,另外的线程t3执行down()操作,count=-2,t3也睡眠。count=-2 说明有两个线程在睡眠,接着t1执行up() 操作,此时count=-1,小于等于0,唤醒t2或者t3其中一个线程,假如计数器count是大于等于0才唤醒其他线程,这明显是不对的。
  • 换ArrayList是不行的,临界区内可能存在多个线程来执行remove操作,出现不可预知的后果