CountDownLatch、Semaphore、CyclicBarrier、Condition源码分析

一、CountDownLatch

1.1 定义

它是一个同步辅助类,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。一个倒数计算的概念。
初始化给定一定的整数参数值,然后通过countDown()来实现倒数功能,在这个整数倒数到 0 之前,调用了 await() 方法的程序都必须要等待,当到达0后, 释放所有等待线程。

1.2 源码分析

对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。

1.2.1 countDown()

CountDownLatch有一个同步内部类Sync
它使用AQS状态表示计数,实现同步控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

countDown方法调用Sync中releaseShared()方法

1
2
3
public void countDown() {
sync.releaseShared(1);
}

所有的线程由于调用了await()方法阻塞了,只能等到countDown()使得state=0的时候才会被全部唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
//使用自旋的方式实现state-1
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

当state递减为0的时候,tryReleaseShared才返回true;否则只是返回state-1的值;
如果state=0,调用doReleaseShared()方法,唤醒等待的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//PROPAGATE的节点状态,表示处于共享模式,会对线程的唤醒进行传播
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果前面唤醒的线程占领了head,那么再进行循环,通过头节点检查是否改变了,如果改变了就继续循环
if (h == head) // loop if head changed
break;
}
}

在线程被唤醒的执行顺序中:

  • h == head 表示头节点还没有被使用;
  • unparkSuccessor(h) 表示唤醒的线程;
  • h != head 表示头节点被刚刚唤醒的线程占用。

1.2.2 await()

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

如果state<0,那么当前线程需要加入到共享锁队列中,执行doAcquireSharedInterruptibly()方法。

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//SHARED为共享模式,创建一个共享模式的节点到队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//尝试获取锁
int r = tryAcquireShared(arg);
//获得了锁并且state!=0,下面的代码则不会执行
if (r >= 0) {
//把唤醒的节点,设置成head节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

把唤醒的节点,设置成head节点,当第一个线程被唤醒后,并设置为head节点,依次会唤醒第二个线程……

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

二、Semaphore

2.1 定义

从单词的意思理解为信号灯,它可以控制同时访问程序的线程个数,比如停车场总共有100个车位,那么这时一下子来了150辆车需要停放在此停车场,必须要等到停车场有空余的位置才能让停满剩下的车进入此停车场。使用场景可用于限流。

两个重要的方法,acquire()获取一个许可,release()释放一个许可。

2.2 源码分析

2.2.1 FairSync 公平策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
//判断是否有线程在排队,然后再进行CAS操作
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

2.2.2 NonFairSync 非公平策略

公平与非公平策略只是多了个hasQueuedPredecessors()判断。

1
2
3
4
5
6
7
8
9
10
11
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

nonfairTryAcquireShared()方法源码:

1
2
3
4
5
6
7
8
9
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

其它源码和 CountDownLatch 的是完全一样,都是基于共享锁的实现的。

三、CyclicBarrier

3.1 定义

从单词组成的意思理解为循环屏障。所谓屏障就是一个同步点,当一组线程到达这个同步点的时候被阻塞了,只有最后一个线程到达这个同步点的时候,屏障(也就是同步点)的大门才会打开,所有被拦截在大门之外的线程才会进入大门而继续工作。可以适用的场景于所有的子线程完成任务后,再执行主线程。

3.2 源码分析

1
2
3
4
5
6
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

构造方法参数parties,表示参与线程的个数;
每一个线程调用await()方法后,parties递减1,穿过屏障的大门(栅栏)后重置。
第二个参数barrierAction为Runnable实例,由最后一个到达的线程进行执行,如果没有需要执行的,设置为null。

四、Condition

4.1 定义

它是一个用来多线程的协调通信的工具类,当某个线程阻塞等待某个条件时,当满足条件才会被唤醒。

4.2 源码分析

两个重要方法,await()和signal()。

4.2.1 await()

调用此方法会使得线程进入等待队列并释放锁,线程的状态变成等待状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final void await() throws InterruptedException {
//允许线程中断
if (Thread.interrupted())
throw new InterruptedException();
//创建一个状态为condition的节点,采用链表的形式存放数据
Node node = addConditionWaiter();
//释放当前的锁,得到锁的状态,释放等待队列中的一个线程
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断当前节点是或否在队列上
while (!isOnSyncQueue(node)) {
//挂起当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//acquireQueued为false就拿到了锁
//interruptMode != THROW_IE表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
//将这个变量设置成 REINTERRUPT
interruptMode = REINTERRUPT;
//如果node节点的下一个等待者不为空,则开始进行清理,清理condition节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//如果线程中断了,需要抛出异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
  • addConditionWaiter()源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//如果lastWaiter不等于空并且waitStatus不为condition,把这个节点从链表中移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建一个状态为condition的单向列表
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
  • fullyRelease()方法源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获得重入的次数
int savedState = getState();
//释放并唤醒同步队列中的线程
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
  • isOnSyncQueue()方法源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final boolean isOnSyncQueue(Node node) {
//判断当前节点是否在队列中,false表示不在,true表示在
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
/从tail节点往前扫描AQS队列,如果发现AQS队列中的节点与当前节点相等,则说明节点一定存在与队列中
return findNodeFromTail(node);
}

4.2.2 signal()

调用此方法,将会唤醒在AQS队列中的节点

1
2
3
4
5
6
7
8
9
public final void signal() {
//判断当前线程是否获得了锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//AQS队列的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
  • doSignal()方法的源码:
1
2
3
4
5
6
7
8
9
private void doSignal(Node first) {
do {
//从condition队列中移除first节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
  • transferForSignal()方法的源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//更新节点状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//调用 enq,把当前节点添加到AQS队列。并且返回返回按当前节点的上一个节点,也就是原tail 节点
Node p = enq(node);
int ws = p.waitStatus;
//如果上一个节点被取消了,尝试设置上一节点状态为SIGNAL
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//唤醒节点上的线程
LockSupport.unpark(node.thread);
return true;
}
  • 阻塞:await()方法中,在线程释放锁资源之后,如果节点不在AQS等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁;
  • 释放:signal()后,节点会从condition队列移动到AQS等待队列,则进入正常锁的获取流程。