hadoop調度演算法
⑴ 基於Hadoop的作業調度演算法的發展現狀、研究意義
調度器是一個可插拔的模塊,用戶可以根據自己的實際應用要求設計調度器,然後在配置文件中指定相應的調度器
⑵ hadoop的maprece常見演算法案例有幾種
基本MapRece模式
計數與求和
問題陳述:
有許多文檔,每個文檔都有一些欄位組成。需要計算出每個欄位在所有文檔中的出現次數或者這些欄位的其他什麼統計值。例如,給定一個log文件,其中的每條記錄都包含一個響應時間,需要計算出平均響應時間。
解決方案:
讓我們先從簡單的例子入手。在下面的代碼片段里,Mapper每遇到指定詞就把頻次記1,Recer一個個遍歷這些詞的集合然後把他們的頻次加和。
1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Recer
7 method Rece(term t, counts [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
這種方法的缺點顯而易見,Mapper提交了太多無意義的計數。它完全可以通過先對每個文檔中的詞進行計數從而減少傳遞給Recer的數據量:
1 class Mapper
2 method Map(docid id, doc d)
3 H = new AssociativeArray
4 for all term t in doc d do
5 H{t} = H{t} + 1
6 for all term t in H do
7 Emit(term t, count H{t})
如果要累計計數的的不只是單個文檔中的內容,還包括了一個Mapper節點處理的所有文檔,那就要用到Combiner了:
1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Combiner
7 method Combine(term t, [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
12
13 class Recer
14 method Rece(term t, counts [c1, c2,...])
15 sum = 0
16 for all count c in [c1, c2,...] do
17 sum = sum + c
18 Emit(term t, count sum)
應用:Log 分析, 數據查詢
整理歸類
問題陳述:
有一系列條目,每個條目都有幾個屬性,要把具有同一屬性值的條目都保存在一個文件里,或者把條目按照屬性值分組。 最典型的應用是倒排索引。
解決方案:
解決方案很簡單。 在 Mapper 中以每個條目的所需屬性值作為 key,其本身作為值傳遞給 Recer。 Recer 取得按照屬性值分組的條目,然後可以處理或者保存。如果是在構建倒排索引,那麼 每個條目相當於一個詞而屬性值就是詞所在的文檔ID。
應用:倒排索引, ETL
過濾 (文本查找),解析和校驗
問題陳述:
假設有很多條記錄,需要從其中找出滿足某個條件的所有記錄,或者將每條記錄傳換成另外一種形式(轉換操作相對於各條記錄獨立,即對一條記錄的操作與其他記錄無關)。像文本解析、特定值抽取、格式轉換等都屬於後一種用例。
解決方案:
非常簡單,在Mapper 里逐條進行操作,輸出需要的值或轉換後的形式。
應用:日誌分析,數據查詢,ETL,數據校驗
分布式任務執行
問題陳述:
大型計算可以分解為多個部分分別進行然後合並各個計算的結果以獲得最終結果。
解決方案: 將數據切分成多份作為每個 Mapper 的輸入,每個Mapper處理一份數據,執行同樣的運算,產生結果,Recer把多個Mapper的結果組合成一個。
案例研究: 數字通信系統模擬
像 WiMAX 這樣的數字通信模擬軟體通過系統模型來傳輸大量的隨機數據,然後計算傳輸中的錯誤幾率。 每個 Mapper 處理樣本 1/N 的數據,計算出這部分數據的錯誤率,然後在 Recer 里計算平均錯誤率。
應用:工程模擬,數字分析,性能測試
排序
問題陳述:
有許多條記錄,需要按照某種規則將所有記錄排序或是按照順序來處理記錄。
解決方案: 簡單排序很好辦 – Mappers 將待排序的屬性值為鍵,整條記錄為值輸出。 不過實際應用中的排序要更加巧妙一點, 這就是它之所以被稱為MapRece 核心的原因(「核心」是說排序?因為證明Hadoop計算能力的實驗是大數據排序?還是說Hadoop的處理過程中對key排序的環節?)。在實踐中,常用組合鍵來實現二次排序和分組。
MapRece 最初只能夠對鍵排序, 但是也有技術利用可以利用Hadoop 的特性來實現按值排序。想了解的話可以看這篇博客。
按照BigTable的概念,使用 MapRece來對最初數據而非中間數據排序,也即保持數據的有序狀態更有好處,必須注意這一點。換句話說,在數據插入時排序一次要比在每次查詢數據的時候排序更高效。
應用:ETL,數據分析
非基本 MapRece 模式
迭代消息傳遞 (圖處理)
問題陳述:
假設一個實體網路,實體之間存在著關系。 需要按照與它比鄰的其他實體的屬性計算出一個狀態。這個狀態可以表現為它和其它節點之間的距離, 存在特定屬性的鄰接點的跡象, 鄰域密度特徵等等。
解決方案:
網路存儲為系列節點的結合,每個節點包含有其所有鄰接點ID的列表。按照這個概念,MapRece 迭代進行,每次迭代中每個節點都發消息給它的鄰接點。鄰接點根據接收到的信息更新自己的狀態。當滿足了某些條件的時候迭代停止,如達到了最大迭代次數(網路半徑)或兩次連續的迭代幾乎沒有狀態改變。從技術上來看,Mapper 以每個鄰接點的ID為鍵發出信息,所有的信息都會按照接受節點分組,recer 就能夠重算各節點的狀態然後更新那些狀態改變了的節點。下面展示了這個演算法:
1 class Mapper
2 method Map(id n, object N)
3 Emit(id n, object N)
4 for all id m in N.OutgoingRelations do
5 Emit(id m, message getMessage(N))
6
7 class Recer
8 method Rece(id m, [s1, s2,...])
9 M = null
10 messages = []
11 for all s in [s1, s2,...] do
12 if IsObject(s) then
13 M = s
14 else // s is a message
15 messages.add(s)
16 M.State = calculateState(messages)
17 Emit(id m, item M)
一個節點的狀態可以迅速的沿著網路傳全網,那些被感染了的節點又去感染它們的鄰居,整個過程就像下面的圖示一樣:
案例研究: 沿分類樹的有效性傳遞
問題陳述:
這個問題來自於真實的電子商務應用。將各種貨物分類,這些類別可以組成一個樹形結構,比較大的分類(像男人、女人、兒童)可以再分出小分類(像男褲或女裝),直到不能再分為止(像男式藍色牛仔褲)。這些不能再分的基層類別可以是有效(這個類別包含有貨品)或者已無效的(沒有屬於這個分類的貨品)。如果一個分類至少含有一個有效的子分類那麼認為這個分類也是有效的。我們需要在已知一些基層分類有效的情況下找出分類樹上所有有效的分類。
解決方案:
這個問題可以用上一節提到的框架來解決。我們咋下面定義了名為 getMessage和 calculateState 的方法:
1 class N
2 State in {True = 2, False = 1, null = 0},
3 initialized 1 or 2 for end-of-line categories, 0 otherwise
4 method getMessage(object N)
5 return N.State
6 method calculateState(state s, data [d1, d2,...])
7 return max( [d1, d2,...] )
案例研究:廣度優先搜索
問題陳述:需要計算出一個圖結構中某一個節點到其它所有節點的距離。
解決方案: Source源節點給所有鄰接點發出值為0的信號,鄰接點把收到的信號再轉發給自己的鄰接點,每轉發一次就對信號值加1:
1 class N
2 State is distance,
3 initialized 0 for source node, INFINITY for all other nodes
4 method getMessage(N)
5 return N.State + 1
6 method calculateState(state s, data [d1, d2,...])
7 min( [d1, d2,...] )
案例研究:網頁排名和 Mapper 端數據聚合
這個演算法由Google提出,使用權威的PageRank演算法,通過連接到一個網頁的其他網頁來計算網頁的相關性。真實演算法是相當復雜的,但是核心思想是權重可以傳播,也即通過一個節點的各聯接節點的權重的均值來計算節點自身的權重。
1 class N
2 State is PageRank
3 method getMessage(object N)
4 return N.State / N.OutgoingRelations.size()
5 method calculateState(state s, data [d1, d2,...])
6 return ( sum([d1, d2,...]) )
要指出的是上面用一個數值來作為評分實際上是一種簡化,在實際情況下,我們需要在Mapper端來進行聚合計算得出這個值。下面的代碼片段展示了這個改變後的邏輯 (針對於 PageRank 演算法):
1 class Mapper
2 method Initialize
3 H = new AssociativeArray
4 method Map(id n, object N)
5 p = N.PageRank / N.OutgoingRelations.size()
6 Emit(id n, object N)
7 for all id m in N.OutgoingRelations do
8 H{m} = H{m} + p
9 method Close
10 for all id n in H do
11 Emit(id n, value H{n})
12
13 class Recer
14 method Rece(id m, [s1, s2,...])
15 M = null
16 p = 0
17 for all s in [s1, s2,...] do
18 if IsObject(s) then
19 M = s
20 else
21 p = p + s
22 M.PageRank = p
23 Emit(id m, item M)
應用:圖分析,網頁索引
值去重 (對唯一項計數)
問題陳述: 記錄包含值域F和值域 G,要分別統計相同G值的記錄中不同的F值的數目 (相當於按照 G分組).
這個問題可以推而廣之應用於分面搜索(某些電子商務網站稱之為Narrow Search)
Record 1: F=1, G={a, b}
Record 2: F=2, G={a, d, e}
Record 3: F=1, G={b}
Record 4: F=3, G={a, b}
Result:
a -> 3 // F=1, F=2, F=3
b -> 2 // F=1, F=3
d -> 1 // F=2
e -> 1 // F=2
解決方案 I:
第一種方法是分兩個階段來解決這個問題。第一階段在Mapper中使用F和G組成一個復合值對,然後在Recer中輸出每個值對,目的是為了保證F值的唯一性。在第二階段,再將值對按照G值來分組計算每組中的條目數。
第一階段:
1 class Mapper
2 method Map(null, record [value f, categories [g1, g2,...]])
3 for all category g in [g1, g2,...]
4 Emit(record [g, f], count 1)
5
6 class Recer
7 method Rece(record [g, f], counts [n1, n2, ...])
8 Emit(record [g, f], null )
第二階段:
1 class Mapper
2 method Map(record [f, g], null)
3 Emit(value g, count 1)
4
5 class Recer
6 method Rece(value g, counts [n1, n2,...])
7 Emit(value g, sum( [n1, n2,...] ) )
解決方案 II:
第二種方法只需要一次MapRece 即可實現,但擴展性不強。演算法很簡單-Mapper 輸出值和分類,在Recer里為每個值對應的分類去重然後給每個所屬的分類計數加1,最後再在Recer結束後將所有計數加和。這種方法適用於只有有限個分類,而且擁有相同F值的記錄不是很多的情況。例如網路日誌處理和用戶分類,用戶的總數很多,但是每個用戶的事件是有限的,以此分類得到的類別也是有限的。值得一提的是在這種模式下可以在數據傳輸到Recer之前使用Combiner來去除分類的重復值。
1 class Mapper
2 method Map(null, record [value f, categories [g1, g2,...] )
3 for all category g in [g1, g2,...]
4 Emit(value f, category g)
5
6 class Recer
7 method Initialize
8 H = new AssociativeArray : category -> count
9 method Rece(value f, categories [g1, g2,...])
10 [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] )
11 for all category g in [g1', g2',...]
12 H{g} = H{g} + 1
13 method Close
14 for all category g in H do
15 Emit(category g, count H{g})
應用:日誌分析,用戶計數
互相關
問題陳述:有多個各由若干項構成的組,計算項兩兩共同出現於一個組中的次數。假如項數是N,那麼應該計算N*N。
這種情況常見於文本分析(條目是單詞而元組是句子),市場分析(購買了此物的客戶還可能購買什麼)。如果N*N小到可以容納於一台機器的內存,實現起來就比較簡單了。
配對法
第一種方法是在Mapper中給所有條目配對,然後在Recer中將同一條目對的計數加和。但這種做法也有缺點:
使用 combiners 帶來的的好處有限,因為很可能所有項對都是唯一的
不能有效利用內存
1 class Mapper
2 method Map(null, items [i1, i2,...] )
3 for all item i in [i1, i2,...]
4 for all item j in [i1, i2,...]
5 Emit(pair [i j], count 1)
6
7 class Recer
8 method Rece(pair [i j], counts [c1, c2,...])
9 s = sum([c1, c2,...])
10 Emit(pair[i j], count s)
Stripes Approach(條方法?不知道這個名字怎麼理解)
第二種方法是將數據按照pair中的第一項來分組,並維護一個關聯數組,數組中存儲的是所有關聯項的計數。The second approach is to group data by the first item in pair and maintain an associative array (「stripe」) where counters for all adjacent items are accumulated. Recer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.
中間結果的鍵數量相對較少,因此減少了排序消耗。
可以有效利用 combiners。
可在內存中執行,不過如果沒有正確執行的話也會帶來問題。
實現起來比較復雜。
一般來說, 「stripes」 比 「pairs」 更快
1 class Mapper
2 method Map(null, items [i1, i2,...] )
3 for all item i in [i1, i2,...]
4 H = new AssociativeArray : item -> counter
5 for all item j in [i1, i2,...]
6 H{j} = H{j} + 1
7 Emit(item i, stripe H)
8
9 class Recer
10 method Rece(item i, stripes [H1, H2,...])
11 H = new AssociativeArray : item -> counter
12 H = merge-sum( [H1, H2,...] )
13 for all item j in H.keys()
14 Emit(pair [i j], H{j})
應用:文本分析,市場分析
參考資料:Lin J. Dyer C. Hirst G. Data Intensive Processing MapRece
用MapRece 表達關系模式
在這部分我們會討論一下怎麼使用MapRece來進行主要的關系操作。
篩選(Selection)
1 class Mapper
2 method Map(rowkey key, tuple t)
3 if t satisfies the predicate
4 Emit(tuple t, null)
投影(Projection)
投影只比篩選稍微復雜一點,在這種情況下我們可以用Recer來消除可能的重復值。
1 class Mapper
2 method Map(rowkey key, tuple t)
3 tuple g = project(t) // extract required fields to tuple g
4 Emit(tuple g, null)
5
6 class Recer
⑶ 如何編寫Hadoop調度器
1. 編寫目的
在Hadoop中,調度器是一個可插拔的模塊,用戶可以根據自己的實際應用要求設計調度器,然後在配置文件中指定相應的調度器,這樣,當Hadoop集群啟動時,便會載入該調度器。當前Hadoop自帶了幾種調度器,分別是FIFO(默認調度器),Capacity Scheler和FairScheler,通常境況下,這些調度器很難滿足公司復雜的應用需求,因而往往需要開發自己的調度器。本文介紹了Hadoop調度器的基本編寫方法。
2. Hadoop調度框架
Hadoop的調度器是在JobTracker中載入和調用的,用戶可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheler屬性中指定調度器。本節分析了Hadoop調度器的調度框架,實際上分析了兩個重要類:TaskScheler和JobTracker的關系。
(1) TaskScheler
如果用戶要編寫自己的調度器,需要繼承抽象類TaskScheler,該類的介面如下:
abstract class TaskScheler implements Configurable {
protected Configuration conf; //配置文件
protected TaskTrackerManager taskTrackerManager; //一般會設為JobTracker
public Configuration getConf() {
return conf;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public synchronized void setTaskTrackerManager(
TaskTrackerManager taskTrackerManager) {
this.taskTrackerManager = taskTrackerManager;
}
public void start() throws IOException { //初始化函數,如載入配置文件等
// do nothing
}
public void terminate() throws IOException { //結束函數
// do nothing
}
//最重要的函數,為該taskTracker分配合適的task
public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException;
//根據隊列名字獲job列表
public abstract Collection<JobInProgress> getJobs(String queueName);
}
(2) JobTracker
JobTracker是Hadoop最核心的組件,它監控整個集群中的作業運行情況並對資源進行管理和調度。
每個TaskTracker每個3s(默認值,可配置)通過heartbeat向JobTracker匯報自己管理的機器的一些基本信息,包括內存使用量,內存剩餘量,正在運行的task,空閑的slot數目等,一旦JobTracker發現該TaskTracker出現了空閑的slot,便會調用調度器中的AssignTasks方法為該TaskTracker分配task。
下面分析JobTracker調用TaskScheler的具體流程:
……
private final TaskScheler taskScheler; //聲明調度器對象
……
public static JobTracker startTracker(JobConf conf, String identifier) {
…….
result = new JobTracker(conf, identifier);
result.taskScheler.setTaskTrackerManager(result); //設置調度器的manager
……
}
//創建調度器
JobTracker(JobConf conf, String identifier) {
……
// Create the scheler
Class<? extends TaskScheler> schelerClass
= conf.getClass("mapred.jobtracker.taskScheler",
JobQueueTaskScheler.class, TaskScheler.class);
taskScheler = (TaskScheler) ReflectionUtils.newInstance(schelerClass, conf);
…..
}
//run forever
public void offerService() {
……
taskScheler.start(); //啟動調度器
……
}
。。。。。
HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
boolean initialContact,
boolean acceptNewTasks,
short responseId) {
…….
// Check for new tasks to be executed on the tasktracker
if (recoveryManager.shouldSchele() && acceptNewTasks && !isBlacklisted) {
……
//使用調度器,為該taskTracker分配作業
tasks = taskScheler.assignTasks(taskTrackerStatus);
……
}
}
從上面的分析可以知道,Scheler和JobTracker之間會相互包含(實際上是組合模式),Scheler中要包含JobTracker(實際上就是TaskTrackerManager)對象,以便獲取整個Hadoop集群的一些信息,如slot總數,QueueManager對象,添加JobInProgressListener以便增加或刪除job時,通知Scheler;JobTracker中要包含Scheler對象,以便可以對每個TaskTracker分配task。
3. 編寫Hadoop調度器
假設我們要編寫一個新的調度器,為MyHadoopScheler,需要進行以下工作:
(1) 用戶需要自己實現的類
@ MyHadoopSchelerConf:配置文件管理類,讀取你自己的配置文件,並保存到合適的數據結構中,一般而言,這個類應該支持動態載入配置文件。
@ MyHadoopSchelerListener:編寫自己的JobInProgressListener,並調用JobTracker的addJobInProgressListener(),將之加到系統的Listener隊列中,以便系統中添加或刪除job後,JobTracker可立刻告訴調度器。
@ MyHadoopScheler:調度器的核心實現演算法
(2) 用戶要用到的系統類
@ JobTracker:JobTracker在startTracker函數中,會將MyHadoopScheler的taskTrackerManager賦值為JobTracker對象,這樣,在MyHadoopScheler中,可調用Jobracker中的所有public方法和成員變數,常用的有:
$ getClusterStatus():獲取集群的狀態,如tasktracker列表,map slot總數,rece slot總數,當前正在運行的map/rece task總數等
$ getQueueManager():如果MyHadoopScheler支持多隊列,那麼需要使用該方法獲取QueueManager對象,通過該對象,會用可以獲取系統的所有隊列名稱,每個隊列的ACL(Access Control List),具體參考:http://hadoop.apache.org/common/docs/current/service_level_auth.html
$ killJob:可以調用該函數殺死某個job
$ killTask:如果調度器支持資源搶占,可調用該函數 殺死某個task以便進行資源搶占。
@ JobInprogress:用戶向Hadoop中提交一個job後,Hadoop會為該job創建一個叫JobInProgress的對象,該對象中包含了job相關的基本信息,且它會伴隨某個job的一生(與job共存亡)。該對象中包含的job信息有:該job包含的所有task的信息(如:正在運行的task列表,已經完成的task列表,尚未運行的task列表等),作業的優先順序,作業的提交時間,開始運行時間,運行結束時間等信息。
在JobInprogress的task列表中,每個task以對象TaskInProgress的形式保存,該對象中包含了每個task的基本信息,包括:task要處理的數據split,task創建時間,task開始執行時間,task結束時間等信息。這些信息肯定會在調度器中使用。
@ JobConf
每個作業的運行參數和配置選項被保存到一個JobConf對象中,該對象包含了配置文件mapred-site.xml,core-site.xml和hdfs-site.xml設置的選項和該作業的特有屬性(用戶名,InputFormat,Mapper等),一般是以key/value的形式保存,比如:想獲取當前用戶名,可以這樣:
JobConf conf;
…….
String username = conf.get("user.name");
用戶也可以通過該對象傳遞一些自己定義的全局屬性,如用戶自己定義了一個屬性叫mapred.job.deadline(作業的deadline時間),用戶可以在提交作業時設定該值:
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt \
-D mapred.job.deadline=100000 \
input output
然後在調度器中這樣獲取該屬性的值:
JobConf conf;
…….
int deadline=conf.getInt("mapred.job.deadline", -1); //獲取mapred.job.deadline屬性,如果沒有設置,則返回-1
4. 總結
調度器是Hadoop的中樞,其重要性可想而知。用戶如果要設計Hadoop調度器,需要對Hadoop的整個框架有比較深入的理解,同時需閱讀一些很重要的類(如JobTracker和JobInprogress等)的源碼,以便利用這些類完成你的調度演算法。
Hadoop目前自帶了三個比較常用的調度器,分別為JobQueueTaskScheler (FIFO,但隊列調度器),Capacity Scheler(多隊列多用戶調度器)和Fair Scheler(多隊列多用戶調度器),它們是你學習Hadoop調度器的最好資料。
5. 參考資料
(1) Hadoop-0.20.2源代碼
⑷ Hadoop讀寫文件時內部工作機制是怎樣的
客戶端通過調用FileSystem對象(對應於HDFS文件系統,調用DistributedFileSystem對象)的open()方法來打開文件(也即圖中的第一步),DistributedFileSystem通過RPC(Remote Procere Call)調用詢問NameNode來得到此文件最開始幾個block的文件位置(第二步)。對每一個block來說,namenode返回擁有此block備份的所有namenode的地址信息(按集群的拓撲網路中與客戶端距離的遠近排序,關於在Hadoop集群中如何進行網路拓撲請看下面介紹)。如果客戶端本身就是一個datanode(如客戶端是一個maprece任務)並且此datanode本身就有所需文件block的話,客戶端便從本地讀取文件。
以上步驟完成後,DistributedFileSystem會返回一個FSDataInputStream(支持文件seek),客戶端可以從FSDataInputStream中讀取數據。FSDataInputStream包裝了一個DFSInputSteam類,用來處理namenode和datanode的I/O操作。
客戶端然後執行read()方法(第三步),DFSInputStream(已經存儲了欲讀取文件的開始幾個block的位置信息)連接到第一個datanode(也即最近的datanode)來獲取數據。通過重復調用read()方法(第四、第五步),文件內的數據就被流式的送到了客戶端。當讀到該block的末尾時,DFSInputStream就會關閉指向該block的流,轉而找到下一個block的位置信息然後重復調用read()方法繼續對該block的流式讀取。這些過程對於用戶來說都是透明的,在用戶看來這就是不間斷的流式讀取整個文件。
當真個文件讀取完畢時,客戶端調用FSDataInputSteam中的close()方法關閉文件輸入流(第六步)。
如果在讀某個block是DFSInputStream檢測到錯誤,DFSInputSteam就會連接下一個datanode以獲取此block的其他備份,同時他會記錄下以前檢測到的壞掉的datanode以免以後再無用的重復讀取該datanode。DFSInputSteam也會檢查從datanode讀取來的數據的校驗和,如果發現有數據損壞,它會把壞掉的block報告給namenode同時重新讀取其他datanode上的其他block備份。
這種設計模式的一個好處是,文件讀取是遍布這個集群的datanode的,namenode只是提供文件block的位置信息,這些信息所需的帶寬是很少的,這樣便有效的避免了單點瓶頸問題從而可以更大的擴充集群的規模。
Hadoop中的網路拓撲
在Hadoop集群中如何衡量兩個節點的遠近呢?要知道,在高速處理數據時,數據處理速率的唯一限制因素就是數據在不同節點間的傳輸速度:這是由帶寬的可怕匱乏引起的。所以我們把帶寬作為衡量兩個節點距離大小的標准。
但是計算兩個節點之間的帶寬是比較復雜的,而且它需要在一個靜態的集群下才能衡量,但Hadoop集群一般是隨著數據處理的規模動態變化的(且兩兩節點直接相連的連接數是節點數的平方)。於是Hadoop使用了一個簡單的方法來衡量距離,它把集群內的網路表示成一個樹結構,兩個節點之間的距離就是他們離共同祖先節點的距離之和。樹一般按數據中心(datacenter),機架(rack),計算節點(datanode)的結構組織。計算節點上的本地運算速度最快,跨數據中心的計算速度最慢(現在跨數據中心的Hadoop集群用的還很少,一般都是在一個數據中心內做運算的)。
假如有個計算節點n1處在數據中心c1的機架r1上,它可以表示為/c1/r1/n1,下面是不同情況下兩個節點的距離:
• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)
• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)
• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)
• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)
如下圖所示:
Hadoop
寫文件
現在我們來看一下Hadoop中的寫文件機制解析,通過寫文件機制我們可以更好的了解一下Hadoop中的一致性模型。
Hadoop
上圖為我們展示了一個創建一個新文件並向其中寫數據的例子。
首先客戶端通過DistributedFileSystem上的create()方法指明一個欲創建的文件的文件名(第一步),DistributedFileSystem再通過RPC調用向NameNode申請創建一個新文件(第二步,這時該文件還沒有分配相應的block)。namenode檢查是否有同名文件存在以及用戶是否有相應的創建許可權,如果檢查通過,namenode會為該文件創建一個新的記錄,否則的話文件創建失敗,客戶端得到一個IOException異常。DistributedFileSystem返回一個FSDataOutputStream以供客戶端寫入數據,與FSDataInputStream類似,FSDataOutputStream封裝了一個DFSOutputStream用於處理namenode與datanode之間的通信。
當客戶端開始寫數據時(第三步),DFSOutputStream把寫入的數據分成包(packet), 放入一個中間隊列——數據隊列(data queue)中去。DataStreamer從數據隊列中取數據,同時向namenode申請一個新的block來存放它已經取得的數據。namenode選擇一系列合適的datanode(個數由文件的replica數決定)構成一個管道線(pipeline),這里我們假設replica為3,所以管道線中就有三個datanode。DataSteamer把數據流式的寫入到管道線中的第一個datanode中(第四步),第一個datanode再把接收到的數據轉到第二個datanode中(第四步),以此類推。
DFSOutputStream同時也維護著另一個中間隊列——確認隊列(ack queue),確認隊列中的包只有在得到管道線中所有的datanode的確認以後才會被移出確認隊列(第五步)。
如果某個datanode在寫數據的時候當掉了,下面這些對用戶透明的步驟會被執行:
1)管道線關閉,所有確認隊列上的數據會被挪到數據隊列的首部重新發送,這樣可以確保管道線中當掉的datanode下流的datanode不會因為當掉的datanode而丟失數據包。
2)在還在正常運行的datanode上的當前block上做一個標志,這樣當當掉的datanode重新啟動以後namenode就會知道該datanode上哪個block是剛才當機時殘留下的局部損壞block,從而可以把它刪掉。
3)已經當掉的datanode從管道線中被移除,未寫完的block的其他數據繼續被寫入到其他兩個還在正常運行的datanode中去,namenode知道這個block還處在under-replicated狀態(也即備份數不足的狀態)下,然後他會安排一個新的replica從而達到要求的備份數,後續的block寫入方法同前面正常時候一樣。
有可能管道線中的多個datanode當掉(雖然不太經常發生),但只要dfs.replication.min(默認為1)個replica被創建,我們就認為該創建成功了。剩餘的replica會在以後非同步創建以達到指定的replica數。
當客戶端完成寫數據後,它會調用close()方法(第六步)。這個操作會沖洗(flush)所有剩下的package到pipeline中,等待這些package確認成功,然後通知namenode寫入文件成功(第七步)。這時候namenode就知道該文件由哪些block組成(因為DataStreamer向namenode請求分配新block,namenode當然會知道它分配過哪些blcok給給定文件),它會等待最少的replica數被創建,然後成功返回。
replica是如何分布的
Hadoop在創建新文件時是如何選擇block的位置的呢,綜合來說,要考慮以下因素:帶寬(包括寫帶寬和讀帶寬)和數據安全性。如果我們把三個備份全部放在一個datanode上,雖然可以避免了寫帶寬的消耗,但幾乎沒有提供數據冗餘帶來的安全性,因為如果這個datanode當機,那麼這個文件的所有數據就全部丟失了。另一個極端情況是,如果把三個冗餘備份全部放在不同的機架,甚至數據中心裏面,雖然這樣數據會安全,但寫數據會消耗很多的帶寬。Hadoop 0.17.0給我們提供了一個默認replica分配策略(Hadoop 1.X以後允許replica策略是可插拔的,也就是你可以自己制定自己需要的replica分配策略)。replica的默認分配策略是把第一個備份放在與客戶端相同的datanode上(如果客戶端在集群外運行,就隨機選取一個datanode來存放第一個replica),第二個replica放在與第一個replica不同機架的一個隨機datanode上,第三個replica放在與第二個replica相同機架的隨機datanode上。如果replica數大於三,則隨後的replica在集群中隨機存放,Hadoop會盡量避免過多的replica存放在同一個機架上。選取replica的放置位置後,管道線的網路拓撲結構如下所示:
Hadoop
總體來說,上述默認的replica分配策略給了我們很好的可用性(blocks放置在兩個rack上,較為安全),寫帶寬優化(寫數據只需要跨越一個rack),讀帶寬優化(你可以從兩個機架中選擇較近的一個讀取)。
一致性模型
HDFS某些地方為了性能可能會不符合POSIX(是的,你沒有看錯,POSIX不僅僅只適用於linux/unix, Hadoop 使用了POSIX的設計來實現對文件系統文件流的讀取 ),所以它看起來可能與你所期望的不同,要注意。
創建了一個文件以後,它是可以在命名空間(namespace)中可以看到的:
Path p = new Path("p");
fs.create(p);
assertThat(fs.exists(p), is(true));
但是任何向此文件中寫入的數據並不能保證是可見的,即使你flush了已經寫入的數據,此文件的長度可能仍然為零:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L));
這是因為,在Hadoop中,只有滿一個block數據量的數據被寫入文件後,此文件中的內容才是可見的(即這些數據會被寫入到硬碟中去),所以當前正在寫的block中的內容總是不可見的。
Hadoop提供了一種強制使buffer中的內容沖洗到datanode的方法,那就是FSDataOutputStream的sync()方法。調用了sync()方法後,Hadoop保證所有已經被寫入的數據都被沖洗到了管道線中的datanode中,並且對所有讀者都可見了:
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
out.sync();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
這個方法就像POSIX中的fsync系統調用(它沖洗給定文件描述符中的所有緩沖數據到磁碟中)。例如,使用java API寫一個本地文件,我們可以保證在調用flush()和同步化後可以看到已寫入的內容:
FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush(); // flush to operating system
out.getFD().sync(); // sync to disk (getFD()返回與該流所對應的文件描述符)
assertThat(localFile.length(), is(((long) "content".length())));
在HDFS中關閉一個流隱式的調用了sync()方法:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
由於Hadoop中的一致性模型限制,如果我們不調用sync()方法的話,我們很可能會丟失多大一個block的數據。這是難以接受的,所以我們應該使用sync()方法來確保數據已經寫入磁碟。但頻繁調用sync()方法也是不好的,因為會造成很多額外開銷。我們可以再寫入一定量數據後調用sync()方法一次,至於這個具體的數據量大小就要根據你的應用程序而定了,在不影響你的應用程序的性能的情況下,這個數據量應越大越好。
⑸ 在hadoop集群中,fifo調度演算法具有哪些特點
首先介紹了Hadoop平台下作業的分布式運行機制,然後對Hadoop平台自帶的4種任務調度器做分析和比較,最後在分析JobTracker類文件的基礎上指出了創建自定義任務調度器所需完成的工作。首先Hadoop集群式基於單伺服器的,只有一個伺服器節點負責調度整個集群的作業運行,主要的具體工作是切分大數據量的作業,指定哪些Worker節點做Map工作、哪些Worker節點做Rece工作、與Worker節點通信並接受其心跳信號、作為用戶的訪問入口等等。其次,集群中的每個Worker節點相當於一個器官,運行著主節點所指派的具體作業。這些節點會被分為兩種類型,一種是接收分塊之後的作業並做映射工作。另一種是負責把前面所做的映射工作按照約定的規則做一個統計。Task-Tracker通過運行一個簡單循環來定期地發送心跳信號(heartbeat)給JobTracker.這個心跳信號會把TaskTracker是否還在存活告知JobTracker,TaskTracker通過信號指明自己是否已經准備好運行新的任務.一旦TaskTracker已經准備好接受任務,JobTracker就會從作業優先順序表中選定一個作業並分配下去.至於到底是執行Map任務還是Rece任務,是由TaskTracker的任務槽所決定的.默認的任務調度器在處理Rece任務之前,會優先填滿空閑的Map任務槽.因此,如果TaskTracker滿足存在至少一個空閑任務槽時,JobTracker會為它分配Map任務,否則為它選擇一個Rece任務.TaskTracker在運行任務的時候,第一步是從共享文件系統中把作業的JAR文件復制過來,從而實現任務文件的本地化.第二步是TaskTracker為任務新建一個本地文件夾並把作業文件解壓在此目錄中.第三步是由Task-Tracker新建一個TaskRunner實例來運行該任務.Hadoop平台默認的調度方案就是JobQueueTaskScheler,這是一種按照任務到來的時間先後順序而執行的調度策略.這種方式比較簡單,JobTracker作為主控節點,僅僅是依照作業到來的先後順序而選擇將要執行的作業.當然,這有一定的缺陷,由於Hadoop平台是默認將作業運行在整個集群上的,那麼如果一個耗時非常大的作業進入執行期,將會導致其餘大量作業長時間得不到運行.這種長時間運行的優先順序別並不高的作業帶來了嚴重的作業阻塞,使得整個平台的運行效率處在較低的水平.Hadoop平台對這種FIFO(FirstINAndFirstOut)機制所給出的解決法是調用SetJobPriority()方法,通過設置作業的權重級別來做平衡調度.FairScheler是一種「公平」調度器,它的目標是讓每個用戶能夠公平地共享Hadoop集群計算能力.當只有一個作業運行的時候,它會得到整個集群的資源.隨著提交到作業表中作業的增多,Hadoop平台會把集群中空閑出來的時間槽公平分配給每個需要執行的作業.這樣即便其中某些作業需要較長時間運行,平台仍然有能力讓那些短作業在合理時間內完成[3].FairScheler支持資源搶占,當一個資源池在一定時段內沒有得到公平共享時,它會終止該資源池所獲得的過多的資源,同時把這些釋放的資源讓給那些資源不足的資源池.Hadoop平台中的CapacityScheler是由Yahoo貢獻的,在調度器上,設置了三種粒度的對象:queue,job,task.在該策略下,平台可以有多個作業隊列,每個作業隊列經提交後,都會獲得一定數量的TaskTracker資源.具體調度流程如下.(1)選擇queue,根據資源庫的使用情況從小到大排序,直到找到一個合適的job.(2)選擇job,在當前所選定的queue中,按照作業提交的時間先後以及作業的權重優先順序別進行排序,選擇合適的job.當然,在job選擇時還需要考慮所選作業是否超出目前現有的資源上限,以及資源池中的內存是否夠該job的task用等因素.(3)選擇task,根據本地節點的資源使用情況來選擇合適的task.雖然Hadoop平台自帶了幾種調度器,但是上述3種調度方案很難滿足公司復雜的應用需求.因此作為平台的個性化使用者,往往需要開發自己的調度器.Hadoop的調度器是在JobTracker中載入和調用的,因此開發一個自定義的調度器就必須搞清楚JobTracker類文件的內部機制.作為Hadoop平台的核心組件,JobTracker監控著整個集群的作業運行情況並對資源進行管理調度.每個Task-Tracker每隔3s通過heartbeat向JobTracker匯報自己管理的機器的一些基本信息,包括內存使用量、內存的剩餘量以及空閑的slot數目等等[5].一旦JobTracker發現了空閑slot,便會調用調度器中的AssignTask方法為該TaskTracker分配task。
⑹ Hadoop到底是什麼玩意
Hadoop到底是個啥?
答:Hadoop是基於廉價設備利用集群的威力對海量數據進行安全存儲和高效計算的分布式存儲和分析框架,Hadoop本身是一個龐大的項目家族,其核心 家族或者底層是HDFS和MapRece,HDFS和MapRece分別用來實現對海量數據的存儲和分析,其它的項目,例如Hive、HBase 等都是基於HDFS和MapRece,是為了解決特定類型的大數據處理問題而提出的子項目,使用Hive、HBase等子項目可以在更高的抽象的基礎上更簡單的編寫分布式大數據處理程序。Hadoop的其它子項目還包括Common, Avro, Pig, ZooKeeper, Sqoop, Oozie 等,隨著時間的推移一些新的子項目會被加入進來,一些關注度不高的項目會被移除Hadoop家族,所以Hadoop是一個充滿活力的系統。
Apache Hadoop: 是Apache開源組織的一個分布式計算開源框架,提供了一個分布式文件系統子項目(HDFS)和支持MapRece分布式計算的軟體架構。
Apache Hive: 是基於Hadoop的一個數據倉庫工具,可以將結構化的數據文件映射為一張資料庫表,通過類SQL語句快速實現簡單的MapRece統計,不必開發專門的MapRece應用,十分適合數據倉庫的統計分析。
ApachePig: 是一個基於Hadoop的大規模數據分析工具,它提供的SQL-LIKE語言叫Pig Latin,該語言的編譯器會把類SQL的數據分析請求轉換為一系列經過優化處理的MapRece運算。
ApacheHBase: 是一個高可靠性、高性能、面向列、可伸縮的分布式存儲系統,利用HBase技術可在廉價PC Server上搭建起大規模結構化存儲集群。
Apache Sqoop: 是一個用來將Hadoop和關系型資料庫中的數據相互轉移的工具,可以將一個關系型資料庫(MySQL ,Oracle ,Postgres等)中的數據導進到Hadoop的HDFS中,也可以將HDFS的數據導進到關系型資料庫中。
Apache Zookeeper: 是一個為分布式應用所設計的分布的、開源的協調服務,它主要是用來解決分布式應用中經常遇到的一些數據管理問題,簡化分布式應用協調及其管理的難度,提供高性能的分布式服務 ApacheMahout:是基於Hadoop的機器學習和數據挖掘的一個分布式框架。Mahout用MapRece實現了部分數據挖掘演算法,解決了並行挖掘的問題。
ApacheCassandra:是一套開源分布式NoSQL資料庫系統。它最初由Facebook開發,用於儲存簡單格式數據,集Google BigTable的數據模型與AmazonDynamo的完全分布式的架構於一身 Apache Avro: 是一個數據序列化系統,設計用於支持數據密集型,大批量數據交換的應用。Avro是新的數據序列化格式與傳輸工具,將逐步取代Hadoop原有的IPC機制 ApacheAmbari: 是一種基於Web的工具,支持Hadoop集群的供應、管理和監控。
ApacheChukwa: 是一個開源的用於監控大型分布式系統的數據收集系統,它可以將各種各樣類型的數據收集成適合 Hadoop 處理的文件保存在 HDFS 中供Hadoop 進行各種 MapRece 操作。
ApacheHama: 是一個基於HDFS的BSP(Bulk Synchronous Parallel)並行計算框架, Hama可用於包括圖、矩陣和網路演算法在內的大規模、大數據計算。
ApacheFlume: 是一個分布的、可靠的、高可用的海量日誌聚合的系統,可用於日誌數據收集,日誌數據處理,日誌數據傳輸。
ApacheGiraph: 是一個可伸縮的分布式迭代圖處理系統, 基於Hadoop平台,靈感來自 BSP (bulk synchronous parallel) 和Google 的 Pregel。
ApacheOozie: 是一個工作流引擎伺服器, 用於管理和協調運行在Hadoop平台上(HDFS、Pig和MapRece)的任務。
ApacheCrunch: 是基於Google的FlumeJava庫編寫的Java庫,用於創建MapRece程序。與Hive,Pig類似,Crunch提供了用於實現如連接數據、執行聚合和排序記錄等常見任務的模式庫 ApacheWhirr: 是一套運行於雲服務的類庫(包括Hadoop),可提供高度的互補性。Whirr學支持Amazon EC2和Rackspace的服務。
ApacheBigtop: 是一個對Hadoop及其周邊生態進行打包,分發和測試的工具。
ApacheHCatalog: 是基於Hadoop的數據表和存儲管理,實現中央的元數據和模式管理,跨越Hadoop和RDBMS,利用Pig和Hive提供關系視圖。
ClouderaHue: 是一個基於WEB的監控和管理系統,實現對HDFS,MapRece/YARN, HBase, Hive, Pig的web化操作和管理。
⑺ Hadoop:是什麼,如何工作,可以用來做什麼
Hadoop主要是分布式計算和存儲的框架,所以Hadoop工作過程主要依賴於HDFS(Hadoop Distributed File System)分布式存儲系統和Maprece分布式計算框架。
分布式存儲系統HDFS中工作主要是一個主節點namenode(master)(hadoop1.x只要一個namenode節點,2.x中可以有多個節點)和若干個從節點Datanode(數據節點)相互配合進行工作,HDFS主要是存儲Hadoop中的大量的數據,namenode節點主要負責的是:
1、接收client用戶的操作請求,這種用戶主要指的是開發工程師的Java代碼或者是命令客戶端操作。
2、維護文件系統的目錄結構,主要就是大量數據的關系以及位置信息等。
3、管理文件系統與block的關系,Hadoop中大量的數據為了方便存儲和管理主要是以block塊(64M)的形式儲存。一個文件被分成大量的block塊存儲之後,block塊之間都是有順序關系的,這個文件與block之間的關系以及block屬於哪個datanode都是有namenode來管理。
Datanode的主要職責是:
1、存儲文件。
2、將數據分成大量的block塊。
3、為保證數據的安全,對數據進行備份,一般備份3份。當其中的一份出現問題時,將由其他的備份來對數據進行恢復。
MapRece主要也是一個主節點JOPtracker和testtracker組成,主要是負責hadoop中的數據處理過程中的計算問題。
joptracker主要負責接收客戶端傳來的任務,並且把計算任務交給很多testtracker工作,同時joptracker會不斷的監控testtracker的執行情況。
testtracker主要是執行joptracker交給它的任務具體計算,例如給求大量數據的最大值,每個testtracker會計算出自己負責的數據中的最大值,然後交給joptracker。
Hadoop的主要兩個框架組合成了分布式的存儲和計算,使得hadoop可以很快的處理大量的數據。
⑻ 如何架構大數據系統 hadoop
大數據數量龐大,格式多樣化。大量數據由家庭、製造工廠和辦公場所的各種設備、互聯網事務交易、社交網路的活動、自動化感測器、移動設備以及科研儀器等生成。它的爆炸式增長已超出了傳統IT基礎架構的處理能力,給企業和社會帶來嚴峻的數據管理問題。因此必須開發新的數據架構,圍繞「數據收集、數據管理、數據分析、知識形成、智慧行動」的全過程,開發使用這些數據,釋放出更多數據的隱藏價值。
一、大數據建設思路
1)數據的獲得
四、總結
基於分布式技術構建的大數據平台能夠有效降低數據存儲成本,提升數據分析處理效率,並具備海量數據、高並發場景的支撐能力,可大幅縮短數據查詢響應時間,滿足企業各上層應用的數據需求。
⑼ 怎麼優化hadoop任務調度演算法
首先介紹了Hadoop平台下作業的分布式運行機制,然後對Hadoop平台自帶的4種任務調度器做分析和比較,最後在分析JobTracker類文件的基礎上指出了創建自定義任務調度器所需完成的工作。
首先Hadoop集群式基於單伺服器的,只有一個伺服器節點負責調度整個集群的作業運行,主要的具體工作是切分大數據量的作業,指定哪些Worker節點做Map工作、哪些Worker節點做Rece工作、與Worker節點通信並接受其心跳信號、作為用戶的訪問入口等等。其次,集群中的每個Worker節點相當於一個器官,運行著主節點所指派的具體作業。這些節點會被分為兩種類型,一種是接收分塊之後的作業並做映射工作。另一種是負責把前面所做的映射工作按照約定的規則做一個統計。
Task-Tracker通過運行一個簡單循環來定期地發送心跳信號(heartbeat)給JobTracker.這個心跳信號會把TaskTracker是否還在存活告知JobTracker,TaskTracker通過信號指明自己是否已經准備
好運行新的任務.一旦TaskTracker已經准備好接受任務,JobTracker就會從作業優先順序表中選定一個作業並分配下去.至於到底是執行Map任務還是Rece任務,是由TaskTracker的任務槽所決定的.默認的任務調度器在處理Rece任務之前,會優先填滿空閑的Map任務槽.因此,如果TaskTracker滿足存在至少一個空閑任務槽時,JobTracker會為它分配Map任務,否則為它選擇一個Rece任務.TaskTracker在運行任務的時候,第一步是從共享文件系統中把作業的JAR文件復制過來,從而實現任務文件的本地化.第二步是TaskTracker為任務新建一個本地文件夾並把作業文件解壓在此目錄中.第三步是由Task-Tracker新建一個TaskRunner實例來運行該任務.
Hadoop平台默認的調度方案就是JobQueueTaskScheler,這是一種按照任務到來的時間先後順序而執行的調度策略.這種方式比較簡單,JobTracker作為主控節點,僅僅是依照作業到來的先後順序而選擇將要執行的作業.當然,這有一定的缺陷,由於Hadoop平台是默認將作業運行在整個集群上的,那麼如果一個耗時非常大的作業進入執行期,將會導致其餘大量作業長時間得不到運行.這種長時間運行的優先順序別並不高的作業帶來了嚴重的作業阻塞,使得整個平台的運行效率處在較低的水平.Hadoop平台對這種FIFO(FirstINAndFirstOut)機制所給出的解決辦法是調用SetJobPriority()方法,通過設置作業的權重級別來做平衡調度.
FairScheler是一種「公平」調度器,它的目標是讓每個用戶能夠公平地共享Hadoop集群計算能力.當只有一個作業運行的時候,它會得到整個集群的資源.隨著提交到作業表中作業的增多,Hadoop平台會把集群中空閑出來的時間槽公平分配給每個需要執行的作業.這樣即便其中某些作業需要較長時間運行,平台仍然有能力讓那些短作業在合理時間內完成[3].FairScheler支持資源搶占,當一個資源池在一定時段內沒有得到公平共享時,它會終止該資源池所獲得的過多的資源,同時把這些釋放的資源讓給那些資源不足的資源池.
Hadoop平台中的CapacityScheler是由Yahoo貢獻的,在調度器上,設置了三種粒度的對象:queue,job,task.在該策略下,平台可以有多個作業隊列,每個作業隊列經提交後,都會獲得一定數量的TaskTracker資源.具體調度流程如下.
(1)選擇queue,根據資源庫的使用情況從小到大排序,直到找到一個合適的job.
(2)選擇job,在當前所選定的queue中,按照作業提交的時間先後以及作業的權重優先順序別進行排序,選擇合適的job.當然,在job選擇時還需要考慮所選作業是否超出目前現有的資源上限,以及資源池中的內存是否夠該job的task用等因素.
(3)選擇task,根據本地節點的資源使用情況來選擇合適的task.
雖然Hadoop平台自帶了幾種調度器,但是上述3種調度方案很難滿足公司復雜的應用需求.因此作為平台的個性化使用者,往往需要開發自己的調度器.Hadoop的調度器是在JobTracker中載入和調用的,因此開發一個自定義的調度器就必須搞清楚JobTracker類文件的內部機制.作為Hadoop平台的核心組件,JobTracker監控著整個集群的作業運行情況並對資源進行管理調度.每個Task-Tracker每隔3s通過heartbeat向JobTracker匯報自己管理的機器的一些基本信息,包括內存使用量、內存的剩餘量以及空閑的slot數目等等[5].一
旦JobTracker發現了空閑slot,便會調用調度器中的AssignTask方法為該TaskTracker分配task。
⑽ hadoop集群中,fifo調度演算法具有哪些特點
首先介紹了Hadoop平台下作業的分布式運行機制,然後對Hadoop平台自帶的4種任務調度器做分析和比較,最後在分析JobTracker類文件的基礎上指出了創建自定義任務調度器所需完成的工作。首先Hadoop集群式基於單伺服器的,只有一個伺服器節點負責調度整個集群的作業運行,主要的具體工作是切分大數據量的作業,指定哪些Worker節點做Map工作、哪些Worker節點做Rece工作、與Worker節點通信並接受其心跳信號、作為用戶的訪問入口等等。其次,集群中的每個Worker節點相當於一個器官,運行著主節點所指派的具體作業。這些節點會被分為兩種類型,一種是接收分塊之後的作業並做映射工作。另一種是負責把前面所做的映射工作按照約定的規則做一個統計。Task-Tracker通過運行一個簡單循環來定期地發送心跳信號(heartbeat)給JobTracker.這個心跳信號會把TaskTracker是否還在存活告知JobTracker,TaskTracker通過信號指明自己是否已經准備好運行新的任務.一旦TaskTracker已經准備好接受任務,JobTracker就會從作業優先順序表中選定一個作業並分配下去.至於到底是執行Map任務還是Rece任務,是由TaskTracker的任務槽所決定的.默認的任務調度器在處理Rece任務之前,會優先填滿空閑的Map任務槽.因此,如果TaskTracker滿足存在至少一個空閑任務槽時,JobTracker會為它分配Map任務,否則為它選擇一個Rece任務.TaskTracker在運行任務的時候,第一步是從共享文件系統中把作業的JAR文件復制過來,從而實現任務文件的本地化.第二步是TaskTracker為任務新建一個本地文件夾並把作業文件解壓在此目錄中.第三步是由Task-Tracker新建一個TaskRunner實例來運行該任務.Hadoop平台默認的調度方案就是JobQueueTaskScheler,這是一種按照任務到來的時間先後順序而執行的調度策略.這種方式比較簡單,JobTracker作為主控節點,僅僅是依照作業到來的先後順序而選擇將要執行的作業.當然,這有一定的缺陷,由於Hadoop平台是默認將作業運行在整個集群上的,那麼如果一個耗時非常大的作業進入執行期,將會導致其餘大量作業長時間得不到運行.這種長時間運行的優先順序別並不高的作業帶來了嚴重的作業阻塞,使得整個平台的運行效率處在較低的水平.Hadoop平台對這種FIFO(FirstINAndFirstOut)機制所給出的解決法是調用SetJobPriority()方法,通過設置作業的權重級別來做平衡調度.FairScheler是一種「公平」調度器,它的目標是讓每個用戶能夠公平地共享Hadoop集群計算能力.當只有一個作業運行的時候,它會得到整個集群的資源.隨著提交到作業表中作業的增多,Hadoop平台會把集群中空閑出來的時間槽公平分配給每個需要執行的作業.這樣即便其中某些作業需要較長時間運行,平台仍然有能力讓那些短作業在合理時間內完成[3].FairScheler支持資源搶占,當一個資源池在一定時段內沒有得到公平共享時,它會終止該資源池所獲得的過多的資源,同時把這些釋放的資源讓給那些資源不足的資源池.Hadoop平台中的CapacityScheler是由Yahoo貢獻的,在調度器上,設置了三種粒度的對象:queue,job,task.在該策略下,平台可以有多個作業隊列,每個作業隊列經提交後,都會獲得一定數量的TaskTracker資源.具體調度流程如下.(1)選擇queue,根據資源庫的使用情況從小到大排序,直到找到一個合適的job.(2)選擇job,在當前所選定的queue中,按照作業提交的時間先後以及作業的權重優先順序別進行排序,選擇合適的job.當然,在job選擇時還需要考慮所選作業是否超出目前現有的資源上限,以及資源池中的內存是否夠該job的task用等因素.(3)選擇task,根據本地節點的資源使用情況來選擇合適的task.雖然Hadoop平台自帶了幾種調度器,但是上述3種調度方案很難滿足公司復雜的應用需求.因此作為平台的個性化使用者,往往需要開發自己的調度器.Hadoop的調度器是在JobTracker中載入和調用的,因此開發一個自定義的調度器就必須搞清楚JobTracker類文件的內部機制.作為Hadoop平台的核心組件,JobTracker監控著整個集群的作業運行情況並對資源進行管理調度.每個Task-Tracker每隔3s通過heartbeat向JobTracker匯報自己管理的機器的一些基本信息,包括內存使用量、內存的剩餘量以及空閑的slot數目等等[5].一旦JobTracker發現了空閑slot,便會調用調度器中的AssignTask方法為該TaskTracker分配task。