/** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L;
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
countDown方法调用Sync中releaseShared()方法
1 2 3
public void countDown() { sync.releaseShared(1); }
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero //使用自旋的方式实现state-1 for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } //PROPAGATE的节点状态,表示处于共享模式,会对线程的唤醒进行传播 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //如果前面唤醒的线程占领了head,那么再进行循环,通过头节点检查是否改变了,如果改变了就继续循环 if (h == head) // loop if head changed break; } }
在线程被唤醒的执行顺序中:
h == head 表示头节点还没有被使用;
unparkSuccessor(h) 表示唤醒的线程;
h != head 表示头节点被刚刚唤醒的线程占用。
1.2.2 await()
1 2 3
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) { super(permits); }
protected int tryAcquireShared(int acquires) { for (;;) { //判断是否有线程在排队,然后再进行CAS操作 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
2.2.2 NonFairSync 非公平策略
公平与非公平策略只是多了个hasQueuedPredecessors()判断。
1 2 3 4 5 6 7 8 9 10 11
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) { super(permits); }
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
nonfairTryAcquireShared()方法源码:
1 2 3 4 5 6 7 8 9
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
public final void await() throws InterruptedException { //允许线程中断 if (Thread.interrupted()) throw new InterruptedException(); //创建一个状态为condition的节点,采用链表的形式存放数据 Node node = addConditionWaiter(); //释放当前的锁,得到锁的状态,释放等待队列中的一个线程 int savedState = fullyRelease(node); int interruptMode = 0; //判断当前节点是或否在队列上 while (!isOnSyncQueue(node)) { //挂起当前线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //acquireQueued为false就拿到了锁 //interruptMode != THROW_IE表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //将这个变量设置成 REINTERRUPT interruptMode = REINTERRUPT; //如果node节点的下一个等待者不为空,则开始进行清理,清理condition节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); //如果线程中断了,需要抛出异常 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
addConditionWaiter()源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. //如果lastWaiter不等于空并且waitStatus不为condition,把这个节点从链表中移除 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //创建一个状态为condition的单向列表 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
fullyRelease()方法源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
final int fullyRelease(Node node) { boolean failed = true; try { //获得重入的次数 int savedState = getState(); //释放并唤醒同步队列中的线程 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
isOnSyncQueue()方法源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
final boolean isOnSyncQueue(Node node) { //判断当前节点是否在队列中,false表示不在,true表示在 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ /从tail节点往前扫描AQS队列,如果发现AQS队列中的节点与当前节点相等,则说明节点一定存在与队列中 return findNodeFromTail(node); }
4.2.2 signal()
调用此方法,将会唤醒在AQS队列中的节点
1 2 3 4 5 6 7 8 9
public final void signal() { //判断当前线程是否获得了锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //AQS队列的第一个节点 Node first = firstWaiter; if (first != null) doSignal(first); }
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //更新节点状态为0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //调用 enq,把当前节点添加到AQS队列。并且返回返回按当前节点的上一个节点,也就是原tail 节点 Node p = enq(node); int ws = p.waitStatus; //如果上一个节点被取消了,尝试设置上一节点状态为SIGNAL if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //唤醒节点上的线程 LockSupport.unpark(node.thread); return true; }