您的位置:首頁>正文

SynchronousQueue解析上-TransferStack

SynchronousQueue是同步的佇列, 裡面涉及到資料結構和一些演算法的知識, 今天我們虛心來看一下, 能得到多少, 是多少。 歡迎網友, 給出不同的建議, 進行共同學習交流。

package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.concurrent.atomic.*; import java.util.*; /** * A {@linkplain BlockingQueue blocking queue} in which each insert * operation must wait for a corresponding remove operation by another * thread, and vice versa. A synchronous queue does not have any * internal capacity, not even a capacity of one. You cannot * peek at a synchronous queue because an element is only * present when you try to remove it; you cannot insert an element * (using any method) unless another thread is trying to remove it; * you cannot iterate as there is nothing to iterate. The * [i]head[/i] of the queue is the element that the first queued * inserting thread is trying to add to the queue; if there is no such * queued thread then no element is available for removal and * poll will return null. For purposes of other * Collection methods (for example contains), a * SynchronousQueue acts as an empty collection. This queue * does not permit null elements. * SynchronousQueue阻塞佇列, 每次插入操作必須等待一個協同的移除執行緒, 反之亦然。 SynchronousQueue同步佇列沒有容量, 可以說, 沒有一個容量。 由於佇列中只有在消費執行緒, 嘗試消費元素的時候, 才會出現元素, 所以不能進行peek操作;不能用任何方法, 生產元素, 除非有消費者在嘗試消費元素, 同時由於佇列中沒有元素, 所以不能反覆運算。 head是第一個生產線程嘗試生產的元素;如果沒有這樣的生產線程,
那麼沒有元素可利用, remove和poll操作將會返回null。 SynchronousQueue實際一個空集合類。 同時同步佇列不允許為null。 *

Synchronous queues are similar to rendezvous channels used in * CSP and Ada. They are well suited for handoff designs, in which an * object running in one thread must sync up with an object running * in another thread in order to hand it some information, event, or * task. * 同步佇列與CSP和Ada場景下的通道相似(具體CSP和Ada可以google, 我查的意思 為CSP-Constraint Satisfaction Problem, 只有這個意思看上去有點像, 怎麼感覺不對, 據說CSP在機器學習中很有用, Ada查的靠譜一點的意思為美國軍方的程式設計語言, 其他的 都不靠譜, 看到這篇文章的網友, 可以看一下, 可以給我發私信或留言, 探討一下)。 同步佇列適用於傳輸通道設計, 一個執行緒同步或生產一個元素, 消息, 資源, 同時 另一個執行緒消費這些資源或任務。 *

This class supports an optional fairness policy for ordering * waiting producer and consumer threads. By default, this ordering * is not guaranteed. However, a queue constructed with fairness set * to true grants threads access in FIFO order. * 同步佇列支援生產者和消費者等待的公平性策略。 預設情況下, 不能保證生產消費的順序。 如果一個同步佇列構造為公平性,

則可以執行緒以FIFO訪問佇列元素。 *

This class and its iterator implement all of the * [i]optional[/i] methods of the {@link Collection} and {@link * Iterator} interfaces. * 實現了所有Collection和Iterator介面 *

This class is a member of the * * Java Collections Framework. * * @since 1.5 * @author Doug Lea and Bill Scherer and Michael Scott * @param the type of elements held in this collection */ public class SynchronousQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { private static final long serialVersionUID = -3223113410248163686L; /* * This class implements extensions of the dual stack and dual * queue algorithms described in "Nonblocking Concurrent Objects * with Condition Synchronization", by W. N. Scherer III and * M. L. Scott. 18th Annual Conf. on Distributed Computing, * Oct. 2004 (see also * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html). 同步佇列實現拓展了雙棧和雙佇列演算法(條件同步的非阻塞併發物件), 在分佈計算年刊中有具體描述, 見下面連接 * The (Lifo) stack is used for non-fair mode, and the (Fifo) * queue for fair mode. The performance of the two is generally * similar. Fifo usually supports higher throughput under * contention but Lifo maintains higher thread locality in common * applications. * LIFO棧用於非公平模式, FIFO佇列用於公平模式。 兩者的性能大體相同。 FIFO通常用於有高輸送量存在競爭的場景, LIFO棧用於 Lifo maintains higher thread locality in common applications.這句不翻譯了, 保持原味。 * A dual queue (and similarly stack) is one that at any given * time either holds "data" -- items provided by put operations, * or "requests" -- slots representing take operations, or is * empty. A call to "fulfill" (i.e., a call requesting an item * from a queue holding data or vice versa) dequeues a * complementary node. The most interesting feature of these * queues is that any operation can figure out which mode the * queue is in, and act accordingly without needing locks. * 雙佇列是一個在任何時候持有由put操作提供元素的data, slots表示的 take操作的請求, 或為空佇列, 與棧相似。 一個調用fulfill操作(請求佇列中 的持有元素, 即進行put操作), 將會有一個不足元素出佇列, 反之亦然, 意思為一個take操作對一個put操作, 一個put操作必須對應一個take操作。 這種佇列最有趣的特點是, 任何操作不根據鎖, 可以判斷進佇列的模式,

是非公平的LIFO棧stack還是公平的FIFO佇列queue。 * Both the queue and stack extend abstract class Transferer * defining the single method transfer that does a put or a * take. These are unified into a single method because in dual * data structures, the put and take operations are symmetrical, * so nearly all code can be combined. The resulting transfer * methods are on the long side, but are easier to follow than * they would be if broken up into nearly-duplicated parts. * 佇列和棧繼承了Transferer類, Transferer定義簡單的方法(轉換, 轉讓) 做put或take操作。 因為在雙資料結構中, put和take操作是對稱的, 所以他們 統一定義在一個方法中, 所以幾乎所有的代碼可以放在一起。 The resulting transfer methods are on the long side, but are easier to follow than they would be if broken up into nearly-duplicated parts. 這段不翻譯保持原味。 * The queue and stack data structures share many conceptual * similarities but very few concrete details. For simplicity, * they are kept distinct so that they can later evolve * separately. * 佇列和棧資料結構有許多概念上相同的屬性, 但也有一些具體的不同。 為了簡單起見, 他們保持著區別, 確保later evolve separately。 * The algorithms here differ from the versions in the above paper * in extending them for use in synchronous queues, as well as * dealing with cancellation. The main differences include: * 這個演算法與上面論文中的演算法有所不同, 我們擴展為了論文中的演算法用在同步 佇列中, 也用於處理cancellation。 主要的不同包括: * 1. The original algorithms used bit-marked pointers, but * the ones here use mode bits in nodes, leading to a number * of further adaptations. * 2. SynchronousQueues must block threads waiting to become * fulfilled. * 3. Support for cancellation via timeout and interrupts, * including cleaning out cancelled nodes/threads * from lists to avoid garbage retention and memory depletion. * 1.原始演算法中用了bit標記指標, 本同步佇列實現演算法中, 在節點中使用bits模式, 將導致number進一步的調整。 2.同步佇列必須阻塞執行緒等待變的可填充。 3.支持通過中斷和超時取消等待策略,
包括從等待佇列中清除取消的節點或執行緒, 以避免產生垃圾, 和記憶體洩漏。 * Blocking is mainly accomplished using LockSupport park/unpark, * except that nodes that appear to be the next ones to become * fulfilled first spin a bit (on multiprocessors only). On very * busy synchronous queues, spinning can dramatically improve * throughput. And on less busy ones, the amount of spinning is * small enough not to be noticeable. * 通過LockSupport的park/unpark方法, 實現阻塞, 除了在多處理器上, 下一個變得可填充的先自旋的節點或執行緒。 在繁忙的同步佇列中, 自旋可以顯著 提高輸送量。 在不繁忙時, 自旋並不太多的消耗。 * Cleaning is done in different ways in queues vs stacks. For * queues, we can almost always remove a node immediately in O(1) * time (modulo retries for consistency checks) when it is * cancelled. But if it may be pinned as the current tail, it must * wait until some subsequent cancellation. For stacks, we need a * potentially O(n) traversal to be sure that we can remove the * node, but this can run concurrently with other threads * accessing the stack. * 在佇列和棧中, 清除操作有著不同的實現。 在佇列中, 當一個節點或執行緒取消時, 我們大多數情況下, 可以立即以常量1(一致性檢查嘗試次數的模)的時間移除一個節點或執行緒。 但是如果一直在佇列的尾部, 則必須等後來的執行緒節點取消。 對於棧, 我們可能需要時間O(n)遍歷已確定那個節點我們可以移除, 但是這個可以與 其他執行緒併發訪問棧。
* While garbage collection takes care of most node reclamation * issues that otherwise complicate nonblocking algorithms, care * is taken to "forget" references to data, other nodes, and * threads that might be held on to long-term by blocked * threads. In cases where setting to null would otherwise * conflict with main algorithms, this is done by changing a * node's link to now point to the node itself. This doesn't arise * much for Stack nodes (because blocked threads do not hang on to * old head pointers), but references in Queue nodes must be * aggressively forgotten to avoid reachability of everything any * node has ever referred to since arrival. 然而垃圾回收器必須關注其他複雜非阻塞演算法的節點再生問題, 資料, 節點的引用 及執行緒也在會通過阻塞其他執行緒, 以便長期持有鎖。 以防此類情況的發生, 引用將會為設置為null, 以免與主要演算法衝突, 本演算法姐姐方法是節點連結指向其自己。 這樣不為引起大量的棧節點(因為阻塞執行緒, 不能停留在head指標上), 但是為了 避免其他的所有節點與以前引用的節點可達, 佇列節點的引用必須顯示忘記索引。 */ /** * Shared internal API for dual stacks and queues. 雙棧和佇列共用內部API, 佇列和棧的父類 */ abstract static class Transferer { /** * Performs a put or take. * 執行一個put或take操作 * @param e if non-null, the item to be handed to a consumer; * if null, requests that transfer return an item * offered by producer. 如果元素為非空, 則交給消費者處理, 如果為null, 請求生產者 生產一個元素, 並返回元素 * @param timed if this operation should timeout 是否超時 * @param nanos the timeout, in nanoseconds 超時時間 * @return if non-null, the item provided or received; if null, * the operation failed due to timeout or interrupt -- * the caller can distinguish which of these occurred * by checking Thread.interrupted. 返回元素, 如果非null, 要不是佇列中已經存在的, 要不是生產者剛生產的。 如果為null,以為著由於超時,中斷導致操作失敗,調用可以通過檢查執行緒中斷位, 辨別放生了哪一種情況。 */ abstract Object transfer(Object e, boolean timed, long nanos); } /** The number of CPUs, for spin control 獲取運行時環境的處理個數*/ static final int NCPUS = Runtime.getRuntime.availableProcessors; /** * The number of times to spin before blocking in timed waits. * The value is empirically derived -- it works well across a * variety of processors and OSes. Empirically, the best value * seems not to vary with number of CPUs (beyond 2) so is just * a constant. 在超時等待阻塞前,自旋嘗試的次數,這個值是一個,在不同處理器和系統性能上 良好工作的經驗值。經驗上來講,最好的值,不要隨著CPUS的個數/2的值變動, 所以它是一個常量,當處理器個數小於2,則為0,否則為32。 */ static final int maxTimedSpins = (NCPUS

下麵來看dual佇列和棧的實現

先看棧:

/** Dual stack */ static final class TransferStack extends Transferer { /* * This extends Scherer-Scott dual stack algorithm, differing, * among other ways, by using "covering" nodes rather than * bit-marked pointers: Fulfilling operations push on marker * nodes (with FULFILLING bit set in mode) to reserve a spot * to match a waiting node. 本stack實現的是演算法是拓展了Scherer-Scott雙棧的演算法,所不同的時,用 covering節點,而不是bit-marked指針:在bit集填充模式下,填充操作將會為 匹配一個等待節點保留資源,生產一個標記節點。 */ /* Modes for SNodes, ORed together in node fields */ /** Node represents an unfulfilled consumer REQUEST節點表示一個未填充的消費者*/ static final int REQUEST = 0; /** Node represents an unfulfilled producer DATA節點表示一個未填充的生產者*/ static final int DATA = 1; /** Node is fulfilling another unfulfilled DATA or REQUEST FULFILLING節點表示生產者正在給等待資源的消費者補給資源,或生產者在等待消費者消費資源/ static final int FULFILLING = 2; /** Return true if m has fulfilling bit set 如果m是一個填充為單元,則返回true*/ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } /** Node class for TransferStacks. 棧節點 */ static final class SNode { volatile SNode next; // next node in stack 節點的後繼 volatile SNode match; // the node matched to this 匹配節點 volatile Thread waiter; // to control park/unpark 等待者執行緒 Object item; // data; or null for REQUESTs 資料,消費者消費的資源 int mode;//節點模式 // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations. //元素item和mode需要要可見,由於他們總是在其他可見/原子操作寫之前,讀之後 SNode(Object item) { this.item = item; } //設置節點後繼 boolean casNext(SNode cmp, SNode val) { return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } /** * Tries to match node s to this node, if so, waking up thread. * Fulfillers call tryMatch to identify their waiters. * Waiters block until they have been matched. * 嘗試匹配目標節點與本節點,如果匹配,可以喚醒執行緒。補給者調用tryMatch方法 確定它們的等待中的執行緒。等待中的執行緒阻塞到它們自己被匹配。如果匹配返回true。 * @param s the node to match * @return true if successfully matched to s */ boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; //如果等待者不為null,則unpark等待中的執行緒 if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); } return true; } return match == s; } /** * Tries to cancel a wait by matching node to itself.節點嘗試取消等待 */ void tryCancel { UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } //match指向自己,則取消等待 boolean isCancelled { return match == this; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe; Class k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } /** The head (top) of the stack 棧頭節點*/ volatile SNode head; //CAS操作nh為當前head,並比較head舊值是否為h boolean casHead(SNode h, SNode nh) { return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh); } /** * Creates or resets fields of a node. Called only from transfer * where the node to push on stack is lazily created and * reused when possible to help reduce intervals between reads * and CASes of head and to avoid surges of garbage when CASes * to push nodes fail due to contention. 創建或重新設置節點的fields。在節點入棧懶創建,在當可能需要保證減少intervals(間隔) 讀和head的CAS操或避免由於競爭CAS操作節點入棧引起的垃圾時,此方法會被transfer調用 */ static SNode snode(SNode s, Object e, SNode next, int mode) { if (s == null) s = new SNode(e); s.mode = mode; s.next = next; return s; } /** * Puts or takes an item. put或take一個元素 */ Object transfer(Object e, boolean timed, long nanos) { /* * Basic algorithm is to loop trying one of three actions: * 演算法的基本步驟是,迴圈嘗試一下3步 * 1. If apparently empty or already containing nodes of same * mode, try to push node on stack and wait for a match, * returning it, or null if cancelled. * 1.如果佇列為空或已經包含相同模式的節點,則嘗試節點入棧,等待匹配, 返回,如果取消返回null。 * 2. If apparently containing node of complementary mode, * try to push a fulfilling node on to stack, match * with corresponding waiting node, pop both from * stack, and return matched item. The matching or * unlinking might not actually be necessary because of * other threads performing action 3: * 2.如果包含一個互補模式的節點(take(REQUEST)->put(DATA);put(DATA)->take(REQUEST)), 則嘗試一個FULFILLING節點入棧,同時匹配等待的協同節點,兩個節點同時出棧,返回匹配的元素。 由於其他執行緒執行步驟3,實際匹配和解除連結指標動作不會發生。 * 3. If top of stack already holds another fulfilling node, * help it out by doing its match and/or pop * operations, and then continue. The code for helping * is essentially the same as for fulfilling, except * that it doesn't return the item. 3.如果棧頂存在另外一個FULFILLING的節點,則匹配節點,並出棧。這段的代碼 與fulfilling相同,除非沒有元素返回 */ SNode s = null; // constructed/reused as needed //根據元素判斷節點模式,元素不為null,則為DATA,否則為REQUEST int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; if (h == null || h.mode == mode) { // empty or same-mode //如果是空佇列,或棧頭節點的模式與要放入的節點模式相同 if (timed && nanos 0) //如果自旋次數大於零,且可以自旋,則自旋次數減1 spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) //如果節點S的等待中的執行緒為空,則設置當前節點為S節點的等待中的執行緒,以便可以park後繼節點。 s.waiter = w; // establish waiter so can park next iter else if (!timed) //非超時等在者,park當前執行緒 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) //如果超時時間大於,最大自旋閾值,則超時park當前執行緒 LockSupport.parkNanos(this, nanos); } } /** * Returns true if node s is at head or there is an active * fulfiller. 如果節點在棧頭或棧頭為FULFILLING的節點,則返回true */ boolean shouldSpin(SNode s) { SNode h = head; return (h == s || h == null || isFulfilling(h.mode)); } /** * Unlinks s from the stack. */ void clean(SNode s) { s.item = null; // forget item s.waiter = null; // forget thread /* * At worst we may need to traverse entire stack to unlink * s. If there are multiple concurrent calls to clean, we * might not see s if another thread has already removed * it. But we can stop when we see any node known to * follow s. We use s.next unless it too is cancelled, in * which case we try the node one past. We don't check any * further because we don't want to doubly traverse just to * find sentinel. 最糟糕的情況是我們需要遍歷整個棧,unlink節點s。如果有多個執行緒同時訪問 clean方法,由於其他執行緒可能移除s節點,我們也許看不到s節點。但是我們可以停止 操作,當發現一個節點的後繼為s。我們可以用s節點的後繼,除非s節點取消,否則, 我們可越過s節點。我們不會進一步地檢查,因為我們不想僅僅為了發現s節點,遍歷兩次。 */ SNode past = s.next; if (past != null && past.isCancelled) past = past.next; // Absorb cancelled nodes at head SNode p; while ((p = head) != null && p != past && p.isCancelled) //設置棧頭節點的後繼為第一個非取消等待的節點 casHead(p, p.next); // Unsplice embedded nodes,遍歷棧,移除取消等待的節點 while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled) p.casNext(n, n.next); else p = n; } } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe; Class k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } } } }

自此同步佇列的TransferStack已經看完,由於同步佇列的內容量較大,我們這一篇先分析到這,下一篇再看TransferQueue和其他部分,先小節一下吧:

SynchronousQueue阻塞佇列,每次插入操作必須等待一個協同的移除執行緒,反之亦然。SynchronousQueue同步佇列沒有容量,可以說,沒有一個容量。由於佇列中只有在消費執行緒,嘗試消費元素的時候,才會出現元素,所以不能進行peek操作;不能用任何方法,生產元素,除非有消費者在嘗試消費元素,同時由於佇列中沒有元素,所以不能反覆運算。head是第一個生產線程嘗試生產的元素;如果沒有這樣的生產線程,那麼沒有元素可利用,remove和poll操作將會返回null。SynchronousQueue實際一個空集合類。同時同步佇列不允許為null。同步佇列支援生產者和消費者等待的公平性策略。預設情況下,不能保證生產消費的順序。如果一個同步佇列構造為公平性,則可以執行緒以FIFO訪問佇列元素。當時非公平策略用的是TransferStack,公平策略用的是TransferQueue;TransferStack和TransferQueue是存放等待操作執行緒的描述,從TransferStack中Snode節點可以看出:節點關聯一個等待中的執行緒waiter,後繼next,匹配節點match,節點元素item和模式mode;模式由三種,REQUEST節點表示消費者等待消費資源,DATA表示生產者等待生產資源。FULFILLING節點表示生產者正在給等待資源的消費者補給資源,或生產者在等待消費者消費資源。當有執行緒take/put操作時,查看棧頭,如果是空佇列,或棧頭節點的模式與要放入的節點模式相同;如果是超時等待,判斷時間是否小於0,小於0則取消節點等待;如果非超時,則將創建的新節點入棧成功,即放在棧頭,自旋等待匹配節點(timed決定超時,不超時);如果匹配返回的是自己,節點取消等待,從棧中移除,並遍歷棧移除取消等待的節點;匹配成功,兩個節點同時出棧,REQUEST模式返回,匹配到的節點元素(DATA),DATA模式返回返回當前節點元素)。如果與棧頭節點的模式不同且不為FULFILLING,匹配節點,成功者,兩個節點同時出棧,REQUEST模式返回,匹配到的節點元素(DATA),DATA(put)模式返回返回當前節點元素。如果棧頭為FULFILLING,找出棧頭的匹配節點,棧頭與匹配到的節點同時出棧。從分析非公平模式下的TransferStack,可以看出一個REQUEST操作必須同時伴隨著一個DATA操作,一個DATA操作必須同時伴隨著一個REQUEST操作,這也是同步佇列的命名中含Synchronous原因。這也應了這句話

SynchronousQueue像一個管道,一個操作必須等待另一個操作的發生。

要不是生產者剛生產的。 如果為null,以為著由於超時,中斷導致操作失敗,調用可以通過檢查執行緒中斷位, 辨別放生了哪一種情況。 */ abstract Object transfer(Object e, boolean timed, long nanos); } /** The number of CPUs, for spin control 獲取運行時環境的處理個數*/ static final int NCPUS = Runtime.getRuntime.availableProcessors; /** * The number of times to spin before blocking in timed waits. * The value is empirically derived -- it works well across a * variety of processors and OSes. Empirically, the best value * seems not to vary with number of CPUs (beyond 2) so is just * a constant. 在超時等待阻塞前,自旋嘗試的次數,這個值是一個,在不同處理器和系統性能上 良好工作的經驗值。經驗上來講,最好的值,不要隨著CPUS的個數/2的值變動, 所以它是一個常量,當處理器個數小於2,則為0,否則為32。 */ static final int maxTimedSpins = (NCPUS

下麵來看dual佇列和棧的實現

先看棧:

/** Dual stack */ static final class TransferStack extends Transferer { /* * This extends Scherer-Scott dual stack algorithm, differing, * among other ways, by using "covering" nodes rather than * bit-marked pointers: Fulfilling operations push on marker * nodes (with FULFILLING bit set in mode) to reserve a spot * to match a waiting node. 本stack實現的是演算法是拓展了Scherer-Scott雙棧的演算法,所不同的時,用 covering節點,而不是bit-marked指針:在bit集填充模式下,填充操作將會為 匹配一個等待節點保留資源,生產一個標記節點。 */ /* Modes for SNodes, ORed together in node fields */ /** Node represents an unfulfilled consumer REQUEST節點表示一個未填充的消費者*/ static final int REQUEST = 0; /** Node represents an unfulfilled producer DATA節點表示一個未填充的生產者*/ static final int DATA = 1; /** Node is fulfilling another unfulfilled DATA or REQUEST FULFILLING節點表示生產者正在給等待資源的消費者補給資源,或生產者在等待消費者消費資源/ static final int FULFILLING = 2; /** Return true if m has fulfilling bit set 如果m是一個填充為單元,則返回true*/ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } /** Node class for TransferStacks. 棧節點 */ static final class SNode { volatile SNode next; // next node in stack 節點的後繼 volatile SNode match; // the node matched to this 匹配節點 volatile Thread waiter; // to control park/unpark 等待者執行緒 Object item; // data; or null for REQUESTs 資料,消費者消費的資源 int mode;//節點模式 // Note: item and mode fields don't need to be volatile // since they are always written before, and read after, // other volatile/atomic operations. //元素item和mode需要要可見,由於他們總是在其他可見/原子操作寫之前,讀之後 SNode(Object item) { this.item = item; } //設置節點後繼 boolean casNext(SNode cmp, SNode val) { return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } /** * Tries to match node s to this node, if so, waking up thread. * Fulfillers call tryMatch to identify their waiters. * Waiters block until they have been matched. * 嘗試匹配目標節點與本節點,如果匹配,可以喚醒執行緒。補給者調用tryMatch方法 確定它們的等待中的執行緒。等待中的執行緒阻塞到它們自己被匹配。如果匹配返回true。 * @param s the node to match * @return true if successfully matched to s */ boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; //如果等待者不為null,則unpark等待中的執行緒 if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); } return true; } return match == s; } /** * Tries to cancel a wait by matching node to itself.節點嘗試取消等待 */ void tryCancel { UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } //match指向自己,則取消等待 boolean isCancelled { return match == this; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe; Class k = SNode.class; matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } /** The head (top) of the stack 棧頭節點*/ volatile SNode head; //CAS操作nh為當前head,並比較head舊值是否為h boolean casHead(SNode h, SNode nh) { return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh); } /** * Creates or resets fields of a node. Called only from transfer * where the node to push on stack is lazily created and * reused when possible to help reduce intervals between reads * and CASes of head and to avoid surges of garbage when CASes * to push nodes fail due to contention. 創建或重新設置節點的fields。在節點入棧懶創建,在當可能需要保證減少intervals(間隔) 讀和head的CAS操或避免由於競爭CAS操作節點入棧引起的垃圾時,此方法會被transfer調用 */ static SNode snode(SNode s, Object e, SNode next, int mode) { if (s == null) s = new SNode(e); s.mode = mode; s.next = next; return s; } /** * Puts or takes an item. put或take一個元素 */ Object transfer(Object e, boolean timed, long nanos) { /* * Basic algorithm is to loop trying one of three actions: * 演算法的基本步驟是,迴圈嘗試一下3步 * 1. If apparently empty or already containing nodes of same * mode, try to push node on stack and wait for a match, * returning it, or null if cancelled. * 1.如果佇列為空或已經包含相同模式的節點,則嘗試節點入棧,等待匹配, 返回,如果取消返回null。 * 2. If apparently containing node of complementary mode, * try to push a fulfilling node on to stack, match * with corresponding waiting node, pop both from * stack, and return matched item. The matching or * unlinking might not actually be necessary because of * other threads performing action 3: * 2.如果包含一個互補模式的節點(take(REQUEST)->put(DATA);put(DATA)->take(REQUEST)), 則嘗試一個FULFILLING節點入棧,同時匹配等待的協同節點,兩個節點同時出棧,返回匹配的元素。 由於其他執行緒執行步驟3,實際匹配和解除連結指標動作不會發生。 * 3. If top of stack already holds another fulfilling node, * help it out by doing its match and/or pop * operations, and then continue. The code for helping * is essentially the same as for fulfilling, except * that it doesn't return the item. 3.如果棧頂存在另外一個FULFILLING的節點,則匹配節點,並出棧。這段的代碼 與fulfilling相同,除非沒有元素返回 */ SNode s = null; // constructed/reused as needed //根據元素判斷節點模式,元素不為null,則為DATA,否則為REQUEST int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; if (h == null || h.mode == mode) { // empty or same-mode //如果是空佇列,或棧頭節點的模式與要放入的節點模式相同 if (timed && nanos 0) //如果自旋次數大於零,且可以自旋,則自旋次數減1 spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) //如果節點S的等待中的執行緒為空,則設置當前節點為S節點的等待中的執行緒,以便可以park後繼節點。 s.waiter = w; // establish waiter so can park next iter else if (!timed) //非超時等在者,park當前執行緒 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) //如果超時時間大於,最大自旋閾值,則超時park當前執行緒 LockSupport.parkNanos(this, nanos); } } /** * Returns true if node s is at head or there is an active * fulfiller. 如果節點在棧頭或棧頭為FULFILLING的節點,則返回true */ boolean shouldSpin(SNode s) { SNode h = head; return (h == s || h == null || isFulfilling(h.mode)); } /** * Unlinks s from the stack. */ void clean(SNode s) { s.item = null; // forget item s.waiter = null; // forget thread /* * At worst we may need to traverse entire stack to unlink * s. If there are multiple concurrent calls to clean, we * might not see s if another thread has already removed * it. But we can stop when we see any node known to * follow s. We use s.next unless it too is cancelled, in * which case we try the node one past. We don't check any * further because we don't want to doubly traverse just to * find sentinel. 最糟糕的情況是我們需要遍歷整個棧,unlink節點s。如果有多個執行緒同時訪問 clean方法,由於其他執行緒可能移除s節點,我們也許看不到s節點。但是我們可以停止 操作,當發現一個節點的後繼為s。我們可以用s節點的後繼,除非s節點取消,否則, 我們可越過s節點。我們不會進一步地檢查,因為我們不想僅僅為了發現s節點,遍歷兩次。 */ SNode past = s.next; if (past != null && past.isCancelled) past = past.next; // Absorb cancelled nodes at head SNode p; while ((p = head) != null && p != past && p.isCancelled) //設置棧頭節點的後繼為第一個非取消等待的節點 casHead(p, p.next); // Unsplice embedded nodes,遍歷棧,移除取消等待的節點 while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled) p.casNext(n, n.next); else p = n; } } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe; Class k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } } } }

自此同步佇列的TransferStack已經看完,由於同步佇列的內容量較大,我們這一篇先分析到這,下一篇再看TransferQueue和其他部分,先小節一下吧:

SynchronousQueue阻塞佇列,每次插入操作必須等待一個協同的移除執行緒,反之亦然。SynchronousQueue同步佇列沒有容量,可以說,沒有一個容量。由於佇列中只有在消費執行緒,嘗試消費元素的時候,才會出現元素,所以不能進行peek操作;不能用任何方法,生產元素,除非有消費者在嘗試消費元素,同時由於佇列中沒有元素,所以不能反覆運算。head是第一個生產線程嘗試生產的元素;如果沒有這樣的生產線程,那麼沒有元素可利用,remove和poll操作將會返回null。SynchronousQueue實際一個空集合類。同時同步佇列不允許為null。同步佇列支援生產者和消費者等待的公平性策略。預設情況下,不能保證生產消費的順序。如果一個同步佇列構造為公平性,則可以執行緒以FIFO訪問佇列元素。當時非公平策略用的是TransferStack,公平策略用的是TransferQueue;TransferStack和TransferQueue是存放等待操作執行緒的描述,從TransferStack中Snode節點可以看出:節點關聯一個等待中的執行緒waiter,後繼next,匹配節點match,節點元素item和模式mode;模式由三種,REQUEST節點表示消費者等待消費資源,DATA表示生產者等待生產資源。FULFILLING節點表示生產者正在給等待資源的消費者補給資源,或生產者在等待消費者消費資源。當有執行緒take/put操作時,查看棧頭,如果是空佇列,或棧頭節點的模式與要放入的節點模式相同;如果是超時等待,判斷時間是否小於0,小於0則取消節點等待;如果非超時,則將創建的新節點入棧成功,即放在棧頭,自旋等待匹配節點(timed決定超時,不超時);如果匹配返回的是自己,節點取消等待,從棧中移除,並遍歷棧移除取消等待的節點;匹配成功,兩個節點同時出棧,REQUEST模式返回,匹配到的節點元素(DATA),DATA模式返回返回當前節點元素)。如果與棧頭節點的模式不同且不為FULFILLING,匹配節點,成功者,兩個節點同時出棧,REQUEST模式返回,匹配到的節點元素(DATA),DATA(put)模式返回返回當前節點元素。如果棧頭為FULFILLING,找出棧頭的匹配節點,棧頭與匹配到的節點同時出棧。從分析非公平模式下的TransferStack,可以看出一個REQUEST操作必須同時伴隨著一個DATA操作,一個DATA操作必須同時伴隨著一個REQUEST操作,這也是同步佇列的命名中含Synchronous原因。這也應了這句話

SynchronousQueue像一個管道,一個操作必須等待另一個操作的發生。

同類文章
Next Article
喜欢就按个赞吧!!!
点击关闭提示