hadoop调度算法
⑴ 基于Hadoop的作业调度算法的发展现状、研究意义
调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器,然后在配置文件中指定相应的调度器
⑵ hadoop的maprece常见算法案例有几种
基本MapRece模式
计数与求和
问题陈述:
有许多文档,每个文档都有一些字段组成。需要计算出每个字段在所有文档中的出现次数或者这些字段的其他什么统计值。例如,给定一个log文件,其中的每条记录都包含一个响应时间,需要计算出平均响应时间。
解决方案:
让我们先从简单的例子入手。在下面的代码片段里,Mapper每遇到指定词就把频次记1,Recer一个个遍历这些词的集合然后把他们的频次加和。
1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Recer
7 method Rece(term t, counts [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
这种方法的缺点显而易见,Mapper提交了太多无意义的计数。它完全可以通过先对每个文档中的词进行计数从而减少传递给Recer的数据量:
1 class Mapper
2 method Map(docid id, doc d)
3 H = new AssociativeArray
4 for all term t in doc d do
5 H{t} = H{t} + 1
6 for all term t in H do
7 Emit(term t, count H{t})
如果要累计计数的的不只是单个文档中的内容,还包括了一个Mapper节点处理的所有文档,那就要用到Combiner了:
1 class Mapper
2 method Map(docid id, doc d)
3 for all term t in doc d do
4 Emit(term t, count 1)
5
6 class Combiner
7 method Combine(term t, [c1, c2,...])
8 sum = 0
9 for all count c in [c1, c2,...] do
10 sum = sum + c
11 Emit(term t, count sum)
12
13 class Recer
14 method Rece(term t, counts [c1, c2,...])
15 sum = 0
16 for all count c in [c1, c2,...] do
17 sum = sum + c
18 Emit(term t, count sum)
应用:Log 分析, 数据查询
整理归类
问题陈述:
有一系列条目,每个条目都有几个属性,要把具有同一属性值的条目都保存在一个文件里,或者把条目按照属性值分组。 最典型的应用是倒排索引。
解决方案:
解决方案很简单。 在 Mapper 中以每个条目的所需属性值作为 key,其本身作为值传递给 Recer。 Recer 取得按照属性值分组的条目,然后可以处理或者保存。如果是在构建倒排索引,那么 每个条目相当于一个词而属性值就是词所在的文档ID。
应用:倒排索引, ETL
过滤 (文本查找),解析和校验
问题陈述:
假设有很多条记录,需要从其中找出满足某个条件的所有记录,或者将每条记录传换成另外一种形式(转换操作相对于各条记录独立,即对一条记录的操作与其他记录无关)。像文本解析、特定值抽取、格式转换等都属于后一种用例。
解决方案:
非常简单,在Mapper 里逐条进行操作,输出需要的值或转换后的形式。
应用:日志分析,数据查询,ETL,数据校验
分布式任务执行
问题陈述:
大型计算可以分解为多个部分分别进行然后合并各个计算的结果以获得最终结果。
解决方案: 将数据切分成多份作为每个 Mapper 的输入,每个Mapper处理一份数据,执行同样的运算,产生结果,Recer把多个Mapper的结果组合成一个。
案例研究: 数字通信系统模拟
像 WiMAX 这样的数字通信模拟软件通过系统模型来传输大量的随机数据,然后计算传输中的错误几率。 每个 Mapper 处理样本 1/N 的数据,计算出这部分数据的错误率,然后在 Recer 里计算平均错误率。
应用:工程模拟,数字分析,性能测试
排序
问题陈述:
有许多条记录,需要按照某种规则将所有记录排序或是按照顺序来处理记录。
解决方案: 简单排序很好办 – Mappers 将待排序的属性值为键,整条记录为值输出。 不过实际应用中的排序要更加巧妙一点, 这就是它之所以被称为MapRece 核心的原因(“核心”是说排序?因为证明Hadoop计算能力的实验是大数据排序?还是说Hadoop的处理过程中对key排序的环节?)。在实践中,常用组合键来实现二次排序和分组。
MapRece 最初只能够对键排序, 但是也有技术利用可以利用Hadoop 的特性来实现按值排序。想了解的话可以看这篇博客。
按照BigTable的概念,使用 MapRece来对最初数据而非中间数据排序,也即保持数据的有序状态更有好处,必须注意这一点。换句话说,在数据插入时排序一次要比在每次查询数据的时候排序更高效。
应用:ETL,数据分析
非基本 MapRece 模式
迭代消息传递 (图处理)
问题陈述:
假设一个实体网络,实体之间存在着关系。 需要按照与它比邻的其他实体的属性计算出一个状态。这个状态可以表现为它和其它节点之间的距离, 存在特定属性的邻接点的迹象, 邻域密度特征等等。
解决方案:
网络存储为系列节点的结合,每个节点包含有其所有邻接点ID的列表。按照这个概念,MapRece 迭代进行,每次迭代中每个节点都发消息给它的邻接点。邻接点根据接收到的信息更新自己的状态。当满足了某些条件的时候迭代停止,如达到了最大迭代次数(网络半径)或两次连续的迭代几乎没有状态改变。从技术上来看,Mapper 以每个邻接点的ID为键发出信息,所有的信息都会按照接受节点分组,recer 就能够重算各节点的状态然后更新那些状态改变了的节点。下面展示了这个算法:
1 class Mapper
2 method Map(id n, object N)
3 Emit(id n, object N)
4 for all id m in N.OutgoingRelations do
5 Emit(id m, message getMessage(N))
6
7 class Recer
8 method Rece(id m, [s1, s2,...])
9 M = null
10 messages = []
11 for all s in [s1, s2,...] do
12 if IsObject(s) then
13 M = s
14 else // s is a message
15 messages.add(s)
16 M.State = calculateState(messages)
17 Emit(id m, item M)
一个节点的状态可以迅速的沿着网络传全网,那些被感染了的节点又去感染它们的邻居,整个过程就像下面的图示一样:
案例研究: 沿分类树的有效性传递
问题陈述:
这个问题来自于真实的电子商务应用。将各种货物分类,这些类别可以组成一个树形结构,比较大的分类(像男人、女人、儿童)可以再分出小分类(像男裤或女装),直到不能再分为止(像男式蓝色牛仔裤)。这些不能再分的基层类别可以是有效(这个类别包含有货品)或者已无效的(没有属于这个分类的货品)。如果一个分类至少含有一个有效的子分类那么认为这个分类也是有效的。我们需要在已知一些基层分类有效的情况下找出分类树上所有有效的分类。
解决方案:
这个问题可以用上一节提到的框架来解决。我们咋下面定义了名为 getMessage和 calculateState 的方法:
1 class N
2 State in {True = 2, False = 1, null = 0},
3 initialized 1 or 2 for end-of-line categories, 0 otherwise
4 method getMessage(object N)
5 return N.State
6 method calculateState(state s, data [d1, d2,...])
7 return max( [d1, d2,...] )
案例研究:广度优先搜索
问题陈述:需要计算出一个图结构中某一个节点到其它所有节点的距离。
解决方案: Source源节点给所有邻接点发出值为0的信号,邻接点把收到的信号再转发给自己的邻接点,每转发一次就对信号值加1:
1 class N
2 State is distance,
3 initialized 0 for source node, INFINITY for all other nodes
4 method getMessage(N)
5 return N.State + 1
6 method calculateState(state s, data [d1, d2,...])
7 min( [d1, d2,...] )
案例研究:网页排名和 Mapper 端数据聚合
这个算法由Google提出,使用权威的PageRank算法,通过连接到一个网页的其他网页来计算网页的相关性。真实算法是相当复杂的,但是核心思想是权重可以传播,也即通过一个节点的各联接节点的权重的均值来计算节点自身的权重。
1 class N
2 State is PageRank
3 method getMessage(object N)
4 return N.State / N.OutgoingRelations.size()
5 method calculateState(state s, data [d1, d2,...])
6 return ( sum([d1, d2,...]) )
要指出的是上面用一个数值来作为评分实际上是一种简化,在实际情况下,我们需要在Mapper端来进行聚合计算得出这个值。下面的代码片段展示了这个改变后的逻辑 (针对于 PageRank 算法):
1 class Mapper
2 method Initialize
3 H = new AssociativeArray
4 method Map(id n, object N)
5 p = N.PageRank / N.OutgoingRelations.size()
6 Emit(id n, object N)
7 for all id m in N.OutgoingRelations do
8 H{m} = H{m} + p
9 method Close
10 for all id n in H do
11 Emit(id n, value H{n})
12
13 class Recer
14 method Rece(id m, [s1, s2,...])
15 M = null
16 p = 0
17 for all s in [s1, s2,...] do
18 if IsObject(s) then
19 M = s
20 else
21 p = p + s
22 M.PageRank = p
23 Emit(id m, item M)
应用:图分析,网页索引
值去重 (对唯一项计数)
问题陈述: 记录包含值域F和值域 G,要分别统计相同G值的记录中不同的F值的数目 (相当于按照 G分组).
这个问题可以推而广之应用于分面搜索(某些电子商务网站称之为Narrow Search)
Record 1: F=1, G={a, b}
Record 2: F=2, G={a, d, e}
Record 3: F=1, G={b}
Record 4: F=3, G={a, b}
Result:
a -> 3 // F=1, F=2, F=3
b -> 2 // F=1, F=3
d -> 1 // F=2
e -> 1 // F=2
解决方案 I:
第一种方法是分两个阶段来解决这个问题。第一阶段在Mapper中使用F和G组成一个复合值对,然后在Recer中输出每个值对,目的是为了保证F值的唯一性。在第二阶段,再将值对按照G值来分组计算每组中的条目数。
第一阶段:
1 class Mapper
2 method Map(null, record [value f, categories [g1, g2,...]])
3 for all category g in [g1, g2,...]
4 Emit(record [g, f], count 1)
5
6 class Recer
7 method Rece(record [g, f], counts [n1, n2, ...])
8 Emit(record [g, f], null )
第二阶段:
1 class Mapper
2 method Map(record [f, g], null)
3 Emit(value g, count 1)
4
5 class Recer
6 method Rece(value g, counts [n1, n2,...])
7 Emit(value g, sum( [n1, n2,...] ) )
解决方案 II:
第二种方法只需要一次MapRece 即可实现,但扩展性不强。算法很简单-Mapper 输出值和分类,在Recer里为每个值对应的分类去重然后给每个所属的分类计数加1,最后再在Recer结束后将所有计数加和。这种方法适用于只有有限个分类,而且拥有相同F值的记录不是很多的情况。例如网络日志处理和用户分类,用户的总数很多,但是每个用户的事件是有限的,以此分类得到的类别也是有限的。值得一提的是在这种模式下可以在数据传输到Recer之前使用Combiner来去除分类的重复值。
1 class Mapper
2 method Map(null, record [value f, categories [g1, g2,...] )
3 for all category g in [g1, g2,...]
4 Emit(value f, category g)
5
6 class Recer
7 method Initialize
8 H = new AssociativeArray : category -> count
9 method Rece(value f, categories [g1, g2,...])
10 [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] )
11 for all category g in [g1', g2',...]
12 H{g} = H{g} + 1
13 method Close
14 for all category g in H do
15 Emit(category g, count H{g})
应用:日志分析,用户计数
互相关
问题陈述:有多个各由若干项构成的组,计算项两两共同出现于一个组中的次数。假如项数是N,那么应该计算N*N。
这种情况常见于文本分析(条目是单词而元组是句子),市场分析(购买了此物的客户还可能购买什么)。如果N*N小到可以容纳于一台机器的内存,实现起来就比较简单了。
配对法
第一种方法是在Mapper中给所有条目配对,然后在Recer中将同一条目对的计数加和。但这种做法也有缺点:
使用 combiners 带来的的好处有限,因为很可能所有项对都是唯一的
不能有效利用内存
1 class Mapper
2 method Map(null, items [i1, i2,...] )
3 for all item i in [i1, i2,...]
4 for all item j in [i1, i2,...]
5 Emit(pair [i j], count 1)
6
7 class Recer
8 method Rece(pair [i j], counts [c1, c2,...])
9 s = sum([c1, c2,...])
10 Emit(pair[i j], count s)
Stripes Approach(条方法?不知道这个名字怎么理解)
第二种方法是将数据按照pair中的第一项来分组,并维护一个关联数组,数组中存储的是所有关联项的计数。The second approach is to group data by the first item in pair and maintain an associative array (“stripe”) where counters for all adjacent items are accumulated. Recer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.
中间结果的键数量相对较少,因此减少了排序消耗。
可以有效利用 combiners。
可在内存中执行,不过如果没有正确执行的话也会带来问题。
实现起来比较复杂。
一般来说, “stripes” 比 “pairs” 更快
1 class Mapper
2 method Map(null, items [i1, i2,...] )
3 for all item i in [i1, i2,...]
4 H = new AssociativeArray : item -> counter
5 for all item j in [i1, i2,...]
6 H{j} = H{j} + 1
7 Emit(item i, stripe H)
8
9 class Recer
10 method Rece(item i, stripes [H1, H2,...])
11 H = new AssociativeArray : item -> counter
12 H = merge-sum( [H1, H2,...] )
13 for all item j in H.keys()
14 Emit(pair [i j], H{j})
应用:文本分析,市场分析
参考资料:Lin J. Dyer C. Hirst G. Data Intensive Processing MapRece
用MapRece 表达关系模式
在这部分我们会讨论一下怎么使用MapRece来进行主要的关系操作。
筛选(Selection)
1 class Mapper
2 method Map(rowkey key, tuple t)
3 if t satisfies the predicate
4 Emit(tuple t, null)
投影(Projection)
投影只比筛选稍微复杂一点,在这种情况下我们可以用Recer来消除可能的重复值。
1 class Mapper
2 method Map(rowkey key, tuple t)
3 tuple g = project(t) // extract required fields to tuple g
4 Emit(tuple g, null)
5
6 class Recer
⑶ 如何编写Hadoop调度器
1. 编写目的
在Hadoop中,调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器,然后在配置文件中指定相应的调度器,这样,当Hadoop集群启动时,便会加载该调度器。当前Hadoop自带了几种调度器,分别是FIFO(默认调度器),Capacity Scheler和FairScheler,通常境况下,这些调度器很难满足公司复杂的应用需求,因而往往需要开发自己的调度器。本文介绍了Hadoop调度器的基本编写方法。
2. Hadoop调度框架
Hadoop的调度器是在JobTracker中加载和调用的,用户可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheler属性中指定调度器。本节分析了Hadoop调度器的调度框架,实际上分析了两个重要类:TaskScheler和JobTracker的关系。
(1) TaskScheler
如果用户要编写自己的调度器,需要继承抽象类TaskScheler,该类的接口如下:
abstract class TaskScheler implements Configurable {
protected Configuration conf; //配置文件
protected TaskTrackerManager taskTrackerManager; //一般会设为JobTracker
public Configuration getConf() {
return conf;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public synchronized void setTaskTrackerManager(
TaskTrackerManager taskTrackerManager) {
this.taskTrackerManager = taskTrackerManager;
}
public void start() throws IOException { //初始化函数,如加载配置文件等
// do nothing
}
public void terminate() throws IOException { //结束函数
// do nothing
}
//最重要的函数,为该taskTracker分配合适的task
public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException;
//根据队列名字获job列表
public abstract Collection<JobInProgress> getJobs(String queueName);
}
(2) JobTracker
JobTracker是Hadoop最核心的组件,它监控整个集群中的作业运行情况并对资源进行管理和调度。
每个TaskTracker每个3s(默认值,可配置)通过heartbeat向JobTracker汇报自己管理的机器的一些基本信息,包括内存使用量,内存剩余量,正在运行的task,空闲的slot数目等,一旦JobTracker发现该TaskTracker出现了空闲的slot,便会调用调度器中的AssignTasks方法为该TaskTracker分配task。
下面分析JobTracker调用TaskScheler的具体流程:
……
private final TaskScheler taskScheler; //声明调度器对象
……
public static JobTracker startTracker(JobConf conf, String identifier) {
…….
result = new JobTracker(conf, identifier);
result.taskScheler.setTaskTrackerManager(result); //设置调度器的manager
……
}
//创建调度器
JobTracker(JobConf conf, String identifier) {
……
// Create the scheler
Class<? extends TaskScheler> schelerClass
= conf.getClass("mapred.jobtracker.taskScheler",
JobQueueTaskScheler.class, TaskScheler.class);
taskScheler = (TaskScheler) ReflectionUtils.newInstance(schelerClass, conf);
…..
}
//run forever
public void offerService() {
……
taskScheler.start(); //启动调度器
……
}
。。。。。
HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean restarted,
boolean initialContact,
boolean acceptNewTasks,
short responseId) {
…….
// Check for new tasks to be executed on the tasktracker
if (recoveryManager.shouldSchele() && acceptNewTasks && !isBlacklisted) {
……
//使用调度器,为该taskTracker分配作业
tasks = taskScheler.assignTasks(taskTrackerStatus);
……
}
}
从上面的分析可以知道,Scheler和JobTracker之间会相互包含(实际上是组合模式),Scheler中要包含JobTracker(实际上就是TaskTrackerManager)对象,以便获取整个Hadoop集群的一些信息,如slot总数,QueueManager对象,添加JobInProgressListener以便增加或删除job时,通知Scheler;JobTracker中要包含Scheler对象,以便可以对每个TaskTracker分配task。
3. 编写Hadoop调度器
假设我们要编写一个新的调度器,为MyHadoopScheler,需要进行以下工作:
(1) 用户需要自己实现的类
@ MyHadoopSchelerConf:配置文件管理类,读取你自己的配置文件,并保存到合适的数据结构中,一般而言,这个类应该支持动态加载配置文件。
@ MyHadoopSchelerListener:编写自己的JobInProgressListener,并调用JobTracker的addJobInProgressListener(),将之加到系统的Listener队列中,以便系统中添加或删除job后,JobTracker可立刻告诉调度器。
@ MyHadoopScheler:调度器的核心实现算法
(2) 用户要用到的系统类
@ JobTracker:JobTracker在startTracker函数中,会将MyHadoopScheler的taskTrackerManager赋值为JobTracker对象,这样,在MyHadoopScheler中,可调用Jobracker中的所有public方法和成员变量,常用的有:
$ getClusterStatus():获取集群的状态,如tasktracker列表,map slot总数,rece slot总数,当前正在运行的map/rece task总数等
$ getQueueManager():如果MyHadoopScheler支持多队列,那么需要使用该方法获取QueueManager对象,通过该对象,会用可以获取系统的所有队列名称,每个队列的ACL(Access Control List),具体参考:http://hadoop.apache.org/common/docs/current/service_level_auth.html
$ killJob:可以调用该函数杀死某个job
$ killTask:如果调度器支持资源抢占,可调用该函数 杀死某个task以便进行资源抢占。
@ JobInprogress:用户向Hadoop中提交一个job后,Hadoop会为该job创建一个叫JobInProgress的对象,该对象中包含了job相关的基本信息,且它会伴随某个job的一生(与job共存亡)。该对象中包含的job信息有:该job包含的所有task的信息(如:正在运行的task列表,已经完成的task列表,尚未运行的task列表等),作业的优先级,作业的提交时间,开始运行时间,运行结束时间等信息。
在JobInprogress的task列表中,每个task以对象TaskInProgress的形式保存,该对象中包含了每个task的基本信息,包括:task要处理的数据split,task创建时间,task开始执行时间,task结束时间等信息。这些信息肯定会在调度器中使用。
@ JobConf
每个作业的运行参数和配置选项被保存到一个JobConf对象中,该对象包含了配置文件mapred-site.xml,core-site.xml和hdfs-site.xml设置的选项和该作业的特有属性(用户名,InputFormat,Mapper等),一般是以key/value的形式保存,比如:想获取当前用户名,可以这样:
JobConf conf;
…….
String username = conf.get("user.name");
用户也可以通过该对象传递一些自己定义的全局属性,如用户自己定义了一个属性叫mapred.job.deadline(作业的deadline时间),用户可以在提交作业时设定该值:
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt \
-D mapred.job.deadline=100000 \
input output
然后在调度器中这样获取该属性的值:
JobConf conf;
…….
int deadline=conf.getInt("mapred.job.deadline", -1); //获取mapred.job.deadline属性,如果没有设置,则返回-1
4. 总结
调度器是Hadoop的中枢,其重要性可想而知。用户如果要设计Hadoop调度器,需要对Hadoop的整个框架有比较深入的理解,同时需阅读一些很重要的类(如JobTracker和JobInprogress等)的源码,以便利用这些类完成你的调度算法。
Hadoop目前自带了三个比较常用的调度器,分别为JobQueueTaskScheler (FIFO,但队列调度器),Capacity Scheler(多队列多用户调度器)和Fair Scheler(多队列多用户调度器),它们是你学习Hadoop调度器的最好资料。
5. 参考资料
(1) Hadoop-0.20.2源代码
⑷ Hadoop读写文件时内部工作机制是怎样的
客户端通过调用FileSystem对象(对应于HDFS文件系统,调用DistributedFileSystem对象)的open()方法来打开文件(也即图中的第一步),DistributedFileSystem通过RPC(Remote Procere Call)调用询问NameNode来得到此文件最开始几个block的文件位置(第二步)。对每一个block来说,namenode返回拥有此block备份的所有namenode的地址信息(按集群的拓扑网络中与客户端距离的远近排序,关于在Hadoop集群中如何进行网络拓扑请看下面介绍)。如果客户端本身就是一个datanode(如客户端是一个maprece任务)并且此datanode本身就有所需文件block的话,客户端便从本地读取文件。
以上步骤完成后,DistributedFileSystem会返回一个FSDataInputStream(支持文件seek),客户端可以从FSDataInputStream中读取数据。FSDataInputStream包装了一个DFSInputSteam类,用来处理namenode和datanode的I/O操作。
客户端然后执行read()方法(第三步),DFSInputStream(已经存储了欲读取文件的开始几个block的位置信息)连接到第一个datanode(也即最近的datanode)来获取数据。通过重复调用read()方法(第四、第五步),文件内的数据就被流式的送到了客户端。当读到该block的末尾时,DFSInputStream就会关闭指向该block的流,转而找到下一个block的位置信息然后重复调用read()方法继续对该block的流式读取。这些过程对于用户来说都是透明的,在用户看来这就是不间断的流式读取整个文件。
当真个文件读取完毕时,客户端调用FSDataInputSteam中的close()方法关闭文件输入流(第六步)。
如果在读某个block是DFSInputStream检测到错误,DFSInputSteam就会连接下一个datanode以获取此block的其他备份,同时他会记录下以前检测到的坏掉的datanode以免以后再无用的重复读取该datanode。DFSInputSteam也会检查从datanode读取来的数据的校验和,如果发现有数据损坏,它会把坏掉的block报告给namenode同时重新读取其他datanode上的其他block备份。
这种设计模式的一个好处是,文件读取是遍布这个集群的datanode的,namenode只是提供文件block的位置信息,这些信息所需的带宽是很少的,这样便有效的避免了单点瓶颈问题从而可以更大的扩充集群的规模。
Hadoop中的网络拓扑
在Hadoop集群中如何衡量两个节点的远近呢?要知道,在高速处理数据时,数据处理速率的唯一限制因素就是数据在不同节点间的传输速度:这是由带宽的可怕匮乏引起的。所以我们把带宽作为衡量两个节点距离大小的标准。
但是计算两个节点之间的带宽是比较复杂的,而且它需要在一个静态的集群下才能衡量,但Hadoop集群一般是随着数据处理的规模动态变化的(且两两节点直接相连的连接数是节点数的平方)。于是Hadoop使用了一个简单的方法来衡量距离,它把集群内的网络表示成一个树结构,两个节点之间的距离就是他们离共同祖先节点的距离之和。树一般按数据中心(datacenter),机架(rack),计算节点(datanode)的结构组织。计算节点上的本地运算速度最快,跨数据中心的计算速度最慢(现在跨数据中心的Hadoop集群用的还很少,一般都是在一个数据中心内做运算的)。
假如有个计算节点n1处在数据中心c1的机架r1上,它可以表示为/c1/r1/n1,下面是不同情况下两个节点的距离:
• distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)
• distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)
• distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)
• distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)
如下图所示:
Hadoop
写文件
现在我们来看一下Hadoop中的写文件机制解析,通过写文件机制我们可以更好的了解一下Hadoop中的一致性模型。
Hadoop
上图为我们展示了一个创建一个新文件并向其中写数据的例子。
首先客户端通过DistributedFileSystem上的create()方法指明一个欲创建的文件的文件名(第一步),DistributedFileSystem再通过RPC调用向NameNode申请创建一个新文件(第二步,这时该文件还没有分配相应的block)。namenode检查是否有同名文件存在以及用户是否有相应的创建权限,如果检查通过,namenode会为该文件创建一个新的记录,否则的话文件创建失败,客户端得到一个IOException异常。DistributedFileSystem返回一个FSDataOutputStream以供客户端写入数据,与FSDataInputStream类似,FSDataOutputStream封装了一个DFSOutputStream用于处理namenode与datanode之间的通信。
当客户端开始写数据时(第三步),DFSOutputStream把写入的数据分成包(packet), 放入一个中间队列——数据队列(data queue)中去。DataStreamer从数据队列中取数据,同时向namenode申请一个新的block来存放它已经取得的数据。namenode选择一系列合适的datanode(个数由文件的replica数决定)构成一个管道线(pipeline),这里我们假设replica为3,所以管道线中就有三个datanode。DataSteamer把数据流式的写入到管道线中的第一个datanode中(第四步),第一个datanode再把接收到的数据转到第二个datanode中(第四步),以此类推。
DFSOutputStream同时也维护着另一个中间队列——确认队列(ack queue),确认队列中的包只有在得到管道线中所有的datanode的确认以后才会被移出确认队列(第五步)。
如果某个datanode在写数据的时候当掉了,下面这些对用户透明的步骤会被执行:
1)管道线关闭,所有确认队列上的数据会被挪到数据队列的首部重新发送,这样可以确保管道线中当掉的datanode下流的datanode不会因为当掉的datanode而丢失数据包。
2)在还在正常运行的datanode上的当前block上做一个标志,这样当当掉的datanode重新启动以后namenode就会知道该datanode上哪个block是刚才当机时残留下的局部损坏block,从而可以把它删掉。
3)已经当掉的datanode从管道线中被移除,未写完的block的其他数据继续被写入到其他两个还在正常运行的datanode中去,namenode知道这个block还处在under-replicated状态(也即备份数不足的状态)下,然后他会安排一个新的replica从而达到要求的备份数,后续的block写入方法同前面正常时候一样。
有可能管道线中的多个datanode当掉(虽然不太经常发生),但只要dfs.replication.min(默认为1)个replica被创建,我们就认为该创建成功了。剩余的replica会在以后异步创建以达到指定的replica数。
当客户端完成写数据后,它会调用close()方法(第六步)。这个操作会冲洗(flush)所有剩下的package到pipeline中,等待这些package确认成功,然后通知namenode写入文件成功(第七步)。这时候namenode就知道该文件由哪些block组成(因为DataStreamer向namenode请求分配新block,namenode当然会知道它分配过哪些blcok给给定文件),它会等待最少的replica数被创建,然后成功返回。
replica是如何分布的
Hadoop在创建新文件时是如何选择block的位置的呢,综合来说,要考虑以下因素:带宽(包括写带宽和读带宽)和数据安全性。如果我们把三个备份全部放在一个datanode上,虽然可以避免了写带宽的消耗,但几乎没有提供数据冗余带来的安全性,因为如果这个datanode当机,那么这个文件的所有数据就全部丢失了。另一个极端情况是,如果把三个冗余备份全部放在不同的机架,甚至数据中心里面,虽然这样数据会安全,但写数据会消耗很多的带宽。Hadoop 0.17.0给我们提供了一个默认replica分配策略(Hadoop 1.X以后允许replica策略是可插拔的,也就是你可以自己制定自己需要的replica分配策略)。replica的默认分配策略是把第一个备份放在与客户端相同的datanode上(如果客户端在集群外运行,就随机选取一个datanode来存放第一个replica),第二个replica放在与第一个replica不同机架的一个随机datanode上,第三个replica放在与第二个replica相同机架的随机datanode上。如果replica数大于三,则随后的replica在集群中随机存放,Hadoop会尽量避免过多的replica存放在同一个机架上。选取replica的放置位置后,管道线的网络拓扑结构如下所示:
Hadoop
总体来说,上述默认的replica分配策略给了我们很好的可用性(blocks放置在两个rack上,较为安全),写带宽优化(写数据只需要跨越一个rack),读带宽优化(你可以从两个机架中选择较近的一个读取)。
一致性模型
HDFS某些地方为了性能可能会不符合POSIX(是的,你没有看错,POSIX不仅仅只适用于linux/unix, Hadoop 使用了POSIX的设计来实现对文件系统文件流的读取 ),所以它看起来可能与你所期望的不同,要注意。
创建了一个文件以后,它是可以在命名空间(namespace)中可以看到的:
Path p = new Path("p");
fs.create(p);
assertThat(fs.exists(p), is(true));
但是任何向此文件中写入的数据并不能保证是可见的,即使你flush了已经写入的数据,此文件的长度可能仍然为零:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L));
这是因为,在Hadoop中,只有满一个block数据量的数据被写入文件后,此文件中的内容才是可见的(即这些数据会被写入到硬盘中去),所以当前正在写的block中的内容总是不可见的。
Hadoop提供了一种强制使buffer中的内容冲洗到datanode的方法,那就是FSDataOutputStream的sync()方法。调用了sync()方法后,Hadoop保证所有已经被写入的数据都被冲洗到了管道线中的datanode中,并且对所有读者都可见了:
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
out.sync();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
这个方法就像POSIX中的fsync系统调用(它冲洗给定文件描述符中的所有缓冲数据到磁盘中)。例如,使用java API写一个本地文件,我们可以保证在调用flush()和同步化后可以看到已写入的内容:
FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush(); // flush to operating system
out.getFD().sync(); // sync to disk (getFD()返回与该流所对应的文件描述符)
assertThat(localFile.length(), is(((long) "content".length())));
在HDFS中关闭一个流隐式的调用了sync()方法:
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));
由于Hadoop中的一致性模型限制,如果我们不调用sync()方法的话,我们很可能会丢失多大一个block的数据。这是难以接受的,所以我们应该使用sync()方法来确保数据已经写入磁盘。但频繁调用sync()方法也是不好的,因为会造成很多额外开销。我们可以再写入一定量数据后调用sync()方法一次,至于这个具体的数据量大小就要根据你的应用程序而定了,在不影响你的应用程序的性能的情况下,这个数据量应越大越好。
⑸ 在hadoop集群中,fifo调度算法具有哪些特点
首先介绍了Hadoop平台下作业的分布式运行机制,然后对Hadoop平台自带的4种任务调度器做分析和比较,最后在分析JobTracker类文件的基础上指出了创建自定义任务调度器所需完成的工作。首先Hadoop集群式基于单服务器的,只有一个服务器节点负责调度整个集群的作业运行,主要的具体工作是切分大数据量的作业,指定哪些Worker节点做Map工作、哪些Worker节点做Rece工作、与Worker节点通信并接受其心跳信号、作为用户的访问入口等等。其次,集群中的每个Worker节点相当于一个器官,运行着主节点所指派的具体作业。这些节点会被分为两种类型,一种是接收分块之后的作业并做映射工作。另一种是负责把前面所做的映射工作按照约定的规则做一个统计。Task-Tracker通过运行一个简单循环来定期地发送心跳信号(heartbeat)给JobTracker.这个心跳信号会把TaskTracker是否还在存活告知JobTracker,TaskTracker通过信号指明自己是否已经准备好运行新的任务.一旦TaskTracker已经准备好接受任务,JobTracker就会从作业优先级表中选定一个作业并分配下去.至于到底是执行Map任务还是Rece任务,是由TaskTracker的任务槽所决定的.默认的任务调度器在处理Rece任务之前,会优先填满空闲的Map任务槽.因此,如果TaskTracker满足存在至少一个空闲任务槽时,JobTracker会为它分配Map任务,否则为它选择一个Rece任务.TaskTracker在运行任务的时候,第一步是从共享文件系统中把作业的JAR文件复制过来,从而实现任务文件的本地化.第二步是TaskTracker为任务新建一个本地文件夹并把作业文件解压在此目录中.第三步是由Task-Tracker新建一个TaskRunner实例来运行该任务.Hadoop平台默认的调度方案就是JobQueueTaskScheler,这是一种按照任务到来的时间先后顺序而执行的调度策略.这种方式比较简单,JobTracker作为主控节点,仅仅是依照作业到来的先后顺序而选择将要执行的作业.当然,这有一定的缺陷,由于Hadoop平台是默认将作业运行在整个集群上的,那么如果一个耗时非常大的作业进入执行期,将会导致其余大量作业长时间得不到运行.这种长时间运行的优先级别并不高的作业带来了严重的作业阻塞,使得整个平台的运行效率处在较低的水平.Hadoop平台对这种FIFO(FirstINAndFirstOut)机制所给出的解决法是调用SetJobPriority()方法,通过设置作业的权重级别来做平衡调度.FairScheler是一种“公平”调度器,它的目标是让每个用户能够公平地共享Hadoop集群计算能力.当只有一个作业运行的时候,它会得到整个集群的资源.随着提交到作业表中作业的增多,Hadoop平台会把集群中空闲出来的时间槽公平分配给每个需要执行的作业.这样即便其中某些作业需要较长时间运行,平台仍然有能力让那些短作业在合理时间内完成[3].FairScheler支持资源抢占,当一个资源池在一定时段内没有得到公平共享时,它会终止该资源池所获得的过多的资源,同时把这些释放的资源让给那些资源不足的资源池.Hadoop平台中的CapacityScheler是由Yahoo贡献的,在调度器上,设置了三种粒度的对象:queue,job,task.在该策略下,平台可以有多个作业队列,每个作业队列经提交后,都会获得一定数量的TaskTracker资源.具体调度流程如下.(1)选择queue,根据资源库的使用情况从小到大排序,直到找到一个合适的job.(2)选择job,在当前所选定的queue中,按照作业提交的时间先后以及作业的权重优先级别进行排序,选择合适的job.当然,在job选择时还需要考虑所选作业是否超出目前现有的资源上限,以及资源池中的内存是否够该job的task用等因素.(3)选择task,根据本地节点的资源使用情况来选择合适的task.虽然Hadoop平台自带了几种调度器,但是上述3种调度方案很难满足公司复杂的应用需求.因此作为平台的个性化使用者,往往需要开发自己的调度器.Hadoop的调度器是在JobTracker中加载和调用的,因此开发一个自定义的调度器就必须搞清楚JobTracker类文件的内部机制.作为Hadoop平台的核心组件,JobTracker监控着整个集群的作业运行情况并对资源进行管理调度.每个Task-Tracker每隔3s通过heartbeat向JobTracker汇报自己管理的机器的一些基本信息,包括内存使用量、内存的剩余量以及空闲的slot数目等等[5].一旦JobTracker发现了空闲slot,便会调用调度器中的AssignTask方法为该TaskTracker分配task。
⑹ Hadoop到底是什么玩意
Hadoop到底是个啥?
答:Hadoop是基于廉价设备利用集群的威力对海量数据进行安全存储和高效计算的分布式存储和分析框架,Hadoop本身是一个庞大的项目家族,其核心 家族或者底层是HDFS和MapRece,HDFS和MapRece分别用来实现对海量数据的存储和分析,其它的项目,例如Hive、HBase 等都是基于HDFS和MapRece,是为了解决特定类型的大数据处理问题而提出的子项目,使用Hive、HBase等子项目可以在更高的抽象的基础上更简单的编写分布式大数据处理程序。Hadoop的其它子项目还包括Common, Avro, Pig, ZooKeeper, Sqoop, Oozie 等,随着时间的推移一些新的子项目会被加入进来,一些关注度不高的项目会被移除Hadoop家族,所以Hadoop是一个充满活力的系统。
Apache Hadoop: 是Apache开源组织的一个分布式计算开源框架,提供了一个分布式文件系统子项目(HDFS)和支持MapRece分布式计算的软件架构。
Apache Hive: 是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,通过类SQL语句快速实现简单的MapRece统计,不必开发专门的MapRece应用,十分适合数据仓库的统计分析。
ApachePig: 是一个基于Hadoop的大规模数据分析工具,它提供的SQL-LIKE语言叫Pig Latin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapRece运算。
ApacheHBase: 是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。
Apache Sqoop: 是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
Apache Zookeeper: 是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度,提供高性能的分布式服务 ApacheMahout:是基于Hadoop的机器学习和数据挖掘的一个分布式框架。Mahout用MapRece实现了部分数据挖掘算法,解决了并行挖掘的问题。
ApacheCassandra:是一套开源分布式NoSQL数据库系统。它最初由Facebook开发,用于储存简单格式数据,集Google BigTable的数据模型与AmazonDynamo的完全分布式的架构于一身 Apache Avro: 是一个数据序列化系统,设计用于支持数据密集型,大批量数据交换的应用。Avro是新的数据序列化格式与传输工具,将逐步取代Hadoop原有的IPC机制 ApacheAmbari: 是一种基于Web的工具,支持Hadoop集群的供应、管理和监控。
ApacheChukwa: 是一个开源的用于监控大型分布式系统的数据收集系统,它可以将各种各样类型的数据收集成适合 Hadoop 处理的文件保存在 HDFS 中供Hadoop 进行各种 MapRece 操作。
ApacheHama: 是一个基于HDFS的BSP(Bulk Synchronous Parallel)并行计算框架, Hama可用于包括图、矩阵和网络算法在内的大规模、大数据计算。
ApacheFlume: 是一个分布的、可靠的、高可用的海量日志聚合的系统,可用于日志数据收集,日志数据处理,日志数据传输。
ApacheGiraph: 是一个可伸缩的分布式迭代图处理系统, 基于Hadoop平台,灵感来自 BSP (bulk synchronous parallel) 和Google 的 Pregel。
ApacheOozie: 是一个工作流引擎服务器, 用于管理和协调运行在Hadoop平台上(HDFS、Pig和MapRece)的任务。
ApacheCrunch: 是基于Google的FlumeJava库编写的Java库,用于创建MapRece程序。与Hive,Pig类似,Crunch提供了用于实现如连接数据、执行聚合和排序记录等常见任务的模式库 ApacheWhirr: 是一套运行于云服务的类库(包括Hadoop),可提供高度的互补性。Whirr学支持Amazon EC2和Rackspace的服务。
ApacheBigtop: 是一个对Hadoop及其周边生态进行打包,分发和测试的工具。
ApacheHCatalog: 是基于Hadoop的数据表和存储管理,实现中央的元数据和模式管理,跨越Hadoop和RDBMS,利用Pig和Hive提供关系视图。
ClouderaHue: 是一个基于WEB的监控和管理系统,实现对HDFS,MapRece/YARN, HBase, Hive, Pig的web化操作和管理。
⑺ Hadoop:是什么,如何工作,可以用来做什么
Hadoop主要是分布式计算和存储的框架,所以Hadoop工作过程主要依赖于HDFS(Hadoop Distributed File System)分布式存储系统和Maprece分布式计算框架。
分布式存储系统HDFS中工作主要是一个主节点namenode(master)(hadoop1.x只要一个namenode节点,2.x中可以有多个节点)和若干个从节点Datanode(数据节点)相互配合进行工作,HDFS主要是存储Hadoop中的大量的数据,namenode节点主要负责的是:
1、接收client用户的操作请求,这种用户主要指的是开发工程师的Java代码或者是命令客户端操作。
2、维护文件系统的目录结构,主要就是大量数据的关系以及位置信息等。
3、管理文件系统与block的关系,Hadoop中大量的数据为了方便存储和管理主要是以block块(64M)的形式储存。一个文件被分成大量的block块存储之后,block块之间都是有顺序关系的,这个文件与block之间的关系以及block属于哪个datanode都是有namenode来管理。
Datanode的主要职责是:
1、存储文件。
2、将数据分成大量的block块。
3、为保证数据的安全,对数据进行备份,一般备份3份。当其中的一份出现问题时,将由其他的备份来对数据进行恢复。
MapRece主要也是一个主节点JOPtracker和testtracker组成,主要是负责hadoop中的数据处理过程中的计算问题。
joptracker主要负责接收客户端传来的任务,并且把计算任务交给很多testtracker工作,同时joptracker会不断的监控testtracker的执行情况。
testtracker主要是执行joptracker交给它的任务具体计算,例如给求大量数据的最大值,每个testtracker会计算出自己负责的数据中的最大值,然后交给joptracker。
Hadoop的主要两个框架组合成了分布式的存储和计算,使得hadoop可以很快的处理大量的数据。
⑻ 如何架构大数据系统 hadoop
大数据数量庞大,格式多样化。大量数据由家庭、制造工厂和办公场所的各种设备、互联网事务交易、社交网络的活动、自动化传感器、移动设备以及科研仪器等生成。它的爆炸式增长已超出了传统IT基础架构的处理能力,给企业和社会带来严峻的数据管理问题。因此必须开发新的数据架构,围绕“数据收集、数据管理、数据分析、知识形成、智慧行动”的全过程,开发使用这些数据,释放出更多数据的隐藏价值。
一、大数据建设思路
1)数据的获得
四、总结
基于分布式技术构建的大数据平台能够有效降低数据存储成本,提升数据分析处理效率,并具备海量数据、高并发场景的支撑能力,可大幅缩短数据查询响应时间,满足企业各上层应用的数据需求。
⑼ 怎么优化hadoop任务调度算法
首先介绍了Hadoop平台下作业的分布式运行机制,然后对Hadoop平台自带的4种任务调度器做分析和比较,最后在分析JobTracker类文件的基础上指出了创建自定义任务调度器所需完成的工作。
首先Hadoop集群式基于单服务器的,只有一个服务器节点负责调度整个集群的作业运行,主要的具体工作是切分大数据量的作业,指定哪些Worker节点做Map工作、哪些Worker节点做Rece工作、与Worker节点通信并接受其心跳信号、作为用户的访问入口等等。其次,集群中的每个Worker节点相当于一个器官,运行着主节点所指派的具体作业。这些节点会被分为两种类型,一种是接收分块之后的作业并做映射工作。另一种是负责把前面所做的映射工作按照约定的规则做一个统计。
Task-Tracker通过运行一个简单循环来定期地发送心跳信号(heartbeat)给JobTracker.这个心跳信号会把TaskTracker是否还在存活告知JobTracker,TaskTracker通过信号指明自己是否已经准备
好运行新的任务.一旦TaskTracker已经准备好接受任务,JobTracker就会从作业优先级表中选定一个作业并分配下去.至于到底是执行Map任务还是Rece任务,是由TaskTracker的任务槽所决定的.默认的任务调度器在处理Rece任务之前,会优先填满空闲的Map任务槽.因此,如果TaskTracker满足存在至少一个空闲任务槽时,JobTracker会为它分配Map任务,否则为它选择一个Rece任务.TaskTracker在运行任务的时候,第一步是从共享文件系统中把作业的JAR文件复制过来,从而实现任务文件的本地化.第二步是TaskTracker为任务新建一个本地文件夹并把作业文件解压在此目录中.第三步是由Task-Tracker新建一个TaskRunner实例来运行该任务.
Hadoop平台默认的调度方案就是JobQueueTaskScheler,这是一种按照任务到来的时间先后顺序而执行的调度策略.这种方式比较简单,JobTracker作为主控节点,仅仅是依照作业到来的先后顺序而选择将要执行的作业.当然,这有一定的缺陷,由于Hadoop平台是默认将作业运行在整个集群上的,那么如果一个耗时非常大的作业进入执行期,将会导致其余大量作业长时间得不到运行.这种长时间运行的优先级别并不高的作业带来了严重的作业阻塞,使得整个平台的运行效率处在较低的水平.Hadoop平台对这种FIFO(FirstINAndFirstOut)机制所给出的解决办法是调用SetJobPriority()方法,通过设置作业的权重级别来做平衡调度.
FairScheler是一种“公平”调度器,它的目标是让每个用户能够公平地共享Hadoop集群计算能力.当只有一个作业运行的时候,它会得到整个集群的资源.随着提交到作业表中作业的增多,Hadoop平台会把集群中空闲出来的时间槽公平分配给每个需要执行的作业.这样即便其中某些作业需要较长时间运行,平台仍然有能力让那些短作业在合理时间内完成[3].FairScheler支持资源抢占,当一个资源池在一定时段内没有得到公平共享时,它会终止该资源池所获得的过多的资源,同时把这些释放的资源让给那些资源不足的资源池.
Hadoop平台中的CapacityScheler是由Yahoo贡献的,在调度器上,设置了三种粒度的对象:queue,job,task.在该策略下,平台可以有多个作业队列,每个作业队列经提交后,都会获得一定数量的TaskTracker资源.具体调度流程如下.
(1)选择queue,根据资源库的使用情况从小到大排序,直到找到一个合适的job.
(2)选择job,在当前所选定的queue中,按照作业提交的时间先后以及作业的权重优先级别进行排序,选择合适的job.当然,在job选择时还需要考虑所选作业是否超出目前现有的资源上限,以及资源池中的内存是否够该job的task用等因素.
(3)选择task,根据本地节点的资源使用情况来选择合适的task.
虽然Hadoop平台自带了几种调度器,但是上述3种调度方案很难满足公司复杂的应用需求.因此作为平台的个性化使用者,往往需要开发自己的调度器.Hadoop的调度器是在JobTracker中加载和调用的,因此开发一个自定义的调度器就必须搞清楚JobTracker类文件的内部机制.作为Hadoop平台的核心组件,JobTracker监控着整个集群的作业运行情况并对资源进行管理调度.每个Task-Tracker每隔3s通过heartbeat向JobTracker汇报自己管理的机器的一些基本信息,包括内存使用量、内存的剩余量以及空闲的slot数目等等[5].一
旦JobTracker发现了空闲slot,便会调用调度器中的AssignTask方法为该TaskTracker分配task。
⑽ hadoop集群中,fifo调度算法具有哪些特点
首先介绍了Hadoop平台下作业的分布式运行机制,然后对Hadoop平台自带的4种任务调度器做分析和比较,最后在分析JobTracker类文件的基础上指出了创建自定义任务调度器所需完成的工作。首先Hadoop集群式基于单服务器的,只有一个服务器节点负责调度整个集群的作业运行,主要的具体工作是切分大数据量的作业,指定哪些Worker节点做Map工作、哪些Worker节点做Rece工作、与Worker节点通信并接受其心跳信号、作为用户的访问入口等等。其次,集群中的每个Worker节点相当于一个器官,运行着主节点所指派的具体作业。这些节点会被分为两种类型,一种是接收分块之后的作业并做映射工作。另一种是负责把前面所做的映射工作按照约定的规则做一个统计。Task-Tracker通过运行一个简单循环来定期地发送心跳信号(heartbeat)给JobTracker.这个心跳信号会把TaskTracker是否还在存活告知JobTracker,TaskTracker通过信号指明自己是否已经准备好运行新的任务.一旦TaskTracker已经准备好接受任务,JobTracker就会从作业优先级表中选定一个作业并分配下去.至于到底是执行Map任务还是Rece任务,是由TaskTracker的任务槽所决定的.默认的任务调度器在处理Rece任务之前,会优先填满空闲的Map任务槽.因此,如果TaskTracker满足存在至少一个空闲任务槽时,JobTracker会为它分配Map任务,否则为它选择一个Rece任务.TaskTracker在运行任务的时候,第一步是从共享文件系统中把作业的JAR文件复制过来,从而实现任务文件的本地化.第二步是TaskTracker为任务新建一个本地文件夹并把作业文件解压在此目录中.第三步是由Task-Tracker新建一个TaskRunner实例来运行该任务.Hadoop平台默认的调度方案就是JobQueueTaskScheler,这是一种按照任务到来的时间先后顺序而执行的调度策略.这种方式比较简单,JobTracker作为主控节点,仅仅是依照作业到来的先后顺序而选择将要执行的作业.当然,这有一定的缺陷,由于Hadoop平台是默认将作业运行在整个集群上的,那么如果一个耗时非常大的作业进入执行期,将会导致其余大量作业长时间得不到运行.这种长时间运行的优先级别并不高的作业带来了严重的作业阻塞,使得整个平台的运行效率处在较低的水平.Hadoop平台对这种FIFO(FirstINAndFirstOut)机制所给出的解决法是调用SetJobPriority()方法,通过设置作业的权重级别来做平衡调度.FairScheler是一种“公平”调度器,它的目标是让每个用户能够公平地共享Hadoop集群计算能力.当只有一个作业运行的时候,它会得到整个集群的资源.随着提交到作业表中作业的增多,Hadoop平台会把集群中空闲出来的时间槽公平分配给每个需要执行的作业.这样即便其中某些作业需要较长时间运行,平台仍然有能力让那些短作业在合理时间内完成[3].FairScheler支持资源抢占,当一个资源池在一定时段内没有得到公平共享时,它会终止该资源池所获得的过多的资源,同时把这些释放的资源让给那些资源不足的资源池.Hadoop平台中的CapacityScheler是由Yahoo贡献的,在调度器上,设置了三种粒度的对象:queue,job,task.在该策略下,平台可以有多个作业队列,每个作业队列经提交后,都会获得一定数量的TaskTracker资源.具体调度流程如下.(1)选择queue,根据资源库的使用情况从小到大排序,直到找到一个合适的job.(2)选择job,在当前所选定的queue中,按照作业提交的时间先后以及作业的权重优先级别进行排序,选择合适的job.当然,在job选择时还需要考虑所选作业是否超出目前现有的资源上限,以及资源池中的内存是否够该job的task用等因素.(3)选择task,根据本地节点的资源使用情况来选择合适的task.虽然Hadoop平台自带了几种调度器,但是上述3种调度方案很难满足公司复杂的应用需求.因此作为平台的个性化使用者,往往需要开发自己的调度器.Hadoop的调度器是在JobTracker中加载和调用的,因此开发一个自定义的调度器就必须搞清楚JobTracker类文件的内部机制.作为Hadoop平台的核心组件,JobTracker监控着整个集群的作业运行情况并对资源进行管理调度.每个Task-Tracker每隔3s通过heartbeat向JobTracker汇报自己管理的机器的一些基本信息,包括内存使用量、内存的剩余量以及空闲的slot数目等等[5].一旦JobTracker发现了空闲slot,便会调用调度器中的AssignTask方法为该TaskTracker分配task。