您的位置:首頁>正文

深度揭秘Twitter的新一代流處理引擎Heron

流計算又稱即時計算, 是繼以Map-Reduce為代表的批次處理之後的又一重要計算模型。 隨著互聯網業務的發展以及資料規模的持續擴大,

傳統的批次處理計算難以有效地對資料進行快速低延遲處理並返回結果。 由於資料幾乎處於不斷增長的狀態中, 及時處理計算大批量資料成為了批次處理計算的一大難題。 在此背景之下, 流計算應運而生。 相比于傳統的批次處理計算, 流計算具有低延遲、高回應、持續處理的特點。 在資料產生的同時, 就可以進行計算並獲得結果。 更可以通過Lambda架構將即時的流計算處理結果與延後的批次處理計算結果結合, 從而較好地滿足低延遲、高正確性的業務需求。

Twitter由於本身的業務特性, 對即時性有著強烈的需求。 因此在流計算上投入了大量的資源進行開發。 第一代流處理系統Storm發佈以後得到了廣泛的關注和應用。

根據Storm在實踐中遇到的性能、規模、可用性等方面的問題, Twitter又開發了第二代流處理系統——Heron[1], 並在2016年將它開源。

重要概念定義

在開始瞭解Heron的具體架構和設計之前, 我們首先定義一些流計算以及在Heron設計中用到的基本概念:

Tuple:流計算任務中處理的最小單中繼資料的抽象。

Stream:由無限個Tuple組成的連續序列。

Spout:從外界資料來源獲得資料並生成Tuple的計算任務。

Bolt:處理上游Spout或者Bolt生成的Tuple的計算任務。

Topology:一個通過Stream將Spout和Bolt相連的處理Tuple的邏輯計算任務。

Grouping:流計算中的Tuple分發策略。 在Tuple通過Stream傳遞到下游Bolt的過程中, Grouping策略決定了如何將一個Tuple路由給一個具體的Bolt實例。 典型的Grouping策略有:隨機分配、基於Tuple內容的分配等。

Physical Plan:基於Topology定義的邏輯計算任務以及所擁有的計算資源, 生成的實際運行時資訊的集合。

在以上流處理基本概念的基礎上, 我們可以構建出流處理的三種不同處理語義:

至多一次(At-Most-Once): 盡可能處理資料, 但不保證資料一定會被處理。 輸送量大, 計算快但是計算結果存在一定的誤差。

至少一次(At-Least-Once):在外部資料來源允許Replay(重演)的情況下, 保證資料至少被處理一次。 在出現錯誤的情況下會重新處理該資料, 可能會出現重複處理多次同一資料的情況。 保證資料的處理但是延遲升高。

僅有一次(Exactly-Once):每一個資料確保被處理且僅被處理一次。 結果精確但是所需要的計算資源增多並且還會導致計算效率降低。

從上可知, 三種不同的處理模式有各自的優缺點, 因此在選擇處理模式的時候需要綜合考量一個Topology對於輸送量、延遲、結果誤差、計算資源的要求,

從而做出最優的選擇。 目前的Heron已經實現支持至多一次和至少一次語義, 並且正在開發對於僅有一次語義的支持。

Heron系統概覽

保持與Storm介面(API)相容是Heron的設計目標之一。 因此, Heron的資料模型與Storm的資料模型基本保持一致。 每個提交給Heron的Topology都是一個由Spout和Bolt這兩類結點(Vertex)組成的, 以Stream為邊(Edge)的有向無環圖(Directed acyclic graph)。 其中Spout結點是Topology的資料來源, 它從外部讀取Topology所需要處理的資料, 常見的如kafka-spout, 然後發送給後續的Bolt結點進行處理。 Bolt節點進行實際的資料計算, 常見的運算如Filter、Map以及FlatMap等。

我們可以把Heron的Topology類比為資料庫的邏輯查詢計畫。 這種邏輯上的計畫最後都要變成實質上的處理計畫才能執行。 用戶在編寫Topology時指定每個Spout和Bolt任務的並行度和Tuple在Topology中結點間的分發策略(Grouping)。

所有使用者提供的資訊經過打包演算法(Pakcing)的計算, 這些Spout和Bolt任務(task)被分配到一批抽象容器中。 最後再把這些抽象容器映射到真實的容器中, 就可以生成一個物理上可執行的計畫(Physical plan), 它是所有邏輯資訊(拓撲圖、並行度、計算任務)和運行時資訊(計算任務和容器的對應關係、實際運行位址)的集合。

整體結構

總體上, Heron的整體架構如圖1所示。 用戶通過命令列工具(Heron-CLI)將Topology提交給Heron Scheduler。 再由Scheduler對提交的Topology進行資源配置以及運行調度。 在同一時間, 同一個資源平臺上可以運行多個相互獨立Topology。

圖1 Heron架構

與Storm的Service架構不同, Heron是Library架構。 Storm在架構設計上是基於服務的, 因此需要設立專有的Storm集群來運行用戶提交的Topology。 在開發、運維以及成本上, 都有諸多的不足。而Heron則是基於庫的,可以運行在任意的共用資源調度平臺上。最大化地降低了運維負擔以及成本開銷。

目前的Heron支持Aurora、YARN、Mesos以及EC2,而Kubernetes和Docker等目前正在開發中。通過可擴展外掛程式Heron Scheduler,使用者可以根據不同的需求及實際情況選擇相應的運行平臺,從而達到多平臺資源管理器的支援[2]。

而被提交運行Topology的內部結構如圖2所示,不同的計算任務被封裝在多個容器中運行。這些由調度器調度的容器可以在同一個物理主機上,也可分佈在多個主機上。其中每一個Topology的第一個容器(容器0)負責整個Topology的管理工作,主要運行一個Topology Master進程;其餘各個容器負責使用者提交的計算邏輯的實現,每個容器中主要運行一個Stream Manager進程,一個Metrics Manager進程,以及多個Instance進程。每個Instance都負責運行一個Spout或者Bolt任務(task)。對於Topology Master、Stream Manager以及Instance進程的結構及重要功能,我們會在本文的後面章節進行詳細的分析。

圖2 Topology結構

狀態(State)存儲和監控

Heron的State Manager是一個抽象的模組,它在具體實現中可以是ZooKeeper或者是檔案系統。它的主要作用是保存各個Topology的各種元資訊:Topology的提交者、提交時間、運行時生成的Physical Plan以及Topology Master的地址等,從而為Topology的自我恢復提供幫助。

每個容器中的Metrics Manager負責收集所在容器的運行時狀態指標(Metrics),並上傳給監控系統。當前Heron版本中,簡化的監控系統集成在Topology Master中。將來這一監控模組將會成為容器0中的一個獨立進程。Heron還提供Heron-Tracker和Heron-UI 這兩個工具來查看和監測一個資料中心中運行的所有Topology。

啟動過程

在一個Topology中,Topology Master是整個Topology的元資訊管理者,它維護著完整的Topology元資訊。而Stream Manager是每個容器的閘道,它負責各個Instance之間的資料通信,以及和Topology Master之間的控制信令。

當用戶提交Topology之後,Scheduler便會開始分配資源並運行容器。每個容器中啟動一個Heron Executor的進程,它區分容器0和其他容器,分別啟動Topology Master或者Stream Manager等進程。在一個普通容器中,Instance進程啟動後會主動向本地容器的Stream Manager進行註冊。當Stream Manager收到所有Instance的註冊請求後,會向Topology Master發送包含了自己的所負責的Instance的註冊資訊。當Topology Master收到所有Stream Manager的註冊資訊以後,會生成一個各個Instance,Stream Manager的實際運行位址的Physical Plan並進行廣播分發。收到了Physical Plan的各個Stream Manager之間就可以根據這一Physical Plan互相建立連接形成一個完全圖,然後開始處理資料。

Instance進行具體的Tuple資料計算處理。Stream Manager則不執行具體的計算處理任務,只負責中繼轉發Tuple。從資料流程網路的角度,可以把Stream Manager理解為每個容器的路由器。所有Instance之間的Tuple傳遞都是通過Stream Manager中繼。因此容器內的Instance之間通信是一跳(hop)的星形網路。所有的Stream Manager都互相連接,形成Mesh網路。容器之間的通信也是通過Stream Manager中繼的,是通過兩跳的中繼完成的。

核心元件分析

TMaster

TMaster是Topology Master的簡寫。與很多Master-Slave模式分散式系統中的Master單點處理控制邏輯的作用相同,TMaster作為Master角色提供了一個全域的介面來瞭解Topology的運行狀態。同時,通過將重要的狀態資訊(Physical Plan)等記錄到ZooKeeper中,保證了TMaster在崩潰恢復之後能繼續運行。

實際產品中的TMaster在啟動的時候,會在ZooKeeper的某一約定目錄中創建一個Ephemeral Node來存儲自己的IP位址以及埠,讓Stream Manager能發現自己。Heron使用Ephemeral Node的原因包括:

避免了一個Topology出現多個TMaster的情況。這樣就使得這個Topology的所有進程都能認定同一個TMaster;

同一Topology內部的進程能夠通過ZooKeeper來發現TMaster所在的位置,從而與其建立連接。

TMaster主要有以下三個功能:

構建、分發並維護Topology的Physical Plan;

收集各個Stream Manager的心跳,確認Stream Manager的存活;

收集和分發Topology部分重要的運行時狀態指標(Metrics)。

由於Topology的Physical Plan只有在運行時才能確定,因此TMaster就成為了構建、分發以及維護Physical Plan的最佳選擇。在TMaster完成啟動和向ZooKeeper註冊之後,會等待所有的Stream Manager與自己建立連接。在Stream Manager與TMaster建立連接之後,Stream Manager會報告自己的實際IP位址、埠以及自己所負責的Instance地址與埠。TMaster在收到所有Stream Manager報告的位址資訊之後就能構建出Physical Plan並進行廣播分發。所有的Stream Manager都會收到由TMaster構建的Physical Plan,並且根據其中的資訊與其餘的Stream Manager建立兩兩連接。只有當所有的連接都建立完成之後,Topology才會真正開始進行資料的運算和處理。當某一個Stream Manager丟失並重連之後,TMaster會檢測其運行位址及埠是否發生了改變;若改變,則會及時地更新Physical Plan並廣播分發,使Stream Manager能夠建立正確的連接,從而保證整個Topology的正確運行。

TMaster會接受Stream Manager定時發送的心跳資訊並且維護各個Stream Manager的最近一次心跳時間戳記。心跳首先能夠幫助TMaster確認Stream Manager的存活,其次可以幫助其決定是否更新一個Stream Manager的連接並且更新Physical Plan。

TMaster還會接受由Metrics Manager發送的一部分重要Metrics並且向Heron-Tracker提供這些Metrics。Heron-Tracker可以通過這些Metrics來確定Topology的運行情況並使得Heron-UI能夠基於這些重要的Metrics來進行監控檢測。典型的Metrics有:分發Tuple的次數,計算Tuple的次數以及處於backpressure狀態的時間等。

非常值得注意的一點是,TMaster本身並不參與任何實際的資料處理。因此它也不會接受和分發任何的Tuple。這一設計使得TMaster本身邏輯清晰,也非常輕量,同時也為以後功能的拓展留下了巨大的空間。

Stream Manager 和反壓(Back pressure)機制

Stmgr是Stream Manager的簡寫。Stmgr管理著Tuple的路由,並負責中繼Tuple。當Stmgr拿到Physical Plan以後就能根據其中的資訊知道與其餘的Stmgr建立連接形成Mesh網路,從而進行資料中繼以及Backpressure控制。Tuple傳遞路徑可以通過圖3來說明,圖3中容器1的Instance D(1D)要發送一個Tuple給容器4中的Instance C(4C),這個Tuple經過的路徑為:容器1的1D,容器1的Stmgr,容器4的Stmgr,容器4的4C。又比如從3A到3B的Tuple經過的路徑為:3A,容器3的Stmgr,3B。與Internet的路由機制對比,Heron的路由非常簡單,這得益於Stmgr之間兩兩相連,使得所有的Instance之間的距離不超過2跳。

圖3 Tuple發送路徑示例

Acking

Stmgr除了路由中繼Tuple的功能以外,它還負責確認(Acking)Tuple已經被處理。Acking的概念在Heron的前身Storm中已經存在。Acking機制的目的是為了實現At-Least-Once的語義。原理上,當一個Bolt實例處理完一個Tuple以後,這個Bolt實例發送一個特殊的Acking Tuple給這個bolt的上游Bolt實例或者Spout實例,向上游結點確認Tuple已經處理完成。這個過程層層向上游結點推進,直到Spout結點。實現上,當Acking Tuple經過Stmgr時候由異或(xor)操作標記Tuple,由異或操作的特性得知是否處理完成。當一個Spout實例在一定時間內還沒有收集到Acking Tuple,那麼它將重發對應的資料Tuple。Heron的Acking機制的實現與它的前任Storm一致。

Back Pressure

Heron引入了反壓(Back Pressure)機制,來動態調整Tuple的處理速度以避免系統超載。一般來說,解決系統超載問題有三種策略:1. 放任不管;2. 丟棄超載數據;3. 請求減少負載。Heron採用了第三種策略,通過Backpressure機制來進行超載恢復,保證系統不會在超載的情況下崩潰。

Backpressure機制觸發過程如下:當某一個Bolt Instance處理速度跟不上Tuple的輸入速度時,會造成負責向該Instance轉發Tuple的Stmgr緩存不斷堆積。當緩存大小超過一個上限值(Hight Water Mark)時,該Stmgr會停止從本地的Spout中讀取Tuple並向Topology中的其他所有Stmgr發送一個“開始Backpressure”的資訊。而其餘的Stmgr在接收到這一消息時也會停止從他們所負責的Spout Instance處讀取並轉發Tuple。至此,整個Topology就不再從外界讀入Tuple而只處理堆積在內部的未處理Tuple。而處理的速度則由最慢的Instance來決定。在經過一定時間的處理以後,當緩存的大小減低到一個下限值(Low Water Mark)時,最開始發送“開始Backpressure”的Stmgr會再次發送“停止Backpressure”的資訊,從而使得所有的Stmgr重新開始從Spout Instance讀取分發資料。而由於Spout通常是從具有允許重演(Replay)的訊息佇列中讀取資料,因此即使凍結了也不會導致資料的丟失。

注意在Backpressure的過程中兩個重要的數值:上限值(High Water Mark)和下限值(Low Water Mark)。只有當緩存區的大小超過上限值時才會觸發Backpressure,然後一直持續到緩存區的大小減低到下限值時。這一設計有效地避免了一個Topology不停地在Backpressure狀態和正常狀態之間震盪變化的情況發展,一定程度上保證了Topology的穩定。

Instance

Instance是整個Heron處理引擎的核心部分之一。Topology中不論是Spout類型結點還是Bolt類型結點,都是由Instance來實現的。不同於Storm的Worker設計,在當前的Heron中每一個Instance都是一個獨立的JVM進程,通過Stmgr進行資料的分發接受,完成用戶定義的計算任務。獨立進程的設計帶來了一系列的優點:便於調試、調優、資源隔離以及容錯恢復等。同時,由於資料的分發傳送任務已經交由Stmgr來處理,Instance可以用任何程式設計語言來進行實現,從而支援各種語言平臺。

Instance採用雙執行緒的設計,如圖4所示。一個Instance的進程包含Gateway以及Task Execution這兩個執行緒。Gateway執行緒主要控制著Instance與本地Stmgr和Metrics Manager之間的資料交換。通過TCP連接,Gateway執行緒:1. 接受由Stmgr分發的待處理Tuple;2. 發送經Task Execution處理的Tuple給Stmgr;3. 轉發由Task Execution執行緒產生的Metrics給Metrics Manager。不論是Spout還是Bolt,Gateway執行緒完成的任務都相同。

Task Execution執行緒的職責是執行用戶定義的計算任務。對於Spout和Bolt,Task Execution執行緒會相應地去執行open和prepare方法來初始化其狀態。如果運行的Instance是一個Bolt實例,那麼Task Execution執行緒會執行execute方法來處理接收到的Tuple;如果是Spout,則會重複執行nextTuple方法來從外部資料來源不停地獲取資料,生成Tuple,併發送給下游的Instance進行處理。經過處理的Tuple會被發送至Gateway執行緒進行下一步的分發。同時在執行的過程中,Task Execution執行緒會生成各種Metrics(tuple處理數量,tuple處理延遲等)併發送給Metrics Manager進行狀態監控。

圖4 Instance結構

Gateway執行緒和Task Execution執行緒之間通過三個單向的佇列來進行通信,分別是資料進入佇列、資料發送佇列以及Metrics發送佇列。Gateway執行緒通過資料進入佇列向Task Execution執行緒傳入Tuple;Task Execution通過資料發送佇列將處理完的Tuple發送給Gateway執行緒;Task Execution執行緒通過Metrics發送佇列將收集的Metric發送給Gateway執行緒。

總結

在本文中,我們介紹了流計算的背景和重要概念,並且詳細分析了Twitter目前的流計算引擎—— Heron的結構及重要元件。希望能借此為大家提供一些在設計和構建流計算系統時的經驗,也歡迎大家向我們提供建議和幫助。如果大家對Heron的開發和改進感興趣,可以在Github上進行查看。

【1】Kulkarni, Sanjeev, Nikunj Bhagat, Maosong Fu, Vikas Kedigehalli, Christopher Kellogg, Sailesh Mittal, Jignesh M. Patel, Karthik Ramasamy, and Siddarth Taneja. "Twitter heron: Stream processing at scale." In Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data, pp. 239-250. ACM, 2015.

【2】Maosong Fu, Ashvin Agrawal, Avrilia Floratou, Bill Graham, Andrew Jorgensen, Mark Li, Neng Lu, Karthik Ramasamy, Sriram Rao and Cong Wang. "Twitter Heron: Towards Extensible Streaming Engines." In 2017 International Conference on Data Engineering (ICDE). IEEE, 2017.

呂能,Twitter即時計算平臺團隊成員。專注于分散式系統,曾參與過Twitter的Manhattan鍵值存儲系統,Obs監控警報系統的開發,目前負責Heron的開發研究。曾在國際頂級期刊和會議發表多篇學術論文。

吳惠君,Twitter軟體工程師,致力於即時流處理引擎Heron的研究和開發。他畢業於Arizona State University,專攻大資料處理和移動雲計算,曾在國際頂級期刊和會議發表多篇學術論文,並有多項專利。

符茂松,Twitter即時計算平臺團隊主管,負責Heron, Presto等服務。Heron的原作者之一。專注於分散式系統,在SIGMOD、ICDE等會議期刊發表多篇論文。本科畢業于華中科技大學,研究生畢業于Carnegie Mellon University。

本文為《程式師》原創文章,未經允許禁止轉載

都有諸多的不足。而Heron則是基於庫的,可以運行在任意的共用資源調度平臺上。最大化地降低了運維負擔以及成本開銷。

目前的Heron支持Aurora、YARN、Mesos以及EC2,而Kubernetes和Docker等目前正在開發中。通過可擴展外掛程式Heron Scheduler,使用者可以根據不同的需求及實際情況選擇相應的運行平臺,從而達到多平臺資源管理器的支援[2]。

而被提交運行Topology的內部結構如圖2所示,不同的計算任務被封裝在多個容器中運行。這些由調度器調度的容器可以在同一個物理主機上,也可分佈在多個主機上。其中每一個Topology的第一個容器(容器0)負責整個Topology的管理工作,主要運行一個Topology Master進程;其餘各個容器負責使用者提交的計算邏輯的實現,每個容器中主要運行一個Stream Manager進程,一個Metrics Manager進程,以及多個Instance進程。每個Instance都負責運行一個Spout或者Bolt任務(task)。對於Topology Master、Stream Manager以及Instance進程的結構及重要功能,我們會在本文的後面章節進行詳細的分析。

圖2 Topology結構

狀態(State)存儲和監控

Heron的State Manager是一個抽象的模組,它在具體實現中可以是ZooKeeper或者是檔案系統。它的主要作用是保存各個Topology的各種元資訊:Topology的提交者、提交時間、運行時生成的Physical Plan以及Topology Master的地址等,從而為Topology的自我恢復提供幫助。

每個容器中的Metrics Manager負責收集所在容器的運行時狀態指標(Metrics),並上傳給監控系統。當前Heron版本中,簡化的監控系統集成在Topology Master中。將來這一監控模組將會成為容器0中的一個獨立進程。Heron還提供Heron-Tracker和Heron-UI 這兩個工具來查看和監測一個資料中心中運行的所有Topology。

啟動過程

在一個Topology中,Topology Master是整個Topology的元資訊管理者,它維護著完整的Topology元資訊。而Stream Manager是每個容器的閘道,它負責各個Instance之間的資料通信,以及和Topology Master之間的控制信令。

當用戶提交Topology之後,Scheduler便會開始分配資源並運行容器。每個容器中啟動一個Heron Executor的進程,它區分容器0和其他容器,分別啟動Topology Master或者Stream Manager等進程。在一個普通容器中,Instance進程啟動後會主動向本地容器的Stream Manager進行註冊。當Stream Manager收到所有Instance的註冊請求後,會向Topology Master發送包含了自己的所負責的Instance的註冊資訊。當Topology Master收到所有Stream Manager的註冊資訊以後,會生成一個各個Instance,Stream Manager的實際運行位址的Physical Plan並進行廣播分發。收到了Physical Plan的各個Stream Manager之間就可以根據這一Physical Plan互相建立連接形成一個完全圖,然後開始處理資料。

Instance進行具體的Tuple資料計算處理。Stream Manager則不執行具體的計算處理任務,只負責中繼轉發Tuple。從資料流程網路的角度,可以把Stream Manager理解為每個容器的路由器。所有Instance之間的Tuple傳遞都是通過Stream Manager中繼。因此容器內的Instance之間通信是一跳(hop)的星形網路。所有的Stream Manager都互相連接,形成Mesh網路。容器之間的通信也是通過Stream Manager中繼的,是通過兩跳的中繼完成的。

核心元件分析

TMaster

TMaster是Topology Master的簡寫。與很多Master-Slave模式分散式系統中的Master單點處理控制邏輯的作用相同,TMaster作為Master角色提供了一個全域的介面來瞭解Topology的運行狀態。同時,通過將重要的狀態資訊(Physical Plan)等記錄到ZooKeeper中,保證了TMaster在崩潰恢復之後能繼續運行。

實際產品中的TMaster在啟動的時候,會在ZooKeeper的某一約定目錄中創建一個Ephemeral Node來存儲自己的IP位址以及埠,讓Stream Manager能發現自己。Heron使用Ephemeral Node的原因包括:

避免了一個Topology出現多個TMaster的情況。這樣就使得這個Topology的所有進程都能認定同一個TMaster;

同一Topology內部的進程能夠通過ZooKeeper來發現TMaster所在的位置,從而與其建立連接。

TMaster主要有以下三個功能:

構建、分發並維護Topology的Physical Plan;

收集各個Stream Manager的心跳,確認Stream Manager的存活;

收集和分發Topology部分重要的運行時狀態指標(Metrics)。

由於Topology的Physical Plan只有在運行時才能確定,因此TMaster就成為了構建、分發以及維護Physical Plan的最佳選擇。在TMaster完成啟動和向ZooKeeper註冊之後,會等待所有的Stream Manager與自己建立連接。在Stream Manager與TMaster建立連接之後,Stream Manager會報告自己的實際IP位址、埠以及自己所負責的Instance地址與埠。TMaster在收到所有Stream Manager報告的位址資訊之後就能構建出Physical Plan並進行廣播分發。所有的Stream Manager都會收到由TMaster構建的Physical Plan,並且根據其中的資訊與其餘的Stream Manager建立兩兩連接。只有當所有的連接都建立完成之後,Topology才會真正開始進行資料的運算和處理。當某一個Stream Manager丟失並重連之後,TMaster會檢測其運行位址及埠是否發生了改變;若改變,則會及時地更新Physical Plan並廣播分發,使Stream Manager能夠建立正確的連接,從而保證整個Topology的正確運行。

TMaster會接受Stream Manager定時發送的心跳資訊並且維護各個Stream Manager的最近一次心跳時間戳記。心跳首先能夠幫助TMaster確認Stream Manager的存活,其次可以幫助其決定是否更新一個Stream Manager的連接並且更新Physical Plan。

TMaster還會接受由Metrics Manager發送的一部分重要Metrics並且向Heron-Tracker提供這些Metrics。Heron-Tracker可以通過這些Metrics來確定Topology的運行情況並使得Heron-UI能夠基於這些重要的Metrics來進行監控檢測。典型的Metrics有:分發Tuple的次數,計算Tuple的次數以及處於backpressure狀態的時間等。

非常值得注意的一點是,TMaster本身並不參與任何實際的資料處理。因此它也不會接受和分發任何的Tuple。這一設計使得TMaster本身邏輯清晰,也非常輕量,同時也為以後功能的拓展留下了巨大的空間。

Stream Manager 和反壓(Back pressure)機制

Stmgr是Stream Manager的簡寫。Stmgr管理著Tuple的路由,並負責中繼Tuple。當Stmgr拿到Physical Plan以後就能根據其中的資訊知道與其餘的Stmgr建立連接形成Mesh網路,從而進行資料中繼以及Backpressure控制。Tuple傳遞路徑可以通過圖3來說明,圖3中容器1的Instance D(1D)要發送一個Tuple給容器4中的Instance C(4C),這個Tuple經過的路徑為:容器1的1D,容器1的Stmgr,容器4的Stmgr,容器4的4C。又比如從3A到3B的Tuple經過的路徑為:3A,容器3的Stmgr,3B。與Internet的路由機制對比,Heron的路由非常簡單,這得益於Stmgr之間兩兩相連,使得所有的Instance之間的距離不超過2跳。

圖3 Tuple發送路徑示例

Acking

Stmgr除了路由中繼Tuple的功能以外,它還負責確認(Acking)Tuple已經被處理。Acking的概念在Heron的前身Storm中已經存在。Acking機制的目的是為了實現At-Least-Once的語義。原理上,當一個Bolt實例處理完一個Tuple以後,這個Bolt實例發送一個特殊的Acking Tuple給這個bolt的上游Bolt實例或者Spout實例,向上游結點確認Tuple已經處理完成。這個過程層層向上游結點推進,直到Spout結點。實現上,當Acking Tuple經過Stmgr時候由異或(xor)操作標記Tuple,由異或操作的特性得知是否處理完成。當一個Spout實例在一定時間內還沒有收集到Acking Tuple,那麼它將重發對應的資料Tuple。Heron的Acking機制的實現與它的前任Storm一致。

Back Pressure

Heron引入了反壓(Back Pressure)機制,來動態調整Tuple的處理速度以避免系統超載。一般來說,解決系統超載問題有三種策略:1. 放任不管;2. 丟棄超載數據;3. 請求減少負載。Heron採用了第三種策略,通過Backpressure機制來進行超載恢復,保證系統不會在超載的情況下崩潰。

Backpressure機制觸發過程如下:當某一個Bolt Instance處理速度跟不上Tuple的輸入速度時,會造成負責向該Instance轉發Tuple的Stmgr緩存不斷堆積。當緩存大小超過一個上限值(Hight Water Mark)時,該Stmgr會停止從本地的Spout中讀取Tuple並向Topology中的其他所有Stmgr發送一個“開始Backpressure”的資訊。而其餘的Stmgr在接收到這一消息時也會停止從他們所負責的Spout Instance處讀取並轉發Tuple。至此,整個Topology就不再從外界讀入Tuple而只處理堆積在內部的未處理Tuple。而處理的速度則由最慢的Instance來決定。在經過一定時間的處理以後,當緩存的大小減低到一個下限值(Low Water Mark)時,最開始發送“開始Backpressure”的Stmgr會再次發送“停止Backpressure”的資訊,從而使得所有的Stmgr重新開始從Spout Instance讀取分發資料。而由於Spout通常是從具有允許重演(Replay)的訊息佇列中讀取資料,因此即使凍結了也不會導致資料的丟失。

注意在Backpressure的過程中兩個重要的數值:上限值(High Water Mark)和下限值(Low Water Mark)。只有當緩存區的大小超過上限值時才會觸發Backpressure,然後一直持續到緩存區的大小減低到下限值時。這一設計有效地避免了一個Topology不停地在Backpressure狀態和正常狀態之間震盪變化的情況發展,一定程度上保證了Topology的穩定。

Instance

Instance是整個Heron處理引擎的核心部分之一。Topology中不論是Spout類型結點還是Bolt類型結點,都是由Instance來實現的。不同於Storm的Worker設計,在當前的Heron中每一個Instance都是一個獨立的JVM進程,通過Stmgr進行資料的分發接受,完成用戶定義的計算任務。獨立進程的設計帶來了一系列的優點:便於調試、調優、資源隔離以及容錯恢復等。同時,由於資料的分發傳送任務已經交由Stmgr來處理,Instance可以用任何程式設計語言來進行實現,從而支援各種語言平臺。

Instance採用雙執行緒的設計,如圖4所示。一個Instance的進程包含Gateway以及Task Execution這兩個執行緒。Gateway執行緒主要控制著Instance與本地Stmgr和Metrics Manager之間的資料交換。通過TCP連接,Gateway執行緒:1. 接受由Stmgr分發的待處理Tuple;2. 發送經Task Execution處理的Tuple給Stmgr;3. 轉發由Task Execution執行緒產生的Metrics給Metrics Manager。不論是Spout還是Bolt,Gateway執行緒完成的任務都相同。

Task Execution執行緒的職責是執行用戶定義的計算任務。對於Spout和Bolt,Task Execution執行緒會相應地去執行open和prepare方法來初始化其狀態。如果運行的Instance是一個Bolt實例,那麼Task Execution執行緒會執行execute方法來處理接收到的Tuple;如果是Spout,則會重複執行nextTuple方法來從外部資料來源不停地獲取資料,生成Tuple,併發送給下游的Instance進行處理。經過處理的Tuple會被發送至Gateway執行緒進行下一步的分發。同時在執行的過程中,Task Execution執行緒會生成各種Metrics(tuple處理數量,tuple處理延遲等)併發送給Metrics Manager進行狀態監控。

圖4 Instance結構

Gateway執行緒和Task Execution執行緒之間通過三個單向的佇列來進行通信,分別是資料進入佇列、資料發送佇列以及Metrics發送佇列。Gateway執行緒通過資料進入佇列向Task Execution執行緒傳入Tuple;Task Execution通過資料發送佇列將處理完的Tuple發送給Gateway執行緒;Task Execution執行緒通過Metrics發送佇列將收集的Metric發送給Gateway執行緒。

總結

在本文中,我們介紹了流計算的背景和重要概念,並且詳細分析了Twitter目前的流計算引擎—— Heron的結構及重要元件。希望能借此為大家提供一些在設計和構建流計算系統時的經驗,也歡迎大家向我們提供建議和幫助。如果大家對Heron的開發和改進感興趣,可以在Github上進行查看。

【1】Kulkarni, Sanjeev, Nikunj Bhagat, Maosong Fu, Vikas Kedigehalli, Christopher Kellogg, Sailesh Mittal, Jignesh M. Patel, Karthik Ramasamy, and Siddarth Taneja. "Twitter heron: Stream processing at scale." In Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data, pp. 239-250. ACM, 2015.

【2】Maosong Fu, Ashvin Agrawal, Avrilia Floratou, Bill Graham, Andrew Jorgensen, Mark Li, Neng Lu, Karthik Ramasamy, Sriram Rao and Cong Wang. "Twitter Heron: Towards Extensible Streaming Engines." In 2017 International Conference on Data Engineering (ICDE). IEEE, 2017.

呂能,Twitter即時計算平臺團隊成員。專注于分散式系統,曾參與過Twitter的Manhattan鍵值存儲系統,Obs監控警報系統的開發,目前負責Heron的開發研究。曾在國際頂級期刊和會議發表多篇學術論文。

吳惠君,Twitter軟體工程師,致力於即時流處理引擎Heron的研究和開發。他畢業於Arizona State University,專攻大資料處理和移動雲計算,曾在國際頂級期刊和會議發表多篇學術論文,並有多項專利。

符茂松,Twitter即時計算平臺團隊主管,負責Heron, Presto等服務。Heron的原作者之一。專注於分散式系統,在SIGMOD、ICDE等會議期刊發表多篇論文。本科畢業于華中科技大學,研究生畢業于Carnegie Mellon University。

本文為《程式師》原創文章,未經允許禁止轉載

同類文章
Next Article
喜欢就按个赞吧!!!
点击关闭提示