hadoop存儲小文件
❶ Hadoop HDFS處理大量的小文件
小文件是指文件大小明顯小於HDFS上塊(block)大小(默認64MB)的文件。如果存儲小文件,必定會有大量這樣的小文件,否則你也不會使用Hadoop(If you』re storing small files, then you probably have lots of them
(otherwise you wouldn』t turn to Hadoop)),這樣的文件給hadoop的擴展性和性能帶來嚴重問題。當一個文件的大小小於HDFS的塊大小(默認64MB),就將認定為小文件否則就是大文件。為了檢測輸入文件的大小,可以瀏覽Hadoop DFS 主頁 http://machinename:50070/dfshealth.jsp ,並點擊Browse filesystem(瀏覽文件系統)。
首先,在HDFS中,任何一個文件,目錄或者block在NameNode節點的內存中均以一個對象表示(元數據)(Every file, directory and block in HDFS is represented as an object in the namenode』s memory),而這受到NameNode物理內存容量的限制。每個元數據對象約佔150byte,所以如果有1千萬個小文件,每個文件佔用一個block,則NameNode大約需要15G空間。如果存儲1億個文件,則NameNode需要150G空間,這毫無疑問1億個小文件是不可取的。
其次,處理小文件並非Hadoop的設計目標,HDFS的設計目標是流式訪問大數據集(TB級別)。因而,在HDFS中存儲大量小文件是很低效的。訪問大量小文件經常會導致大量的尋找,以及不斷的從一個DatanNde跳到另一個DataNode去檢索小文件(Reading through small files normally causes lots of seeks and lots of hopping from datanode to datanode to retrieve each small file),這都不是一個很有效的訪問模式,嚴重影響性能。
最後,處理大量小文件速度遠遠小於處理同等大小的大文件的速度。每一個小文件要佔用一個slot,而task啟動將耗費大量時間甚至大部分時間都耗費在啟動task和釋放task上。
Hadoop存檔文件系統通常將HDFS中的多個文件打包成一個存檔文件,減少namenode內存的使用
hadoop archive命令創建HAR文件
from:https://blog.csdn.net/sunnyyoona/article/details/53870077
❷ hadoop怎麼保存配置文件
從安裝Hadoop 單機版的配置說起
在這里配置了Hadoop的文件目錄
1. 啟動Hadoop 並上傳文件
上傳文件命令:hadoop fs -put hadoop-2.9.2.tar.gz hdfs://hdgroup01:9000/
可以看到已經上傳了文件了
2. 刪除本地文件
如果刪除後你在瀏覽器刷新頁面 然後還能下載 說明 肯定存在其則哪寬他地方
使用命令下載也可以:hadoop fs -get hdfs://hdgroup01:9000/hadoop-2.9.2.tar.gz
3. hdfs 文件存放的位置
通過查看緩液 安裝時配置的hadoop文件目錄 上傳文件後的結果
這個dfs 應該見過的 就是在格式化namenode的時候 在我們配置的文件中創建了dfs 文件夾
4. 上傳一個349.5M 的文件
5. 進到hadoop 本地存儲文件中查看 存儲格式
上傳的文件為349.47M 被切分成了 三塊 因為是單機版 所以都存放到了 這台主機的文件系統中
可以看到 hadoop 塊大小為128M(默認) 超過128M的文件會被切成不同的塊存放
總結
1. hdfs是是通過分布式集群來存儲文件,為客戶端提供了一個便捷的訪問方式 虛擬的目錄結構
2. 文件存儲到hdfs集群中的時候是被切分成block的 默認為128M
3. 文件的block 存放在若乾颱datanode節點中
4. hdfs文件系統中的文件與真實的block之間有映射關系,由namenode管理
5. 每個block 在集孫亮群中會存儲多個副本,好處是可以提高數據的可靠性,還可以提高訪問的吞吐量。
❸ hadoop實驗如果上傳的是130M的文件,會放在幾個塊中
這個要看你用的hadoop版本,要是沒記錯2.7.2之前默認一個塊是64MB,同時默岩備認副本是3個,所以130M會存放在9個塊中,如果是2.7.2之後,默認一個塊祥卜是128MB,默認副本數依然是3個,所以130M會放在6個塊中。可以看到hdfs上最害怕的就是存放小文件,會很浪謹棗穗費空間,1k的文件也會佔用一個塊。
❹ 弱弱地問下:Hadoop為什麼就不適合處理小文件
因為namenode在內存中存儲hdfs中的文件信息。每個文件、目錄或分區(block)需要大約150B,所以如果有很多小文件,那麼缺唯namenode的內存將會承擔橡扮判很大壓力。比如有100萬個文件,每個文件一個block,那麼這就需要300M內存。若梁改文件數量達到十億級,則沒有足夠大的內存來應付它了。
❺ 小文件會在hadoop里佔用更多的磁碟空間嗎
64M為一個block,採用HDFS,使用的空間根據你磁碟的大小,也可以在配置文件中設置的!
❻ 關於Hadoop文件塊的大小
HDFS中的文件在物理上是分塊存儲的(Block),塊的大小可以通過配置參數( dfs.blocksize )來設置,默認的塊大小在Hadoop2.x版本中是 128M ,Hadoop1.x版本中是 64M 。
Tips:硬碟平均定址時間的計算:
❼ Hadoop的優點和缺點是什麼
Hadoop的優點:
1、Hadoop具有按位存儲和處理數據能力的高可靠性。
2、Hadoop通過可用的計算機集群分配數據,完成存儲和計算任務,這些集群可以方便地擴展到數以千計的節點中,具有高擴展性。
3、Hadoop能夠在節點之間進行動態地移動數據,並保證各個節點的動態平衡,處理速度非常快,具有高效性。
4、Hadoop能夠自動保存數據的多個副本,並且能夠自動將失敗的任務重新分配,具有高容錯性。
Hadoop的缺點:
1、Hadoop不適用於低延遲數據訪問。
2、Hadoop不能高效存儲大量小文件。
3、Hadoop不支持多用戶寫入並任意修改文件。
關於大數據技術Hadoop學習哪些內容,青藤小編就和您分享到這里了。如果您對大數據工程有濃厚的興趣,希望這篇文章可以為您提供幫助。如果您還想了解更多關於數據分析師、大數據工程師的技巧及素材等內容,可以點擊本站的其他文章進行學習。
❽ 在hadoop項目結構中h dfs指的是什麼
HDFS(Hadoop Distributed File System)是Hadoop項目的核心子項目,是分布式計算中數據存儲管理的基礎,是基於流數據模式訪問和處理超大文件的需求而開發的,可以運行於廉價的商用伺服器上。
HDFS 具有以下優點:
1、高容錯性
數據自動保存多個副本。它通過增加副本的形式,提高容錯性。某一個副本丟失以後,它可以自動恢復,這是由 HDFS 內部機制實現的,我們不必關心。
2、適合批處理
它是通過移動計算而不是移動數據。它會把數據位置暴露給計算框架。
3、適合大數據處理
處理數據達到 GB、TB、甚至PB級別的數據。能夠處理百萬規模以上的文件數量,數量相當之大。能夠處理10K節點的規模。
4、流式文件訪問
一次寫入,多次讀取。文件一旦寫入不能修改,只能追加。它能保證數據的一致性。
5、可構建在廉價機器上
它通過多副本機制,提高可靠性。它提供了容錯和恢復機制。比如某一個副本丟失,可以通過其它副本來恢復。
HDFS 也有它的劣勢,並不適合所有的場合:
1、低延時數據訪問
比如毫秒級的來存儲數據,這是不行的,它做不到。它適合高吞吐率的場景,就是在某一時間內寫入大量的數據。但是它在低延時的情況下是不行的,比如毫秒級以內讀取數據,這樣它是很難做到的。
2、小文件存儲
存儲大量小文件(這里的小文件是指小於HDFS系統的Block大小的文件(默認64M))的話,它會佔用 NameNode大量的內存來存儲文件、目錄和塊信息。這樣是不可取的,因為NameNode的內存總是有限的。
3、並發寫入、文件隨機修改
一個文件只能有一個寫,不允許多個線程同時寫。僅支持數據 append(追加),不支持文件的隨機修改。
❾ hadoop文件格式和壓縮
Hadoop中的文件格式大致上分為面向行和面向列兩類:
面向行:TextFile、SequenceFile、MapFile、Avro Datafile
二進制格式文件大小比文本文件大。
生產環境常用,作為原始表的存儲格式,會佔用更多磁碟資源,對它的 解析開銷一般會比二進制格式高 幾十倍以上。
Hadoop API 提供的一種二進制文件,它將數據以<key,value>的形式序列化到文件中。這種二進制文件內部使用Hadoop 的標準的Writable 介面實現序列化和反序列化。它與Hadoop API中的MapFile 是互相兼容的。
MapFile即為排序後的SequeneceFile,它會額外生成一個索引文件提供按鍵的查找。文件不支持復寫操作,不能向已存在的SequenceFile(MapFile)追加存儲記錄,在執行文件寫操作的時候,該文件是不可讀取的。
Avro是一種用於支持數據密集型的二進制文件格式。它的文件格式更為緊湊,若要讀取大量數據時,Avro能夠提供更好的序列化和反序列化性能。並且Avro數據文件天生是帶Schema定義的,所以它不需要開發者在API 級別實現自己的Writable對象。最近多個Hadoop 子項目都支持Avro 數據格式,如Pig 、Hive、Flume、Sqoop和Hcatalog。
面向列:Parquet 、RCFile、ORCFile
RCFile是Hive推出的一種專門面向列的數據格式。 它遵循「先按列劃分,再垂直劃分」的設計理念。當查詢過程中,針對它並不關心的列時,它會在IO上跳過這些列。
ORCFile (Optimized Record Columnar File)提供了一種比RCFile更加高效的文件格式。其內部將數據劃分為默認大小為250M的Stripe。每個Stripe包括索引、數據和Footer。索引存儲每一列的最大最小值,以及列中每一行的位置。
Parquet 是一種支持嵌套結構的列式存儲格式。Parquet 的存儲模型主要由行組(Row Group)、列塊(Column Chuck)、頁(Page)組成。
1、行組,Row Group:Parquet 在水平方向上將數據劃分為行組,默認行組大小與 HDFS Block 塊大小對齊,Parquet 保證一個行組會被一個 Mapper 處理。
2、列塊,Column Chunk:行組中每一列保存在一個列塊中,一個列塊具有相同的數據類型,不同的列塊可以使用不同的壓縮。
3、頁,Page:Parquet 是頁存儲方式,每一個列塊包含多個頁,一個頁是最小的編碼的單位,同一列塊的不同頁可以使用不同的編碼方式。
一般原始表數據使用文本格式存儲,其他的都是列式存儲。
目前在Hadoop中常用的幾種壓縮格式:lzo,gzip,snappy,bzip2,主要特性對比如下:
其性能對比如下:
2.1 lzo
hadoop中最流行的壓縮格式,壓縮/解壓速度也比較快,合理的壓縮率,支持split。適用於較大文本的處理。
對於lzo壓縮,常用的有LzoCodec和lzopCodec,可以對sequenceFile和TextFile進行壓縮。對TextFile壓縮後,mapred對壓縮後的文件默認是不能夠進行split操作,需要對該lzo壓縮文件進行index操作,生成lzo.index文件,map操作才可以進行split。如果設置LzoCodec,那麼就生成.lzo後綴的文件,可以用LzoIndexer 進行支持split的index計算,如果設置LzopCodec,那麼生成.lzo_deflate後綴的文件,不支持建立index。
❿ hadoop中存儲文件系統hdfs的冗餘機制是怎麼進行的有什麼特點
可以只用一行代碼來運行MapRece作業:JobClient.runJon(conf),Job作業運行時參與的四個實體:
1.JobClient 寫代碼,配置作業,提交作業。
2.JobTracker:初始化作業,分配作業,協調作業運行。這是一個java程序,主類是JobTracker。
3.TaskTracker:運行作業劃分後的任務,即分配數據分配上執行Map或Rece任務。
4.HDFS:保存作業數據、配置信息等,保存作業結果。
Map/Rece 作業總體執行流程:
代碼編寫 ----> 作業配置 ---->作業提交---->Map任務分配和執行---->處理中間結果----> Rece任務分配與執行----> 輸出結果
而對於每個作業的執行,又包含:
輸入准備---->任務執行---->輸出結果
作業提交JobClient:
JobClient的runJob方法產生一個Jobclient實例並調用其submitJob方法,然後runJob開始循環嗎,並在循環中調用getTaskCompetionEvents方法,獲得TaskCompletionEvent實例,每秒輪詢作業進度(後面有介紹進度和狀態更新),把進度寫到控制台,作業完成後顯示作業計數器,若失敗,則把錯誤記錄到控制台。
submitJob方法作業提交的過程:
1.向JobTracker請求一個新的JobId。
2.檢查作業相關路徑,如果路徑不正確就會返回錯誤。
3.計算作業輸入分片及其劃分信息。
4.將作業運行需要的資源(jar文件、配置文件等)復制到Shared HDFS,並
復制多個副本(參數控制,默認值為10)供tasktracker訪問,也會將計算的分片復制到HDFS。
5.調用JobTracker對象的submitJob()方法來真正提交作業,告訴JobTracker作業准備執行。
作業的初始化JobTracker:
JobTracker收到submitJob方法調用後,會把調用放入到一個內部隊列,由作業調度器(Job scheler)進行調度並對其初始化。Job初始化即創建一個作業對象。
當作業被調度後,JobTracker會創建一個代表這個作業的JobInProgress對象,並將任務和記錄信息封裝在這個對象中,以便跟蹤任務狀態和進程。
初始化過程就是JobInProgress對象的initTasks方法進行初始化的。
初始化步驟:
1.從HDFS中讀取作業對應的job.split信息,為後面的初始化做好准備。
2.創建並初始化map和rece任務。根據數據分片信息中的個數確定map task的個數,然後為每個map task生成一個TaskInProgress對象來處理數據分片,先將其放入nonRunningMapCache,以便JobTracker分配任務的時候使用。接下來根據JobConf中的mapred.rece.tasks屬性利用setNumReceTasks()方法設置rece task的數量,然後同map task創建方式。
3.最後就是創建兩個初始化task,進行map和rece的初始化。
任務的分配JobTracker:
消息傳遞HeartBeat: tasktracker運行一個簡單循環定期發送心跳(heartbeat)給JobTracker。由心跳告知JobTracker自己是否存活,同時作為消息通道傳遞其它信息(請求新task)。作為心跳的一部分,tasktracker會指明自己是否已准備好運行新的任務,如果是,jobtracker會分配它一個任務。
分配任務所屬於的作業:在Jobtracker分配任務前需先確定任務所在的作業。後面會介紹到各種作業調度演算法,默認是一個FIFO的作業調度。
分配Map和Rece任務:tasktracker有固定數量的任務槽,一個tasktracker可以同時運行多個Map和Rece任務,但其准確的數量由tasktracker的核的數量和內存大小決定。默認調度器會先填滿Map任務槽,再填Rece任務槽。jobtracker會選擇距離離分片文件最近的tasktracker,最理想情況下,任務是數據本地化(data-local)的,當然也可以是機架本地化(rack-local),如果不是本地化的,那麼他們就需要從其他機架上檢索數據。Rece任務分配很簡單,jobtracker會簡單的從待運行的rece任務列表中選取下一個來執行,不用考慮數據本地化。
任務的執行TaskTracker:
TaskTracker收到新任務後,就要在本地運行任務了,運行任務的第一步就是通過localizedJob將任務本地化所需要的注入配置、數據、程序等信息進行本地化。
1.本地化數據:從共享文件系統將job.split 、job.jar (在分布式緩存中)復制本地,將job配置信息寫入job.xml。
2.新建本地工作目錄:tasktracker會加壓job.jar文件到本工作目錄。
3.調用launchTaskForJob方法發布任務(其中會新建TaskRunner實例運行任務),如果是Map任務就啟用MapTaskRunner,對於Rece就是ReceTaskRunner。
在這之後,TaskRunner會啟用一個新的JVM來運行每個Map/Rece任務,防止程序原因而導致tasktracker崩潰,但不同任務間重用JVM還是可以的,後續會講到任務JVM重用。
對於單個Map,任務執行的簡單流程是:
1.分配任務執行參數
2.在Child臨時文件中添加map任務信息(Child是運行Map和Rece任務的主進程)
3.配置log文件夾,配置map任務的通信和輸出參數
4.讀取input split,生成RecordReader讀取數據
5.為Map生成MapRunnable,依次從RecordReader中接收數據,並調用Map函數進行處理。
6.最後將map函數的輸出調用collect收集到MapOutputBuffer(參數控制其大小)中。
Streaming和Pipes:
Streaming和Pipes都運行特殊的Map和Rece任務,目的是運行用戶提供的可執行程序並與之通信。
Streaming:使用標准輸入輸出Streaming與進程進行通信。
Pipes:用來監聽套接字,會發送一個埠號給C++程序,兩者便可建立鏈接。
進度和狀態更新:
一個作業和它的任務都有狀態(status),其中包括:運行成功失敗狀態、Map/Rece進度、作業計數器值、狀態消息。
狀態消息與客戶端的通信:
1.對於Map任務Progress的追蹤:progress是已經處理完的輸入所佔的比例。
2.對於Rece:稍復雜,rece任務分三個階段(每個階段佔1/3),復制、排序和Rece處理,若rece已執行一半的輸入的話,那麼任務進度便是1/3+1/3+1/6=5/6。
3.任務計數器:任務有一組計數器,負責對任務運行各個事件進行計數。
4.任務進度報告:如果任務報告了進度,便會設置一個標記以表明狀態將被發送到tasktracker。有一個獨立線程每隔三秒檢查一次此標記,如果已設置,則告知tasktracker當前狀態。
5.tasktracker進度報告:tasktracker會每隔5秒(這個心跳是由集群大小決定,集群越大時間會越長)發送heartbeat到jobtracker,並且tasktracker運行的所有狀態都會在調用中被發送到jobtracker。
6.jobtracker合並各任務報告:產生一個表明所有運行作業機器所含任務狀態的全局視圖。
前面提到的JobClient就是通過每秒查詢JobTracker來接收最新狀態,而且客戶端JobClient的getJob方法可以得到一個RunningJob的實例,其包含了作業的所以狀態信息。
作業的完成:
當jobtracker收到作業最後一個任務已完成的通知後,便把作業狀態設置成成功。JobClient查詢狀態時,便知道任務已成功完成,於是JobClient列印一條消息告知用戶,然後從runJob方法返回。
如果jobtracker有相應設置,也會發送一個Http作業通知給客戶端,希望收到回調指令的客戶端可以通過job.end.notification.url屬性來進行設置。
jobtracker情況作業的工作狀態,指示tasktracker也清空作業的工作狀態,如刪除中間輸出。
失敗
實際情況下,用戶的代碼存在軟體錯誤進程會崩潰,機器也會產生故障,但Hadoop能很好的應對這些故障並完成作業。
1.任務失敗
子任務異常:如Map/Rece任務中的用戶代碼拋出異常,子任務JVM進程會在退出前向父進程tasktracker發送錯誤報告,錯誤被記錄用戶日誌。tasktracker會將此次task attempt標記為tailed,並釋放這個任務槽運行另外一個任務。
子進程JVM突然退出:可能由於JVM bug導致用戶代碼造成的某些特殊原因導致JVM退出,這種情況下,tasktracker會注意到進程已經退出,並將此次嘗試標記為failed。
任務掛起:一旦tasktracker注意一段時間沒有收到進度更新,便會將任務標記為failed,JVM子進程將被自動殺死。任務失敗間隔時間通常為10分鍾,可以以作業或者集群為基礎設置過期時間,參數為mapred.task.timeout。注意:如果參數值設置為0,則掛起的任務永遠不會釋放掉它的任務槽,隨著時間的推移會降低整個集群的效率。
任務失敗嘗試次數:jobtracker得知一個tasktracker失敗後,它會重新調度該任務執行,當然,jobtracker會嘗試避免重新調度失敗過的tasktracker任務。如果一個任務嘗試次數超過4次,它將不再被重試。這個值是可以設置的,對於Map任務,參數是mapred.map.max.attempts,對於rece任務,則由mapred.rece.max.attempts屬性控制。如果次數超過限制,整個作業都會失敗。當然,有時我們不希望少數幾個任務失敗就終止運行的整個作業,因為即使有些任務失敗,作業的一些結果可能還是有用的,這種情況下,可以為作業設置在不觸發作業失敗情況下的允許任務失敗的最大百分比,Map任務和Rece任務可以獨立控制,參數為mapred.max.map.failures.percent 和mapred.max.rece.failures.percent。
任務嘗試中止(kill):任務終止和任務失敗不同,task attempt可以中止是因為他是一個推測副本或因為它所處的tasktracker失敗,導致jobtracker將它上面的所有task attempt標記為killed。被終止的task attempt不會被計入任務運行嘗試次數,因為嘗試中止並不是任務的錯。
2.tasktracker失敗
tasktracker由於崩潰或者運行過慢而失敗,他將停止向jobtracker發送心跳(或很少發送心跳)。jobtracker注意已停止發送心跳的tasktracker(過期時間由參數mapred.tasktracker.expiry.interval設置,單位毫秒),並將它從等待調度的tasktracker池中移除。如果是未完成的作業,jobtracker會安排次tasktracker上已經運行成功的Map任務重新運行,因為此時rece任務已無法訪問(中間輸出存放在失敗的tasktracker的本地文件系統上)。
即使tasktracker沒有失敗,也有可能被jobtracker列入黑名單。如果tasktracker上面的失敗任務數量遠遠高於集群的平均失敗任務次數,他就會被列入黑名單,被列入黑名單的tasktracker可以通過重啟從jobtracker黑名單中移除。
3.jobtracker失敗
老版本的JobTracker失敗屬於單點故障,這種情況下作業註定失敗。
作業調度:
早期作業調度FIFO:按作業提交順序先進先出。可以設置優先順序,通過設置mapred.job.priority屬性或者JobClient的setJobPriority()方法制定優先順序(優先順序別:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO調度演算法不支持搶占(preemption),所以高優先順序作業仍然會被那些已經開始的長時間運行的低優先順序作業所阻塞。
Fair Scheler:目標是讓每個用戶公平地共享集群能力。當集群存在很多作業時,空閑的任務槽會以」讓每個用戶共享集群「的方式進行分配。默認每個用戶都有自己的作業池。FairScheler支持搶占,所以,如果一個池在特定的一段時間未得到公平地資源共享,它會終止池中得到過多的資源任務,以便把任務槽讓給資源不足的池。FairScheler是一個後續模塊,使用它需要將其jar文件放在Hadoop的類路徑下。可以通過參數map.red.jobtracker.taskScheler屬性配置(值為org.apache.hadoop.mapred.FairScheler)
Capacity Scheler:
集群由很多隊列組成,每個隊列都有一個分配能力,這一點與FairScheler類似,只不過在每個隊列內部,作業根據FIFO方式進行調度。本質上說,Capacity Scheler允許用戶或組織為每個用戶模擬一個獨立使用FIFO的集群。
shuffle和排序:
MapRece確保每個Recer的輸入都是按鍵排序的。系統執行排序的過程-將map輸出作為輸入傳給recer的過程稱為shuffle。shuffle屬於不斷被優化和改進的代碼庫的一部分,從許多方面來看,shuffle是MapRece的心臟。
整個shuffle的流程應該是這樣:
map結果劃分partition 排序sort 分割spill 合並同一劃分 合並同一劃分 合並結果排序 rece處理 輸出
Map端:
寫入緩沖區:Map函數的輸出,是由collector處理的,它並不是簡單的將結果寫到磁碟。它利用緩沖的方式寫到內存,並處於效率的考慮進行預排序。每個map都有一個環形的內存緩沖區,用於任務輸出,默認緩沖區大小為100MB(由參數io.sort.mb調整),一旦緩沖區內容達到閾值(默認0.8),後台進程邊開始把內容寫到磁碟(spill),在寫磁碟過程中,map輸出繼續被寫到緩沖區,但如果緩沖區被填滿,map會阻塞知道寫磁碟過程完成。寫磁碟將按照輪詢方式寫到mapred.local.dir屬性制定的作業特定子目錄中。
寫出緩沖區:collect將緩沖區的內容寫出時,會調用sortAndSpill函數,這個函數作用主要是創建spill文件,按照key值對數據進行排序,按照劃分將數據寫入文件,如果配置了combiner類,會先調用combineAndSpill函數再寫文件。sortAndSpill每被調用一次,就會寫一個spill文件。
合並所有Map的spill文件:TaskTracker會在每個map任務結束後對所有map產生的spill文件進行merge,merge規則是根據分區將各個spill文件中數據同一分區中的數據合並在一起,並寫入到一個已分區且排序的map輸出文件中。待唯一的已分區且已排序的map輸出文件寫入最後一條記錄後,map端的shuffle階段就結束了。
在寫磁碟前,線程首先根據數據最終要傳遞到的recer把數據劃分成響應的分區(partition),在每個分區中,後台線程按鍵進行內排序,如果有一個combiner,它會在排序後的輸出上運行。
內存達到溢出寫的閾值時,就會新建一個溢出寫文件,因為map任務完成其最後一個輸出記錄之後,會有幾個溢出寫文件。在任務完成前,溢出寫文件會被合並成一個已分區且已排序的輸出文件。配置屬性io.sort.facor控制一次最多能合並多少流,默認值是10。
如果已經指定combiner,並且寫次數至少為3(通過min.mum.spills.for.combine設置)時,則combiner就會在輸出文件寫到磁碟之前運行。運行combiner的意義在於使map輸出更緊湊,捨得寫到本地磁碟和傳給recer的數據更少。
寫磁碟時壓縮:寫磁碟時壓縮會讓寫的速度更快,節約磁碟空間,並且減少傳給recer的數據量。默認情況下,輸出是不壓縮的,但可以通過設置mapred.compress.map.output值為true,就可以啟用壓縮。使用的壓縮庫是由mapred.map.output.compression.codec制定。
recer獲得文件分區的工作線程:recer通過http方式得到輸出文件的分區,用於文件分區的工作線程數量由tracker.http.threads屬性指定,此設置針對的是每個tasktracker,而不是每個map任務槽。默認值為40,在大型集群上此值可以根據需要而增加。
Rece端:
復制階段:rece會定期向JobTracker獲取map的輸出位置,一旦拿到輸出位置,rece就會從對應的TaskTracker上復制map輸出到本地(如果map輸出很小,則會被復制到TaskTracker節點的內存中,否則會被讓如磁碟),而不會等到所有map任務結束(當然這個也有參數控制)。
合並階段:從各個TaskTracker上復制的map輸出文件(無論在磁碟還是內存)進行整合,並維持數據原來的順序。
Rece階段:從合並的文件中順序拿出一條數據進行rece函數處理,然後將結果輸出到本地HDFS。
Map的輸出文件位於運行map任務的tasktracker的本地磁碟,現在,tasktracker要為分區文件運行rece任務。每個任務完成時間可能不同,但是只要有一個任務完成,rece任務就開始復制其輸出,這就是rece任務的復制階段( phase)。rece任務有少量復制線程,因此能夠並行取得map輸出。默認值是5個線程,可以通過mapred.rece.parallel.copies屬性設置。
Recer如何得知從哪個tasktracker獲得map輸出:map任務完成後會通知其父tasktracker狀態已更新,tasktracker進而通知(通過heart beat)jobtracker。因此,JobTracker就知道map輸出和tasktracker之間的映射關系,recer中的一個線程定期詢問jobtracker以便獲知map輸出位置。由於recer有可能失敗,因此tasktracker並沒有在第一個recer檢索到map輸出時就立即從磁碟上刪除它們,相反他會等待jobtracker告示它可以刪除map輸出時才刪除,這是作業完成後最後執行的。
如果map輸出很小,則會被直接復制到rece tasktracker的內存緩沖區(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空間的百分比),否則,map輸出被復制到磁碟。一旦內存緩沖區達到閾值大小(由mapred.iob.shuffle.merge.percent)
或達到map輸出閾值大小(mapred.inmem.threadhold),則合並後溢出寫到磁碟中。
隨著磁碟上副本增多,後台線程會將他們合並為更大的、排好序的文件。注意:為了合並,壓縮的map輸出必須在內存中被解壓縮。
排序階段:復制階段完成後,rece任務會進入排序階段,更確切的說是合並階段,這個階段將合並map輸出,維持其順序排列。合並是循環進行的,由合並因子決定每次合並的輸出文件數量。但讓有可能會產生中間文件。
rece階段:在最後rece階段,會直接把排序好的文件輸入rece函數,不會對中間文件進行再合並,最後的合並即可來自內存,也可來自磁碟。此階段的輸出會直接寫到文件系統,一般為hdfs。
細節:這里合並是並非平均合並,比如有40個文件,合並因子為10,我們並不是每趟合並10個,合並四趟。而是第一趟合並4個,後三趟合並10,在最後一趟中4個已合並的文件和餘下6個未合並會直接並入rece。