華文網

Spark技術在京東智慧供應鏈預測的應用

作者|楊冬越 郭景瞻

編輯|小智

本文將介紹京東智慧供應鏈及預測技術在其中的作用、預測系統的業務和技術架構,

以及Spark在核心系統中的使用,最後結合作者所著書《圖解Spark:核心技術與案例實戰》部分章節介紹在預測系統中的應用與優化。

背景介紹

前段時間京東公開了面向第二個十二年的戰略規劃,表示京東將全面走向技術化,大力發展人工智慧和機器人自動化技術,將過去傳統方式構築的優勢全面升級。京東Y事業部順勢成立,

該事業部將以服務泛零售為核心,著重智慧供應能力的打造,核心使命是利用人工智慧技術來驅動零售革新。

京東的供應鏈

目前京東在全國範圍內的運營256個大型倉庫,按功能可劃分為RDC、FDC、大件中心倉、大件衛星倉、圖書倉和城市倉等等。RDC(Regional Distribution Center)即區域分發中心,可理解為一級倉庫,向供應商採購的商品會優先送往這裡,一般設置在中心城市,覆蓋範圍大。FDC(Forward Distribution Center)即區域運轉中心,

可理解為二級倉庫,覆蓋一些中、小型城市及邊遠地區,通常會根據需求將商品從RDC調配過來。

結合人工智慧、大資料等技術,京東首先從供應商那裡合理採購定量的商品到RDC,再根據實際需求調配到FDC,然後運往離客戶最近的配送站,最後快遞員將商品帶到客戶手中。

這只是京東供應鏈體系中一個普通的場景,但正因為有這樣的體系,使得京東對使用者的回應速度大大提高,使用者體驗大大提升。

京東供應鏈優化

用戶體驗提升的同時也伴隨著大量資金的投入和成本的提高,成本必須得到控制,整個體系才能發揮出最大的價值,於是對供應鏈的優化就顯得至關重要了。

優化其實是一門運籌學問題,需考慮在各種決策目標之間如何平衡以達到最大收益,

在這個過程中需要考慮很多問題,把這些考慮清楚,問題就容易解決了。舉幾個簡單的例子:

商品補貨:考慮在什麼時間,給哪個RDC採購什麼商品,採購量是多少?

商品調撥:考慮在什麼時間,給哪個FDC調配什麼商品,調配量是多少?

倉儲運營:在大促來臨之際,倉庫和配送站要增配多少人手、多少輛貨車?

雖然看上去這些問題都很容易回答,

但仔細想想卻又很難給出答案,原因就在於想要做到精確不是那麼容易的事情,就拿補貨來說,補的太多會增加庫存成本,補的太少會增加缺貨成本,只有合理的補貨量才能做到成本最低。

預測技術在京東供應鏈的作用

借助機器學習、大資料等相關技術,京東在很多供應鏈優化問題上都已經實現系統化,由系統自動給出優化建議,並與生產系統相連接,實現全流程自動化。在這裡有一項技術起著至關重要的低層支撐作用——預測技術。據粗略估算,1%的預測準確度的提升可以節約數倍的運營成本。

怎樣理解預測在供應鏈優化中的作用呢?拿商品補貨舉例,一家公司為了保證庫房不缺貨,可能會頻繁的從供應商那裡補充大量商品,這樣做雖然不會缺貨,但可能會造成更多賣不出去的商品積壓在倉庫中,從而使商品的周轉率降低,庫存成本增加。反之,這家公司有可能為了追求零庫存而補很少的商品,但這就可能出現嚴重的缺貨問題,從而使現貨率降低,嚴重影響用戶體驗,缺貨成本增加。於是問題就來了,要補多少商品才合適,什麼時間補貨,這就需要權衡考慮了,最終目的是要使庫存成本和缺貨成本達到一個平衡。

考慮一下極端情況,等庫存降到零時再去補貨,這時供應商接到補貨通知後將貨物運往倉庫。但是這麼做有個問題,因為運送過程需要時間,這段時間庫房就缺貨了。那怎麼辦呢?就是利用預測技術。利用預測我們可以計算出未來商品在途的這段時間裡銷量大概是多少,然後我們讓倉庫保證這個量,低於這個量就給供應商下達補貨通知,於是問題得以解決。總而言之,預測技術在這裡發揮了重要的作用,成為關鍵的一個環。

京東預測系統

預測系統介紹

預測系統在整個供應鏈體系中處在最底層並且起到一個支撐的作用,支持上層的多個決策優化系統,而這些決策優化系統利用精准的預測資料結合運籌學技術得出最優的決策,並將結果提供給更上層的業務執行系統或是業務方直接使用。

目前,預測系統主要支援三大業務:銷量預測、單量預測和GMV預測。其中銷量預測主要支持商品補貨、商品調撥;單量預測主要支援倉庫、網站的運營管理;GMV預測主要支持銷售部門計畫的定制。

銷量預測按照不同維度又可以分為RDC採購預測、FDC調撥預測、城市倉調撥預測、大建倉補貨預測、全球購銷量預測和圖書促銷預測等;單量預測又可分為庫房單量預測、配送中心單量預測和配送站單量預測等(在這裡“單量”並非指使用者所下訂單的量,而是將訂單拆單後流轉到倉庫中的單量。例如一個使用者的訂單中包括3件物品,其中兩個大件品和一個小件品,在京東的供應鏈環節中可能會將其中兩個大件品組成一個單投放到大件倉中,而將那個小件單獨一個單投放到小件倉中,單量指的是拆單後的量);GMV預測支援到商品細微性。

預測系統架構

整體架構從上至下依次是:資料來源輸入層、基礎資料加工層、核心業務層、資料輸出層和下游系統。首先從外部資料來源獲取我們所需的業務資料,然後對基礎資料進行加工清洗,再通過時間序列、機器學習等人工智慧技術對資料進行處理分析,最後計算出預測結果並通過多種途徑推送給下游系統使用。

資料來源輸入層:京東資料倉庫中存儲著我們需要的大部分業務資料,例如訂單資訊、商品資訊、庫存資訊等等。而對於促銷計畫資料則大部分來自于采銷人員通過Web系統錄入的資訊。除此之外還有一小部分資料通過文本形式直接上傳到HDFS中。

基礎資料加工層:在這一層主要通過Hive對基礎資料進行一些加工清洗,去掉不需要的欄位,過濾不需要的維度並清洗有問題的資料。

核心業務層:這層是系統的的核心部分,橫向看又可分為三層:特徵構建、預測演算法和預測結果加工。縱向看是由多條業務線組成,彼此之間不發生任何交集。

特徵構建:將之前清洗過的基礎資料通過近一步的處理轉化成標準格式的特徵資料,提供給後續演算法模型使用。

核心演算法:利用時間序列分析、機器學習等人工智慧技術進行銷量、單量的預測,是預測系統中最為核心的部分。

預測結果加工:預測結果可能在格式和一些特殊性要求上不能滿足下游系統,所以還需要根據實際情況對其進行加工處理,比如增加標準差、促銷標識等額外資訊。

預測結果輸出層:將最終預測結果同步回京東資料倉庫、MySql、HBase或製作成JSF接口供其他系統遠端調用。

下游系統:包括下游任務流程、下游Web系統和其他系統。

預測系統核心介紹

預測系統核心層技術選型

預測系統核心層技術主要分為四層:基礎層、框架層、工具層和演算法層。

基礎層:

HDFS用來做資料存儲,Yarn用來做資源調度,BDP(Big Data Platform)是京東自己研發的大資料平臺,我們主要用它來做任務調度。

框架層:

以Spark RDD、Spark SQL、Hive為主, MapReduce程式占一小部分,是原先遺留下來的,目前正逐步替換成Spark RDD。 選擇Spark除了對性能的考慮外,還考慮了Spark程式開發的高效率、多語言特性以及對機器學習演算法的支援。在Spark開發語言上我們選擇了Python,原因有以下三點:

Python有很多不錯的機器學習演算法包可以使用,比起Spark的MLlib,演算法的準確度更高。我們用GBDT做過對比,發現xgboost比MLlib裡面提供的提升樹模型預測準確度高出大概5%~10%。雖然直接使用Spark自帶的機器學習框架會節省我們的開發成本,但預測準確度對於我們來說至關重要,每提升1%的準確度,就可能會帶來成本的成倍降低。

我們的團隊中包括開發工程師和演算法工程師,對於演算法工程師而言他們更擅長使用Python進行資料分析,使用Java或Scala會有不小的學習成本。

對比其他語言,我們發現使用Python的開發效率是最高的,並且對於一個新人,學習Python比學習其他語言更加容易。

工具層:

一方面我們會結合自身業務有針對性的開發一些演算法,另一方面我們會直接使用業界比較成熟的演算法和模型,這些演算法都封裝在協力廠商Python包中。我們比較常用的包有xgboost、numpy、pandas、sklearn、scipy和hyperopt等。

Xgboost:它是Gradient Boosting Machine的一個C++實現,xgboost最大的特點在於,它能夠自動利用CPU的多執行緒進行並行,同時在演算法上加以改進提高了精度。

numpy:是Python的一種開源的數值計算擴展。這種工具可用來存儲和處理大型矩陣,比Python自身的嵌套清單結構要高效的多(該結構也可以用來表示矩陣)。

pandas:是基於NumPy 的一種工具,該工具是為了解決資料分析任務而創建的。Pandas 納入了大量庫和一些標準的資料模型,提供了高效地操作大型資料集所需的工具。

sklearn:是Python重要的機器學習庫,支持包括分類、回歸、降維和聚類四大機器學習演算法。還包含了特徵提取、資料處理和模型評估三大模組。

scipy:是在NumPy庫的基礎上增加了眾多的數學、科學以及工程計算中常用的庫函數。例如線性代數、常微分方程數值求解、信號處理、影像處理和疏鬆陣列等等。

演算法層:

我們用到的演算法模型非常多,原因是京東的商品品類齊全、業務複雜,需要根據不同的情況採用不同的演算法模型。我們有一個獨立的系統來為演算法模型與商品之間建立匹配關係,有些比較複雜的預測業務還需要使用多個模型。我們使用的演算法總體上可以分為三類:時間序列、機器學習和結合業務開發的一些獨有的演算法。

1. 機器學習演算法主要包括GBDT、LASSO和RNN :

GBDT:是一種反覆運算的決策樹演算法,該演算法由多棵決策樹組成,所有樹的結論累加起來做最終答案。我們用它來預測高銷量,但歷史規律不明顯的商品。

RNN:這種網路的內部狀態可以展示動態時序行為。不同於前饋神經網路的是,RNN可以利用它內部的記憶來處理任意時序的輸入序列,這讓它可以更容易處理如時序預測、語音辨識等。

LASSO:該方法是一種壓縮估計。它通過構造一個罰函數得到一個較為精煉的模型,使得它壓縮一些係數,同時設定一些係數為零。因此保留了子集收縮的優點,是一種處理具有複共線性資料的有偏估計。用來預測低銷量,歷史資料平穩的商品效果較好。

2. 時間序列主要包括ARIMA和Holt winters :

ARIMA:全稱為自回歸積分滑動平均模型,于70年代初提出的一個著名時間序列預測方法,我們用它來主要預測類似庫房單量這種平穩的序列。

Holt winters:又稱三次指數平滑演算法,也是一個經典的時間序列演算法,我們用它來預測季節性和趨勢都很明顯的商品。

3. 結合業務開發的獨有演算法包括WMAStockDT、SimilarityModel和NewProduct等:

WMAStockDT:庫存決策樹模型,用來預測受庫存狀態影響較大的商品。

SimilarityModel:相似品模型,使用指定的同類品資料來預測某商品未來銷量。

NewProduct:新品模型,顧名思義就是用來預測新品的銷量。

預測系統核心流程

預測核心流程主要包括兩類:以機器學習演算法為主的流程和以時間序列分析為主的流程。

1. 以機器學習演算法為主的流程如下:

特徵構建:通過資料分析、模型試驗確定主要特徵,通過一系列任務生成標準格式的特徵資料。

模型選擇:不同的商品有不同的特性,所以首先會根據商品的銷量高低、新品舊品、假節日敏感性等因素分配不同的演算法模型。

特徵選擇:對一批特徵進行篩選過濾不需要的特徵,不同類型的商品特徵不同。

樣本分區:對訓練資料進行分組,分成多組樣本,真正訓練時針對每組樣本生成一個模型檔。一般是同類型商品被分成一組,比如按品類維度分組,這樣做是考慮並行化以及模型的準確性。

模型參數:選擇最優的模型參數,合適的參數將提高模型的準確度,因為需要對不同的參數組合分別進行模型訓練和預測,所以這一步是非常耗費資源。

模型訓練:待特徵、模型、樣本都確定好後就可以進行模型訓練,訓練往往會耗費很長時間,訓練後會生成模型檔,存儲在HDFS中。

模型預測:讀取模型檔進行預測執行。

多模型擇優:為了提高預測準確度,我們可能會使用多個演算法模型,當每個模型的預測結果輸出後系統會通過一些規則來選擇一個最優的預測結果。

預測值異常攔截:我們發現越是複雜且不易解釋的演算法越容易出現極個別預測值異常偏高的情況,這種預測偏高無法結合歷史資料進行解釋,因此我們會通過一些規則將這些異常值攔截下來,並且用一個更加保守的數值代替。

模型評價:計算預測準確度,我們通常用使用mapd來作為評價指標。

誤差分析:通過分析預測準確度得出一個誤差在不同維度上的分佈,以便給演算法優化提供參考依據。

2. 以時間序列分析為主的預測流程如下:

生成歷史時序:將歷史銷量、價格、庫存等資料按照規定格式生成時序資料。

節假日因數:計算節假日與銷量之間的關係,用來平滑節假日對銷量影響。

周日因數:計算週一到周日這7天與銷量的關係,用來平滑周日對銷量的影響。

促銷因數:計算促銷與銷量之間的關係,用來平滑促銷對銷量的影響。

因數平滑:歷史銷量是不穩定的,會受到節假日、促銷等影響,在這種情況下進行預測有很大難度,所以需要利用之前計算的各類因數對歷史資料進行平滑處理。

時序預測:在一個相對平穩的銷量資料上通過演算法進行預測。

因數疊加:結合未來節假日、促銷計畫等因素對預測結果進行調整。

Spark在預測核心層的應用

我們使用Spark SQL和Spark RDD相結合的方式來編寫程式,對於一般的資料處理,我們使用Spark的方式與其他無異,但是對於模型訓練、預測這些需要調用演算法介面的邏輯就需要考慮一下並行化的問題了。我們平均一個訓練任務在一天處理的資料量大約在500G左右,雖然資料規模不是特別的龐大,但是Python演算法包提供的演算法都是單進程執行。我們計算過,如果使用一台機器訓練全部品類資料需要一個星期的時間,這是無法接收的,所以我們需要借助Spark這種分散式平行計算框架來將計算分攤到多個節點上實現並行化處理。

我們實現的方法很簡單,首先需要在集群的每個節點上安裝所需的全部Python包,然後在編寫Spark程式時考慮通過某種規則將資料分區,比如按品類維度,通過groupByKey操作將資料重新分區,每一個分區是一個樣本集合並進行獨立的訓練,以此達到並行化。流程如下圖所示:

虛擬碼如下:

repartitionBy方法即設置一個重分區的邏輯返回(K,V)結構RDD,train方法是訓練資料,在train方法裡面會調用Python演算法包介面。saveAsPickleFile是Spark Python獨有的一個Action操作,支持將RDD保存成序列化後的sequnceFile格式的檔,在序列化過程中會以10個一批的方式進行處理,保存模型檔非常適合。

雖然原理簡單,但存在著一個難點,即以什麼樣的規則進行分區,key應該如何設置。為了解決這個問題我們需要考慮幾個方面,第一就是哪些資料應該被聚合到一起進行訓練,第二就是如何避免資料傾斜。

針對第一個問題我們做了如下幾點考慮:

被分在一個分區的資料要有一定的相似性,這樣訓練的效果才會更好,比如按品類分區就是個典型例子。

分析商品的特性,根據特性的不同選擇不同的模型,例如高銷商品和低銷商品的預測模型是不一樣的,即使是同一模型使用的特徵也可能不同,比如對促銷敏感的商品就需要更多與促銷相關特徵,相同模型相同特徵的商品應傾向於分在一個分區中。

針對第二個問題我們採用了如下的方式解決:

對於資料量過大的分區進行隨機抽樣選取。

對於資料量過大的分區還可以做二次拆分,比如圖書小說這個品類資料量明顯大於其他品類,於是就可以分析小說品類下的子品類資料量分佈情況,並將子品類合併成新的幾個分區。

對於資料量過小這種情況則需要考慮進行幾個分區資料的合併處理。

總之對於後兩種處理方式可以單獨通過一個Spark任務定期運行,並將這種分區規則保存。

結合圖解Spark進行應用、優化

注:《圖解Spark:核心技術與案例實戰》為本文作者所著。

《圖解Spark:核心技術與案例實戰》一書以Spark2.0版本為基礎進行編寫,系統介紹了Spark核心及其生態圈元件技術。其內容包括Spark生態圈、實戰環境搭建和程式設計模型等,重點介紹了作業調度、容錯執行、監控管理、存儲管理以及運行架構,同時還介紹了Spark生態圈相關元件,包括了Spark SQL的即席查詢、Spark Streaming的即時流處理、MLlib的機器學習、GraphX的圖處理和Alluxio的分散式記憶體檔案系統等。下面介紹京東預測系統如何進行資源調度,並描述如何使用Spark存儲相關知識進行系統優化。

結合系統中的應用

在圖解Spark書的第六章描述了Spark運行架構,介紹了Spark集群資源調度一般分為粗細微性調度和細細微性調度兩種模式。粗細微性包括了獨立運行模式和Mesos粗細微性運行模式,在這種情況下以整個機器作為分配單元執行作業,該模式優點是由於資源長期持有減少了資源調度的時間開銷,缺點是該模式中無法感知資源使用的變化,易造成系統資源的閒置,從而造成了資源浪費。

而細細微性包括了Yarn運行模式和Mesos細細微性運行模式,該模式的優點是系統資源能夠得到充分利用,缺點是該模式中每個任務都需要從管理器獲取資源,調度延遲較大、開銷較大。

由於京東Spark集群屬於基礎平臺,在公司內部共用這些資源,所以集群採用的是Yarn運行模式,在這種模式下可以根據不同系統所需要的資源進行靈活的管理。在YARN-Cluster模式中,當使用者向YARN集群中提交一個應用程式後,YARN集群將分兩個階段運行該應用程式:

第一個階段是把Spark的SparkContext作為Application Master在YARN集群中先啟動;第二個階段是由Application Master創建應用程式,然後為它向Resource Manager申請資源,並啟動Executor來運行任務集,同時監控它的整個運行過程,直到運行完成。下圖為Yarn-Cluster運行模式執行過程:

結合系統的優化

我們都知道大資料處理的瓶頸在IO。我們借助Spark可以把反覆運算過程中的資料放在記憶體中,相比MapReduce寫到磁片速度提高近兩個數量級;另外對於資料處理過程盡可能避免Shuffle,如果不能避免則Shuffle前盡可能過濾資料,減少Shuffle資料量;最後,就是使用高效的序列化和壓縮演算法。在京東預測系統主要就是圍繞這些環節展開優化,相關Spark存儲原理知識可以參見圖解Spark書第五章的詳細描述。

由於資源限制,分配給預測系統的Spark集群規模並不是很大,在有限的資源下運行Spark應用程式確實是一個考驗,因為在這種情況下經常會出現諸如程式計算時間太長、找不到Executor等錯誤。我們通過調整參數、修改設計和修改程式邏輯三個方面進行優化:

參數調整

減少num-executors,調大executor-memory,這樣的目的是希望Executor有足夠的記憶體可以使用。

查看日誌發現沒有足夠的空間存儲廣播變數,分析是由於Cache到記憶體裡的資料太多耗盡了記憶體,於是我們將Cache的級別適當調成MEMORY_ONLY_SER和DISK_ONLY。

針對某些任務關閉了推測機制,因為有些任務會出現暫時無法解決的資料傾斜問題,並非節點出現問題。

調整記憶體分配,對於一個Shuffle很多的任務,我們就把Cache的記憶體分配比例調低,同時調高Shuffle的記憶體比例。

修改設計

參數的調整雖然容易做,但往往效果不好,這時候需要考慮從設計的角度去優化:

原先在訓練資料之前會先讀取歷史的幾個月甚至幾年的資料,對這些資料進行合併、轉換等一系列複雜的處理,最終生成特徵資料。由於資料量龐大,任務有時會報錯。經過調整後當天只處理當天資料,並將結果保存到當日分區下,訓練時按天數需要讀取多個分區的資料做union操作即可。

將“模型訓練”從每天執行調整到每週執行,將“模型參數選取”從每週執行調整到每月執行。因為這兩個任務都十分消耗資源,並且屬於不需要頻繁運行,這麼做雖然準確度會略微降低,但都在可接受範圍內。

通過拆分任務也可以很好的解決資源不夠用的問題。可以橫向拆分,比如原先是將100個品類資料放在一個任務中進行訓練,調整後改成每10個品類提交一次Spark作業進行訓練。這樣雖然整體執行時間變長,但是避免了程式異常退出,保證任務可以執行成功。除了橫向還可以縱向拆分,即將一個包含10個Stage的Spark任務拆分成兩個任務,每個任務包含5個Stage,中間資料保存到HDFS中。

修改程式邏輯

為了進一步提高程式的運行效率,通過修改程式的邏輯來提高性能,主要是在如下方面進行了改進:避免過多的Shuffle、減少Shuffle時需要傳輸的資料和處理資料傾斜問題等。

1. 避免過多的Shuffle

Spark提供了豐富的轉換操作,可以使我們完成各類複雜的資料處理工作,但是也正因為如此我們在寫Spark程式的時候可能會遇到一個陷阱,那就是為了使代碼變的簡潔過分依賴RDD的轉換操作,使本來僅需一次Shuffle的過程變為了執行多次。我們就曾經犯過這樣一個錯誤,本來可以通過一次groupByKey完成的操作卻使用了兩回。

業務邏輯是這樣的:我們有三張表分別是銷量(s)、價格(p)、庫存(v),每張表有3個欄位:商品id(sku_id)、品類id(category)和歷史時序資料(data),現在需要按sku_id將s、p、v資料合併,然後再按category再合併一次,最終的資料格式是:[category,[[sku_id, s , p, v], [sku_id, s , p, v], […],[…]]]。一開始我們先按照sku_id + category作為key進行一次groupByKey,將資料格式轉換成[sku_id, category , [s,p, v]],然後按category作為key再groupByKey一次。

後來我們修改為按照category作為key只進行一次groupByKey,因為一個sku_id只會屬於一個category,所以後續的map轉換裡面只需要寫一些代碼將相同sku_id的s、p、v資料group到一起就可以了。兩次groupByKey的情況:

修改後變為一次groupByKey的情況:

多表join時,如果key值相同,則可以使用union+groupByKey+flatMapValues形式進行。比如:需要將銷量、庫存、價格、促銷計畫和商品資訊通過商品編碼連接到一起,一開始使用的是join轉換操作,將幾個RDD彼此join在一起。後來發現這樣做運行速度非常慢,於是換成union+groypByKey+flatMapValue形式,這樣做只需進行一次Shuffle,這樣修改後運行速度比以前快多了。實例代碼如下:

如果兩個RDD需要在groupByKey後進行join操作,可以使用cogroup轉換操作代替。比如, 將歷史銷量資料按品類進行合併,然後再與模型檔進行join操作,流程如下:

使用cogroup後,經過一次Shuffle就可完成了兩步操作,性能大幅提升。

2. 減少Shuffle時傳輸的資料量

在Shuffle操作前儘量將不需要的資料過濾掉。

使用comebineyeByKey可以高效率的實現任何複雜的聚合邏輯。

comebineyeByKey屬於聚合類操作,由於它支持map端的聚合所以比groupByKey性能好,又由於它的map端與reduce端可以設置成不一樣的邏輯,所以它支援的場景比reduceByKey多,它的定義如下:

educeByKey和groupByKey內部實際是調用了comebineyeByKey,

我們之前有很多複雜的無法用reduceByKey來實現的聚合邏輯都通過groupByKey來完成的,後來全部替換為comebineyeByKey後性能提升了不少。

3.處理資料傾斜

有些時候經過一系列轉換操作之後資料變得十分傾斜,在這樣情況下後續的RDD計算效率會非常的糟糕,嚴重時程式報錯。遇到這種情況通常會使用repartition這個轉換操作對RDD進行重新分區,重新分區後資料會均勻分佈在不同的分區中,避免了資料傾斜。如果是減少分區使用coalesce也可以達到效果,但比起repartition不足的是分配不是那麼均勻。

寫在最後

雖然京東的預測系統已經穩定運行了很長一段時間,但是我們也看到系統本身還存在著很多待改進的地方,接下來我們會在預測準確度的提高、系統性能的優化、多業務支持的便捷性上進行改進。未來,隨著大資料、人工智慧技術在京東供應鏈管理中的使用越來越多,預測系統也將發揮出更大作用,對於京東預測系統的研發工作也將是充滿著挑戰與樂趣。

隱藏在文末的福利

當傳統行業遇到創新技術,會碰撞出怎樣的火花?當創新技術進入傳統行業,將產生出怎樣的價值?技術又如何為企業帶來一萬倍的業務增長?

G+全球技術價值峰會,4月19日,北京。聯合京東終身名譽技術顧問李大學、平安科技COO兼CTO 胡瑋、螞蟻金服全球技術合作與發展部總經理歸朴等行業領軍人物揭秘傳統企業互聯網轉型的痛點與經驗。

戳 「 閱讀原文 」,瞭解更多!

今日薦號

大資料雜談

我們專注大資料和機器學習, 每天發佈高品質文章, 技術案例等原創乾貨源源不斷。 更有社群微課堂, 也希望你能從這裡分享前沿技術,交流深度思考。

今日薦文

點擊下方圖片即可閱讀

程式師,這是你想要的技術leader嗎?

在這裡有一項技術起著至關重要的低層支撐作用——預測技術。據粗略估算,1%的預測準確度的提升可以節約數倍的運營成本。

怎樣理解預測在供應鏈優化中的作用呢?拿商品補貨舉例,一家公司為了保證庫房不缺貨,可能會頻繁的從供應商那裡補充大量商品,這樣做雖然不會缺貨,但可能會造成更多賣不出去的商品積壓在倉庫中,從而使商品的周轉率降低,庫存成本增加。反之,這家公司有可能為了追求零庫存而補很少的商品,但這就可能出現嚴重的缺貨問題,從而使現貨率降低,嚴重影響用戶體驗,缺貨成本增加。於是問題就來了,要補多少商品才合適,什麼時間補貨,這就需要權衡考慮了,最終目的是要使庫存成本和缺貨成本達到一個平衡。

考慮一下極端情況,等庫存降到零時再去補貨,這時供應商接到補貨通知後將貨物運往倉庫。但是這麼做有個問題,因為運送過程需要時間,這段時間庫房就缺貨了。那怎麼辦呢?就是利用預測技術。利用預測我們可以計算出未來商品在途的這段時間裡銷量大概是多少,然後我們讓倉庫保證這個量,低於這個量就給供應商下達補貨通知,於是問題得以解決。總而言之,預測技術在這裡發揮了重要的作用,成為關鍵的一個環。

京東預測系統

預測系統介紹

預測系統在整個供應鏈體系中處在最底層並且起到一個支撐的作用,支持上層的多個決策優化系統,而這些決策優化系統利用精准的預測資料結合運籌學技術得出最優的決策,並將結果提供給更上層的業務執行系統或是業務方直接使用。

目前,預測系統主要支援三大業務:銷量預測、單量預測和GMV預測。其中銷量預測主要支持商品補貨、商品調撥;單量預測主要支援倉庫、網站的運營管理;GMV預測主要支持銷售部門計畫的定制。

銷量預測按照不同維度又可以分為RDC採購預測、FDC調撥預測、城市倉調撥預測、大建倉補貨預測、全球購銷量預測和圖書促銷預測等;單量預測又可分為庫房單量預測、配送中心單量預測和配送站單量預測等(在這裡“單量”並非指使用者所下訂單的量,而是將訂單拆單後流轉到倉庫中的單量。例如一個使用者的訂單中包括3件物品,其中兩個大件品和一個小件品,在京東的供應鏈環節中可能會將其中兩個大件品組成一個單投放到大件倉中,而將那個小件單獨一個單投放到小件倉中,單量指的是拆單後的量);GMV預測支援到商品細微性。

預測系統架構

整體架構從上至下依次是:資料來源輸入層、基礎資料加工層、核心業務層、資料輸出層和下游系統。首先從外部資料來源獲取我們所需的業務資料,然後對基礎資料進行加工清洗,再通過時間序列、機器學習等人工智慧技術對資料進行處理分析,最後計算出預測結果並通過多種途徑推送給下游系統使用。

資料來源輸入層:京東資料倉庫中存儲著我們需要的大部分業務資料,例如訂單資訊、商品資訊、庫存資訊等等。而對於促銷計畫資料則大部分來自于采銷人員通過Web系統錄入的資訊。除此之外還有一小部分資料通過文本形式直接上傳到HDFS中。

基礎資料加工層:在這一層主要通過Hive對基礎資料進行一些加工清洗,去掉不需要的欄位,過濾不需要的維度並清洗有問題的資料。

核心業務層:這層是系統的的核心部分,橫向看又可分為三層:特徵構建、預測演算法和預測結果加工。縱向看是由多條業務線組成,彼此之間不發生任何交集。

特徵構建:將之前清洗過的基礎資料通過近一步的處理轉化成標準格式的特徵資料,提供給後續演算法模型使用。

核心演算法:利用時間序列分析、機器學習等人工智慧技術進行銷量、單量的預測,是預測系統中最為核心的部分。

預測結果加工:預測結果可能在格式和一些特殊性要求上不能滿足下游系統,所以還需要根據實際情況對其進行加工處理,比如增加標準差、促銷標識等額外資訊。

預測結果輸出層:將最終預測結果同步回京東資料倉庫、MySql、HBase或製作成JSF接口供其他系統遠端調用。

下游系統:包括下游任務流程、下游Web系統和其他系統。

預測系統核心介紹

預測系統核心層技術選型

預測系統核心層技術主要分為四層:基礎層、框架層、工具層和演算法層。

基礎層:

HDFS用來做資料存儲,Yarn用來做資源調度,BDP(Big Data Platform)是京東自己研發的大資料平臺,我們主要用它來做任務調度。

框架層:

以Spark RDD、Spark SQL、Hive為主, MapReduce程式占一小部分,是原先遺留下來的,目前正逐步替換成Spark RDD。 選擇Spark除了對性能的考慮外,還考慮了Spark程式開發的高效率、多語言特性以及對機器學習演算法的支援。在Spark開發語言上我們選擇了Python,原因有以下三點:

Python有很多不錯的機器學習演算法包可以使用,比起Spark的MLlib,演算法的準確度更高。我們用GBDT做過對比,發現xgboost比MLlib裡面提供的提升樹模型預測準確度高出大概5%~10%。雖然直接使用Spark自帶的機器學習框架會節省我們的開發成本,但預測準確度對於我們來說至關重要,每提升1%的準確度,就可能會帶來成本的成倍降低。

我們的團隊中包括開發工程師和演算法工程師,對於演算法工程師而言他們更擅長使用Python進行資料分析,使用Java或Scala會有不小的學習成本。

對比其他語言,我們發現使用Python的開發效率是最高的,並且對於一個新人,學習Python比學習其他語言更加容易。

工具層:

一方面我們會結合自身業務有針對性的開發一些演算法,另一方面我們會直接使用業界比較成熟的演算法和模型,這些演算法都封裝在協力廠商Python包中。我們比較常用的包有xgboost、numpy、pandas、sklearn、scipy和hyperopt等。

Xgboost:它是Gradient Boosting Machine的一個C++實現,xgboost最大的特點在於,它能夠自動利用CPU的多執行緒進行並行,同時在演算法上加以改進提高了精度。

numpy:是Python的一種開源的數值計算擴展。這種工具可用來存儲和處理大型矩陣,比Python自身的嵌套清單結構要高效的多(該結構也可以用來表示矩陣)。

pandas:是基於NumPy 的一種工具,該工具是為了解決資料分析任務而創建的。Pandas 納入了大量庫和一些標準的資料模型,提供了高效地操作大型資料集所需的工具。

sklearn:是Python重要的機器學習庫,支持包括分類、回歸、降維和聚類四大機器學習演算法。還包含了特徵提取、資料處理和模型評估三大模組。

scipy:是在NumPy庫的基礎上增加了眾多的數學、科學以及工程計算中常用的庫函數。例如線性代數、常微分方程數值求解、信號處理、影像處理和疏鬆陣列等等。

演算法層:

我們用到的演算法模型非常多,原因是京東的商品品類齊全、業務複雜,需要根據不同的情況採用不同的演算法模型。我們有一個獨立的系統來為演算法模型與商品之間建立匹配關係,有些比較複雜的預測業務還需要使用多個模型。我們使用的演算法總體上可以分為三類:時間序列、機器學習和結合業務開發的一些獨有的演算法。

1. 機器學習演算法主要包括GBDT、LASSO和RNN :

GBDT:是一種反覆運算的決策樹演算法,該演算法由多棵決策樹組成,所有樹的結論累加起來做最終答案。我們用它來預測高銷量,但歷史規律不明顯的商品。

RNN:這種網路的內部狀態可以展示動態時序行為。不同於前饋神經網路的是,RNN可以利用它內部的記憶來處理任意時序的輸入序列,這讓它可以更容易處理如時序預測、語音辨識等。

LASSO:該方法是一種壓縮估計。它通過構造一個罰函數得到一個較為精煉的模型,使得它壓縮一些係數,同時設定一些係數為零。因此保留了子集收縮的優點,是一種處理具有複共線性資料的有偏估計。用來預測低銷量,歷史資料平穩的商品效果較好。

2. 時間序列主要包括ARIMA和Holt winters :

ARIMA:全稱為自回歸積分滑動平均模型,于70年代初提出的一個著名時間序列預測方法,我們用它來主要預測類似庫房單量這種平穩的序列。

Holt winters:又稱三次指數平滑演算法,也是一個經典的時間序列演算法,我們用它來預測季節性和趨勢都很明顯的商品。

3. 結合業務開發的獨有演算法包括WMAStockDT、SimilarityModel和NewProduct等:

WMAStockDT:庫存決策樹模型,用來預測受庫存狀態影響較大的商品。

SimilarityModel:相似品模型,使用指定的同類品資料來預測某商品未來銷量。

NewProduct:新品模型,顧名思義就是用來預測新品的銷量。

預測系統核心流程

預測核心流程主要包括兩類:以機器學習演算法為主的流程和以時間序列分析為主的流程。

1. 以機器學習演算法為主的流程如下:

特徵構建:通過資料分析、模型試驗確定主要特徵,通過一系列任務生成標準格式的特徵資料。

模型選擇:不同的商品有不同的特性,所以首先會根據商品的銷量高低、新品舊品、假節日敏感性等因素分配不同的演算法模型。

特徵選擇:對一批特徵進行篩選過濾不需要的特徵,不同類型的商品特徵不同。

樣本分區:對訓練資料進行分組,分成多組樣本,真正訓練時針對每組樣本生成一個模型檔。一般是同類型商品被分成一組,比如按品類維度分組,這樣做是考慮並行化以及模型的準確性。

模型參數:選擇最優的模型參數,合適的參數將提高模型的準確度,因為需要對不同的參數組合分別進行模型訓練和預測,所以這一步是非常耗費資源。

模型訓練:待特徵、模型、樣本都確定好後就可以進行模型訓練,訓練往往會耗費很長時間,訓練後會生成模型檔,存儲在HDFS中。

模型預測:讀取模型檔進行預測執行。

多模型擇優:為了提高預測準確度,我們可能會使用多個演算法模型,當每個模型的預測結果輸出後系統會通過一些規則來選擇一個最優的預測結果。

預測值異常攔截:我們發現越是複雜且不易解釋的演算法越容易出現極個別預測值異常偏高的情況,這種預測偏高無法結合歷史資料進行解釋,因此我們會通過一些規則將這些異常值攔截下來,並且用一個更加保守的數值代替。

模型評價:計算預測準確度,我們通常用使用mapd來作為評價指標。

誤差分析:通過分析預測準確度得出一個誤差在不同維度上的分佈,以便給演算法優化提供參考依據。

2. 以時間序列分析為主的預測流程如下:

生成歷史時序:將歷史銷量、價格、庫存等資料按照規定格式生成時序資料。

節假日因數:計算節假日與銷量之間的關係,用來平滑節假日對銷量影響。

周日因數:計算週一到周日這7天與銷量的關係,用來平滑周日對銷量的影響。

促銷因數:計算促銷與銷量之間的關係,用來平滑促銷對銷量的影響。

因數平滑:歷史銷量是不穩定的,會受到節假日、促銷等影響,在這種情況下進行預測有很大難度,所以需要利用之前計算的各類因數對歷史資料進行平滑處理。

時序預測:在一個相對平穩的銷量資料上通過演算法進行預測。

因數疊加:結合未來節假日、促銷計畫等因素對預測結果進行調整。

Spark在預測核心層的應用

我們使用Spark SQL和Spark RDD相結合的方式來編寫程式,對於一般的資料處理,我們使用Spark的方式與其他無異,但是對於模型訓練、預測這些需要調用演算法介面的邏輯就需要考慮一下並行化的問題了。我們平均一個訓練任務在一天處理的資料量大約在500G左右,雖然資料規模不是特別的龐大,但是Python演算法包提供的演算法都是單進程執行。我們計算過,如果使用一台機器訓練全部品類資料需要一個星期的時間,這是無法接收的,所以我們需要借助Spark這種分散式平行計算框架來將計算分攤到多個節點上實現並行化處理。

我們實現的方法很簡單,首先需要在集群的每個節點上安裝所需的全部Python包,然後在編寫Spark程式時考慮通過某種規則將資料分區,比如按品類維度,通過groupByKey操作將資料重新分區,每一個分區是一個樣本集合並進行獨立的訓練,以此達到並行化。流程如下圖所示:

虛擬碼如下:

repartitionBy方法即設置一個重分區的邏輯返回(K,V)結構RDD,train方法是訓練資料,在train方法裡面會調用Python演算法包介面。saveAsPickleFile是Spark Python獨有的一個Action操作,支持將RDD保存成序列化後的sequnceFile格式的檔,在序列化過程中會以10個一批的方式進行處理,保存模型檔非常適合。

雖然原理簡單,但存在著一個難點,即以什麼樣的規則進行分區,key應該如何設置。為了解決這個問題我們需要考慮幾個方面,第一就是哪些資料應該被聚合到一起進行訓練,第二就是如何避免資料傾斜。

針對第一個問題我們做了如下幾點考慮:

被分在一個分區的資料要有一定的相似性,這樣訓練的效果才會更好,比如按品類分區就是個典型例子。

分析商品的特性,根據特性的不同選擇不同的模型,例如高銷商品和低銷商品的預測模型是不一樣的,即使是同一模型使用的特徵也可能不同,比如對促銷敏感的商品就需要更多與促銷相關特徵,相同模型相同特徵的商品應傾向於分在一個分區中。

針對第二個問題我們採用了如下的方式解決:

對於資料量過大的分區進行隨機抽樣選取。

對於資料量過大的分區還可以做二次拆分,比如圖書小說這個品類資料量明顯大於其他品類,於是就可以分析小說品類下的子品類資料量分佈情況,並將子品類合併成新的幾個分區。

對於資料量過小這種情況則需要考慮進行幾個分區資料的合併處理。

總之對於後兩種處理方式可以單獨通過一個Spark任務定期運行,並將這種分區規則保存。

結合圖解Spark進行應用、優化

注:《圖解Spark:核心技術與案例實戰》為本文作者所著。

《圖解Spark:核心技術與案例實戰》一書以Spark2.0版本為基礎進行編寫,系統介紹了Spark核心及其生態圈元件技術。其內容包括Spark生態圈、實戰環境搭建和程式設計模型等,重點介紹了作業調度、容錯執行、監控管理、存儲管理以及運行架構,同時還介紹了Spark生態圈相關元件,包括了Spark SQL的即席查詢、Spark Streaming的即時流處理、MLlib的機器學習、GraphX的圖處理和Alluxio的分散式記憶體檔案系統等。下面介紹京東預測系統如何進行資源調度,並描述如何使用Spark存儲相關知識進行系統優化。

結合系統中的應用

在圖解Spark書的第六章描述了Spark運行架構,介紹了Spark集群資源調度一般分為粗細微性調度和細細微性調度兩種模式。粗細微性包括了獨立運行模式和Mesos粗細微性運行模式,在這種情況下以整個機器作為分配單元執行作業,該模式優點是由於資源長期持有減少了資源調度的時間開銷,缺點是該模式中無法感知資源使用的變化,易造成系統資源的閒置,從而造成了資源浪費。

而細細微性包括了Yarn運行模式和Mesos細細微性運行模式,該模式的優點是系統資源能夠得到充分利用,缺點是該模式中每個任務都需要從管理器獲取資源,調度延遲較大、開銷較大。

由於京東Spark集群屬於基礎平臺,在公司內部共用這些資源,所以集群採用的是Yarn運行模式,在這種模式下可以根據不同系統所需要的資源進行靈活的管理。在YARN-Cluster模式中,當使用者向YARN集群中提交一個應用程式後,YARN集群將分兩個階段運行該應用程式:

第一個階段是把Spark的SparkContext作為Application Master在YARN集群中先啟動;第二個階段是由Application Master創建應用程式,然後為它向Resource Manager申請資源,並啟動Executor來運行任務集,同時監控它的整個運行過程,直到運行完成。下圖為Yarn-Cluster運行模式執行過程:

結合系統的優化

我們都知道大資料處理的瓶頸在IO。我們借助Spark可以把反覆運算過程中的資料放在記憶體中,相比MapReduce寫到磁片速度提高近兩個數量級;另外對於資料處理過程盡可能避免Shuffle,如果不能避免則Shuffle前盡可能過濾資料,減少Shuffle資料量;最後,就是使用高效的序列化和壓縮演算法。在京東預測系統主要就是圍繞這些環節展開優化,相關Spark存儲原理知識可以參見圖解Spark書第五章的詳細描述。

由於資源限制,分配給預測系統的Spark集群規模並不是很大,在有限的資源下運行Spark應用程式確實是一個考驗,因為在這種情況下經常會出現諸如程式計算時間太長、找不到Executor等錯誤。我們通過調整參數、修改設計和修改程式邏輯三個方面進行優化:

參數調整

減少num-executors,調大executor-memory,這樣的目的是希望Executor有足夠的記憶體可以使用。

查看日誌發現沒有足夠的空間存儲廣播變數,分析是由於Cache到記憶體裡的資料太多耗盡了記憶體,於是我們將Cache的級別適當調成MEMORY_ONLY_SER和DISK_ONLY。

針對某些任務關閉了推測機制,因為有些任務會出現暫時無法解決的資料傾斜問題,並非節點出現問題。

調整記憶體分配,對於一個Shuffle很多的任務,我們就把Cache的記憶體分配比例調低,同時調高Shuffle的記憶體比例。

修改設計

參數的調整雖然容易做,但往往效果不好,這時候需要考慮從設計的角度去優化:

原先在訓練資料之前會先讀取歷史的幾個月甚至幾年的資料,對這些資料進行合併、轉換等一系列複雜的處理,最終生成特徵資料。由於資料量龐大,任務有時會報錯。經過調整後當天只處理當天資料,並將結果保存到當日分區下,訓練時按天數需要讀取多個分區的資料做union操作即可。

將“模型訓練”從每天執行調整到每週執行,將“模型參數選取”從每週執行調整到每月執行。因為這兩個任務都十分消耗資源,並且屬於不需要頻繁運行,這麼做雖然準確度會略微降低,但都在可接受範圍內。

通過拆分任務也可以很好的解決資源不夠用的問題。可以橫向拆分,比如原先是將100個品類資料放在一個任務中進行訓練,調整後改成每10個品類提交一次Spark作業進行訓練。這樣雖然整體執行時間變長,但是避免了程式異常退出,保證任務可以執行成功。除了橫向還可以縱向拆分,即將一個包含10個Stage的Spark任務拆分成兩個任務,每個任務包含5個Stage,中間資料保存到HDFS中。

修改程式邏輯

為了進一步提高程式的運行效率,通過修改程式的邏輯來提高性能,主要是在如下方面進行了改進:避免過多的Shuffle、減少Shuffle時需要傳輸的資料和處理資料傾斜問題等。

1. 避免過多的Shuffle

Spark提供了豐富的轉換操作,可以使我們完成各類複雜的資料處理工作,但是也正因為如此我們在寫Spark程式的時候可能會遇到一個陷阱,那就是為了使代碼變的簡潔過分依賴RDD的轉換操作,使本來僅需一次Shuffle的過程變為了執行多次。我們就曾經犯過這樣一個錯誤,本來可以通過一次groupByKey完成的操作卻使用了兩回。

業務邏輯是這樣的:我們有三張表分別是銷量(s)、價格(p)、庫存(v),每張表有3個欄位:商品id(sku_id)、品類id(category)和歷史時序資料(data),現在需要按sku_id將s、p、v資料合併,然後再按category再合併一次,最終的資料格式是:[category,[[sku_id, s , p, v], [sku_id, s , p, v], […],[…]]]。一開始我們先按照sku_id + category作為key進行一次groupByKey,將資料格式轉換成[sku_id, category , [s,p, v]],然後按category作為key再groupByKey一次。

後來我們修改為按照category作為key只進行一次groupByKey,因為一個sku_id只會屬於一個category,所以後續的map轉換裡面只需要寫一些代碼將相同sku_id的s、p、v資料group到一起就可以了。兩次groupByKey的情況:

修改後變為一次groupByKey的情況:

多表join時,如果key值相同,則可以使用union+groupByKey+flatMapValues形式進行。比如:需要將銷量、庫存、價格、促銷計畫和商品資訊通過商品編碼連接到一起,一開始使用的是join轉換操作,將幾個RDD彼此join在一起。後來發現這樣做運行速度非常慢,於是換成union+groypByKey+flatMapValue形式,這樣做只需進行一次Shuffle,這樣修改後運行速度比以前快多了。實例代碼如下:

如果兩個RDD需要在groupByKey後進行join操作,可以使用cogroup轉換操作代替。比如, 將歷史銷量資料按品類進行合併,然後再與模型檔進行join操作,流程如下:

使用cogroup後,經過一次Shuffle就可完成了兩步操作,性能大幅提升。

2. 減少Shuffle時傳輸的資料量

在Shuffle操作前儘量將不需要的資料過濾掉。

使用comebineyeByKey可以高效率的實現任何複雜的聚合邏輯。

comebineyeByKey屬於聚合類操作,由於它支持map端的聚合所以比groupByKey性能好,又由於它的map端與reduce端可以設置成不一樣的邏輯,所以它支援的場景比reduceByKey多,它的定義如下:

educeByKey和groupByKey內部實際是調用了comebineyeByKey,

我們之前有很多複雜的無法用reduceByKey來實現的聚合邏輯都通過groupByKey來完成的,後來全部替換為comebineyeByKey後性能提升了不少。

3.處理資料傾斜

有些時候經過一系列轉換操作之後資料變得十分傾斜,在這樣情況下後續的RDD計算效率會非常的糟糕,嚴重時程式報錯。遇到這種情況通常會使用repartition這個轉換操作對RDD進行重新分區,重新分區後資料會均勻分佈在不同的分區中,避免了資料傾斜。如果是減少分區使用coalesce也可以達到效果,但比起repartition不足的是分配不是那麼均勻。

寫在最後

雖然京東的預測系統已經穩定運行了很長一段時間,但是我們也看到系統本身還存在著很多待改進的地方,接下來我們會在預測準確度的提高、系統性能的優化、多業務支持的便捷性上進行改進。未來,隨著大資料、人工智慧技術在京東供應鏈管理中的使用越來越多,預測系統也將發揮出更大作用,對於京東預測系統的研發工作也將是充滿著挑戰與樂趣。

隱藏在文末的福利

當傳統行業遇到創新技術,會碰撞出怎樣的火花?當創新技術進入傳統行業,將產生出怎樣的價值?技術又如何為企業帶來一萬倍的業務增長?

G+全球技術價值峰會,4月19日,北京。聯合京東終身名譽技術顧問李大學、平安科技COO兼CTO 胡瑋、螞蟻金服全球技術合作與發展部總經理歸朴等行業領軍人物揭秘傳統企業互聯網轉型的痛點與經驗。

戳 「 閱讀原文 」,瞭解更多!

今日薦號

大資料雜談

我們專注大資料和機器學習, 每天發佈高品質文章, 技術案例等原創乾貨源源不斷。 更有社群微課堂, 也希望你能從這裡分享前沿技術,交流深度思考。

今日薦文

點擊下方圖片即可閱讀

程式師,這是你想要的技術leader嗎?