static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
/** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus;
/** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev;//前驱节点
/** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next;//后继节点
/** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread;//当前线程
/** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter;//存储在condition队列中的后继节点
/** * Returns true if node is waiting in shared mode. */ //是否为共享锁 final boolean isShared() { return nextWaiter == SHARED; }
/** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
Node() { // Used to establish initial head or SHARED marker } //将线程组装成一个Node,添加到队列中 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } //在condition队列中进行使用 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L;
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { //对于非公平锁,一开始就CAS抢占一下 //如果CAS成功了,就表示获得了锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else//如果CAS失败了,调用acquire()方法走竞争逻辑 acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
4.3.1 CAS的实现原理
1 2 3 4
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//自旋方式获得锁 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
4.4.1 NonfairSync.tryAcquire()
这个方法的作用是尝试获取锁,如果成功返回 true,不成功返回 false。
1 2 3
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
下面来看此方法的具体实现:
获得当前线程,判断当前锁的状态;
如果state=0表示无锁状态,通过CAS更新state状态的值;
如果当前线程属于重入,则增加重入次数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前置节点 int ws = pred.waitStatus; //如果前置节点为 SIGNAL,意味着只需要等待其他前置节点的线程被释放 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ //返回true,可以放心挂起了 return true; //ws 大于 0,意味着 prev 节点取消了排队,直接移除这个节点就行 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { //相当于: pred=pred.prev;node.prev=pred; node.prev = pred = pred.prev; } while (pred.waitStatus > 0);//这里采用循环,从双向列表中移除 CANCELLED 的节点 pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //利用 cas 设置 prev 节点的状态为 SIGNAL(-1) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
Node的状态有5种,默认状态是0,以下是其它四种状态:
1 2 3 4 5 6 7 8
//在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该 Node 的结点, 其结点的 waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化 static final int CANCELLED = 1; //只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程 static final int SIGNAL = -1; //表示该线程在condition队列中阻塞 static final int CONDITION = -2; //共享模式下,PROPAGATE 状态的线程处于可运行状态 static final int PROPAGATE = -3;
4.6.2 parkAndCheckInterrupt()
使用LockSupport.park(this)挂起当前线程为WAITING状态
1 2 3 4
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) UnsafeWrapper("Unsafe_Unpark"); Parker* p = NULL; if (jthread != NULL) { oop java_thread = JNIHandles::resolve_non_null(jthread); if (java_thread != NULL) { jlong lp = java_lang_Thread::park_event(java_thread); if (lp != 0) { // This cast is OK even though the jlong might have been read // non-atomically on 32bit systems, since there, one word will // always be zero anyway and the value set is always the same p = (Parker*)addr_from_java(lp); } else { // Grab lock if apparently null or using older version of library MutexLocker mu(Threads_lock); java_thread = JNIHandles::resolve_non_null(jthread); if (java_thread != NULL) { JavaThread* thr = java_lang_Thread::thread(java_thread); if (thr != NULL) { p = thr->parker(); if (p != NULL) { // Bind to Java thread for next time. java_lang_Thread::set_park_event(java_thread, addr_to_java(p)); } } } } } } if (p != NULL) { #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, thread__unpark, p); #else /* USDT2 */ HOTSPOT_THREAD_UNPARK( (uintptr_t) p); #endif /* USDT2 */ p->unpark(); } UNSAFE_END
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ //获得head节点的状态 int ws = node.waitStatus; if (ws < 0) //设置head节点的状态为0 compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //得到head节点的下一个节点 Node s = node.next; //如果下一个节点为 null 或者 status>0 表示 cancelled 状态 if (s == null || s.waitStatus > 0) { s = null; //通过从尾部节点开始扫描,找到距离 head 最近的一个waitStatus<=0 的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //next 节点不为空,直接唤醒这个线程即可 if (s != null) LockSupport.unpark(s.thread); }
4.7.3 为什么释放锁的时候是从tail节点开始扫描的?
我们在加锁的enq()方法中,在 cas 操作之后,t.next=node 操作之前。 存在其他线程调用 unlock 方法从 head开始往后遍历,由于 t.next=node 还没执行意味着链表的关系还没有建立完整。就会导致遍历到 t 节点的时候被中断。所以从后往前遍历,一定不会存在这个问题。