hadoop存储小文件
❶ Hadoop HDFS处理大量的小文件
小文件是指文件大小明显小于HDFS上块(block)大小(默认64MB)的文件。如果存储小文件,必定会有大量这样的小文件,否则你也不会使用Hadoop(If you’re storing small files, then you probably have lots of them
(otherwise you wouldn’t turn to Hadoop)),这样的文件给hadoop的扩展性和性能带来严重问题。当一个文件的大小小于HDFS的块大小(默认64MB),就将认定为小文件否则就是大文件。为了检测输入文件的大小,可以浏览Hadoop DFS 主页 http://machinename:50070/dfshealth.jsp ,并点击Browse filesystem(浏览文件系统)。
首先,在HDFS中,任何一个文件,目录或者block在NameNode节点的内存中均以一个对象表示(元数据)(Every file, directory and block in HDFS is represented as an object in the namenode’s memory),而这受到NameNode物理内存容量的限制。每个元数据对象约占150byte,所以如果有1千万个小文件,每个文件占用一个block,则NameNode大约需要15G空间。如果存储1亿个文件,则NameNode需要150G空间,这毫无疑问1亿个小文件是不可取的。
其次,处理小文件并非Hadoop的设计目标,HDFS的设计目标是流式访问大数据集(TB级别)。因而,在HDFS中存储大量小文件是很低效的。访问大量小文件经常会导致大量的寻找,以及不断的从一个DatanNde跳到另一个DataNode去检索小文件(Reading through small files normally causes lots of seeks and lots of hopping from datanode to datanode to retrieve each small file),这都不是一个很有效的访问模式,严重影响性能。
最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。
Hadoop存档文件系统通常将HDFS中的多个文件打包成一个存档文件,减少namenode内存的使用
hadoop archive命令创建HAR文件
from:https://blog.csdn.net/sunnyyoona/article/details/53870077
❷ hadoop怎么保存配置文件
从安装Hadoop 单机版的配置说起
在这里配置了Hadoop的文件目录
1. 启动Hadoop 并上传文件
上传文件命令:hadoop fs -put hadoop-2.9.2.tar.gz hdfs://hdgroup01:9000/
可以看到已经上传了文件了
2. 删除本地文件
如果删除后你在浏览器刷新页面 然后还能下载 说明 肯定存在其则哪宽他地方
使用命令下载也可以:hadoop fs -get hdfs://hdgroup01:9000/hadoop-2.9.2.tar.gz
3. hdfs 文件存放的位置
通过查看缓液 安装时配置的hadoop文件目录 上传文件后的结果
这个dfs 应该见过的 就是在格式化namenode的时候 在我们配置的文件中创建了dfs 文件夹
4. 上传一个349.5M 的文件
5. 进到hadoop 本地存储文件中查看 存储格式
上传的文件为349.47M 被切分成了 三块 因为是单机版 所以都存放到了 这台主机的文件系统中
可以看到 hadoop 块大小为128M(默认) 超过128M的文件会被切成不同的块存放
总结
1. hdfs是是通过分布式集群来存储文件,为客户端提供了一个便捷的访问方式 虚拟的目录结构
2. 文件存储到hdfs集群中的时候是被切分成block的 默认为128M
3. 文件的block 存放在若干台datanode节点中
4. hdfs文件系统中的文件与真实的block之间有映射关系,由namenode管理
5. 每个block 在集孙亮群中会存储多个副本,好处是可以提高数据的可靠性,还可以提高访问的吞吐量。
❸ hadoop实验如果上传的是130M的文件,会放在几个块中
这个要看你用的hadoop版本,要是没记错2.7.2之前默认一个块是64MB,同时默岩备认副本是3个,所以130M会存放在9个块中,如果是2.7.2之后,默认一个块祥卜是128MB,默认副本数依然是3个,所以130M会放在6个块中。可以看到hdfs上最害怕的就是存放小文件,会很浪谨枣穗费空间,1k的文件也会占用一个块。
❹ 弱弱地问下:Hadoop为什么就不适合处理小文件
因为namenode在内存中存储hdfs中的文件信息。每个文件、目录或分区(block)需要大约150B,所以如果有很多小文件,那么缺唯namenode的内存将会承担橡扮判很大压力。比如有100万个文件,每个文件一个block,那么这就需要300M内存。若梁改文件数量达到十亿级,则没有足够大的内存来应付它了。
❺ 小文件会在hadoop里占用更多的磁盘空间吗
64M为一个block,采用HDFS,使用的空间根据你磁盘的大小,也可以在配置文件中设置的!
❻ 关于Hadoop文件块的大小
HDFS中的文件在物理上是分块存储的(Block),块的大小可以通过配置参数( dfs.blocksize )来设置,默认的块大小在Hadoop2.x版本中是 128M ,Hadoop1.x版本中是 64M 。
Tips:硬盘平均寻址时间的计算:
❼ Hadoop的优点和缺点是什么
Hadoop的优点:
1、Hadoop具有按位存储和处理数据能力的高可靠性。
2、Hadoop通过可用的计算机集群分配数据,完成存储和计算任务,这些集群可以方便地扩展到数以千计的节点中,具有高扩展性。
3、Hadoop能够在节点之间进行动态地移动数据,并保证各个节点的动态平衡,处理速度非常快,具有高效性。
4、Hadoop能够自动保存数据的多个副本,并且能够自动将失败的任务重新分配,具有高容错性。
Hadoop的缺点:
1、Hadoop不适用于低延迟数据访问。
2、Hadoop不能高效存储大量小文件。
3、Hadoop不支持多用户写入并任意修改文件。
关于大数据技术Hadoop学习哪些内容,青藤小编就和您分享到这里了。如果您对大数据工程有浓厚的兴趣,希望这篇文章可以为您提供帮助。如果您还想了解更多关于数据分析师、大数据工程师的技巧及素材等内容,可以点击本站的其他文章进行学习。
❽ 在hadoop项目结构中h dfs指的是什么
HDFS(Hadoop Distributed File System)是Hadoop项目的核心子项目,是分布式计算中数据存储管理的基础,是基于流数据模式访问和处理超大文件的需求而开发的,可以运行于廉价的商用服务器上。
HDFS 具有以下优点:
1、高容错性
数据自动保存多个副本。它通过增加副本的形式,提高容错性。某一个副本丢失以后,它可以自动恢复,这是由 HDFS 内部机制实现的,我们不必关心。
2、适合批处理
它是通过移动计算而不是移动数据。它会把数据位置暴露给计算框架。
3、适合大数据处理
处理数据达到 GB、TB、甚至PB级别的数据。能够处理百万规模以上的文件数量,数量相当之大。能够处理10K节点的规模。
4、流式文件访问
一次写入,多次读取。文件一旦写入不能修改,只能追加。它能保证数据的一致性。
5、可构建在廉价机器上
它通过多副本机制,提高可靠性。它提供了容错和恢复机制。比如某一个副本丢失,可以通过其它副本来恢复。
HDFS 也有它的劣势,并不适合所有的场合:
1、低延时数据访问
比如毫秒级的来存储数据,这是不行的,它做不到。它适合高吞吐率的场景,就是在某一时间内写入大量的数据。但是它在低延时的情况下是不行的,比如毫秒级以内读取数据,这样它是很难做到的。
2、小文件存储
存储大量小文件(这里的小文件是指小于HDFS系统的Block大小的文件(默认64M))的话,它会占用 NameNode大量的内存来存储文件、目录和块信息。这样是不可取的,因为NameNode的内存总是有限的。
3、并发写入、文件随机修改
一个文件只能有一个写,不允许多个线程同时写。仅支持数据 append(追加),不支持文件的随机修改。
❾ hadoop文件格式和压缩
Hadoop中的文件格式大致上分为面向行和面向列两类:
面向行:TextFile、SequenceFile、MapFile、Avro Datafile
二进制格式文件大小比文本文件大。
生产环境常用,作为原始表的存储格式,会占用更多磁盘资源,对它的 解析开销一般会比二进制格式高 几十倍以上。
Hadoop API 提供的一种二进制文件,它将数据以<key,value>的形式序列化到文件中。这种二进制文件内部使用Hadoop 的标准的Writable 接口实现序列化和反序列化。它与Hadoop API中的MapFile 是互相兼容的。
MapFile即为排序后的SequeneceFile,它会额外生成一个索引文件提供按键的查找。文件不支持复写操作,不能向已存在的SequenceFile(MapFile)追加存储记录,在执行文件写操作的时候,该文件是不可读取的。
Avro是一种用于支持数据密集型的二进制文件格式。它的文件格式更为紧凑,若要读取大量数据时,Avro能够提供更好的序列化和反序列化性能。并且Avro数据文件天生是带Schema定义的,所以它不需要开发者在API 级别实现自己的Writable对象。最近多个Hadoop 子项目都支持Avro 数据格式,如Pig 、Hive、Flume、Sqoop和Hcatalog。
面向列:Parquet 、RCFile、ORCFile
RCFile是Hive推出的一种专门面向列的数据格式。 它遵循“先按列划分,再垂直划分”的设计理念。当查询过程中,针对它并不关心的列时,它会在IO上跳过这些列。
ORCFile (Optimized Record Columnar File)提供了一种比RCFile更加高效的文件格式。其内部将数据划分为默认大小为250M的Stripe。每个Stripe包括索引、数据和Footer。索引存储每一列的最大最小值,以及列中每一行的位置。
Parquet 是一种支持嵌套结构的列式存储格式。Parquet 的存储模型主要由行组(Row Group)、列块(Column Chuck)、页(Page)组成。
1、行组,Row Group:Parquet 在水平方向上将数据划分为行组,默认行组大小与 HDFS Block 块大小对齐,Parquet 保证一个行组会被一个 Mapper 处理。
2、列块,Column Chunk:行组中每一列保存在一个列块中,一个列块具有相同的数据类型,不同的列块可以使用不同的压缩。
3、页,Page:Parquet 是页存储方式,每一个列块包含多个页,一个页是最小的编码的单位,同一列块的不同页可以使用不同的编码方式。
一般原始表数据使用文本格式存储,其他的都是列式存储。
目前在Hadoop中常用的几种压缩格式:lzo,gzip,snappy,bzip2,主要特性对比如下:
其性能对比如下:
2.1 lzo
hadoop中最流行的压缩格式,压缩/解压速度也比较快,合理的压缩率,支持split。适用于较大文本的处理。
对于lzo压缩,常用的有LzoCodec和lzopCodec,可以对sequenceFile和TextFile进行压缩。对TextFile压缩后,mapred对压缩后的文件默认是不能够进行split操作,需要对该lzo压缩文件进行index操作,生成lzo.index文件,map操作才可以进行split。如果设置LzoCodec,那么就生成.lzo后缀的文件,可以用LzoIndexer 进行支持split的index计算,如果设置LzopCodec,那么生成.lzo_deflate后缀的文件,不支持建立index。
❿ hadoop中存储文件系统hdfs的冗余机制是怎么进行的有什么特点
可以只用一行代码来运行MapRece作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:
1.JobClient 写代码,配置作业,提交作业。
2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。
3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Rece任务。
4.HDFS:保存作业数据、配置信息等,保存作业结果。
Map/Rece 作业总体执行流程:
代码编写 ----> 作业配置 ---->作业提交---->Map任务分配和执行---->处理中间结果----> Rece任务分配与执行----> 输出结果
而对于每个作业的执行,又包含:
输入准备---->任务执行---->输出结果
作业提交JobClient:
JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。
submitJob方法作业提交的过程:
1.向JobTracker请求一个新的JobId。
2.检查作业相关路径,如果路径不正确就会返回错误。
3.计算作业输入分片及其划分信息。
4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并
复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。
5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。
作业的初始化JobTracker:
JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheler)进行调度并对其初始化。Job初始化即创建一个作业对象。
当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。
初始化过程就是JobInProgress对象的initTasks方法进行初始化的。
初始化步骤:
1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。
2.创建并初始化map和rece任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.rece.tasks属性利用setNumReceTasks()方法设置rece task的数量,然后同map task创建方式。
3.最后就是创建两个初始化task,进行map和rece的初始化。
任务的分配JobTracker:
消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。
分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。
分配Map和Rece任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Rece任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Rece任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Rece任务分配很简单,jobtracker会简单的从待运行的rece任务列表中选取下一个来执行,不用考虑数据本地化。
任务的执行TaskTracker:
TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。
1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。
2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。
3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Rece就是ReceTaskRunner。
在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Rece任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。
对于单个Map,任务执行的简单流程是:
1.分配任务执行参数
2.在Child临时文件中添加map任务信息(Child是运行Map和Rece任务的主进程)
3.配置log文件夹,配置map任务的通信和输出参数
4.读取input split,生成RecordReader读取数据
5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。
6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。
Streaming和Pipes:
Streaming和Pipes都运行特殊的Map和Rece任务,目的是运行用户提供的可执行程序并与之通信。
Streaming:使用标准输入输出Streaming与进程进行通信。
Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。
进度和状态更新:
一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Rece进度、作业计数器值、状态消息。
状态消息与客户端的通信:
1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。
2.对于Rece:稍复杂,rece任务分三个阶段(每个阶段占1/3),复制、排序和Rece处理,若rece已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。
3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。
4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。
5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。
6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。
前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。
作业的完成:
当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。
如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。
jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。
失败
实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。
1.任务失败
子任务异常:如Map/Rece任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。
子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。
任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。
任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于rece任务,则由mapred.rece.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Rece任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.rece.failures.percent。
任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。
2.tasktracker失败
tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时rece任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。
即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。
3.jobtracker失败
老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。
作业调度:
早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。
Fair Scheler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheler属性配置(值为org.apache.hadoop.mapred.FairScheler)
Capacity Scheler:
集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。
shuffle和排序:
MapRece确保每个Recer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给recer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapRece的心脏。
整个shuffle的流程应该是这样:
map结果划分partition 排序sort 分割spill 合并同一划分 合并同一划分 合并结果排序 rece处理 输出
Map端:
写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。
写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。
合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。
在写磁盘前,线程首先根据数据最终要传递到的recer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。
内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。
如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给recer的数据更少。
写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给recer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。
recer获得文件分区的工作线程:recer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。
Rece端:
复制阶段:rece会定期向JobTracker获取map的输出位置,一旦拿到输出位置,rece就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。
合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。
Rece阶段:从合并的文件中顺序拿出一条数据进行rece函数处理,然后将结果输出到本地HDFS。
Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行rece任务。每个任务完成时间可能不同,但是只要有一个任务完成,rece任务就开始复制其输出,这就是rece任务的复制阶段( phase)。rece任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.rece.parallel.copies属性设置。
Recer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,recer中的一个线程定期询问jobtracker以便获知map输出位置。由于recer有可能失败,因此tasktracker并没有在第一个recer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。
如果map输出很小,则会被直接复制到rece tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)
或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。
随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。
排序阶段:复制阶段完成后,rece任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。
rece阶段:在最后rece阶段,会直接把排序好的文件输入rece函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。
细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入rece。