rdd緩存
Ⅰ Spark核心-RDD
RDD是Spark中的數據抽象,全稱 彈性分布式數據集(Resilient Distributed Datasets) 。RDD可以理解為將一個大的數據集合以分布式的形式保存在集群伺服器的內存中。RDD是一個容錯的、並行的數據結構,可以讓用戶顯式地將數據存儲到磁碟和內存中,並能控制數據的分區。
RDD是Spark的核心,也是整個Spark的架構基礎。
RDD的特點:
RDD的5個主要屬性:
可以通過兩種方式創建RDD:
轉換操作指的是在原RDD實例上進行計算,然後創建一個新的RDD實例。
RDD中的所有的轉換操作都是 惰性 的,在執行RDD的轉換操作的時候,並不會直接計算結果,而是記住這些應用到基礎數據集上的轉換動作,只有行動操作時,這些轉換才會真正的去執行。這樣設計的好處是更加有效率的運行。
行動操作指的是向驅動器程序返回結果或把結果寫入外部系統的操作。
Spark在調用RDD的行動操作的時候,會觸發Spark中的連鎖反應。當調用的行動操作的時候,Spark會嘗試創建作為調用者的RDD。如果這個RDD是從文件中創建的,那麼Spark會在worker節點上讀取文件至內存中。如果這個RDD是通過其他RDD的轉換得到的,Spark會嘗試創建其父RDD。這個過程會一直持續下去,直到Spark找到根RDD。然後Spark就會真正執行這些生成RDD所必須的轉換計算。最後完成行動操作,將結果返回給驅動程序或者寫入外部存儲。
Spark速度非常快的原因之一,就是在不同操作中在內存中持久化一個數據集。當持久化一個RDD後,每一個節點都將把計算的分片結果保存在內存中,並在對此數據集進行的其他動作中重用。這使得後續的動作變得更加迅速。緩存是Spark構建迭代演算法和快速互動式查詢的關鍵。所以我們在開發過程中,對經常使用的RDD要進行緩存操作,以提升程序運行效率。
RDD緩存的方法
RDD類提供了兩種緩存方法:
cache方法其實是將RDD存儲在集群中Worker的內存中。
persist是一個通用的cache方法。它可以將RDD存儲在內存中或硬碟上或者二者皆有。
緩存的容錯
緩存是有可能丟失(如機器宕機),或者存儲於內存的數據由於內存不足而被刪除。RDD的緩存的容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基於RDD的一系列的轉換,丟失的數據會被重新計算。因為RDD的各個Partition是相對獨立的,所以在重新計算的時候只需要計算丟失部分Partition即可,不需要重新計算全部的Partition。因此,在一個緩存RDD的節點出現故障的時候,Spark會在另外的節點上自動重新創建出現故障的節點中存儲的分區。
RDD的緩存能夠在第一次計算完成後,將計算結果保存到內存、本地文件系統或者Tachyon中。通過緩存,Spark避免了RDD上的重復計算,能夠極大地提升計算速度。但是,如果緩存丟失了,則需要重新計算。如果計算特別復雜或者計算特別耗時,那麼緩存丟失對於整個Job的影響是不容忽視的。為了避免緩存丟失重新計算帶來的開銷,所以Spark引入了檢查點(checkpoint)機制。
緩存是在計算結束後,直接將計算結果通過用戶定義的存儲級別寫入不同的介質。而檢查點不同,它是在計算完成後,重新建立一個Job來計算。所以為了避免重復計算,推薦先將RDD緩存,這樣在進行檢查點操作時就可以快速完成。
Spark會根據用戶提交的計算邏輯中的RDD的轉換和動作來生動RDD之間的依賴關系,同時這個計算鏈也就生成了邏輯上的DAG。
RDD之間的依賴關系包括:
Spark中的依賴關系主要體現為兩種形式:
Ⅱ Spark內存管理詳解(下)——內存管理
彈性分布式數據集(RDD)作為Spark最根本的數據抽象,是只讀的分區記錄(Partition)的集合,只能基於在穩定物理存儲中的數據集上創建,或者在其他已有的RDD上執行轉換(Transformation)操作產生一個新的RDD。轉換後的RDD與原始的RDD之間產生的依賴關系,構成了血統(Lineage)。憑借血統,Spark保證了每一個RDD都可以被重新恢復。但RDD的所有轉換都是惰性的,即只有當一個返回結果給Driver的行動(Action)發生時,Spark才會創建任務讀取RDD,然後真正觸發轉換的執行。
Task在啟動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,如果沒有則需要檢查Checkpoint或按照血統重新計算。所以如果一個RDD上要執行多次行動,可以在第一次行動中使用persist或cache方法,在內存或磁碟中持久化或緩存這個RDD,從而在後面的行動時提升計算速度。事實上,cache方法是使用默認的MEMORY_ONLY的存儲級別將RDD持久化到內存,故緩存是一種特殊的持久化。 堆內和堆外存儲內存的設計,便可以對緩存RDD時使用的內存做統一的規劃和管理 (存儲內存的其他應用場景,如緩存broadcast數據,暫時不在本文的討論范圍之內)。
RDD的持久化由Spark的Storage模塊 [1] 負責,實現了RDD與物理存儲的解耦合。Storage模塊負責管理Spark在計算過程中產生的數據,將那些在內存或磁碟、在本地或遠程存取數據的功能封裝了起來。在具體實現時Driver端和Executor端的Storage模塊構成了主從式的架構,即Driver端的BlockManager為Master,Executor端的BlockManager為Slave。Storage模塊在邏輯上以Block為基本存儲單位,RDD的每個Partition經過處理後唯一對應一個Block(BlockId的格式為 rdd_RDD-ID_PARTITION-ID )。Master負責整個Spark應用程序的Block的元數據信息的管理和維護,而Slave需要將Block的更新等狀態上報到Master,同時接收Master的命令,例如新增或刪除一個RDD。
在對RDD持久化時,Spark規定了MEMORY_ONLY、MEMORY_AND_DISK等7種不同的 存儲級別 ,而存儲級別是以下5個變數的組合 [2] :
通過對數據結構的分析,可以看出存儲級別從三個維度定義了RDD的Partition(同時也就是Block)的存儲方式:
RDD在緩存到存儲內存之前,Partition中的數據一般以迭代器( Iterator )的數據結構來訪問,這是Scala語言中一種遍歷數據集合的方法。通過Iterator可以獲取分區中每一條序列化或者非序列化的數據項(Record),這些Record的對象實例在邏輯上佔用了JVM堆內內存的other部分的空間,同一Partition的不同Record的空間並不連續。
RDD在緩存到存儲內存之後,Partition被轉換成Block,Record在堆內或堆外存儲內存中佔用一塊連續的空間。 將Partition由不連續的存儲空間轉換為連續存儲空間的過程,Spark稱之為「展開」(Unroll) 。Block有序列化和非序列化兩種存儲格式,具體以哪種方式取決於該RDD的存儲級別。非序列化的Block以一種DeserializedMemoryEntry的數據結構定義,用一個數組存儲所有的Java對象,序列化的Block則以SerializedMemoryEntry的數據結構定義,用位元組緩沖區(ByteBuffer)來存儲二進制數據。每個Executor的Storage模塊用一個鏈式Map結構(LinkedHashMap)來管理堆內和堆外存儲內存中所有的Block對象的實例 [6] ,對這個LinkedHashMap新增和刪除間接記錄了內存的申請和釋放。
因為不能保證存儲空間可以一次容納Iterator中的所有數據,當前的計算任務在Unroll時要向MemoryManager申請足夠的Unroll空間來臨時佔位,空間不足則Unroll失敗,空間足夠時可以繼續進行。對於序列化的Partition,其所需的Unroll空間可以直接累加計算,一次申請。而非序列化的Partition則要在遍歷Record的過程中依次申請,即每讀取一條Record,采樣估算其所需的Unroll空間並進行申請,空間不足時可以中斷,釋放已佔用的Unroll空間。如果最終Unroll成功,當前Partition所佔用的Unroll空間被轉換為正常的緩存RDD的存儲空間,如下圖2所示。
在 《Spark內存管理詳解(上)——內存分配》 的圖3和圖5中可以看到,在靜態內存管理時,Spark在存儲內存中專門劃分了一塊Unroll空間,其大小是固定的,統一內存管理時則沒有對Unroll空間進行特別區分,當存儲空間不足是會根據動態佔用機制進行處理。
由於同一個Executor的所有的計算任務共享有限的存儲內存空間,當有新的Block需要緩存但是剩餘空間不足且無法動態佔用時,就要對LinkedHashMap中的舊Block進行淘汰(Eviction),而被淘汰的Block如果其存儲級別中同時包含存儲到磁碟的要求,則要對其進行落盤(Drop),否則直接刪除該Block。
存儲內存的淘汰規則為:
落盤的流程則比較簡單,如果其存儲級別符合 _useDisk 為true的條件,再根據其 _deserialized 判斷是否是非序列化的形式,若是則對其進行序列化,最後將數據存儲到磁碟,在Storage模塊中更新其信息。
Executor內運行的任務同樣共享執行內存,Spark用一個HashMap結構保存了任務到內存耗費的映射。每個任務可佔用的執行內存大小的范圍為 1/2N ~ 1/N ,其中N為當前Executor內正在運行的任務的個數。每個任務在啟動之時,要向MemoryManager請求申請最少為1/2N的執行內存,如果不能被滿足要求則該任務被阻塞,直到有其他任務釋放了足夠的執行內存,該任務才可以被喚醒。
執行內存主要用來存儲任務在執行Shuffle時佔用的內存,Shuffle是按照一定規則對RDD數據重新分區的過程,我們來看Shuffle的Write和Read兩階段對執行內存的使用:
在ExternalSorter和Aggregator中,Spark會使用一種叫AppendOnlyMap的哈希表在堆內執行內存中存儲數據,但在Shuffle過程中所有數據並不能都保存到該哈希表中,當這個哈希表佔用的內存會進行周期性地采樣估算,當其大到一定程度,無法再從MemoryManager申請到新的執行內存時,Spark就會將其全部內容存儲到磁碟文件中,這個過程被稱為溢存(Spill),溢存到磁碟的文件最後會被歸並(Merge)。
Shuffle Write階段中用到的Tungsten是Databricks公司提出的對Spark優化內存和CPU使用的計劃 [4] ,解決了一些JVM在性能上的限制和弊端。Spark會根據Shuffle的情況來自動選擇是否採用Tungsten排序。Tungsten採用的頁式內存管理機制建立在MemoryManager之上,即Tungsten對執行內存的使用進行了一步的抽象,這樣在Shuffle過程中無需關心數據具體存儲在堆內還是堆外。每個內存頁用一個MemoryBlock來定義,並用 Object obj 和 long offset 這兩個變數統一標識一個內存頁在系統內存中的地址。堆內的MemoryBlock是以long型數組的形式分配的內存,其 obj 的值為是這個數組的對象引用, offset 是long型數組的在JVM中的初始偏移地址,兩者配合使用可以定位這個數組在堆內的絕對地址;堆外的MemoryBlock是直接申請到的內存塊,其 obj 為null, offset 是這個內存塊在系統內存中的64位絕對地址。Spark用MemoryBlock巧妙地將堆內和堆外內存頁統一抽象封裝,並用頁表(pageTable)管理每個Task申請到的內存頁。
Tungsten頁式管理下的所有內存用64位的邏輯地址表示,由頁號和頁內偏移量組成:
有了統一的定址方式,Spark可以用64位邏輯地址的指針定位到堆內或堆外的內存,整個Shuffle Write排序的過程只需要對指針進行排序,並且無需反序列化,整個過程非常高效,對於內存訪問效率和CPU使用效率帶來了明顯的提升 [5] 。
Spark的存儲內存和執行內存有著截然不同的管理方式:對於存儲內存來說,Spark用一個LinkedHashMap來集中管理所有的Block,Block由需要緩存的RDD的Partition轉化而成;而對於執行內存,Spark用AppendOnlyMap來存儲Shuffle過程中的數據,在Tungsten排序中甚至抽象成為頁式內存管理,開辟了全新的JVM內存管理機制。
Spark的內存管理是一套復雜的機制,且Spark的版本更新比較快,筆者水平有限,難免有敘述不清、錯誤的地方,若讀者有好的建議和更深的理解,還望不吝賜教。