spark訪問高德
⑴ 大數據分析Apache Spark的應用實例
在考慮Hadoop生態系統中的各種引擎時,重要的是要了解每個引擎在某些用例下效果最佳,並且企業可能需要使用多種工具組合才能滿足每個所需的用例。話雖如此,這里是對Apache Spark的一些頂級用例的回顧。
一、流數據
Apache Spark的關鍵用例是其處理流數據的能力。由於每天要處理大量數據,因此對於公司而言,實時流傳輸和分析數據變得至關重要。Spark Streaming具有處理這種額外工作負載的能力。一些專家甚至認為,無論哪種類型,Spark都可以成為流計算應用程序的首選平台。提出此要求的原因是,Spark Streaming統一了不同的數據處理功能,從而使開發人員可以使用單個框架來滿足其所有處理需求。
當今企業使用Spark Streaming的一般方式包括:
1、流式ETL –在數據倉庫環境中用於批處理的傳統ETL(提取,轉換,載入)工具必須讀取數據,將其轉換為資料庫兼容格式,然後再將其寫入目標資料庫。使用Streaming ETL,在將數據推送到數據存儲之前,將對其進行連續的清理和聚合。
2、數據充實 –這種Spark Streaming功能通過將實時數據與靜態數據相結合來充實實時數據,從而使組織能夠進行更完整的實時數據分析。在線廣告商使用數據充實功能將歷史客戶數據與實時客戶行為數據結合起來,並根據客戶的行為實時提供更多個性化和針對性的廣告。
3、觸發事件檢測 – Spark Streaming使組織可以檢測到可能對系統內部潛在嚴重問題的罕見或異常行為(「觸發事件」)並做出快速響應。金融機構使用觸發器來檢測欺詐性交易並阻止其欺詐行為。醫院還使用觸發器來檢測潛在的危險健康變化,同時監視患者的生命體征-向正確的護理人員發送自動警報,然後他們可以立即採取適當的措施。
4、復雜的會話分析 –使用Spark Streaming,與實時會話有關的事件(例如登錄網站或應用程序後的用戶活動)可以組合在一起並進行快速分析。會話信息還可以用於不斷更新機器學習模型。諸如Netflix之類的公司使用此功能可立即了解用戶在其網站上的參與方式,並提供更多實時電影推薦。
二、機器學習
許多Apache Spark用例中的另一個是它的機器學習功能。
Spark帶有用於執行高級分析的集成框架,該框架可幫助用戶對數據集進行重復查詢,這從本質上講就是處理機器學習演算法。在此框架中找到的組件包括Spark的可擴展機器學習庫(MLlib)。MLlib可以在諸如聚類,分類和降維等領域中工作。所有這些使Spark可以用於一些非常常見的大數據功能,例如預測智能,用於營銷目的的客戶細分以及情感分析。使用推薦引擎的公司將發現Spark可以快速完成工作。
網路安全是Spark 機器學習功能的一個很好的商業案例。通過使用Spark堆棧的各種組件,安全提供程序可以對數據包進行實時檢查,以發現惡意活動的痕跡。在前端,Spark Streaming允許安全分析人員在將數據包傳遞到存儲平台之前檢查已知威脅。到達存儲區後,數據包將通過其他堆棧組件(例如MLlib)進行進一步分析。因此,安全提供商可以在不斷發展的過程中了解新的威脅-始終領先於黑客,同時實時保護其客戶。
三、互動分析
Spark最顯著的功能之一就是其互動式分析功能。MapRece是為處理批處理而構建的,而Hive或Pig等SQL-on-Hadoop引擎通常太慢,無法進行互動式分析。但是,Apache Spark足夠快,可以執行探索性查詢而無需采樣。Spark還與包括SQL,R和Python在內的多種開發語言介面。通過將Spark與可視化工具結合使用,可以交互地處理和可視化復雜的數據集。
下一版本的Apache Spark(Spark 2.0)將於今年的4月或5月首次亮相,它將具有一項新功能- 結構化流 -使用戶能夠對實時數據執行互動式查詢。通過將實時流與其他類型的數據分析相結合,預計結構化流將通過允許用戶針對Web訪問者當前會話運行互動式查詢來促進Web分析。它也可以用於將機器學習演算法應用於實時數據。在這種情況下,將對舊數據進行演算法訓練,然後將其重定向以合並新的數據,並在其進入內存時從中學習。
四、霧計算
盡管大數據分析可能會引起廣泛關注,但真正激發技術界想像力的概念是物聯網(IoT)。物聯網通過微型感測器將對象和設備嵌入在一起,這些微型感測器彼此之間以及與用戶進行通信,從而創建了一個完全互連的世界。這個世界收集了大量數據,對其進行處理,並提供革命性的新功能和應用程序供人們在日常生活中使用。但是,隨著物聯網的擴展,對大量,種類繁多的機器和感測器數據進行大規模並行處理的需求也隨之增加。但是,利用雲中的當前分析功能很難管理所有這些處理。
那就是霧計算和Apache Spark出現的地方。
霧計算將數據處理和存儲分散化,而不是在網路邊緣執行這些功能。但是,霧計算為處理分散數據帶來了新的復雜性,因為它越來越需要低延遲,機器學習的大規模並行處理以及極其復雜的圖形分析演算法。幸運的是,有了Spark Streaming等關鍵堆棧組件,互動式實時查詢工具(Shark),機器學習庫(MLib)和圖形分析引擎(GraphX),Spark不僅具有霧計算解決方案的資格。實際上,隨著物聯網行業逐漸不可避免地融合,許多行業專家預測,與其他開源平台相比,Spark有可能成為事實上的霧基礎設施。
現實世界中的火花
如前所述,在線廣告商和諸如Netflix之類的公司正在利用Spark獲得見識和競爭優勢。其他也從Spark受益的著名企業是:
Uber –這家跨國在線計程車調度公司每天都從其移動用戶那裡收集TB級的事件數據。通過使用Kafka,Spark Streaming和HDFS構建連續的ETL管道,Uber可以在收集原始非結構化事件數據時將其轉換為結構化數據,然後將其用於進一步和更復雜的分析。
Pinterest –通過類似的ETL管道,Pinterest可以利用Spark Streaming即時了解世界各地的用戶如何與Pins互動。因此,當人們瀏覽站點並查看相關的圖釘時,Pinterest可以提出更相關的建議,以幫助他們選擇食譜,確定要購買的產品或計劃前往各個目的地的行程。
Conviva –這家流媒體視頻公司每月平均約有400萬個視頻供稿,僅次於YouTube。Conviva使用Spark通過優化視頻流和管理實時視頻流量來減少客戶流失,從而保持一致的流暢,高質量的觀看體驗。
何時不使用Spark
盡管它具有通用性,但這並不一定意味著Apache Spark的內存中功能最適合所有用例。更具體地說,大數據分析Apache Spark的應用實例Spark並非設計為多用戶環境。Spark用戶需要知道他們有權訪問的內存對於數據集是否足夠。添加更多的用戶使此操作變得更加復雜,因為用戶必須協調內存使用量才能同時運行項目。由於無法處理這種類型的並發,用戶將需要為大型批處理項目考慮使用備用引擎,例如Apache Hive。
隨著時間的流逝,Apache Spark將繼續發展自己的生態系統,變得比以前更加通用。在大數據已成為規范的世界中,組織將需要找到最佳方式來利用它。從這些Apache Spark用例可以看出,未來幾年將有很多機會來了解Spark的真正功能。
隨著越來越多的組織認識到從批處理過渡到實時數據分析的好處,Apache Spark的定位是可以在眾多行業中獲得廣泛而快速的採用。
⑵ Spark內存管理詳解(下)——內存管理
彈性分布式數據集(RDD)作為Spark最根本的數據抽象,是只讀的分區記錄(Partition)的集合,只能基於在穩定物理存儲中的數據集上創建,或者在其他已有的RDD上執行轉換(Transformation)操作產生一個新的RDD。轉換後的RDD與原始的RDD之間產生的依賴關系,構成了血統(Lineage)。憑借血統,Spark保證了每一個RDD都可以被重新恢復。但RDD的所有轉換都是惰性的,即只有當一個返回結果給Driver的行動(Action)發生時,Spark才會創建任務讀取RDD,然後真正觸發轉換的執行。
Task在啟動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,如果沒有則需要檢查Checkpoint或按照血統重新計算。所以如果一個RDD上要執行多次行動,可以在第一次行動中使用persist或cache方法,在內存或磁碟中持久化或緩存這個RDD,從而在後面的行動時提升計算速度。事實上,cache方法是使用默認的MEMORY_ONLY的存儲級別將RDD持久化到內存,故緩存是一種特殊的持久化。 堆內和堆外存儲內存的設計,便可以對緩存RDD時使用的內存做統一的規劃和管理 (存儲內存的其他應用場景,如緩存broadcast數據,暫時不在本文的討論范圍之內)。
RDD的持久化由Spark的Storage模塊 [1] 負責,實現了RDD與物理存儲的解耦合。Storage模塊負責管理Spark在計算過程中產生的數據,將那些在內存或磁碟、在本地或遠程存取數據的功能封裝了起來。在具體實現時Driver端和Executor端的Storage模塊構成了主從式的架構,即Driver端的BlockManager為Master,Executor端的BlockManager為Slave。Storage模塊在邏輯上以Block為基本存儲單位,RDD的每個Partition經過處理後唯一對應一個Block(BlockId的格式為 rdd_RDD-ID_PARTITION-ID )。Master負責整個Spark應用程序的Block的元數據信息的管理和維護,而Slave需要將Block的更新等狀態上報到Master,同時接收Master的命令,例如新增或刪除一個RDD。
在對RDD持久化時,Spark規定了MEMORY_ONLY、MEMORY_AND_DISK等7種不同的 存儲級別 ,而存儲級別是以下5個變數的組合 [2] :
通過對數據結構的分析,可以看出存儲級別從三個維度定義了RDD的Partition(同時也就是Block)的存儲方式:
RDD在緩存到存儲內存之前,Partition中的數據一般以迭代器( Iterator )的數據結構來訪問,這是Scala語言中一種遍歷數據集合的方法。通過Iterator可以獲取分區中每一條序列化或者非序列化的數據項(Record),這些Record的對象實例在邏輯上佔用了JVM堆內內存的other部分的空間,同一Partition的不同Record的空間並不連續。
RDD在緩存到存儲內存之後,Partition被轉換成Block,Record在堆內或堆外存儲內存中佔用一塊連續的空間。 將Partition由不連續的存儲空間轉換為連續存儲空間的過程,Spark稱之為「展開」(Unroll) 。Block有序列化和非序列化兩種存儲格式,具體以哪種方式取決於該RDD的存儲級別。非序列化的Block以一種DeserializedMemoryEntry的數據結構定義,用一個數組存儲所有的Java對象,序列化的Block則以SerializedMemoryEntry的數據結構定義,用位元組緩沖區(ByteBuffer)來存儲二進制數據。每個Executor的Storage模塊用一個鏈式Map結構(LinkedHashMap)來管理堆內和堆外存儲內存中所有的Block對象的實例 [6] ,對這個LinkedHashMap新增和刪除間接記錄了內存的申請和釋放。
因為不能保證存儲空間可以一次容納Iterator中的所有數據,當前的計算任務在Unroll時要向MemoryManager申請足夠的Unroll空間來臨時佔位,空間不足則Unroll失敗,空間足夠時可以繼續進行。對於序列化的Partition,其所需的Unroll空間可以直接累加計算,一次申請。而非序列化的Partition則要在遍歷Record的過程中依次申請,即每讀取一條Record,采樣估算其所需的Unroll空間並進行申請,空間不足時可以中斷,釋放已佔用的Unroll空間。如果最終Unroll成功,當前Partition所佔用的Unroll空間被轉換為正常的緩存RDD的存儲空間,如下圖2所示。
在 《Spark內存管理詳解(上)——內存分配》 的圖3和圖5中可以看到,在靜態內存管理時,Spark在存儲內存中專門劃分了一塊Unroll空間,其大小是固定的,統一內存管理時則沒有對Unroll空間進行特別區分,當存儲空間不足是會根據動態佔用機制進行處理。
由於同一個Executor的所有的計算任務共享有限的存儲內存空間,當有新的Block需要緩存但是剩餘空間不足且無法動態佔用時,就要對LinkedHashMap中的舊Block進行淘汰(Eviction),而被淘汰的Block如果其存儲級別中同時包含存儲到磁碟的要求,則要對其進行落盤(Drop),否則直接刪除該Block。
存儲內存的淘汰規則為:
落盤的流程則比較簡單,如果其存儲級別符合 _useDisk 為true的條件,再根據其 _deserialized 判斷是否是非序列化的形式,若是則對其進行序列化,最後將數據存儲到磁碟,在Storage模塊中更新其信息。
Executor內運行的任務同樣共享執行內存,Spark用一個HashMap結構保存了任務到內存耗費的映射。每個任務可佔用的執行內存大小的范圍為 1/2N ~ 1/N ,其中N為當前Executor內正在運行的任務的個數。每個任務在啟動之時,要向MemoryManager請求申請最少為1/2N的執行內存,如果不能被滿足要求則該任務被阻塞,直到有其他任務釋放了足夠的執行內存,該任務才可以被喚醒。
執行內存主要用來存儲任務在執行Shuffle時佔用的內存,Shuffle是按照一定規則對RDD數據重新分區的過程,我們來看Shuffle的Write和Read兩階段對執行內存的使用:
在ExternalSorter和Aggregator中,Spark會使用一種叫AppendOnlyMap的哈希表在堆內執行內存中存儲數據,但在Shuffle過程中所有數據並不能都保存到該哈希表中,當這個哈希表佔用的內存會進行周期性地采樣估算,當其大到一定程度,無法再從MemoryManager申請到新的執行內存時,Spark就會將其全部內容存儲到磁碟文件中,這個過程被稱為溢存(Spill),溢存到磁碟的文件最後會被歸並(Merge)。
Shuffle Write階段中用到的Tungsten是Databricks公司提出的對Spark優化內存和CPU使用的計劃 [4] ,解決了一些JVM在性能上的限制和弊端。Spark會根據Shuffle的情況來自動選擇是否採用Tungsten排序。Tungsten採用的頁式內存管理機制建立在MemoryManager之上,即Tungsten對執行內存的使用進行了一步的抽象,這樣在Shuffle過程中無需關心數據具體存儲在堆內還是堆外。每個內存頁用一個MemoryBlock來定義,並用 Object obj 和 long offset 這兩個變數統一標識一個內存頁在系統內存中的地址。堆內的MemoryBlock是以long型數組的形式分配的內存,其 obj 的值為是這個數組的對象引用, offset 是long型數組的在JVM中的初始偏移地址,兩者配合使用可以定位這個數組在堆內的絕對地址;堆外的MemoryBlock是直接申請到的內存塊,其 obj 為null, offset 是這個內存塊在系統內存中的64位絕對地址。Spark用MemoryBlock巧妙地將堆內和堆外內存頁統一抽象封裝,並用頁表(pageTable)管理每個Task申請到的內存頁。
Tungsten頁式管理下的所有內存用64位的邏輯地址表示,由頁號和頁內偏移量組成:
有了統一的定址方式,Spark可以用64位邏輯地址的指針定位到堆內或堆外的內存,整個Shuffle Write排序的過程只需要對指針進行排序,並且無需反序列化,整個過程非常高效,對於內存訪問效率和CPU使用效率帶來了明顯的提升 [5] 。
Spark的存儲內存和執行內存有著截然不同的管理方式:對於存儲內存來說,Spark用一個LinkedHashMap來集中管理所有的Block,Block由需要緩存的RDD的Partition轉化而成;而對於執行內存,Spark用AppendOnlyMap來存儲Shuffle過程中的數據,在Tungsten排序中甚至抽象成為頁式內存管理,開辟了全新的JVM內存管理機制。
Spark的內存管理是一套復雜的機制,且Spark的版本更新比較快,筆者水平有限,難免有敘述不清、錯誤的地方,若讀者有好的建議和更深的理解,還望不吝賜教。