華文網

訊息佇列NetMQ 原理分析4-Socket、Session、Option和Pipe

訊息佇列NetMQ 原理分析4-Socket、Session、Option和Pipe 前言 介紹目的Socket 介面實現內部結構SessionOptionPipe YPipeMsgYQueue總結

前言介紹[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標準socket介面的擴展。它提供了一種非同步訊息佇列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。

當前有2個版本正在維護,版本3最新版為3.3.4,版本4最新版本為4.0.1。本文檔是對4.0.1分支代碼進行分析。

目的

對NetMQ的源碼進行學習並分析理解,因此寫下該系列文章,本系列文章暫定編寫計畫如下:

訊息佇列NetMQ 原理分析1-Context和ZObject訊息佇列NetMQ 原理分析2-IO執行緒和完成埠訊息佇列NetMQ 原理分析3-命令產生/處理、創建Socket和回收執行緒訊息佇列NetMQ 原理分析4-Socket、Session、Option和Pipe訊息佇列NetMQ 原理分析5-Engine,Encord和Decord訊息佇列NetMQ 原理分析6-TCP和Inpoc實現訊息佇列NetMQ 原理分析7-Device訊息佇列NetMQ 原理分析8-不同類型的Socket訊息佇列NetMQ 原理分析9-實戰

友情提示: 看本系列文章時最好獲取源碼,更有助於理解。

Socket

上一章最後我們簡單介紹了SocketBase和SessionBase的創建和回收,這一張我們詳細介紹SocketBase和SessionBase。

首先SocketBase繼承自Own,即也是ZObject物件,同時由於SocketBase需要進行消息的傳輸,因此它實現了一些結構,包括IPollEvents、Pipe.IPipeEvents。

介面實現internal abstract class SocketBase : Own, IPollEvents, Pipe.IPipeEvents{ ... } IPollEvents事件上一章回收執行緒已經介紹過,這裡不再做過多說明了,

簡單講SocketBase實現該事件只有在回收執行緒回收Socket的時候會觸發。

Pipe.IPipeEvents:是管道事件,它的簽名如下

public interface IPipeEvents { void ReadActivated([NotNull] Pipe pipe); void WriteActivated([NotNull] Pipe pipe); void Hiccuped([NotNull] Pipe pipe); void Terminated([NotNull] Pipe pipe); }ReadActivated:表示管道可讀,管道實際調用SocketBase或SessionBase的ReadActivated方法,而SocketBase實際會調用XReadActivated方法。WriteActivated:表示管道可寫,管道實際調用SocketBase或SessionBase的WriteActivated方法,而SocketBase實際會調用XWriteActivated方法。Hiccuped:當連接突然中斷時會調用此方法。

WriteActivated:表示管道終止。

內部結構

SocketBase的內部維護著一個欄位,用於存放連接/綁定位址和它的管道(若當前SocketBase是TCPListener,則無需初始化管道,

管道為空)。

private readonly Dictionary m_endpoints = new Dictionary; private readonly Dictionary m_inprocs = new Dictionary;

Endpoint對象用於存放SessionBase和Pipe或Listener的引用

private class Endpoint { public Endpoint(Own own, Pipe pipe) { Own = own; Pipe = pipe; } public Own Own { get; } public Pipe Pipe { get; } }

當SocketBase連接或綁定最後會向將Endpoint保存到字典中

private void AddEndpoint([NotNull] string address, [NotNull] Own endpoint, Pipe pipe) { LaunchChild(endpoint); m_endpoints[address] = new Endpoint(endpoint, pipe); }

在SocketBase斷開連接時會移除它

public void TermEndpoint([NotNull] string addr) { ... if (protocol == Address.InProcProtocol) { ... m_inprocs.Remove(addr); } else { ... m_endpoints.Remove(addr); } }

m_inprocs也是一個字典用於存放inproc協定的連接。

在第一章創建SocketBase我們介紹了Context創建SocketBase所做的一些工作,初始化SocketBase時,會創建MailBox,用於傳輸Command。

protected SocketBase([NotNull] Ctx parent, int threadId, int socketId) : base(parent, threadId) { m_options.SocketId = socketId; m_mailbox = new Mailbox("socket-" + socketId); }

每個SocketBase的命令處理實際都是在工作執行緒中進行。因此理論上(忽略執行緒上下文切換時造成的性能損失)執行緒數越多,NetMQ的IO輸送量和工作執行緒數成正比關係。

在Context創建SocketBase會根據Create靜態方法根據不同類型創建不同的SocketBase

public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId) { switch (type) { case ZmqSocketType.Pair: return new Pair(parent, threadId, socketId); case ZmqSocketType.Pub: return new Pub(parent, threadId, socketId); case ZmqSocketType.Sub: return new Sub(parent, threadId, socketId); case ZmqSocketType.Req: return new Req(parent, threadId, socketId); case ZmqSocketType.Rep: return new Rep(parent, threadId, socketId); case ZmqSocketType.Dealer: return new Dealer(parent, threadId, socketId); case ZmqSocketType.Router: return new Router(parent, threadId, socketId); case ZmqSocketType.Pull: return new Pull(parent, threadId, socketId); case ZmqSocketType.Push: return new Push(parent, threadId, socketId); case ZmqSocketType.Xpub: return new XPub(parent, threadId, socketId); case ZmqSocketType.Xsub: return new XSub(parent, threadId, socketId); case ZmqSocketType.Stream: return new Stream(parent, threadId, socketId); default: throw new InvalidException("SocketBase.Create called with invalid type of " + type); } }

具體創建SocketBase的工作在上一章已經做了詳細的介紹,這裡不再複述。

Session

首先和SocketBase一樣,SessionBase也繼承自Own,即也是ZObject物件,同時由於SessionBase和SocketBase存在消息傳輸,所以它也實現了IPipeEvents介面,同時它實現了IProactorEvents介面,在消息收發是會接收到通知。SessionBase一端和SocketBase進行消息的通訊,另一端和Engine存在消息通訊,它實現了IMsgSink和IMsgSource介面和Engine進行消息傳輸。

internal class SessionBase : Own, Pipe.IPipeEvents, IProactorEvents, IMsgSink, IMsgSource{ }internal interface IMsgSink { /// /// 傳輸消息.成功時返回true. /// /// 將msg消息寫入到管道中 bool PushMsg(ref Msg msg); }internal interface IMsgSource { /// /// 取一個消息。
成功時返回,從管道獲取消息寫入msg參數中;若失敗則返回false,將null寫入到msg參數中。 /// /// 從管道獲取消息寫入Msg中 /// true if successful - and writes the message to the msg argument bool PullMsg(ref Msg msg); }

當SocketBase將消息寫入到寫管道時,對應的SessionBase會從讀管道讀到SocketBase寫入的資料,然後將資料從管道取出生成一個Msg,Engine會和AsyncSocket交互傳輸資料,關於Engine下一章再做介紹。

Option

option參數如下

Affinity
表示哪個執行緒是可用的,默認為0,表示所有執行緒在負載均衡都可使用。Backlog
最大Socket待連接數DelayAttachOnConnect
在創建連接時,延遲在Socket和Session之間創建雙向的管道,默認創建連接時立即創建管道DelayOnClose
若為true,則在Socket關閉時Session先從管道接收所有消息發送出去。
否則直接關閉,默認為true。DelayOnDisconnect
若為true,則在Pipe通知我們中斷時Socket先將接收所有入隊管道消息。
否則直接中斷管道。默認為true.Endianness
位元組序,資料在記憶體中是高到低排還是低到高排。Identity
回應的Identity,每個Identity用於查找Socket。Identiy是一個重複的隨機32位元整形數位,轉換為位元組5位元位元組陣列。每個消息的第一部分是Identity,IdentitySize
1個位元組用於保存Identity的長度。IPv4OnlyLinger
當Socket關閉時,是否延遲一段時間等待資料發送完畢後再關閉管道MaxMessageSize
每個消息包最大消息大小RawSocket
若設置為true,RouterSocket可以接收非NetMQ發送來的tcp連接。
預設是false,Stream在構造函數時會設置為true,設置為true時會將RecvIdentity修改為false(用NetMQ接收其他系統發送來的Socket請求應該用StreamSocekt,否則由於應用層協議不一樣可能會導致一些問題。)RecvIdentity
若為true,Identity轉發給Socket。ReconnectIvl
設置最小重連時間間隔,單位ms。預設100msReconnectIvlMax
設置最大重連時間間隔,單位ms。默認0(無用)RecoveryIvl
PgmSocket用的SendBuffer
發送緩存大小,設置底層傳輸Socket的發送緩存大小,初始為0ReceiveBuffer
接收緩存大小,設置底層傳輸Socket的接收緩存大小,初始為0SendHighWatermark
Socket發送的管道的最大消息數,當發送水位達到最大時會阻塞發送。ReceiveHighWatermark
Socket接收管道的最大消息數SendLowWatermark
Socket發送低水位,消息的最小數量單位,每次達到多少消息數量才向Session管道才啟動寫事件。預設1000ReceiveLowWatermark
Socket接收低水位,消息的最小數量單位,每次達到多少消息數量Session管道才啟動讀事件。默認1000SendTimeout
Socket發送操作超時時間TcpKeepalive
TCP保持連接設置,預設-1不修改配置TcpKeepaliveIdle
TCP心跳包在空閒時的時間間隔,預設-1不修改配置TcpKeepaliveIntvl
TCP心跳包時間間隔,預設-1不修改配置DisableTimeWait
用戶端斷開連接時禁用TIME_WAITTCP狀態

Pipe

在上一章我們講到過在SocketBase和SessionBase是通過2條單向管道進行消息傳輸,傳輸的消息單位是Msg,消息管道是YPipe類型,那麼YPipe<>又是什麼呢?

YPipe

Ypipe內部實際維護這一個YQueue類型的先進先出佇列,YPipe向外暴露了一下方法:

TryRead 該方法用於判斷當前佇列是否可讀,可讀的話第一個對象出隊

public bool TryRead(out T value) { if (!CheckRead) { value = default(T); return false; } value = m_queue.Pop; return true; }

Unwrite 取消寫入消息

public bool Unwrite(ref T value) { if (m_flushToIndex == m_queue.BackPos) return false; value = m_queue.Unpush; return true; }

寫入消息 將消息寫入到佇列中,若寫入未完成則當前消息的指標索引指向當前佇列塊的後一位元。

public void Write(ref T value, bool incomplete) { m_queue.Push(ref value); // Move the "flush up to here" pointer. if (!incomplete) { m_flushToIndex = m_queue.BackPos; } }

完成寫入 當該部分消息寫完時,則會調用Flush完成寫入並通知另一個管道消息可讀

public void Flush { if (m_state == State.Terminating) return; if (m_outboundPipe != null && !m_outboundPipe.Flush) SendActivateRead(m_peer); }

Msg

寫入的消息單位是Msg,它實現了多條資料的存儲,當每次資料寫完還有資料帶寫入時通過將Flag標記為More表示消息還沒寫入完。

YQueue

YQueue是由一個個trunk組成的,每個trunk就是一個消息塊,每個消息塊可能包含多個Msg,主要由寫入消息時是否還有更多消息帶寫入(Flag)決定。trunk是一個雙向迴圈鏈表,內部維護著一個陣列用於存放資料,每個資料會有2個指標,分別指向前一個塊和後一個塊,每個塊還有一個索引,表示當前塊在佇列中的位置。

private sealed class Chunk { public Chunk(int size, int globalIndex) { Values = new T[size]; GlobalOffset = globalIndex; Debug.Assert(Values != null); } /// 數據 public T Values { get; } /// 當前塊在佇列中的位置 public int GlobalOffset { get; } /// 前一個塊 [CanBeNull] public Chunk Previous { get; set; } /// 下一個塊 [CanBeNull] public Chunk Next { get; set; } }

每個chunk默認最多可保存256個部分。

由於每次向SocketBase寫入的Msg可能有多個部分,因此消息會寫入到陣列中,所有消息寫完後指向trunk的指標才會後移一位。

YQueue有以下欄位

//用於記錄當前塊消息的個數,預設為256 private readonly int m_chunkSize; // 當佇列是空的時,下一個塊指向null,首尾塊都指向初始化的一個塊,開始位置的塊僅用於佇列的讀取(front/pop),最後位置的僅用於佇列的寫入(back/push)。 // 開始位置 private volatile Chunk m_beginChunk; //chunk的當前可讀位置索引 private int m_beginPositionInChunk; //指向後一個塊 private Chunk m_backChunk; //chunk的最後一個可讀位置索引 private int m_backPositionInChunk; //指向後一個塊 private Chunk m_endChunk; //chunk的下一個可寫位置索引 private int m_endPosition; //當達到最大Msg數量時,擴展一個chunk,最大為256個塊 private Chunk m_spareChunk; 當前trunk頭部在整個佇列中的的索引位置 private int m_nextGlobalIndex;

YPipe寫入Msg實際是向YQueue入隊

public void Push(ref T val) { m_backChunk.Values[m_backPositionInChunk] = val; //指向後一個塊 m_backChunk = m_endChunk; //索引更新到最後可讀位置 m_backPositionInChunk = m_endPosition; //下一個可寫位置向後移動一位 m_endPosition++; if (m_endPosition != m_chunkSize) return; //到達最後一個位置則需要擴充一個塊 Chunk sc = m_spareChunk; if (sc != m_beginChunk) { //已經擴充了塊則更新下一個塊的位置 m_spareChunk = m_spareChunk.Next; m_endChunk.Next = sc; sc.Previous = m_endChunk; } else { //新建一個塊,並更新索引位置 m_endChunk.Next = new Chunk(m_chunkSize, m_nextGlobalIndex); m_nextGlobalIndex += m_chunkSize; m_endChunk.Next.Previous = m_endChunk; } m_endChunk = m_endChunk.Next; 當前塊的局部位置從0開始 m_endPosition = 0; }

每次消息寫完消息時調用YPipe的Flush方法完成當前消息的寫入

public bool Flush { //只有一條Msg if (m_flushFromIndex == m_flushToIndex) { return true; } //將m_lastAllowedToReadIndex更新為flushToIndex if (Interlocked.CompareExchange(ref m_lastAllowedToReadIndex, m_flushToIndex, m_flushFromIndex) != m_flushFromIndex) { //沒有資料寫入時,lastAllowedToReadIndex為-1,表示沒有資料可讀,因此這裡不需要關係執行緒安全 Interlocked.Exchange(ref m_lastAllowedToReadIndex, m_flushToIndex); m_flushFromIndex = m_flushToIndex; return false; } 有資料寫入時更新指標 m_flushFromIndex = m_flushToIndex; return true; }

總結

該篇在上一片的基礎上對SocketBase和SessionBase進行了一些細節上的補充。同時,對NetMQ的配置參數進行了一些介紹,最後對消息管道進行了簡單講解。

Option

option參數如下

Affinity
表示哪個執行緒是可用的,默認為0,表示所有執行緒在負載均衡都可使用。Backlog
最大Socket待連接數DelayAttachOnConnect
在創建連接時,延遲在Socket和Session之間創建雙向的管道,默認創建連接時立即創建管道DelayOnClose
若為true,則在Socket關閉時Session先從管道接收所有消息發送出去。
否則直接關閉,默認為true。DelayOnDisconnect
若為true,則在Pipe通知我們中斷時Socket先將接收所有入隊管道消息。
否則直接中斷管道。默認為true.Endianness
位元組序,資料在記憶體中是高到低排還是低到高排。Identity
回應的Identity,每個Identity用於查找Socket。Identiy是一個重複的隨機32位元整形數位,轉換為位元組5位元位元組陣列。每個消息的第一部分是Identity,IdentitySize
1個位元組用於保存Identity的長度。IPv4OnlyLinger
當Socket關閉時,是否延遲一段時間等待資料發送完畢後再關閉管道MaxMessageSize
每個消息包最大消息大小RawSocket
若設置為true,RouterSocket可以接收非NetMQ發送來的tcp連接。
預設是false,Stream在構造函數時會設置為true,設置為true時會將RecvIdentity修改為false(用NetMQ接收其他系統發送來的Socket請求應該用StreamSocekt,否則由於應用層協議不一樣可能會導致一些問題。)RecvIdentity
若為true,Identity轉發給Socket。ReconnectIvl
設置最小重連時間間隔,單位ms。預設100msReconnectIvlMax
設置最大重連時間間隔,單位ms。默認0(無用)RecoveryIvl
PgmSocket用的SendBuffer
發送緩存大小,設置底層傳輸Socket的發送緩存大小,初始為0ReceiveBuffer
接收緩存大小,設置底層傳輸Socket的接收緩存大小,初始為0SendHighWatermark
Socket發送的管道的最大消息數,當發送水位達到最大時會阻塞發送。ReceiveHighWatermark
Socket接收管道的最大消息數SendLowWatermark
Socket發送低水位,消息的最小數量單位,每次達到多少消息數量才向Session管道才啟動寫事件。預設1000ReceiveLowWatermark
Socket接收低水位,消息的最小數量單位,每次達到多少消息數量Session管道才啟動讀事件。默認1000SendTimeout
Socket發送操作超時時間TcpKeepalive
TCP保持連接設置,預設-1不修改配置TcpKeepaliveIdle
TCP心跳包在空閒時的時間間隔,預設-1不修改配置TcpKeepaliveIntvl
TCP心跳包時間間隔,預設-1不修改配置DisableTimeWait
用戶端斷開連接時禁用TIME_WAITTCP狀態

Pipe

在上一章我們講到過在SocketBase和SessionBase是通過2條單向管道進行消息傳輸,傳輸的消息單位是Msg,消息管道是YPipe類型,那麼YPipe<>又是什麼呢?

YPipe

Ypipe內部實際維護這一個YQueue類型的先進先出佇列,YPipe向外暴露了一下方法:

TryRead 該方法用於判斷當前佇列是否可讀,可讀的話第一個對象出隊

public bool TryRead(out T value) { if (!CheckRead) { value = default(T); return false; } value = m_queue.Pop; return true; }

Unwrite 取消寫入消息

public bool Unwrite(ref T value) { if (m_flushToIndex == m_queue.BackPos) return false; value = m_queue.Unpush; return true; }

寫入消息 將消息寫入到佇列中,若寫入未完成則當前消息的指標索引指向當前佇列塊的後一位元。

public void Write(ref T value, bool incomplete) { m_queue.Push(ref value); // Move the "flush up to here" pointer. if (!incomplete) { m_flushToIndex = m_queue.BackPos; } }

完成寫入 當該部分消息寫完時,則會調用Flush完成寫入並通知另一個管道消息可讀

public void Flush { if (m_state == State.Terminating) return; if (m_outboundPipe != null && !m_outboundPipe.Flush) SendActivateRead(m_peer); }

Msg

寫入的消息單位是Msg,它實現了多條資料的存儲,當每次資料寫完還有資料帶寫入時通過將Flag標記為More表示消息還沒寫入完。

YQueue

YQueue是由一個個trunk組成的,每個trunk就是一個消息塊,每個消息塊可能包含多個Msg,主要由寫入消息時是否還有更多消息帶寫入(Flag)決定。trunk是一個雙向迴圈鏈表,內部維護著一個陣列用於存放資料,每個資料會有2個指標,分別指向前一個塊和後一個塊,每個塊還有一個索引,表示當前塊在佇列中的位置。

private sealed class Chunk { public Chunk(int size, int globalIndex) { Values = new T[size]; GlobalOffset = globalIndex; Debug.Assert(Values != null); } /// 數據 public T Values { get; } /// 當前塊在佇列中的位置 public int GlobalOffset { get; } /// 前一個塊 [CanBeNull] public Chunk Previous { get; set; } /// 下一個塊 [CanBeNull] public Chunk Next { get; set; } }

每個chunk默認最多可保存256個部分。

由於每次向SocketBase寫入的Msg可能有多個部分,因此消息會寫入到陣列中,所有消息寫完後指向trunk的指標才會後移一位。

YQueue有以下欄位

//用於記錄當前塊消息的個數,預設為256 private readonly int m_chunkSize; // 當佇列是空的時,下一個塊指向null,首尾塊都指向初始化的一個塊,開始位置的塊僅用於佇列的讀取(front/pop),最後位置的僅用於佇列的寫入(back/push)。 // 開始位置 private volatile Chunk m_beginChunk; //chunk的當前可讀位置索引 private int m_beginPositionInChunk; //指向後一個塊 private Chunk m_backChunk; //chunk的最後一個可讀位置索引 private int m_backPositionInChunk; //指向後一個塊 private Chunk m_endChunk; //chunk的下一個可寫位置索引 private int m_endPosition; //當達到最大Msg數量時,擴展一個chunk,最大為256個塊 private Chunk m_spareChunk; 當前trunk頭部在整個佇列中的的索引位置 private int m_nextGlobalIndex;

YPipe寫入Msg實際是向YQueue入隊

public void Push(ref T val) { m_backChunk.Values[m_backPositionInChunk] = val; //指向後一個塊 m_backChunk = m_endChunk; //索引更新到最後可讀位置 m_backPositionInChunk = m_endPosition; //下一個可寫位置向後移動一位 m_endPosition++; if (m_endPosition != m_chunkSize) return; //到達最後一個位置則需要擴充一個塊 Chunk sc = m_spareChunk; if (sc != m_beginChunk) { //已經擴充了塊則更新下一個塊的位置 m_spareChunk = m_spareChunk.Next; m_endChunk.Next = sc; sc.Previous = m_endChunk; } else { //新建一個塊,並更新索引位置 m_endChunk.Next = new Chunk(m_chunkSize, m_nextGlobalIndex); m_nextGlobalIndex += m_chunkSize; m_endChunk.Next.Previous = m_endChunk; } m_endChunk = m_endChunk.Next; 當前塊的局部位置從0開始 m_endPosition = 0; }

每次消息寫完消息時調用YPipe的Flush方法完成當前消息的寫入

public bool Flush { //只有一條Msg if (m_flushFromIndex == m_flushToIndex) { return true; } //將m_lastAllowedToReadIndex更新為flushToIndex if (Interlocked.CompareExchange(ref m_lastAllowedToReadIndex, m_flushToIndex, m_flushFromIndex) != m_flushFromIndex) { //沒有資料寫入時,lastAllowedToReadIndex為-1,表示沒有資料可讀,因此這裡不需要關係執行緒安全 Interlocked.Exchange(ref m_lastAllowedToReadIndex, m_flushToIndex); m_flushFromIndex = m_flushToIndex; return false; } 有資料寫入時更新指標 m_flushFromIndex = m_flushToIndex; return true; }

總結

該篇在上一片的基礎上對SocketBase和SessionBase進行了一些細節上的補充。同時,對NetMQ的配置參數進行了一些介紹,最後對消息管道進行了簡單講解。