華文網

Python版的Spark程式設計指導!神級程式師強推!你得花半小時閱讀!

根據下面原文和網上的譯文結合自己的實際需要整理一下,當做筆記和回查!在給大家分享之前呢,小編推薦一下一個挺不錯的交流寶地,裡面都是一群熱愛並在學習Python的小夥伴們,

大幾千了吧,各種各樣的人群都有,特別喜歡看到這種大家一起交流解決難題的氛圍,群資料也上傳了好多,各種大牛解決小白的問題,這個Python群:330637182 歡迎大家進來一起交流討論,一起進步,儘早掌握這門Python語言。

概述

從高層次上來看,每一個Spark應用都包含一個驅動程式,

用於執行使用者的main函數以及在集群上運行各種平行作業。 Spark提供的主要抽象是彈性分散式資料集( RDD ) ,這是一個包含諸多元素、被劃分到不同節點上進行並行處理的資料集合。 RDD通過打開HDFS(或其他hadoop支持的檔案系統)上的一個檔、在驅動程式中打開一個已有的Scala集合或由其他 RDD轉換操作得到。 使用者可以要求Spark將RDD持久化到記憶體中,這樣就可以有效地在平行作業中複用。
另外,在節點發生錯誤時RDD可以自動恢復。

Spark提供的 另一個抽象是可以在平行作業中使用的共用變數 。在預設情況下,當Spark將一個函數轉化成許多工在不同的節點上運行的時候,對於所有在函數中使用的變數,每一個任務都會得到一個副本。有時,某一個變數需要在任務之間或任務與驅動程式之間共用。Spark支援兩種共用變數: 廣播變數 ,用來將一個值緩存到所有節點的記憶體中; 累加器 ,

只能用於累加,比如計數器和求和。

連接Spark

Spark1.3.0只支援Python2.6或更高的版本(但不支持Python3), 但是Spark2.0.0支持 。它使用了標準的CPython解譯器,所以諸如NumPy一類的C庫也是可以使用的。

通過Spark目錄下的 bin/spark-submit 腳本你可以在Python中運行Spark應用。這個腳本會載入Spark的Java/Scala庫然後讓你將應用提交到集群中。 你可以執行 bin/pyspark 來打開Python的交互命令列。

如果你希望訪問HDFS上的資料,你需要為你使用的HDFS版本建立一個PySpark連接。常見的HDFS版本標籤都已經列在了這個協力廠商發行版本頁面。

最後,你需要將一些Spark的類import到你的程式中。加入如下這行:

from pyspark import SparkContext, SparkConf

想要瞭解命令列選項的完整資訊請執行 pyspark --help命令。在這些場景下,pyspark會觸發一個更通用的spark-submit腳本

並行集合的 一個重要參數是將資料集劃分成分片的數量 。對每一個分片,Spark會在集群中運行一個對應的任務。典型情況下,集群中的每一個CPU將對應運行2-4個分片。一般情況下,Spark會根據當前集群的情況自行設定分片數量。但是, 你也可以通過將第二個參數傳遞給parallelize方法(比如 sc.parallelize(data, 10) )來手動確定分片數量 。注意:有些代碼中會使用切片(slice,分片的同義詞)這個術語來保持向下相容性。

外部資料集

PySpark可以通過Hadoop支持的外部資料來源(包括本地檔案系統、HDFS、 Cassandra、HBase、亞馬遜S3等等)建立分佈資料集。Spark支持文字檔、 序列檔 以及其他任何 Hadoop輸入格式檔 。

通過文字檔創建RDD要使用SparkContext的textFile方法。這個方法會使用一個檔的URI(或本地檔路徑,hdfs://、s3n://這樣的URI等等)然後讀入這個檔建立一個文本行的集合。以下是一個例子:

distFile = sc.textFile("data.txt")

建立完成後distFile上就可以調用資料集操作了。比如,我們可以調用map和reduce操作來疊加所有文本行的長度,代碼如下:

distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

在Spark中讀入文件時有幾點要注意:

如果使用了本地檔路徑時,要保證在worker節點上這個檔也能夠通過這個路徑訪問。這點可以通過將這個檔拷貝到所有worker上或者使用網路掛載的共用檔案系統來解決。

包括textFile在內的所有基於檔的Spark讀入方法,都支持將資料夾、壓縮檔、包含萬用字元的路徑作為參數。比如,以下代碼都是合法的:

可寫類型支持

PySpark序列檔支持利用Java作為仲介載入一個鍵值對RDD,將可寫類型轉化成Java的基本類型,然後使用Pyrolite將java結果物件序列化。當將一個鍵值對RDD儲存到一個序列檔中時PySpark將會運行上述過程的相反過程。首先將Python物件反序列化成Java物件,然後轉化成可寫類型。以下可寫類型會自動轉換:

可寫類型Python類型Textunicode strIntWritableintFloatWritablefloatDoubleWritablefloatBooleanWritableboolBytesWritablebytearrayNullWritableNoneMapWritabledict

陣列是不能自動轉換的。使用者需要在讀寫時指定ArrayWritable的子類型.在讀入的時候,默認的轉換器會把自訂的ArrayWritable子類型轉化成Java的Object[],之後序列化成Python的元組。為了獲得Python的array.array類型來使用主要類型的陣列,使用者需要自行指定轉換器。

保存和讀取序列檔

和文字檔類似,序列檔可以通過指定路徑來保存與讀取。鍵數值型別都可以自行指定,但是對於標準可寫類型可以不指定。

RDD操作

RDD支援兩類操作: 轉化操作 ,用於從已有的資料集轉化產生新的資料集; 啟動操作 ,用於在計算結束後向驅動程式返回結果。舉個例子,map是一個轉化操作,可以將資料集中每一個元素傳給一個函數,同時將計算結果作為一個新的RDD返回。另一方面,reduce操作是一個啟動操作,能夠使用某些函數來聚集計算RDD中所有的元素,並且向驅動程式返回最終結果(同時還有一個並行的reduceByKey操作可以返回一個分佈資料集)。

在Spark所有的轉化操作都是 惰性求值 的,就是說它們並不會立刻真的計算出結果。相反,它們僅僅是記錄下了轉換操作的操作物件(比如:一個檔)。只有當一個啟動操作被執行,要向驅動程式返回結果時,轉化操作才會真的開始計算。這樣的設計使得Spark運行更加高效——比如,我們會發覺由map操作產生的資料集將會在reduce操作中用到,之後僅僅是返回了reduce的最終的結果而不是map產生的龐大資料集。

在預設情況下,每一個由轉化操作得到的RDD都會在每次執行啟動操作時重新計算生成。但是,你也可以通過調用persist(或cache)方法來將RDD 持久化 到記憶體中,這樣Spark就可以在下次使用這個資料集時快速獲得。Spark同樣提供了對將RDD持久化到硬碟上或在多個節點間複製的支持。

比如,傳遞一個無法轉化為 lambda運算式長函數,可以像以下代碼這樣:

轉化操作

轉化操作作用map(func)返回一個新的分佈資料集,由原資料集元素經func處理後的結果組成filter(func)返回一個新的資料集,由傳給func返回True的原資料集元素組成flatMap(func)與map類似,但是每個傳入元素可能有0或多個返回值,func可以返回一個序列而不是一個值mapParitions(func)類似map,但是RDD的每個分片都會分開獨立運行,所以func的參數和返回值必須都是反覆運算器mapParitionsWithIndex(func)類似mapParitions,但是func有兩個參數,第一個是分片的序號,第二個是反覆運算器。返回值還是反覆運算器sample(withReplacement, fraction, seed)使用提供的亂數種子取樣,然後替換或不替換union(otherDataset)返回新的資料集,包括原資料集和參數資料集的所有元素intersection(otherDataset)返回新資料集,是兩個集的交集distinct([numTasks])返回新的集,包括原集中的不重複元素groupByKey([numTasks])當用於鍵值對RDD時返回(鍵,值反覆運算器)對的資料集aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])用於鍵值對RDD時返回(K,U)對集,對每一個Key的value進行聚集計算sortByKey([ascending], [numTasks])用於鍵值對RDD時會返回RDD按鍵的順序排序,升降冪由第一個參數決定join(otherDataset, [numTasks])用於鍵值對(K, V)和(K, W)RDD時返回(K, (V, W))對RDDcogroup(otherDataset, [numTasks])用於兩個鍵值對RDD時返回(K, (V反覆運算器, W反覆運算器))RDDcartesian(otherDataset)用於T和U類型RDD時返回(T, U)對類型鍵值對RDDpipe(command, [envVars])通過shell命令管道處理每個RDD分片coalesce(numPartitions)把RDD的分片數量降低到參數大小repartition(numPartitions)重新打亂RDD中元素順序並重新分片,數量由參數決定repartitionAndSortWithinPartitions(partitioner)按照參數給定的分片器重新分片,同時每個分片內部按照鍵排序

下面的表格列出了Spark支援的常用轉化操作。欲知細節,請查閱RDD API文檔(Scala, Java, Python)和鍵值對RDD函數文檔(Scala, Java)。

(譯者注:這部分翻譯比較簡略,僅供簡單參考,具體細節請看文檔)

轉化操作作用map(func)返回一個新的分佈資料集,由原資料集元素經func處理後的結果組成filter(func)返回一個新的資料集,由傳給func返回True的原資料集元素組成flatMap(func)與map類似,但是每個傳入元素可能有0或多個返回值,func可以返回一個序列而不是一個值mapParitions(func)類似map,但是RDD的每個分片都會分開獨立運行,所以func的參數和返回值必須都是反覆運算器mapParitionsWithIndex(func)類似mapParitions,但是func有兩個參數,第一個是分片的序號,第二個是反覆運算器。返回值還是反覆運算器sample(withReplacement, fraction, seed)使用提供的亂數種子取樣,然後替換或不替換union(otherDataset)返回新的資料集,包括原資料集和參數資料集的所有元素intersection(otherDataset)返回新資料集,是兩個集的交集distinct([numTasks])返回新的集,包括原集中的不重複元素groupByKey([numTasks])當用於鍵值對RDD時返回(鍵,值反覆運算器)對的資料集aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])用於鍵值對RDD時返回(K,U)對集,對每一個Key的value進行聚集計算sortByKey([ascending], [numTasks])用於鍵值對RDD時會返回RDD按鍵的順序排序,升降冪由第一個參數決定join(otherDataset, [numTasks])用於鍵值對(K, V)和(K, W)RDD時返回(K, (V, W))對RDDcogroup(otherDataset, [numTasks])用於兩個鍵值對RDD時返回(K, (V反覆運算器, W反覆運算器))RDDcartesian(otherDataset)用於T和U類型RDD時返回(T, U)對類型鍵值對RDDpipe(command, [envVars])通過shell命令管道處理每個RDD分片coalesce(numPartitions)把RDD的分片數量降低到參數大小repartition(numPartitions)重新打亂RDD中元素順序並重新分片,數量由參數決定repartitionAndSortWithinPartitions(partitioner)按照參數給定的分片器重新分片,同時每個分片內部按照鍵排序

啟動操作

下面的表格列出了Spark支援的部分常用啟動操作。欲知細節,請查閱RDD API文檔(Scala, Java, Python)和鍵值對RDD函數文檔(Scala, Java)。

(譯者注:這部分翻譯比較簡略,僅供簡單參考,具體細節請看文檔)

啟動操作作用reduce(func)使用func進行聚集計算,func的參數是兩個,返回值一個,兩次func運行應當是完全解耦的,這樣才能正確地並行運算collect()向驅動程式返回資料集的元素組成的陣列count()返回資料集元素的數量first()返回資料集的第一個元素take(n)返回前n個元素組成的陣列takeSample(withReplacement, num, [seed])返回一個由原資料集中任意num個元素的suzuki,並且替換之takeOrder(n, [ordering])返回排序後的前n個元素saveAsTextFile(path)將資料集的元素寫成文字檔saveAsSequenceFile(path)將資料集的元素寫成序列檔,這個API只能用於Java和Scala程式saveAsObjectFile(path)將資料集的元素使用Java的序列化特性寫到檔中,這個API只能用於Java和Scala程式countByCount()只能用於鍵值對RDD,返回一個(K, int) hashmap,返回每個key的出現次數foreach(func)對資料集的每個元素執行func, 通常用於完成一些帶有副作用的函數,比如更新累加器(見下文)或與外部存儲交互等

RDD持久化

Spark的一個重要功能就是在將資料集 持久化 (或 緩存 )到記憶體中以便在多個操作中重複使用。當我們持久化一個RDD是,每一個節點將這個RDD的每一個分片計算並保存到記憶體中以便在下次對這個資料集(或者這個資料集衍生的資料集)的計算中可以複用。這使得接下來的計算過程速度能夠加快(經常能加快超過十倍的速度)。緩存是加快反覆運算演算法和快速交互過程速度的關鍵工具。

刪除資料

Spark會自動監視每個節點的緩存使用同時使用LRU演算法丟棄舊資料分片。如果你想手動刪除某個RDD而不是等待它被自動刪除,調用 RDD.unpersist()方法。

共用變數

通常情況下,當一個函數傳遞給一個在遠端集群節點上運行的Spark操作(比如map和reduce)時,Spark會對涉及到的變數的所有副本執行這個函數。這些變數會被複製到每個機器上,而且這個過程不會被回饋給驅動程式。通常情況下,在任務之間讀寫共用變數是很低效的。但是,Spark仍然提供了有限的兩種共用變數類型用於常見的使用場景:廣播變數和累加器。

廣播變數

廣播變數允許程式師在每台機器上保持一個唯讀變數的緩存而不是將一個變數的拷貝傳遞給各個任務。它們可以被使用,比如,給每一個節點傳遞一份大輸入資料集的拷貝是很低效的。Spark試圖使用高效的廣播演算法來分佈廣播變數,以此來降低通信花銷。

其他

集群部署

這個 應用提交指南 描述了一個應用被提交到集群上的過程。簡而言之,只要你把你的應用打成了JAR包(Java/Scala應用)或.py檔的集合或.zip壓縮包(Python應用),bin/spark-submit腳本會將應用提交到任意支援的集群管理器上。

感謝AsuraDong大大的分享!

分片的同義詞)這個術語來保持向下相容性。

外部資料集

PySpark可以通過Hadoop支持的外部資料來源(包括本地檔案系統、HDFS、 Cassandra、HBase、亞馬遜S3等等)建立分佈資料集。Spark支持文字檔、 序列檔 以及其他任何 Hadoop輸入格式檔 。

通過文字檔創建RDD要使用SparkContext的textFile方法。這個方法會使用一個檔的URI(或本地檔路徑,hdfs://、s3n://這樣的URI等等)然後讀入這個檔建立一個文本行的集合。以下是一個例子:

distFile = sc.textFile("data.txt")

建立完成後distFile上就可以調用資料集操作了。比如,我們可以調用map和reduce操作來疊加所有文本行的長度,代碼如下:

distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

在Spark中讀入文件時有幾點要注意:

如果使用了本地檔路徑時,要保證在worker節點上這個檔也能夠通過這個路徑訪問。這點可以通過將這個檔拷貝到所有worker上或者使用網路掛載的共用檔案系統來解決。

包括textFile在內的所有基於檔的Spark讀入方法,都支持將資料夾、壓縮檔、包含萬用字元的路徑作為參數。比如,以下代碼都是合法的:

可寫類型支持

PySpark序列檔支持利用Java作為仲介載入一個鍵值對RDD,將可寫類型轉化成Java的基本類型,然後使用Pyrolite將java結果物件序列化。當將一個鍵值對RDD儲存到一個序列檔中時PySpark將會運行上述過程的相反過程。首先將Python物件反序列化成Java物件,然後轉化成可寫類型。以下可寫類型會自動轉換:

可寫類型Python類型Textunicode strIntWritableintFloatWritablefloatDoubleWritablefloatBooleanWritableboolBytesWritablebytearrayNullWritableNoneMapWritabledict

陣列是不能自動轉換的。使用者需要在讀寫時指定ArrayWritable的子類型.在讀入的時候,默認的轉換器會把自訂的ArrayWritable子類型轉化成Java的Object[],之後序列化成Python的元組。為了獲得Python的array.array類型來使用主要類型的陣列,使用者需要自行指定轉換器。

保存和讀取序列檔

和文字檔類似,序列檔可以通過指定路徑來保存與讀取。鍵數值型別都可以自行指定,但是對於標準可寫類型可以不指定。

RDD操作

RDD支援兩類操作: 轉化操作 ,用於從已有的資料集轉化產生新的資料集; 啟動操作 ,用於在計算結束後向驅動程式返回結果。舉個例子,map是一個轉化操作,可以將資料集中每一個元素傳給一個函數,同時將計算結果作為一個新的RDD返回。另一方面,reduce操作是一個啟動操作,能夠使用某些函數來聚集計算RDD中所有的元素,並且向驅動程式返回最終結果(同時還有一個並行的reduceByKey操作可以返回一個分佈資料集)。

在Spark所有的轉化操作都是 惰性求值 的,就是說它們並不會立刻真的計算出結果。相反,它們僅僅是記錄下了轉換操作的操作物件(比如:一個檔)。只有當一個啟動操作被執行,要向驅動程式返回結果時,轉化操作才會真的開始計算。這樣的設計使得Spark運行更加高效——比如,我們會發覺由map操作產生的資料集將會在reduce操作中用到,之後僅僅是返回了reduce的最終的結果而不是map產生的龐大資料集。

在預設情況下,每一個由轉化操作得到的RDD都會在每次執行啟動操作時重新計算生成。但是,你也可以通過調用persist(或cache)方法來將RDD 持久化 到記憶體中,這樣Spark就可以在下次使用這個資料集時快速獲得。Spark同樣提供了對將RDD持久化到硬碟上或在多個節點間複製的支持。

比如,傳遞一個無法轉化為 lambda運算式長函數,可以像以下代碼這樣:

轉化操作

轉化操作作用map(func)返回一個新的分佈資料集,由原資料集元素經func處理後的結果組成filter(func)返回一個新的資料集,由傳給func返回True的原資料集元素組成flatMap(func)與map類似,但是每個傳入元素可能有0或多個返回值,func可以返回一個序列而不是一個值mapParitions(func)類似map,但是RDD的每個分片都會分開獨立運行,所以func的參數和返回值必須都是反覆運算器mapParitionsWithIndex(func)類似mapParitions,但是func有兩個參數,第一個是分片的序號,第二個是反覆運算器。返回值還是反覆運算器sample(withReplacement, fraction, seed)使用提供的亂數種子取樣,然後替換或不替換union(otherDataset)返回新的資料集,包括原資料集和參數資料集的所有元素intersection(otherDataset)返回新資料集,是兩個集的交集distinct([numTasks])返回新的集,包括原集中的不重複元素groupByKey([numTasks])當用於鍵值對RDD時返回(鍵,值反覆運算器)對的資料集aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])用於鍵值對RDD時返回(K,U)對集,對每一個Key的value進行聚集計算sortByKey([ascending], [numTasks])用於鍵值對RDD時會返回RDD按鍵的順序排序,升降冪由第一個參數決定join(otherDataset, [numTasks])用於鍵值對(K, V)和(K, W)RDD時返回(K, (V, W))對RDDcogroup(otherDataset, [numTasks])用於兩個鍵值對RDD時返回(K, (V反覆運算器, W反覆運算器))RDDcartesian(otherDataset)用於T和U類型RDD時返回(T, U)對類型鍵值對RDDpipe(command, [envVars])通過shell命令管道處理每個RDD分片coalesce(numPartitions)把RDD的分片數量降低到參數大小repartition(numPartitions)重新打亂RDD中元素順序並重新分片,數量由參數決定repartitionAndSortWithinPartitions(partitioner)按照參數給定的分片器重新分片,同時每個分片內部按照鍵排序

下面的表格列出了Spark支援的常用轉化操作。欲知細節,請查閱RDD API文檔(Scala, Java, Python)和鍵值對RDD函數文檔(Scala, Java)。

(譯者注:這部分翻譯比較簡略,僅供簡單參考,具體細節請看文檔)

轉化操作作用map(func)返回一個新的分佈資料集,由原資料集元素經func處理後的結果組成filter(func)返回一個新的資料集,由傳給func返回True的原資料集元素組成flatMap(func)與map類似,但是每個傳入元素可能有0或多個返回值,func可以返回一個序列而不是一個值mapParitions(func)類似map,但是RDD的每個分片都會分開獨立運行,所以func的參數和返回值必須都是反覆運算器mapParitionsWithIndex(func)類似mapParitions,但是func有兩個參數,第一個是分片的序號,第二個是反覆運算器。返回值還是反覆運算器sample(withReplacement, fraction, seed)使用提供的亂數種子取樣,然後替換或不替換union(otherDataset)返回新的資料集,包括原資料集和參數資料集的所有元素intersection(otherDataset)返回新資料集,是兩個集的交集distinct([numTasks])返回新的集,包括原集中的不重複元素groupByKey([numTasks])當用於鍵值對RDD時返回(鍵,值反覆運算器)對的資料集aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])用於鍵值對RDD時返回(K,U)對集,對每一個Key的value進行聚集計算sortByKey([ascending], [numTasks])用於鍵值對RDD時會返回RDD按鍵的順序排序,升降冪由第一個參數決定join(otherDataset, [numTasks])用於鍵值對(K, V)和(K, W)RDD時返回(K, (V, W))對RDDcogroup(otherDataset, [numTasks])用於兩個鍵值對RDD時返回(K, (V反覆運算器, W反覆運算器))RDDcartesian(otherDataset)用於T和U類型RDD時返回(T, U)對類型鍵值對RDDpipe(command, [envVars])通過shell命令管道處理每個RDD分片coalesce(numPartitions)把RDD的分片數量降低到參數大小repartition(numPartitions)重新打亂RDD中元素順序並重新分片,數量由參數決定repartitionAndSortWithinPartitions(partitioner)按照參數給定的分片器重新分片,同時每個分片內部按照鍵排序

啟動操作

下面的表格列出了Spark支援的部分常用啟動操作。欲知細節,請查閱RDD API文檔(Scala, Java, Python)和鍵值對RDD函數文檔(Scala, Java)。

(譯者注:這部分翻譯比較簡略,僅供簡單參考,具體細節請看文檔)

啟動操作作用reduce(func)使用func進行聚集計算,func的參數是兩個,返回值一個,兩次func運行應當是完全解耦的,這樣才能正確地並行運算collect()向驅動程式返回資料集的元素組成的陣列count()返回資料集元素的數量first()返回資料集的第一個元素take(n)返回前n個元素組成的陣列takeSample(withReplacement, num, [seed])返回一個由原資料集中任意num個元素的suzuki,並且替換之takeOrder(n, [ordering])返回排序後的前n個元素saveAsTextFile(path)將資料集的元素寫成文字檔saveAsSequenceFile(path)將資料集的元素寫成序列檔,這個API只能用於Java和Scala程式saveAsObjectFile(path)將資料集的元素使用Java的序列化特性寫到檔中,這個API只能用於Java和Scala程式countByCount()只能用於鍵值對RDD,返回一個(K, int) hashmap,返回每個key的出現次數foreach(func)對資料集的每個元素執行func, 通常用於完成一些帶有副作用的函數,比如更新累加器(見下文)或與外部存儲交互等

RDD持久化

Spark的一個重要功能就是在將資料集 持久化 (或 緩存 )到記憶體中以便在多個操作中重複使用。當我們持久化一個RDD是,每一個節點將這個RDD的每一個分片計算並保存到記憶體中以便在下次對這個資料集(或者這個資料集衍生的資料集)的計算中可以複用。這使得接下來的計算過程速度能夠加快(經常能加快超過十倍的速度)。緩存是加快反覆運算演算法和快速交互過程速度的關鍵工具。

刪除資料

Spark會自動監視每個節點的緩存使用同時使用LRU演算法丟棄舊資料分片。如果你想手動刪除某個RDD而不是等待它被自動刪除,調用 RDD.unpersist()方法。

共用變數

通常情況下,當一個函數傳遞給一個在遠端集群節點上運行的Spark操作(比如map和reduce)時,Spark會對涉及到的變數的所有副本執行這個函數。這些變數會被複製到每個機器上,而且這個過程不會被回饋給驅動程式。通常情況下,在任務之間讀寫共用變數是很低效的。但是,Spark仍然提供了有限的兩種共用變數類型用於常見的使用場景:廣播變數和累加器。

廣播變數

廣播變數允許程式師在每台機器上保持一個唯讀變數的緩存而不是將一個變數的拷貝傳遞給各個任務。它們可以被使用,比如,給每一個節點傳遞一份大輸入資料集的拷貝是很低效的。Spark試圖使用高效的廣播演算法來分佈廣播變數,以此來降低通信花銷。

其他

集群部署

這個 應用提交指南 描述了一個應用被提交到集群上的過程。簡而言之,只要你把你的應用打成了JAR包(Java/Scala應用)或.py檔的集合或.zip壓縮包(Python應用),bin/spark-submit腳本會將應用提交到任意支援的集群管理器上。

感謝AsuraDong大大的分享!