/** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ //返回同步状态的当前值 protectedfinalintgetState() { return state; }
/** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ // 设置同步状态的值 protectedfinalvoidsetState(int newState) { state = newState; }
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ //原子地(CAS操作)将同步状态值设置为给定值update, 如果当前同步状态的值等于expect(期望值) protectedfinalbooleancompareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
// 结点的数据结构 staticfinalclassNode { /** Marker to indicate a node is waiting in shared mode */ // 表示该节点等待模式为共享式,通常记录于nextWaiter, // 通过判断nextWaiter的值可以判断当前结点是否处于共享模式 staticfinalNodeSHARED=newNode(); /** Marker to indicate a node is waiting in exclusive mode */ // 表示节点处于独占式模式,与SHARED相对 staticfinalNodeEXCLUSIVE=null;
//waitStatus的不同状态,具体内容见下文的表格 /** waitStatus value to indicate thread has cancelled */ staticfinalintCANCELLED=1; /** waitStatus value to indicate successor's thread needs unparking */ staticfinalintSIGNAL= -1; /** waitStatus value to indicate thread is waiting on condition */ staticfinalintCONDITION= -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ staticfinalintPROPAGATE= -3; volatileint waitStatus;
/** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ // 用于记录共享模式(SHARED), 也可以用来记录CONDITION队列 Node nextWaiter; // 通过nextWaiter的记录值判断当前结点的模式是否为共享模式 finalbooleanisShared() { return nextWaiter == SHARED; }
// 获取当前结点的前置结点 final Node predecessor()throws NullPointerException { Nodep= prev; if (p == null) thrownewNullPointerException(); else return p; }
// 用于初始化时创建head结点或者创建SHARED结点 Node() { } // 在addWaiter方法中使用,用于创建一个新的结点 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } // 在CONDITION队列中使用该构造函数新建结点 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ // 记录头结点 privatetransientvolatile Node head;
/** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ // 记录尾结点 privatetransientvolatile Node tail;
等待条件状态,表示当前节点在等待 condition,即在 condition 队列中。当其他线程对 Condtion 调用了 signal 方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中
PROPAGATE
-3
状态需要向后传播,表示 releaseShared 需要被传播给后续节点,仅在共享锁模式下使用
3 AQS 对资源的共享方式
线程同步的关键是对 state 进行操作,根据 state 是否属于一个线程,操作 state 的方式有两种模式。 a. 独占模式「Exclusive」:只有一个线程能执行。使用独占的方式获取的资源是与具体线程绑定的,如果一个线程获取到了资源,便标记这个线程已经获取到,其他线程再次尝试操作 state 获取资源时就会发现当前该资源不是自己持有的,就会在获取失败后阻塞。又可分为公平锁和非公平锁:
公平锁:按照线程在队列中的排队顺序获取锁;
非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的,所以非公平锁效率较高;
b 共享模式「 Share 」:多个线程可同时执行。对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过 CAS 方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用 CAS 方式进行获取即可。
4 AQS 的设计模式
AQS 同步器的设计是基于模板方法模式。使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。实现对于共享资源 state 的获取和释放。
/** Use serial ID even though all fields transient. */ // 版本序列号 privatestaticfinallongserialVersionUID=3737899427754241961L;
/** * Empty constructor for use by subclasses. */ protectedAbstractOwnableSynchronizer() { }
/** * The current owner of exclusive mode synchronization. */ // 独占模式下的线程 privatetransient Thread exclusiveOwnerThread;
/** * Sets the thread that currently owns exclusive access. * A {@code null} argument indicates that no thread owns access. * This method does not otherwise impose any synchronization or * {@code volatile} field accesses. * @param thread the owner thread */ // 设置独占线程 protectedfinalvoidsetExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
/** * Returns the thread last set by {@code setExclusiveOwnerThread}, * or {@code null} if never set. This method does not otherwise * impose any synchronization or {@code volatile} field accesses. * @return the owner thread */ // 获取独占线程 protectedfinal Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
6 核心方法介绍
此文档中,先介绍 AQS 中核心的方法,在使用中,进行串联。
acquire
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ publicfinalvoidacquire(int arg) { // 如果线程直接获取成功,或者再尝试获取成功后都是直接工作, // 如果是从阻塞状态中唤醒开始工作的线程,将当前的线程中断 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ // 当获取(资源)失败后,检查并且更新结点状态 privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node) { intws= pred.waitStatus; // 获取前驱结点的状态 if (ws == Node.SIGNAL) // 如果前驱结点的状态为SIGNAL,那么当前的结点应该阻塞(SIGNAL定义) /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; // 可以进行park操作 if (ws > 0) { // 如果前驱结点已经为取消状态(CANCELLED 1 > 0)则向前遍历,直到找到一个有效的结点 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点 pred.next = node; // 将当前结点与新找到的前驱结点连接 } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 到这里的结点状态可能为:CONDITION 和 PROPAGATE,比较并设置前驱结点的状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; // 不能进行park操作 }
parkAndCheckInterrupt
parkAndCheckInterrupt 方法里的逻辑是首先执行 park 操作,即禁用当前线程,然后返回该线程是否已经被中断。
1 2 3 4 5 6 7 8 9 10 11
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ // 进行park操作并且返回该线程是否被中断 privatefinalbooleanparkAndCheckInterrupt() { // 在许可可用之前禁用当前线程,并且设置了blocker LockSupport.park(this); return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位 }
/** * Cancels an ongoing attempt to acquire. * * @param node the node */ // 取消线程继续获取资源 privatevoidcancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) // node为空,返回 return;
node.thread = null; // 设置node结点的thread为空
// Skip cancelled predecessors Nodepred= node.prev; // 保存node的前驱结点 while (pred.waitStatus > 0) //若前驱结点为CANCELLED转态,继续向前遍历,找到第一个状态小于0的结点 node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. NodepredNext= pred.next;// 获取pred结点的下一个结点
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED;// 设置当前node结点的状态为CANCELLED
// If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { //如果node结点为尾结点,则设置尾结点为pred结点 compareAndSetNext(pred, predNext, null); // 比较并设置pred结点的next节点为null } else { // node结点不为尾结点,或者比较设置新的尾结点不成功 // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || // pred结点不为头结点,并且pred结点的状态为SIGNAL (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 或者pred结点状态小于等于0,并且比较并设置等待状态为SIGNAL成功 pred.thread != null) { // 并且pred结点所封装的线程不为空 Nodenext= node.next; // 保存node结点的后继结点 if (next != null && next.waitStatus <= 0) // 后继不为空并且后继的状态小于等于0 compareAndSetNext(pred, predNext, next); // 比较并设置pred.next = next; } else { unparkSuccessor(node); // 唤醒当前node结点的后继结点 }
/** * Wakes up node's successor, if one exists. * * @param node the node */ // 唤醒当前node结点的后继结点 privatevoidunparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ intws= node.waitStatus; // 获取node结点的等待状态 if (ws < 0) // 状态值小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3 compareAndSetWaitStatus(node, ws, 0); // 比较并且设置结点等待状态,设置为INITAL 0
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Nodes= node.next; // 获取node的后继结点 if (s == null || s.waitStatus > 0) { //如果node的后继结点为空,或者结点等待状态CANCELLED 1 s = null; // s赋值为空 for (Nodet= tail; t != null && t != node; t = t.prev) // 从尾结点开始从后往前开始遍历 if (t.waitStatus <= 0)// 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点 s = t; // 保存结点 } if (s != null) // 该结点不为为空,释放许可 LockSupport.unpark(s.thread); }
release
以独占模式释放对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ publicfinalbooleanrelease(int arg) { if (tryRelease(arg)) { // 释放成功(自定义实现) Nodeh= head; // 保存头结点 if (h != null && h.waitStatus != 0) // 头结点不为空并且头结点状态不为0 unparkSuccessor(h); //释放头结点的后继结点 returntrue; } returnfalse; }
/** * Attempts to set the state to reflect a release in exclusive * mode. * * <p>This method is always invoked by the thread performing release. * * <p>The default implementation throws * {@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this object is now in a fully released * state, so that any waiting threads may attempt to acquire; * and {@code false} otherwise. * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protectedbooleantryRelease(int arg) { thrownewUnsupportedOperationException(); }
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ finalbooleantransferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//将结点从CONDITION状态改为初始化状态 returnfalse;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Nodep= enq(node); // 结点状态改为0,并且加入资源队列,p为node的前置结点 intws= p.waitStatus; // 获取p的状态 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果前置结点为CANCELLED状态 或者 状态设置为SIGNAL失败 LockSupport.unpark(node.thread); // 将node结点解锁 returntrue; }
/** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ finalbooleanisOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) // 如果状态是CONDITION 或者 结点是队列第一个结点 (不在CLH队列) returnfalse; //如果当前节点有next指针(next指针只在CLH队列中的节点有,条件队列中的节点是nextWaiter)的话,就返回true (Condition队列中 下个结点是用 nextWaiter) if (node.next != null) // If has successor, it must be on queue returntrue; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ return findNodeFromTail(node); //如果上面无法快速判断的话,就只能从CLH队列中进行遍历,一个一个地去进行判断了 }
findNodeFromTail
从 CLH 队列的最后结点向前遍历,依次判断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ privatebooleanfindNodeFromTail(Node node) { Nodet= tail; for (;;) { if (t == node) returntrue; if (t == null) returnfalse; t = t.prev; } }
/** * Transfers node, if necessary, to sync queue after a cancelled wait. * Returns true if thread was cancelled before being signalled. * * @param node the node * @return true if cancelled before the node was signalled */ finalbooleantransferAfterCancelledWait(Node node) { // 尝试使用CAS操作将node(CONDITION状态) 的ws设置为0 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); // 设置成功后,进入CLH队列 returntrue; // 返回 } /* * If we lost out to a signal(), then we can't proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ while (!isOnSyncQueue(node)) // 判断节点是否在CLH队列中(不在队列中) Thread.yield(); returnfalse; // 返回 }
7 扩展
Condition 接口
Contition 是一种广义上的条件队列,它利用 await () 和 signal () 为线程提供了一种更为灵活的等待 / 通知模式。
publicclassConditionObjectimplementsCondition, java.io.Serializable { // 序列化版本号 privatestaticfinallongserialVersionUID=1173984872572414699L; /** First node of condition queue. */ // condition队列的头结点 privatetransient Node firstWaiter; /** Last node of condition queue. */ // condition队列的尾结点 privatetransient Node lastWaiter;
// 构造方法 publicConditionObject() { }
/* * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire. */
/** Mode meaning to reinterrupt on exit from wait */ privatestaticfinalintREINTERRUPT=1; /** Mode meaning to throw InterruptedException on exit from wait */ privatestaticfinalintTHROW_IE= -1;
/** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ privateintcheckInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
/** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ privatevoidreportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) thrownewInterruptedException(); elseif (interruptMode == REINTERRUPT) selfInterrupt(); }
/** * Implements absolute timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ publicfinalbooleanawaitUntil(Date deadline) throws InterruptedException { longabstime= deadline.getTime(); if (Thread.interrupted()) thrownewInterruptedException(); Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); booleantimedout=false; intinterruptMode=0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
/** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ publicfinalbooleanawait(long time, TimeUnit unit) throws InterruptedException { longnanosTimeout= unit.toNanos(time); if (Thread.interrupted()) thrownewInterruptedException(); Nodenode= addConditionWaiter(); intsavedState= fullyRelease(node); finallongdeadline= System.nanoTime() + nanosTimeout; booleantimedout=false; intinterruptMode=0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
// support for instrumentation
/** * Returns true if this condition was created by the given * synchronization object. * * @return {@code true} if owned */ finalbooleanisOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; }
/** * Queries whether any threads are waiting on this condition. * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. * * @return {@code true} if there are any waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protectedfinalbooleanhasWaiters() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); for (Nodew= firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) returntrue; } returnfalse; }
/** * Returns an estimate of the number of threads waiting on * this condition. * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. * * @return the estimated number of waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protectedfinalintgetWaitQueueLength() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); intn=0; for (Nodew= firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; }
/** * Returns a collection containing those threads that may be * waiting on this Condition. * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. * * @return the collection of threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protectedfinal Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); ArrayList<Thread> list = newArrayList<Thread>(); for (Nodew= firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Threadt= w.thread; if (t != null) list.add(t); } } return list; } }
/** * Adds a new waiter to wait queue. * @return its new wait node */ // 添加新的waiter到wait队列 private Node addConditionWaiter() { // 保存尾结点 Nodet= lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) {// 尾结点不为空,并且尾结点的状态不为CONDITION unlinkCancelledWaiters(); // 清除状态不为CONDITION(为CANCELLED)的结点 t = lastWaiter; } Nodenode=newNode(Thread.currentThread(), Node.CONDITION); // 新建一个结点 if (t == null) // 尾结点为空 firstWaiter = node; // 设置condition队列的头结点 else// 尾结点不为空,说明队列不为空 t.nextWaiter = node; // 设置为节点的nextWaiter域为node结点 lastWaiter = node; // 更新node结点为condition队列的尾结点 return node; }
doSignal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ // 把Condition队列第一个不为空的结点,转移到Sync结点 privatevoiddoSignal(Node first) { do { // 循环 if ( (firstWaiter = first.nextWaiter) == null) // 该节点的nextWaiter为空 lastWaiter = null; // 设置尾结点为空 first.nextWaiter = null; // 设置first结点的nextWaiter域 } while (!transferForSignal(first) && // 将结点从condition队列转移到sync队列失败 (first = firstWaiter) != null); // 并且condition队列中的头结点不为空,一直循环 }
doSignalAll
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ // 将CONDITION队列中的全部结点移除并加入到同步队列中竞争同步状态 privatevoiddoSignalAll(Node first) { // condition队列的头结点尾结点都设置为空 lastWaiter = firstWaiter = null; do {// 循环 Nodenext= first.nextWaiter; // 获取first结点的nextWaiter域结点 first.nextWaiter = null; // 设置first结点的nextWaiter域为空(切断第一个结点) transferForSignal(first); // 将first结点从condition队列转移到sync队列 first = next; // 重新设置first } while (first != null); }
/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ // 清除condition队列中,状态为为CANCELLED的结点 privatevoidunlinkCancelledWaiters() { Nodet= firstWaiter; // 保存condition队列头结点 Nodetrail=null; while (t != null) { // t不为空 Nodenext= t.nextWaiter; // 下一个结点 if (t.waitStatus != Node.CONDITION) { // t结点的状态不为CONDTION状态(为CANCELLED) t.nextWaiter = null; // 设置t节点的nextWaiter为空 if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else// t结点的状态为CONDTION状态 trail = t; // 设置trail结点 t = next; // 设置t结点为下一个结点 } }
公共方法 (public)
signal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ // 将等待时间最长的线程(如果存在)从此条件的等待队列移动到拥有锁的等待队列。 // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。 publicfinalvoidsignal() { if (!isHeldExclusively()) // 不被当前线程独占(非独占模式),抛出异常 thrownewIllegalMonitorStateException(); Nodefirst= firstWaiter; // 获取头结点 if (first != null) doSignal(first); // 把Condition队列第一个不为空的结点,转移到Sync结点 }
signalAll
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ // 将所有结点从等待队列移动到拥有锁的等待队列。 // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。 publicfinalvoidsignalAll() { if (!isHeldExclusively()) // 不被当前线程独占(非独占模式),抛出异常 thrownewIllegalMonitorStateException(); Nodefirst= firstWaiter; if (first != null) doSignalAll(first); // 将CONDITION队列中的全部结点移除并加入到同步队列中竞争同步状态 }
/** * Implements uninterruptible condition wait. * <ol> * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */ // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断 publicfinalvoidawaitUninterruptibly() { Nodenode= addConditionWaiter(); // 将结点添加到Condition等待队列 intsavedState= fullyRelease(node); // 获取释放的状态(激活后置结点的状态) booleaninterrupted=false; while (!isOnSyncQueue(node)) { // 判断节点是否在CLH队列中(不在队列中) LockSupport.park(this); // 阻塞当前线程 if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) // 以独占不间断模式获取已在队列中的阻塞线程 selfInterrupt(); }