複製鏈接
請複製以下鏈接發送給好友

Dpark

鎖定
DPark 是一個類似MapReduce 的基於Mesos(Apache 下的一個集羣管理器,提供了有效的、跨分佈式應用或框架的資源隔離和共享等功能)的集羣並行計算框架(Cluster Computing Framework),DPark 是Spark 的Python克隆版本,是一個Python 實現的分佈式計算框架,可以非常方便地實現大規模數據處理和低延時的迭代計算。該計算框架類似於MapReduce,但是比其更靈活,可以用Python 非常方便地進行分佈式計算,並且提供了更多的功能,以便更好地進行迭代式計算。DPark 由國內的豆瓣公司開發實現和負責維護,據豆瓣公司的描述,豆瓣公司內部的絕大多數數據分析都使用DPark 完成,整個項目也正趨於完善。
外文名
Dpark
類    型
管理器
  1. 1. 基本概念
  2. 1.1.1 彈性分佈式數據集
RDD 是DPark 的核心概念,是DPark 使用的一種數據模型,RDD 的一個重要特徵就是在計算過程中,一個RDD 可以在不同的並行循環中被重複利用。
RDD 是一種支持容錯、可進行並行計算的元素集合。一個RDD 由多個分片(Split)組成,分片是計算過程中並行計算的基本單位。目前,DPark 支持從以下兩種數據源中獲取RDD。
(1)並行化數據集:可以將一個普通的Python 數據集合(如list)拆分為若干分片後組成一個RDD。
(2)分佈式文件系統中的單個或者多個文件:即將分佈式文件系統中的文件按照行拆分後,組成一個RDD,目前,DPark 支持兩種格式的文件,即文本格式文件和CSV 格式文件。
從上述數據源生成的RDD以及在並行計算過程中新生成的RDD都支持相同的操作和變換。在DPark 中通過上述兩種數據源生成RDD 的具體方法如下。
① 從並行化數據集生成RDD。
對於並行化的數據集,可以調用DparkContext 的parallelize 函數得到,方法是:rdd = ctx.parallelize(seq, numSlices)。其中,參數numSlices 表示將數據集seq 劃分的分片數,DPark 會將每個分片作為一個任務發送到集羣上進行並行計算,對於基於Mesos 的集羣節點,通常每個CPU 可以運行2~4 個任務。numSlices 針對不同的集羣設置,也都有相應的默認值。
② 從分佈式文件系統生成RDD。
DPark 可以讀取從分佈式文件系統中的一個或多個文件生成RDD,目前支持的文件格式有文本文件和CSV 格式文件,而且將來會增加對更多文件格式的支持,例如,支持從MySQL 中讀取數據。當然,用户也可以自己按照RDD 的要求實現對應的RDD。
● 讀取文本文件為RDD。
可以調用DparkContext 的textFile 方法從文本文件生成RDD。text_rdd = ctx.textFile('data.txt', splitSize=64<<20)通過這種方式創建的RDD 中,每個元素為源文件中的一行,包含行結尾的回車符,splitSize 指定RDD 中每個分片的大小,默認為64MB。
● 從CSV 格式文件生成RDD。
可以調用DparkContext 的csvFile 方法從文本文件生成RDD。csv_rdd = ctx.csvFile('data.csv', splitSize=32<<20, dialect='excel')通過這種方式生成的RDD 中,每個元素為源文件中每一行分割後生成的數組。splitSize 指定RDD 中每個分片的大小,默認為32MB。dialect 參數指定csv 文件中使用的分隔符,具體請參見csv.reader,默認使用逗號('excel')分割。
1.1.2 共享變量
在 DPark 中,具體的計算過程發生在集羣的每個計算節點上,所以DPark需要將RDD 的分片數據和計算函數(如map 和reduce 函數)進行序列化,通過網絡發送到計算節點上,然後在計算節點上執行反序列化,在這個過程中,計算操作所依賴的全局變量、模塊和閉包等也會被複制到該計算節點上,所以,在計算節點中對普通變量的修改不會影響到主程序中的變量。
對於在計算過程中數據共享的需要,DPark 通過提供共享變量來實現。共享變量在不同的計算任務之間可以進行共享式的讀/寫,DPark 目前支持兩種類型的共享變量:只讀廣播變量和只可以寫的累加器。
● 廣播變量。
DPark 支持將變量一次性發送到集羣的所有計算節點上,這種變量稱為廣播變量。使用廣播變量避免了變量在每次調用時需要通過網絡傳輸,執行序列化和反序列化等操作。這種情況一般用在計算函數需要依賴一個特別大的數據集的時候。需要注意的是,被廣播的對象大小不能超出計算節點的內存限制。DPark 使用較高效率的廣播算法執行廣播變量在集羣中的傳遞,目前支持分佈式文件系統和樹形結構兩種廣播算法。
●累加器。
累加器可以在執行數據量較小的任務的時候,用於收集任務產生的少量數據。累加器只支持add 操作,不支持刪除、更新等操作,DPark 默認使用的累加器可以支持數值類型、list 類型和dict 類型,用户也可以自定義累加器。
  1. DPark的計算模型
DPark 並行計算模型基於上節講到的DPark 的兩個基本概念,一是基於對RDD 的分佈式計算,二是基於計算過程中能夠通過不同的機器訪問的共享變量。共享變量必須很容易在現存的分佈式文件系統中實現。RDD 能夠在多次循環計算過程中反覆被利用,所以DPark 支持將RDD 緩存在計算節點的內存中以加快計算。
RDD 目前支持兩種類型的並行計算。
(1)變換
變換是將現有的RDD 通過運算變成另外一個RDD,例如,DPark 中的map、filter 操作就是該功能。
(2)操作
操作是指將現有的RDD 進行聚合運算,然後將計算結果立即返回給主程序,例如,DPark 中的求和操作count。需要説明的是,所有的RDD 的變換都是滯後的,當在一個RDD 上調用變換函數(如map)的時候,並沒有立即執行該計算,只是生成了一個攜帶計算信息的新的RDD,只有當RDD 需要將某個操作結果返回給主程序的時候,才執行真正的計算。這種設計有如下好處。
(1)提高效率:DPark 可以自動將多個變換操作進行合併,然後同時運行,這樣可以最大程度地減少數據傳輸量。
(2)容錯:在某個分片計算失敗的情況下,RDD 由於攜帶有計算信息,因此可以重新執行計算。
另外,RDD 支持一種很特別且很重要的變換,即緩存(cache),當在某個RDD 上調用cache()函數的時候,每個計算節點會將分配給自己的計算分片的結果緩存在內存中,當以後需要對該RDD 或者從該RDD 轉換來的RDD 進行操作的時候,計算節點就可以直接從內存中取得該RDD 的計算結果。很顯然,當整個計算過程需要對RDD 進行重複利用時,緩存技術將大大提高計算性能。這種設計方法對於迭代式計算非常有幫助。
  1. 3. 工作原理
在前面DPark 和MapReduce 的比較一節中已經提到,DPark 和MapReduce的關鍵性區別在於處理數據流上。MapReduce 基於非循環的數據流模型,而DPark 對於需要重複使用數據集的迭代式算法具有較高的效率。這其中的關鍵是DPark 使用了一種特殊的數據模型,即RDD。
RDD 的設計目標是既保留了MapReduce 等數據流處理模型優點,例如,自動容錯、數據本地化、可拓展性強等,也添加了自己獨特的特性,即將一部分數據緩存在內存中,以加快對這部分數據的查詢和計算的效率。
RDD 可以認為提供了一個高度限制的共享內存,RDD 只讀或者只能從別的RDD 轉化而來。這些限制可以降低自動容錯需要的開銷,RDD 使用一種稱為“血統”的容錯機制,也就是每個RDD 都攜帶了它是如何從別的RDD 轉變過來的信息以及如何重建某一分片的信息。
在 RDD 這種數據模型出現之前,也有很多數據處理模型被創造出來解決非循環的數據模型中的問題。例如,Google 的Pregel(迭代圖計算框架)、Twister和HaLoop(迭代MapReduce 框架)等,這些模型的一個共同特徵就是應用場景有限,這些模型是根據企業(如Google)自己的業務需求而開發出來的,所以無法通用。而基於RDD 的DPark 提供了一種更加通用的迭代式並行計算框架,用户可以顯示控制計算的中間結果,自由控制計算的流程。
RDD 的發明者已經在Spark 的基礎上實現了Pregel(100 行Scala 代碼)以及迭代MapReduce 框架(200 行Scala 代碼),從實現代碼行數上可以看到,Spark提供了一個很簡潔的編程模型。Spark 在其他框架不能很好適用的場景下也可以達到很好的效果。例如,在交互式的大數據查詢上。一些實踐和研究表明,Spark 在那些迭代式計算中比Hadoop 快20 倍,能夠在5~7s 的時間內交互式地查詢1TB 的數據(比Hadoop 快40 倍)。
  1. 4. 容錯機制
通常,實現分佈式數據集容錯有兩種常用的方法,即數據檢查點和記錄更新。
考慮到RDD 的應用場景(面向大規模數據的存儲和分析),給數據設置檢查點代價較高,原因在於設置檢查點需要在集羣節點之間進行大量數據的拷貝操作,而這種拷貝操作會受到集羣帶寬的限制,而且帶寬是集羣中的稀缺資源,拷貝操作會犧牲大量的帶寬,而且拷貝也增加了集羣節點的存儲負載。
考慮到上述情況,DPark 使用記錄更新的方法實現容錯,但是更新所有記錄的方法也需要較高的代價,所以RDD 僅支持粗顆粒度變換,即僅記錄在單個分片上的單個操作,然後創建某個RDD 的變換序列存儲下來,即保存“血統”信息,當出現數據丟失的情況時,可以根據保存的“血統”信息重新構建數據,以此來達到數據容錯的功能。
當時由於RDD 在計算過程中需要進行多次變換,導致變換的序列很長,因此根據這些很長的變換序列恢復丟失的數據需要很長的時間,所以DPark 設計者建議在變換序列很長的情況下,適當地建立一些數據檢測點以加快實現容錯。DPark 目前還沒有實現自動判斷是否需要建立檢查點的機制,用户可以通過saveAsTextFile 方法來手動設置數據檢查點。據DPark 網站描述,DPark 會在後來的升級版本中添加這一機制,實現更加自動化、更加友好的容錯機制。
  1. 5. 內部設計機制
在DPark 設計中,為了將來使RDD 支持更多類型的變換,而不用改變現存的任務調度機制,而且也為了保持RDD 的基於“血統”的容錯機制,RDD被設計了幾個通用接口,具體來講,每個RDD 必須包含如下4 方面的信息。
● 數據源分割後的數據分片信息,即源代碼中的split 變量。
● “血統”信息,即該RDD 所依賴的父RDD 信息以及二者之間的關係信息,即源代碼中的dependence 變量。
● 計算函數,即該RDD 如何從父RDD 經過計算轉變得來,即源代碼中的iterator(split)和compute 函數。
● 如何對數據進行分片和分片保持位置相關的元數據信息,即源代碼中的partitioner 和preferredLocations。
下面進行舉例説明。
例如,從分佈式文件系統中的文件轉換得到的RDD,這些RDD 中的數據分片通過對源數據文件進行分割後得到的,它們沒有父RDD,這些RDD 的計算函數只是讀取文件中的每一行,然後返回給子RDD,對於通過map 函數轉換得到的RDD,會具有和父RDD 相同的數據分片。
上面所列的RDD 的四個通用信息如何表達父RDD 和子RDD 之間的關係是DPark 和Spark 必須要考慮的事情,在Spark 實現中,將這種依賴關係劃分為兩種類型:窄依賴和寬依賴。窄依賴是指子RDD 的每個數據分片只對父RDD 中的有限個數據分片有依賴關係,而且依賴的數量在規模上要和父RDD 分片數量差別很大。寬依賴是指子RDD 中的每個數據分片都可以對父RDD中的每個數據分片有依賴關係。例如,如圖2-20 左側所示,對於map,filter 變換操作就屬於窄依賴,由map 變換產生的子RDD 中的某個數據分片只對父RDD相應的數據分片有依賴,這就是一種窄依賴關係。但是對於groupByKey 變換操作,由於每個子RDD 中的數據分片對父RDD 中的所有數據分片都有依賴,所以這是一種寬依賴關係。
DPark 和Spark 對父子RDD 依賴關係進行分類的特性,主要是為了針對不同的依賴類型使用不同的任務調度機制和數據容錯機制,從而更加高效地進行計算。對於窄依賴關係,可以在計算節點上根據父RDD 中的數據分片計算得到子RDD 中相應的數據分片。對於寬依賴,意味着子RDD 中的數據分片的計算需要在父RDD 中的所有數據分片計算完成的情況下才可以進行。而且對於窄依賴來説,數據丟失或者出錯所需要的恢復時間要比寬依賴少很多,因為對於窄依賴來説,只有丟失的那些數據分片需要重新計算,而對於寬依賴,則需要對丟失的數據分片的所有祖先RDD 重新計算一遍。所以,DPark 和Spark 建議,對於有長“血統”鏈特別是存在寬依賴的情況下,需要在適當的時間設置一個數據檢查點來避免過長的數據恢復時間。
  1. 6. 任務調度機制
在DPark 實現中,設計者試圖利用RDD 的特性為所有的RDD 操作找到一種最有效的執行策略,任務調度器提供一個runJob 接口給RDD 使用,該接口接收三個參數,分別是RDD 對象本身、感興趣的部分數據分片和數據分片上的操作函數。當RDD 需要執行一個操作的時候,例如,對RDD 執行count、collect 等操作,DPark 就會調用runJob 接口來執行並行計算。
從總體來講,DPark 的任務調度機制很像Dryad 的任務調度方法,其區別在於DPark 在進行任務調度的時候會考慮RDD 的哪些數據分片需要緩存在集羣的哪些計算節點上,選擇的過程是這樣實現的,首先,RDD 根據自身攜帶的“血統”序列信息創建出一些階段stage,每個階段會盡可能多地包含可以連續運行的變換,即基於窄依賴的變換,一個stage 的邊界是那些需要在節點之間移動數據的寬依賴變換,或者是那些已經被緩存的RDD。在如圖1 所示的整體計算的示例中表示了階段的分割,只有父階段完成了計算,子階段的計算才能開始,運行的時候每個數據分片分配一個任務,沒有父子關係的階段之間可以並行計算。圖1 中的Stage 1 和Stage 2 就可以並行計算,Stage 3 必須在Stage 1 和Stage2 計算完成後才能開始。
在進行具體任務分配的時候,調度器會根據數據本地化的原則來分配計算任務到計算節點,例如,某個計算任務需要訪問一個已經被緩存的數據分片,那麼調度器就將該任務分配給緩存有該數據分片的計算節點來執行,這種調度策略可以最大程度地減少集羣網絡帶寬的佔用,降低集羣節點通信代價。
圖1  DPark 中的任務調度機制 圖1 DPark 中的任務調度機制
如果一個計算任務執行失敗,但是它所在階段的父階段的數據沒有丟失,那麼調度器可以調度該任務到其他計算節點上重新運行。如果它所在的父階段已經不可用,那麼調度器需要重新提交父階段中的所有需要重新計算的任務。
  1. 7. 共享變量的實現機制
共享變量是DPark 中除了RDD 外的另外一個重要的概念,目前在DPark中支持兩種類型的共享變量:廣播變量和累加器。下面簡單介紹兩種共享變量的實現機制。
(1)廣播變量的實現
在 DPark 中,當在程序中調用RDD 上的map、filter 等變換操作時,會傳遞一個變換函數給這些變換操作,DPark 在實際運行該函數的時候需要將該函數所需要的閉包序列化後通過網絡傳送到計算節點上,在計算節點上計算函數就可以調用函數域的變量、函數等。但是如果閉包中的函數需要訪問一個較大的數據集,那麼序列化閉包並在集羣中傳送的方式所花費的代價就很高。考慮到這種情況,DPark 支持使用廣播變量,從而使用户可以一次性將信息發送到所有集羣的計算節點上。目前實現的算法有基於文件系統的廣播算法和樹形廣播算法。
在分佈式文件系統廣播算法中,當用户為變量v 創建一個廣播變量bv 的時候,DPark 會為其創建一個唯一的廣播ID,然後將變量v 序列化後存儲在文件系統中的一個文件裏。當傳遞閉包的時候,DPark 會將bv 而不是v 序列化後傳遞過去,而廣播變量bv 的序列化結果就是廣播ID。當用户使用bv.value 中v的值時候,DPark 會在緩存中檢查v 是否存在,如果存在,直接使用,如果不存在,就將對應的文件反序列化後提供給計算節點使用。
在樹形廣播算法中,當用户為變量v 創建一個廣播變量bv 時,DPark 也會給該廣播變量創建一個唯一的廣播ID,但是不會將v 序列化後存放到文件系統的某一個文件中,而是利用zeromq 在master 上綁定一個端口,當worker 需要讀取真實數據的時候,如果本地緩存中沒有該數據,就訪問master 節點請求數據,當一個worker 獲取了數據,就可以將該數據傳播給其他worker,其傳播過程是一個N 叉樹形。
兩種算法中,一般來説,樹形廣播算法會更快一些,因為樹形廣播算法只有內存和網絡I/O 操作,而分佈式文件系統算法不僅有內存和網絡I/O 操作,還有磁盤操作,所以相對來説,速度會慢一些。在Spark 實現中,除了上述兩種廣播算法,還支持P2P 的廣播算法,但是DPark 目前還沒有實現該算法。
(2)累加器的實現
在 DPark 中創建累加器的時候,需要提供accumulator.AccumulatorParam這樣一個對象。該對象中有兩個值,分別代表累加器的0 值和加法操作定義。例如,對於數值型累加器,0 值就是數值0,加法定義就是普通的數值加法。但是對於列表型累加器,0 值就是空的[ ],加法就是extend 函數。當累加器被創建的時候,DPark 會給累加器創建一個唯一的ID,當累加器隨着計算任務被分配到計算節點上,在節點上會創建值為0 的累加器的副本,在計算節點上對累加器進行的任何操作都是在該副本上的操作,不會傳遞到主程序中的累加器上。當任務完成返回後,DPark 調度器會將這個副本累加器和主程序中的累加器進行合併,在所有的計算任務完成後,才能得到最終累加器的正確值。