您的位置:首頁>科技>正文

Spark資料當地語系化——>如何達到性能調優的目的

Spark資料當地語系化-->如何達到性能調優的目的

1.Spark數據的當地語系化:移動計算, 而不是移動資料2.Spark中的資料當地語系化級別:

TaskSetManager 的 Locality Levels 分為以下五個級別:

PROCESS_LOCAL

NODE_LOCAL

NO_PREF

RACK_LOCAL

ANY

PROCESS_LOCAL進程當地語系化:task要計算的資料在同一個Executor中

NODE_LOCAL 節點當地語系化:速度比 PROCESS_LOCAL 稍慢, 因為資料需要在不同進程之間傳遞或從檔中讀取

情況一:task要計算的資料是在同一個Worker的不同Executor進程中

情況二:task要計算的資料是在同一個Worker的磁片上, 或在 HDFS 上, 恰好有 block 在同一個節點上。

Spark計算資料來源於HDFS, 那麼最好的資料當地語系化級別就是NODE_LOCAL

如何達到性能調優的目的">

NODE_PREF 沒有最佳位置這一說, 資料從哪裡訪問都一樣快, 不需要位置優先。 比如說SparkSQL讀取MySql中的資料

RACK_LOCAL機架當地語系化, 資料在同一機架的不同節點上。 需要通過網路傳輸資料及檔 IO, 比 NODE_LOCAL 慢

情況一:task計算的資料在Worker2的Executor中

情況二:task計算的資料在Worker2的磁片上

如何達到性能調優的目的">

ANY跨機架, 資料在非同一機架的網路上, 速度最慢

3.Spark中的資料當地語系化由誰負責?

DAGScheduler, TaskScheduler

val rdd1 = rdd1.cache

rdd1.map.filter.count

Driver(TaskScheduler)在發送task之前, 首先應該拿到RDD1緩存在哪一些節點上(node1,node2)-->這一步就是由DAGScheduler通過cacheManager物件調用getPreferredLocations來拿到RDD1緩存在哪些節點上, TaskScheduler根據這些節點來發送task。

val rdd1 = sc.textFile("hdfs://...") //rdd1中封裝了是這個檔所對應的block的位置, getPreferredLocation-->TaskScheduler調用拿到partition所對應的資料的位置

rdd1.map.filter.count

Driver(TaskScheduler)在發送task之前, 首先應該拿到rdd1資料所在的位置(node1,node2)-->RDD1封裝了這個檔所對應的block的位置, TaskScheduler通過調用getPreferredLocations拿到partition所對應的資料的位置, TaskScheduler根據這些位置來發送相應的task

總的來說:

Spark中的資料當地語系化由DAGScheduler和TaskScheduler共同負責。

DAGScheduler切割Job, 劃分Stage, 通過調用submitStage來提交一個Stage對應的tasks, submitStage會調用submitMissingTasks,submitMissingTasks 確定每個需要計算的 task 的preferredLocations, 通過調用getPreferrdeLocations得到partition 的優先位置, 就是這個 partition 對應的 task 的優先位置, 對於要提交到TaskScheduler的TaskSet中的每一個task, 該task優先位置與其對應的partition對應的優先位置一致。

TaskScheduler接收到了TaskSet後, TaskSchedulerImpl 會為每個 TaskSet 創建一個 TaskSetManager 物件, 該物件包含taskSet 所有 tasks, 並管理這些 tasks 的執行, 其中就包括計算 TaskSetManager 中的 tasks 都有哪些locality levels, 以便在調度和延遲調度 tasks 時發揮作用。

4.Spark中的資料當地語系化流程圖

即某個 task 計算節點與其輸入資料的位置關係,

下面將要挖掘Spark 的調度系統如何產生這個結果, 這一過程涉及 RDD、DAGScheduler、TaskScheduler, 搞懂了這一過程也就基本搞懂了 Spark 的 PreferredLocations(位置優先策略)

如何達到性能調優的目的">

第一步:PROCESS_LOCAL-->TaskScheduler首先根據資料所在的節點發送task,

如果task在Worker1的Executor1中等待了3s(這個3s是spark的默認等待時間, 通過spark.locality.wait來設置,

可以在SparkConf()中修改), 重試了5次, 還是無法執行

TaskScheduler會降低資料當地語系化的級別, 從PROCESS_LOCAL降到NODE_LOCAL

第二步:NODE_LOCAL-->TaskScheduler重新發送task到Worker1中的Executor2中執行,

如果task在Worker1的Executor2中等待了3s, 重試了5次, 還是無法執行

TaskScheduler會降低資料當地語系化的級別, 從NODE_LOCAL降到RACK_LOCAL

第三步:RACK_LOCAL-->TaskScheduler重新發送task到Worker2中的Executor1中執行。

第四步:當task分配完成之後, task會通過所在Worker的Executor中的BlockManager來獲取資料, 如果BlockManager發現自己沒有資料, 那麼它會調用getRemote方法, 通過ConnectionManager與原task所在節點的BlockManager中的ConnectionManager先建立連接, 然後通過TransferService(網路傳輸元件)獲取資料, 通過網路傳輸回task所在節點(這時候性能大幅下降, 大量的網路IO佔用資源), 計算後的結果返回給Driver。

總結:

TaskScheduler在發送task的時候, 會根據資料所在的節點發送task,這時候的資料當地語系化的級別是最高的, 如果這個task在這個Executor中等待了三秒, 重試發射了5次還是依然無法執行,那麼TaskScheduler就會認為這個Executor的計算資源滿了,TaskScheduler會降低一級資料當地語系化的級別,重新發送task到其他的Executor中執行,如果還是依然無法執行,那麼繼續降低資料當地語系化的級別...

現在想讓每一個task都能拿到最好的資料當地語系化級別,那麼調優點就是等待時間加長。注意!如果過度調大等待時間,雖然為每一個task都拿到了最好的資料當地語系化級別,但是我們job執行的時間也會隨之延長

spark.locality.wait 3s//相當於是全域的,下麵默認以3s為准,手動設置了,以手動的為准spark.locality.wait.processspark.locality.wait.nodespark.locality.wait.racknewSparkConf.set("spark.locality.wait","100") 重試發射了5次還是依然無法執行,那麼TaskScheduler就會認為這個Executor的計算資源滿了,TaskScheduler會降低一級資料當地語系化的級別,重新發送task到其他的Executor中執行,如果還是依然無法執行,那麼繼續降低資料當地語系化的級別...

現在想讓每一個task都能拿到最好的資料當地語系化級別,那麼調優點就是等待時間加長。注意!如果過度調大等待時間,雖然為每一個task都拿到了最好的資料當地語系化級別,但是我們job執行的時間也會隨之延長

spark.locality.wait 3s//相當於是全域的,下麵默認以3s為准,手動設置了,以手動的為准spark.locality.wait.processspark.locality.wait.nodespark.locality.wait.racknewSparkConf.set("spark.locality.wait","100")
Next Article
喜欢就按个赞吧!!!
点击关闭提示