您的位置:首頁>財經>正文

去哪兒網基於Marathon管理Spark 2.0實現動態擴容實踐

作者丨李雪岩、徐磊、呂曉旭

責編丨魏偉

“去年10月, 去哪兒網實現了Spark 1.5.2版本運行在Mesos資源管理框架上。 目前, 線上已經註冊了44 個Spark任務, 在運行這些任務的過程中, 他們遇到的最大的問題就是動態擴容問題。 ”

背景

去年10月, 我們實現了Spark 1.5.2版本運行在Mesos這個資源管理框架上。 隨後Spark出了新版本我們又對Spark進行了小升級, 升級並沒有什麼太大的難度, 沿用之前的修改過的代碼重新編譯, 替換一下包, 把歷史任務全部發一遍就能很好的升級到1.6.1也就是現在集群的版本, 1.6.2並沒有升級因為感覺改動不是很大。 到現在正好一年的時間,

線上已經註冊了44 個Spark任務, 其中28個為Streaming任務, 在運行這些任務的過程中, 我們遇到了很多問題, 其中最大的問題是動態擴容問題, 即當業務線增加更複雜的代碼邏輯或者業務的增長導致處理量上升的時候會使Spark因計算資源不足, 這時候如果沒有做流量控制則Spark任務會因記憶體承受不了而失敗, 如果做了流量控制則Kafka的lag會有堆積, 這時候一般就需要增加更多的executor來處理, 但是增加多少合適一般不太好判斷, 於是要反復地修改配置重新發佈來找到一個合理的配置。

我們在Marathon上使用Logstash的時候也有類似的問題, 當由於接入一個比較大的日誌導致流量突然增加使得Logstash處理不了時, Kafka的Lag產生堆積, 這時我們只需直接上Marathon的介面上點Scale然後填入更大的實例數位就能啟動了一些Logstash實例自動平衡地去處理了。

當發現某個結點是慢結點不幹活的時候, 只需要在Marathon上將對應的任務Kill掉就會自動再發一個任務替補他的位置, 那麼Logstash既然都可以做到為什麼Spark不可以?因此我們決定在Spark 2.0版本的時候來實現這個功能, 同時我們也會改進其它的一些問題, 另外Spark2.0是一個比較大的版本升級, 配置與之前的1.6.1不同, 不能做到直接全部重發一遍任務來做到全部升級。

( 圖1)使用Logstash的管理架構

Mesos-dispacher架構與問題

在這裡我們首先介紹一些Mesos的一些相關概念, Mesos的Framework是資源配置與調度的發起者, Spark自帶了一個spark-mesos-dispacher的Framework用來管理Spark的資源調度。 而Marathon也是一個Framework他的本質和mesos-dispacher或spark schedular相同。

(圖2)Mesos-dispacher架構

在圖2在這個架構中, 你首先得向mesos註冊一個mesos-dispacher的Framework, 然後, 通過spark-sumbit腳本來向mesos-dispacher發佈任務, mesos-dispacher接到任務以後開始調度他負責發一個Spark Driver, 然後driver在mesos模式下, 他會再次向mesos註冊這個任務的Framework也就是我們看到的Spark UI, 也可以理解為他自己也是個調度器, 然後這個Framework根據配置來向Mesos申請資源來發一些Spark Executor。

(圖3)Mesos-dispacher功能截圖

從圖3可以看出, mesos-dispacher只提供了下功能:

他只提供了一個配置查看的介面, 可以看到資源配置的資訊, 點進去以後可以看到SparkConf的一些參數, 但是這個我們在業務線發佈的時候已經拿到了這些配置, 在這裡只能確認下Driver是否配置正確, 並且在SparkUI上也能看到。

他自帶一個Driver佇列, 他會按順序依次發佈, 當資源不足時會在佇列裡等待。

他自帶一個Driver的HA功能, 但是當你提交Driver代碼有問題, 他會不斷地反復重發, 比較難殺掉, 但也是能殺掉的, 並且沒有次數限制。 所以我們一般也不開放這個功能。

所以mesos-dispacher並不是一個完備的Framework, 在我們使用的過程中發現了存在以下的問題:

在我們發佈Spark的時候需要向mesos-dispacher提供一個SPARK_EXECUTOR_URI的配置來提供SPARK運行環境的位址, 一開始我們是使用http的方式來放環境的, 但是在一次需要發60個executor的時候流量打滿了, 原因是我們編譯出來的Spark的環境包大概250MB, 在發佈的時候60台機器同時拉取這個環境就把流量打爆了。 因此我們的解決方案就是在每一台機器上都部署Spark的環境, 把SPARK_EXECUTOR_URI設成本地目錄來解決這個問題。

介面上的配置並不會真正地同步到driver或executor。由於SPARK的配置很靈活,你的mesos-dispacher啟動的時候會讀取spark-defalut.conf來載入配置,每次發佈時他又會從spark-env.conf裡讀取配置,發driver的時候,driver又會從他的jar包裡的配置讀取配置,使用者自己也可以設置sparkConf的配置,executor的jar包裡同樣也有配置,最終你會發現有些配置設了生效了,有些配置的設置他沒有傳遞,從而造成配置混亂。

mesos-dispacher基本功能缺失。mesos-dispacher雖然是專門為mesos設計的,但是他對mesos的基本功能,如role和constrain支持都不好,如果不修改代碼是無法支持role和constrain,關於這個我提交了個一PR並且在Spark2.0已經沒有這個問題了。 mesos-dispacher並不能運行時修改配置,必須重啟。比如我們上了一些新機器,打了其它一些標籤或者是多標籤,如果想使其生效必須停止mesos-dispacher再啟動才能生效,無法在運行時修改。mesos-dispacher預設工作在非HA模式下,因此在啟動mesos-dispacher在的時候一定要加上Mesosr的zk這樣當停止了mesos-dispacher以後,在mesos-dispacher上的任務將不會受到影響,當重新啟動mesos-dispacher的時候會自動接管任務。

沒有動態擴容功能。我們希望做到的就是可以讓Spark可以在運行時增加實例或減少,但是受於架構限制mesos-dispacher只能管理driver,如果改mesos-dispacher的代碼的話只能實現動態擴driver沒有意義。

此外也有另一種方案就是幫助Spark改進他的Framework使他更強大,但是我們發現只需要Marathon這一個優秀的Framework就可以了,重複造輪子的成本比較大。同時也不希望對Spark代碼有過多的修改,這樣不利於升級。

Marathon+Docker統一架構

由於mesos發佈有很多種模式,我們在做這個時候主要考察了2種模式。

獨立集群模式

該模式需要啟動一個master作為發佈的入口,再對每個實例分別啟動slave。這時候每個slave在啟動的時候資源已經固定了。再增加資源的時候需要啟動新的slave然後停止之前的任務修改資源配置數重發,這種模式的好處是有一個單獨的介面,你可以直接給業務線這個獨立集群模式的介面來用,介面上他們可以根據自己固定的資源發多個任務,並且在SparkUI上可以直接看到日誌。另外它是預先占資源模式,在發佈時不會有資源爭搶導致資源不夠的情況,但是缺點就是做不到運行時的動態擴容。

仿mesos-dispacher模式

該模式下,我們使用Marathon這個framework來模仿mesos-dispacher所做的事,就是先發一個driver然後再發executor掛載到driver來執行任務。關於日誌,我們還是使用之前的方式調用Mesos的介面來獲得日誌。當需要增加資源的時候直接往結點繼續掛executor就可以,當需要刪除結點的時候直接停止executor即可。

實施過程

(圖4)仿mesos-dispacher模式

如何實現仿mesos-dispacher模式

我們要做的事實際上是把圖2的架構圖變成圖4的模式,其中Step 1和Step 2需要模仿,而Step 0則不需要,因為Step 0只是啟動Framework的。我們通過觀察meos-dispacher發現Step 1所做的實際上是調用Spark Submit向Mesos註冊一個Framework然後再由driver來負責調度,我們利用mesos的constraints的特性,設置一個不存在的不可調度的策略,例如:colo:none,這樣一來driver就無法管理資源,而我們使用Marathon自己來發佈Spark Executor來掛到driver上來實現Marathon控制Spark的資源調度策略。由於Mesos他是把Offer推送給Framework的這一特性,我們使用的這種方式也不會有性能問題。

(圖5)主要代碼

那麼圖2中的Step 2是如何做到的呢?我們通過分析Spark原始程式碼發現,Spark 2.0.2在Executor掛到drvier上是通過圖5的命令來做到的。所以通過Marathon發佈Spark Executor的基本原理就是模仿上面的圖5代碼。

從圖6可以看出Marathon發佈的時候先發Spark Driver拿到mesos分配的Spark Driver的IP和PORT填入腳本,這個參數是Driver與Executor之間通信的通道,在發Spark Executor的時候需要提供,這個Driver的IP我們通過Mesos介面可以拿到,因為Driver會向Mesos註冊一個Framework,我們拿到Framework的資訊就拿到了IP和PORT,同時我們還可以拿到FrameworkID那這個PORT是在製作Docker鏡像的時候隨機分配的一個PORT0的一個環境變數,然後通過spark.driver.port指定,這樣Executor這端就可以調用Marathon的REST API來拿到driver的Port。

而參數executor-id是Spark Driver調度時按順序分配的ID,從0開始每次遞增1,如何生成executor-id呢?這個由Spark Executor自己生成一個不超過int的範圍的不重複的亂數即可,這個的ID的不會影響其它行為。hostname可以直接通過命令獲取。cores是我們通過使用者提交的配置來計算出來的,這個Core需要填spark.executor.cores也就是每個Spark Executor的正常使用的Core與spark.mesos.extra.cores分配給每個Spark Executor之和。

(圖6)Executor發佈示意圖

最後一項目app-id通過研究發現在Mesos上實際上就是Framework ID直接通過Mesos介面就可以獲得。這樣我們就完成了Executor的發佈,通過拼上述的命令來把Spark Executor掛到了Driver上,但是實際生產應用中,我們發現了,他還存在Driver和Executor的同步問題。

Spark Receiver的平衡問題

這裡介紹一下在Kafka使用了高階API時,影響Spark性能的Receiver平衡問題,使用低階API則不會有這個問題。如果使用Spark提供的Kafka高階API,你會在代碼裡預先指定好Receiver的數量,然後再做一個Union,在Spark代碼中他實際上是這樣做的,他會先等待Executor連上Driver,默認是30s如果超過了調度的時間則開始進行Receiver的調度,而調度策略是ReceiverPolicy類裡寫死的,ReceiverPolicy的調度策略可以概括為,儘量保證均勻的分配給每個Host一定量的Recevier。

(圖7)啟動3個Spark Executor 示例

舉個例子來說,如圖7當你啟動了3個Spark Executor時,如果代碼裡指定了啟動1個Executor,如果每個Executor啟動在了不同的Host下,Spark在Receiver調度開始時隨機地指定一個Executor啟動Receiver並分配1個Core給這個Task。但是如果代碼裡指定為2個Receiver而2個Executor啟動在了同1個Host1上,另一個啟動在了Host2上,也就是Receiver的數量等於Host Unique數量,則他會在Host1中保證其中的一個Executor啟動1個Receiver,Host2中啟動一個Receiver。如果Receiver的數量,大於了Host Unique的數量如第三張圖,則他會在隨機地在Host1或者Host2中開Receiver,這就帶來了一個問題。分析Spark原始程式碼可知Spark Driver和Spark Executor之間通過運行一個DummyJob,也就是一個MapReduce任務來保證他們之間的同步的,但是他這種做法只能保證一個Spark Executor掛在了Spark Driver上,而不能夠保證所有的Executor比如當只有一個Spark Executor掛在Spark Driver上的時候,這時候開始Receiver開始調度。

如何保證Driver和Executor之間同步

如何保證容器的時間和編碼的準確性讓配置同步

之前在部署1.6.1的mesos-dispacher架構的時候,我們就已經發現, Spark打出的中文日誌會產生亂碼,然後我們做了各種實驗發現,無論如何設置JVM參數,或是使用代碼進行內部的轉換都解決不了亂碼問題,在新架構的Docker環境中也不例外,不過最終還是讓我們解決了,我們發現通過設置JAVA_TOOL_OPTIONS這個環境變數,JAVA虛擬機器的參數才真正的修改生效,於是我們在容器啟動的時候配置了file.encoding=UTF-8,亂碼問題得才以解決,除此之外在Docker鏡像中系統的時間也是不準確的,默認是UTC時間,而系統時間對代碼的影響也很大,有可能寫入到HDFS的檔是以時間戳記生成的,我們一開始解決這個問題的方法是通過以唯讀的方式在Docker中掛載宿主機上的/etc/localtime來修正時間,但是發現時間還是不正確,這時因為Spark內部還會根據時區自動修正時間為UTC,所以還需要給JVM加一個環境變數設置user.timezone=PRC 這樣時間才可以保證時間是對的,另外使用這種架構的時候spark.driver.extraJavaOptions和spark.executor.extraJavaOptions這兩個參數也不會生效,需要使用者通過發佈配置傳過來,然後在容器中追加到JAVA_TOOL_OPTIONS。另外值得注意的是SPARK_EXECUTOR_MEMORY也不會同步,需要手動來進行設置。

如何保證driver和executor失敗時同步

雖然我們之前解決了marathon發佈driver和executor之間的連接問題,但是由於mesos介面慢,在我們實際測試中,發30個executor就可以把mesos打掛,因此,我們想了另一個辦法來解決這個問題,我們首先修改了Spark代碼,讓他的Spark Driver在不依賴mesos-dispacher的情況下實現driver的HA,HA的實現原理大概就是每次在Spark Driver啟動註冊Framework的時候,把Framework ID存到zk裡,然後在程式掛掉了以後保持Framework與Mesos的連接,在下次啟動的時候重新註冊這個Framework,這樣的話,Framework ID可以基本保持不變,在發佈Spark Executor的時候就可以固定住這個Framework ID在Executor掛掉的時候marathon拉起來也能保證重連,而driver如果掛掉的話,他會重新註冊,獲得的Framework ID不變,又可以繼續運行,這樣做只需要在Spark Driver發佈完成以後調用一次Mesos介面拿到Framework ID分發給Spark Executor就可以了。順便說一下Spark Executor拿Spark Driver的ip和port是通過調Marathon介面實現了,而Marathon介面速度很快,不會有這個問題。

如何升級Spark版本

對於業務線的任務來說升級Spark是一件比較麻煩的事,主要原因是需要他們改代碼,不過從改代碼的角度來說,變化也不算大,也就是Spark版本和Scala版本變一下,另外就是有些API也需要做一點調整,另外就是升級麻煩的另一個原因也是因為之前沒有使用Marathon+Docker的模式,如果之前就使用了這種模式,那只需要把鏡像給修改了,願意升級的升級,不願意升級的可以使用原來的鏡像跑,在以後的升級中,我們只需要製作新鏡像就可以了,非常方便遷移,可以讓他跑在任何集群。那現在為了過渡到這種模式,再結合之前發佈的經驗,我們使用的模式是舊的有一套配置,新的也有一套配置,然後通過在git上打tag的方式,在舊的配置裡加入升級資訊,然後發佈邏輯改為優先讀取是否要升級,如果需要升級則發在新集群上,如果不需要則保持原來不變,我們會先讓業務線進行測試,同時保持舊的任務線上,當他們測試通過了以後,再停止舊的作務,把改好的新版本發到新集群上,當發現有問題的時候可以用原來的tag進行回滾,因為原來的tag裡的配置會先判斷是否需要升級,而之前的配置肯定沒有需要升級的選項。

如何監控Spark的運行狀態

Spark自身有一套metric監控,這個在新版本也不例外,在我們集群中唯一的變更就是把不靠譜的udp改成了tcp,另外我們因為使用的是Docker容器,這樣我們就還有另一套監控,這個監控是分析cgroup裡的資料,使用的是我們開源的pyadvisor來做的,我們可以通過監控來觀察CPU和記憶體的使用情況,很好的提出優化改進資源使用的建議,另外,對於業務線們,我們推薦他們使用的是Spark裡自帶的Accumulator,先在Spark Driver上做一個聚合1分鐘的指標,然後再往watcher上打他們的業務指標,這樣即不會有之前不同host之間的聚合指標的問題,同時也給watcher減輕了壓力。

總結

以上就是我們所做的新的Spark架構,綜合看來有以下的優點:

無需環境配置與部署,走Docker。對於以後也升級也會較方便,可以複用之前Dockerfile。

是以直接啟動的方式,配置絕對生效,不會出現複雜配置的問題。

自動平衡executor。沒有Receiver不平衡的問題問題,在某些場景下可以動態增減executor,不會有失敗過多而不再拉executor的現象,也不不會有多發或少發executor現象。

由於使用Marathon的原因,可以支援多標籤,複雜調度,例如業務線有時候需要固定指定的機運行Spark開百名單,同時也為我們以後做遷移有了更多的便利。

由CSDN主辦的中國雲計算技術大會(CCTC 2017)將於5月18-19日在北京召開,Spark、Container、區塊鏈、大資料四大主題峰會震撼襲來,包括Mesosphere CTO Tobi Knaup,Rancher labs 創始人梁勝、Databricks 工程師 Spark commiter 範文臣等近60位技術大牛齊聚京城,為雲計算、大資料以及人工智慧領域開發者帶來一場技術的盛大Party。現在報名,只需399元就可以聆聽近60場的頂級技術專家分享,還等什麼,登陸官網(http://cctc.csdn.net/),趕快報名吧!

介面上的配置並不會真正地同步到driver或executor。由於SPARK的配置很靈活,你的mesos-dispacher啟動的時候會讀取spark-defalut.conf來載入配置,每次發佈時他又會從spark-env.conf裡讀取配置,發driver的時候,driver又會從他的jar包裡的配置讀取配置,使用者自己也可以設置sparkConf的配置,executor的jar包裡同樣也有配置,最終你會發現有些配置設了生效了,有些配置的設置他沒有傳遞,從而造成配置混亂。

mesos-dispacher基本功能缺失。mesos-dispacher雖然是專門為mesos設計的,但是他對mesos的基本功能,如role和constrain支持都不好,如果不修改代碼是無法支持role和constrain,關於這個我提交了個一PR並且在Spark2.0已經沒有這個問題了。 mesos-dispacher並不能運行時修改配置,必須重啟。比如我們上了一些新機器,打了其它一些標籤或者是多標籤,如果想使其生效必須停止mesos-dispacher再啟動才能生效,無法在運行時修改。mesos-dispacher預設工作在非HA模式下,因此在啟動mesos-dispacher在的時候一定要加上Mesosr的zk這樣當停止了mesos-dispacher以後,在mesos-dispacher上的任務將不會受到影響,當重新啟動mesos-dispacher的時候會自動接管任務。

沒有動態擴容功能。我們希望做到的就是可以讓Spark可以在運行時增加實例或減少,但是受於架構限制mesos-dispacher只能管理driver,如果改mesos-dispacher的代碼的話只能實現動態擴driver沒有意義。

此外也有另一種方案就是幫助Spark改進他的Framework使他更強大,但是我們發現只需要Marathon這一個優秀的Framework就可以了,重複造輪子的成本比較大。同時也不希望對Spark代碼有過多的修改,這樣不利於升級。

Marathon+Docker統一架構

由於mesos發佈有很多種模式,我們在做這個時候主要考察了2種模式。

獨立集群模式

該模式需要啟動一個master作為發佈的入口,再對每個實例分別啟動slave。這時候每個slave在啟動的時候資源已經固定了。再增加資源的時候需要啟動新的slave然後停止之前的任務修改資源配置數重發,這種模式的好處是有一個單獨的介面,你可以直接給業務線這個獨立集群模式的介面來用,介面上他們可以根據自己固定的資源發多個任務,並且在SparkUI上可以直接看到日誌。另外它是預先占資源模式,在發佈時不會有資源爭搶導致資源不夠的情況,但是缺點就是做不到運行時的動態擴容。

仿mesos-dispacher模式

該模式下,我們使用Marathon這個framework來模仿mesos-dispacher所做的事,就是先發一個driver然後再發executor掛載到driver來執行任務。關於日誌,我們還是使用之前的方式調用Mesos的介面來獲得日誌。當需要增加資源的時候直接往結點繼續掛executor就可以,當需要刪除結點的時候直接停止executor即可。

實施過程

(圖4)仿mesos-dispacher模式

如何實現仿mesos-dispacher模式

我們要做的事實際上是把圖2的架構圖變成圖4的模式,其中Step 1和Step 2需要模仿,而Step 0則不需要,因為Step 0只是啟動Framework的。我們通過觀察meos-dispacher發現Step 1所做的實際上是調用Spark Submit向Mesos註冊一個Framework然後再由driver來負責調度,我們利用mesos的constraints的特性,設置一個不存在的不可調度的策略,例如:colo:none,這樣一來driver就無法管理資源,而我們使用Marathon自己來發佈Spark Executor來掛到driver上來實現Marathon控制Spark的資源調度策略。由於Mesos他是把Offer推送給Framework的這一特性,我們使用的這種方式也不會有性能問題。

(圖5)主要代碼

那麼圖2中的Step 2是如何做到的呢?我們通過分析Spark原始程式碼發現,Spark 2.0.2在Executor掛到drvier上是通過圖5的命令來做到的。所以通過Marathon發佈Spark Executor的基本原理就是模仿上面的圖5代碼。

從圖6可以看出Marathon發佈的時候先發Spark Driver拿到mesos分配的Spark Driver的IP和PORT填入腳本,這個參數是Driver與Executor之間通信的通道,在發Spark Executor的時候需要提供,這個Driver的IP我們通過Mesos介面可以拿到,因為Driver會向Mesos註冊一個Framework,我們拿到Framework的資訊就拿到了IP和PORT,同時我們還可以拿到FrameworkID那這個PORT是在製作Docker鏡像的時候隨機分配的一個PORT0的一個環境變數,然後通過spark.driver.port指定,這樣Executor這端就可以調用Marathon的REST API來拿到driver的Port。

而參數executor-id是Spark Driver調度時按順序分配的ID,從0開始每次遞增1,如何生成executor-id呢?這個由Spark Executor自己生成一個不超過int的範圍的不重複的亂數即可,這個的ID的不會影響其它行為。hostname可以直接通過命令獲取。cores是我們通過使用者提交的配置來計算出來的,這個Core需要填spark.executor.cores也就是每個Spark Executor的正常使用的Core與spark.mesos.extra.cores分配給每個Spark Executor之和。

(圖6)Executor發佈示意圖

最後一項目app-id通過研究發現在Mesos上實際上就是Framework ID直接通過Mesos介面就可以獲得。這樣我們就完成了Executor的發佈,通過拼上述的命令來把Spark Executor掛到了Driver上,但是實際生產應用中,我們發現了,他還存在Driver和Executor的同步問題。

Spark Receiver的平衡問題

這裡介紹一下在Kafka使用了高階API時,影響Spark性能的Receiver平衡問題,使用低階API則不會有這個問題。如果使用Spark提供的Kafka高階API,你會在代碼裡預先指定好Receiver的數量,然後再做一個Union,在Spark代碼中他實際上是這樣做的,他會先等待Executor連上Driver,默認是30s如果超過了調度的時間則開始進行Receiver的調度,而調度策略是ReceiverPolicy類裡寫死的,ReceiverPolicy的調度策略可以概括為,儘量保證均勻的分配給每個Host一定量的Recevier。

(圖7)啟動3個Spark Executor 示例

舉個例子來說,如圖7當你啟動了3個Spark Executor時,如果代碼裡指定了啟動1個Executor,如果每個Executor啟動在了不同的Host下,Spark在Receiver調度開始時隨機地指定一個Executor啟動Receiver並分配1個Core給這個Task。但是如果代碼裡指定為2個Receiver而2個Executor啟動在了同1個Host1上,另一個啟動在了Host2上,也就是Receiver的數量等於Host Unique數量,則他會在Host1中保證其中的一個Executor啟動1個Receiver,Host2中啟動一個Receiver。如果Receiver的數量,大於了Host Unique的數量如第三張圖,則他會在隨機地在Host1或者Host2中開Receiver,這就帶來了一個問題。分析Spark原始程式碼可知Spark Driver和Spark Executor之間通過運行一個DummyJob,也就是一個MapReduce任務來保證他們之間的同步的,但是他這種做法只能保證一個Spark Executor掛在了Spark Driver上,而不能夠保證所有的Executor比如當只有一個Spark Executor掛在Spark Driver上的時候,這時候開始Receiver開始調度。

如何保證Driver和Executor之間同步

如何保證容器的時間和編碼的準確性讓配置同步

之前在部署1.6.1的mesos-dispacher架構的時候,我們就已經發現, Spark打出的中文日誌會產生亂碼,然後我們做了各種實驗發現,無論如何設置JVM參數,或是使用代碼進行內部的轉換都解決不了亂碼問題,在新架構的Docker環境中也不例外,不過最終還是讓我們解決了,我們發現通過設置JAVA_TOOL_OPTIONS這個環境變數,JAVA虛擬機器的參數才真正的修改生效,於是我們在容器啟動的時候配置了file.encoding=UTF-8,亂碼問題得才以解決,除此之外在Docker鏡像中系統的時間也是不準確的,默認是UTC時間,而系統時間對代碼的影響也很大,有可能寫入到HDFS的檔是以時間戳記生成的,我們一開始解決這個問題的方法是通過以唯讀的方式在Docker中掛載宿主機上的/etc/localtime來修正時間,但是發現時間還是不正確,這時因為Spark內部還會根據時區自動修正時間為UTC,所以還需要給JVM加一個環境變數設置user.timezone=PRC 這樣時間才可以保證時間是對的,另外使用這種架構的時候spark.driver.extraJavaOptions和spark.executor.extraJavaOptions這兩個參數也不會生效,需要使用者通過發佈配置傳過來,然後在容器中追加到JAVA_TOOL_OPTIONS。另外值得注意的是SPARK_EXECUTOR_MEMORY也不會同步,需要手動來進行設置。

如何保證driver和executor失敗時同步

雖然我們之前解決了marathon發佈driver和executor之間的連接問題,但是由於mesos介面慢,在我們實際測試中,發30個executor就可以把mesos打掛,因此,我們想了另一個辦法來解決這個問題,我們首先修改了Spark代碼,讓他的Spark Driver在不依賴mesos-dispacher的情況下實現driver的HA,HA的實現原理大概就是每次在Spark Driver啟動註冊Framework的時候,把Framework ID存到zk裡,然後在程式掛掉了以後保持Framework與Mesos的連接,在下次啟動的時候重新註冊這個Framework,這樣的話,Framework ID可以基本保持不變,在發佈Spark Executor的時候就可以固定住這個Framework ID在Executor掛掉的時候marathon拉起來也能保證重連,而driver如果掛掉的話,他會重新註冊,獲得的Framework ID不變,又可以繼續運行,這樣做只需要在Spark Driver發佈完成以後調用一次Mesos介面拿到Framework ID分發給Spark Executor就可以了。順便說一下Spark Executor拿Spark Driver的ip和port是通過調Marathon介面實現了,而Marathon介面速度很快,不會有這個問題。

如何升級Spark版本

對於業務線的任務來說升級Spark是一件比較麻煩的事,主要原因是需要他們改代碼,不過從改代碼的角度來說,變化也不算大,也就是Spark版本和Scala版本變一下,另外就是有些API也需要做一點調整,另外就是升級麻煩的另一個原因也是因為之前沒有使用Marathon+Docker的模式,如果之前就使用了這種模式,那只需要把鏡像給修改了,願意升級的升級,不願意升級的可以使用原來的鏡像跑,在以後的升級中,我們只需要製作新鏡像就可以了,非常方便遷移,可以讓他跑在任何集群。那現在為了過渡到這種模式,再結合之前發佈的經驗,我們使用的模式是舊的有一套配置,新的也有一套配置,然後通過在git上打tag的方式,在舊的配置裡加入升級資訊,然後發佈邏輯改為優先讀取是否要升級,如果需要升級則發在新集群上,如果不需要則保持原來不變,我們會先讓業務線進行測試,同時保持舊的任務線上,當他們測試通過了以後,再停止舊的作務,把改好的新版本發到新集群上,當發現有問題的時候可以用原來的tag進行回滾,因為原來的tag裡的配置會先判斷是否需要升級,而之前的配置肯定沒有需要升級的選項。

如何監控Spark的運行狀態

Spark自身有一套metric監控,這個在新版本也不例外,在我們集群中唯一的變更就是把不靠譜的udp改成了tcp,另外我們因為使用的是Docker容器,這樣我們就還有另一套監控,這個監控是分析cgroup裡的資料,使用的是我們開源的pyadvisor來做的,我們可以通過監控來觀察CPU和記憶體的使用情況,很好的提出優化改進資源使用的建議,另外,對於業務線們,我們推薦他們使用的是Spark裡自帶的Accumulator,先在Spark Driver上做一個聚合1分鐘的指標,然後再往watcher上打他們的業務指標,這樣即不會有之前不同host之間的聚合指標的問題,同時也給watcher減輕了壓力。

總結

以上就是我們所做的新的Spark架構,綜合看來有以下的優點:

無需環境配置與部署,走Docker。對於以後也升級也會較方便,可以複用之前Dockerfile。

是以直接啟動的方式,配置絕對生效,不會出現複雜配置的問題。

自動平衡executor。沒有Receiver不平衡的問題問題,在某些場景下可以動態增減executor,不會有失敗過多而不再拉executor的現象,也不不會有多發或少發executor現象。

由於使用Marathon的原因,可以支援多標籤,複雜調度,例如業務線有時候需要固定指定的機運行Spark開百名單,同時也為我們以後做遷移有了更多的便利。

由CSDN主辦的中國雲計算技術大會(CCTC 2017)將於5月18-19日在北京召開,Spark、Container、區塊鏈、大資料四大主題峰會震撼襲來,包括Mesosphere CTO Tobi Knaup,Rancher labs 創始人梁勝、Databricks 工程師 Spark commiter 範文臣等近60位技術大牛齊聚京城,為雲計算、大資料以及人工智慧領域開發者帶來一場技術的盛大Party。現在報名,只需399元就可以聆聽近60場的頂級技術專家分享,還等什麼,登陸官網(http://cctc.csdn.net/),趕快報名吧!

Next Article
喜欢就按个赞吧!!!
点击关闭提示