yarncontainer源码
1. Spark下Yarn-Cluster和Yarn-Client的区别
(1)SparkContext初始化不同,这也导致了Driver所在位置的不同,YarnCluster的Driver是在集群的某一台NM上,但是Yarn-Client就是在RM在机器上;
(2)而Driver会和Executors进行通信,这也导致了Yarn_cluster在提交App之后可以关闭Client,而Yarn-Client不可以;
(3)最后再来说应用场景,Yarn-Cluster适合生产环境,Yarn-Client适合交互和调试。
2. yarn container 是进程吗
container就是“一组资源”,现在是“内存+CPU”,未来还有可能把网络带宽之类的也包含进去。
当有一个Application(在MRV1里叫Job),第一个container就用来跑ApplicationMaster,然后ApplicationMaster再申请一些container来跑Mapper,之后再申请一些container来跑Recer。
container既可以跑Mapper也可以跑Recer,就不像MRV1里的map slot只能跑map、rece slot只能跑rece。
3. YARN到底是怎么一回事
YARN的编程模型
1:保证编程模型的向下兼容性,MRv2重用了MRv1的编程模型和数据处理引擎,但运行环境被重写。
2:编程模型与数据处理引擎
maprece应用程序编程接口有两套:新的API(mapred)和旧的API(maprece)
采用MRv1旧的API编写的程序可直接运行在MRv2上
采用MRv1新的API编写的程序需要使用MRv2编程库重新编译并修改不兼容的参数 和返回值
3:运行时环境
MRv1:Jobracker和Tasktracker
MRv2:YARN和ApplicationMaster
YARN的组成
yarn主要由ResourceManager,NodeManager,ApplicationMaster和Container等几个组件组成。
ResourceManager(RM)
RM是全局资源管理器,负责整个系统的资源管理和分配。
主要由两个组件组成:调度器和应用 程序管理器(ASM)
调度器
调度器根据容量,队列等限制条件,将系统中的资源分配给各个正在运行的应用程序
不负责具体应用程序的相关工作,比如监控或跟踪状态
不负责重新启动失败任务
资源分配单位用“资源容器”resource Container表示
Container是一个动态资源分配单位,它将内存,CPU,磁盘,网络等资源封装在一起,从而限定每个任务的资源量
调度器是一个可插拔的组件,用户可以自行设计
YARN提供了多种直接可用的调度器,比如fair Scheler和Capacity Scheler等。
应用程序管理器
负责管理整个系统中所有应用程序
ApplicationMaster(AM)
用户提交的每个应用程序均包含一个AM
AM的主要功能
与RM调度器协商以获取资源(用Container表示)
将得到的任务进一步分配给内部的任务
与NM通信以自动/停止任务
监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务
当前YARN自带了两个AM实现
一个用于演示AM编写方法的实例程序distributedshell
一个用于Maprece程序---MRAppMaster
其他的计算框架对应的AM正在开发中,比如spark等。
Nodemanager(NM)和Container
NM是每个节点上的资源和任务管理器
定时向RM汇报本节点上的资源使用情况和各个Container的运行状态
接收并处理来自AM的Container启动/停止等各种要求
Container是YARN中的资源抽象,它封装了某个节点上的多维度资源
YARN会为每个任务分配一个Container,且改任务只能使用该Container中描述的资源
Container不同于MRv1的slot,它是一个动态资源划分单位,是根据应用程序的需求动态产生的
YARN主要由以下几个协议组成
ApplicationClientProtocol
Jobclient通过该RPC协议提交应用才程序,查询应用程序状态等
Admin通过该协议更新系统配置文件,比如节点黑名单,用户队列权限等。
ApplicationMasterProtocol
AM通过该RPC协议想RM注册和撤销自己,并为各个任务申请资源
ContainerManagementProtocol
AM通过要求NM启动或者停止Container,获取各个Container的使用状态等信息
ResourceTracker
NM通过该RPC协议向RM注册,并定时发送心跳信息汇报当前节点的资源使用情况和Container运行状况
YARN的工作流程
文字描述一下这个过程:
1:由客户端提交一个应用,由RM的ASM接受应用请求
提交过来的应用程序包括哪些内容:
a:ApplicationMaster
b:启动Applicationmaster的命令
c:本身应用程序的内容
2:提交了三部分内容给RM,然后RM找NodeManager,然后
Nodemanager就启用Applicationmaster,并分配Container
接下来我们就要执行这个任务了,
3:但是执行任务需要资源,所以我们得向RM的ASM申请执行任务的资源(它会在RM这儿注册一下,说我已经启动了,注册了以后就可以通过RM的来管理,我们用户也可以通过RM的web客户端来监控任务的状态)ASM只是负责APplicationMaster的启用
4::我们注册好了后,得申请资源,申请资源是通过第四步,向ResourceScheler申请的
5:申请并领取资源后,它会找Nodemanager,告诉他我应经申请到了,然后Nodemanager判断一下,
6:知道他申请到了以后就会启动任务,当前启动之前会准备好环境,
7:任务启动以后会跟APplicationmaster进行通信,不断的心跳进行任务的汇报。
8:完成以后会给RM进行汇报,让RSM撤销注册。然后RSM就会回收资源。当然了,我们是分布式的,所以我们不会只跟自己的Nodemanager通信。也会跟其他的节点通信。
4. yarn appmaster得到的container是虚拟的还是实际的
container就是“一组资源”,现在是“内存+CPU”,未来还有可能把网络带宽之类的也包含进去。 当有一个Application(在MRV1里叫Job),第一个container就用来跑ApplicationMaster,然后ApplicationMaster再申请一些container来跑Mapper
5. 为什么我要选择使用Yarn来做Docker的调度引擎
可部署性
先说明下,这里探讨的是Yarn或者Mesos集群的部署,不涉其上的应用。Yarn除了依赖JDK,对操作系统没有任何依赖,基本上放上去就能
跑。Mesos因为是C/C++开发的,安装部署可能会有库依赖。
这点我不知道大家是否看的重,反正我是看的相当重的。软件就应该是下下来就可以Run。所以12年的时候我就自己开发了一套Java服务框架,开发完之后
运行个main方法就行。让应用包含容器,而不是要把应用丢到Tomcat这些容器,太复杂,不符合直觉。
二次开发
Yarn 对Java/Scala工程师而言,只是个Jar包,类似索引开发包Lucene,你可以把它引入项目,做任何你想要的包装。 这是其一。
其二,Yarn提供了非常多的扩展接口,很多实现都是可插拔。可替换的,在XML配置下,可以很方便的用你的实现替换掉原来的实现,没有太大的侵入性,所以就算是未来Yarn升级,也不会有太大问题。
相比较而言,Mesos更像是一个已经做好的产品,部署了可以直接用,但是对二次开发并不友好。
生态优势
Yarn 诞生于Hadoop这个大数据的“始作俑者”项目,所以在大数据领域具有先天优势。
底层天然就是分布式存储系统HDFS,稳定高效。
其上支撑了Spark、MR等大数据领域的扛顶之座,久经考验。
社区强大,最近发布版本也明显加快,对于长任务的支持也越来越优秀。
长任务支持
谈及长任务(long running
services)的支持,有人认为早先Yarn是为了支持离线短时任务的,所以可能对长任务的支持有限。其实大可不必担心,譬如现在基于其上的
Spark Streaming就是7x24小时运行的,跑起来也没啥问题。一般而言,要支持长任务,需要考虑如下几个点:
Fault tolerance,主要是AM的容错。
Yarn Security,如果开启了安全机制,令牌等的失效时间也是需要注意的。
日志收集到集群。
还有就是资源隔离和优先级。如果资源隔离做的太差,会对长时任务产生影响。
大家感兴趣可以先参考Jira。我看这个Jira 13年就开始了,说明这事很早就被重视起来了。下面我们队提到的几个点做下解释。
Fault tolerance
Yarn 自身高可用。目前Yarn的Master已经实现了HA。
AM容错,我看从2.4版本(看的源码,也可能更早的版本就已经支持)就已经支持 keep containers across
attempt
的选项了。什么意思呢?就是如果AM挂掉了,在Yarn重新启动AM的过程中,所有由AM管理的容器都会被保持而不会被杀掉。除非Yarn多次尝试都没办
法把AM再启动起来(默认两次)。 这说明从底层调度上来看,已经做的很好了。
日志收集到集群
日志收集在2.6版本已经是边运行边收集了。
资源隔离
资源隔离的话,Yarn做的不好,目前有效的是内存,对其他方面一直想做支持,但一直有限。这估计也是很多人最后选择Mesos的缘由。但是现在这
点优势Mesos其实已经荡然无存,因为Docker容器在资源隔离上已经做的足够好。Yarn和Docker一整合,就互补了。
小结
Mesos 和 Yarn 都是非常优秀的调度框架,各有其优缺点,弹性调度,统一的资源管理是未来平台的一个趋势,类似的这种资源管理调度框架必定会大行其道。
一些常见的误解
脱胎于Hadoop,继承了他的光环和生态,然而这也会给其带来一定的困惑,首先就是光环一直被Hadoop给盖住了,而且由于固有的惯性,大家会理所当然的认为Yarn只是Hadoop里的一个组件,有人会想过把Yarn拿出来单独用么?
然而,就像我在之前的一篇课程里,反复强调,Hadoop是一个软件集合,包含分布式存储,资源管理调度,计算框架三个部分。他们之间没有必然的关
系,是可以独立开来的。而Yarn
就是一个资源管理调度引擎,其一开始的设计目标就是为了通用,不仅仅是跑MR。现在基于Yarn之上的服务已经非常多,典型的比如Spark。
这里还有另外一个误区,MR目前基本算是离线批量的代名词,这回让人误以为Yarn也只是适合批量离线任务的调度。其实不然,我在上面已经给出了分析,Yarn 是完全可以保证长任务的稳定可靠的运行的。
如何基于Yarn开发分布式程序
本文不会具体教你如何使用Yarn的API,不过如果你想知道Yarn的API,但是又觉得官方文档太过简略,我这里倒是可以给出两个建议:
代码使用范例可以参看Spark Yarn相关的代码。算的上是一个非常精简的Yarn的adaptor。
买本Yarn相关的书,了解其体系结构也有助于你了解其API的设计。
接下来的内容会探讨以下两个主题:
基于Yarn开发分布式程序需要做的一些准备工作
基于Yarn开发容器调度系统的一些基本思路
基于Yarn开发分布式程序需要做的一些准备工作
肯定不能撸起袖子就开始干。开始动手前,我们需要知道哪些事情呢?
Yarn原生的API太底层,太复杂了
如果你想愉快的开发Yarn的应用,那么对Yarn的API进行一次封装,是很有必要的。
Yarn为了灵活,或者为了能够满足开发者大部分的需求,底层交互的API就显得比较原始了。自然造成开发难度很大。这个也不是我一个人觉得,现在
Apache的Twill,以及Hulu他们开发的时候Adaptor那一层,其实都是为了解决这个问题。那为什么我没有用Twill呢,第一是文档实在
太少,第二是有点复杂,我不需要这么复杂的东西。我觉得,Twill与其开发这么多功能,真的不如好好写写文档。
最好是能开发一个解决一类问题的Framework
Yarn只是一个底层的资源管理和调度引擎。一般你需要基于之上开发一套解决特定问题的Framework。以Spark为例,他是解决分布式计算
相关的一些问题。而以我开发的容器调度程序,其实是为了解决动态部署Web应用的。在他们之上,才是你的应用。比如你要统计日志,你只要在Spark上开
发一个Application 。 比如你想要提供一个推荐系统,那么你只要用容器包装下,就能被容器调度程序调度部署。
所以通常而言,基于Yarn的分布式应用应该符合这么一个层次:
Yarn -> Adapter -> Framework -> Application
Adapter 就是我第一条说的,你自个封装了Yarn的API。 Framework就是解决一类问题的编程框架,Application才是你真正要解决业务的系统。通过这种解耦,各个层次只要关注自己的核心功能点即可。
保证你上层的Framework/Application可以移植
Spark是个典型,他可以跑在Mesos上,也可以跑在Yarn上,还可以跑在自己上面(Standalone),实时上,泡在Yarn上的,以及跑Standalone模式的,都挺多的。这得益于Spark本身不依赖于底层的资源管理调度引擎。
这其实也是我上面说的两条带来的好处,因为有了Adaptor,上层的Framework可以不用绑死在某个资源调度引擎上。而Framework则可以让Applicaiton 无需关注底层调度的事情,只要关注业务即可。
另外,你费尽心机开发的Framework上,你自然是希望它能跑在更多的平台上,已满足更多的人的需求,对吧。
基于Yarn开发容器调度系统的一些基本思路
首先我们需要了解两个概念:
哑应用。所谓哑应用指的是无法和分布式系统直接进行交互,分布式系统也仅仅透过容器能进行生命周期的控制,比如关闭或者开启的应用。典型的比如MySQL、Nginx等这些基础应用。他们一般有自己特有的交互方式,譬如命令行或者socket协议或者HTTP协议。
伴生组件。因为有了哑应用的存在,分布式系统为了能够和这些应用交互,需要有一个代理。而这个代理和被代理的哑应用,具有相同的生命周期。典型
的比如,某个服务被关停后,该事件会被分布式系统获知,分布式系统会将该事件发送给Nginx的伴生组件,伴生组件转化为Nginx能够识别的指令,将停
止的服务从Nginx的ProxyBackend列表中剔除。
在容器调度系统中,如果Yarn的NodeManager直接去管理Docker则需要Yarn本身去做支持,我觉得这是不妥的。Yarn的职责就是做好资源管理,分配,调度即可,并不需要和特定的某个技术耦合,毕竟Yarn是一个通用型的资源调度管理框架。
那基于上面的理论,我们基于Yarn,开发一套框架,这个框架会是典型的 master-slave结构(这是Yarn决定的)。这个框架的 slaves 其实都是Docker 的伴生对象。master 可以通过这些Slave 对容器实现间接的管理。
我们简单描述下他们的流程:
用户提交Application,申请资源;
Yarn启动Framework的master;
Yarn启动Framework的slave;
slave 连接上master,并且发送心跳,从而master知道slave的状况slave启动Docker,slave与被启动的这个docker container 一一对应;
slave定时监控Container;
slave发现container crash,slave自动退出,Yarn获得通知,收回资源;
master发现有节点失败,发出新的节点要求,重新在另外一台服务器上启动slave,重复从2开始的步骤。
这里还有一个问题,如果slave 被正常杀掉,可以通过JVM ShudownHook 顺带把Container也关掉。
但是如果slave被kill -9
或者异常crash掉了,那么就可能导致资源泄露了。目前是这个信息是由master上报给集群管理平台,该平台会定时清理。你也可以存储该信息,譬如放
到Redis或者MySQL中,然后启动后台清理任务即可。
了解了这个思路后,具体实施就变得简单了,就是开发一个基于Yarn的master-slave程序即可,然后slave去管理对应的Docker容器,包括接受新的指令。master提供管理界面展示容器信息,运行状态即可。
当然,你还可以再开发一套Framework B专门和Nginx交互,这样比如上面的系统做了节点变更,通知B的master,然后B的master 通过自己的伴生组件Slave 完成Nginx的更新,从而实现后端服务的自动变更和通知。
6. 怎样获得在yarn框架上运行jar包的执行结果
配置方法
(1) 首先需要确保spark在1.1.0以上的版本。
(2) 在HDFS上建立一个公共lib库,比如/system/spark-lib/,设置权限为755。把spark-assembly-*.jar上传到公共lib库中。
(3) 在spark-env.sh中配置:
view plain to clipboardprint?
<span style="font-size:14px;">spark.yarn.jar hdfs://yarncluster/system/spark_lib/spark-assembly-1.1.0-hadoop2.3.0-cdh5.1.0.jarspark.yarn.preserve.staging.files false</span>
**spark.yarn.jar配置成HDFS上的公共lib库中的jar包。这个配置项会使提交job时,不是从本地上传spark-assembly*.jar包,而是从HDFS的一个目录复制到另一个目录(不确定HDFS上的复制是怎么操作的),总的来说节省了一点时间。(网上有的文章里说,这里的配置,会节省掉上传jar包的步骤,其实是不对的,只是把从本地上传的步骤改成了在HDFS上的复制操作。)
**spark.yarn.preserve.staging.files: 这个配置项配置成false,表示在执行结束后,不保留staging files,也就是两个jar包。然后HDFS上的.sparkStaging下的两个jar包在作业执行完成后就会被删除。如果配置成true,执行完后HDFS上的.sparkStaging下两个jar包都会保存下来。
然后再运行,发现HDFS上.sparkStaging目录下不会再保留jar包。
问题定位
按道理来说,因为spark.yarn.preserve.staging.files默认是false,所以HDFS上的jar包是不会被保留的。但是在spark1.0.2中,却没有删除。我看了下1.0.2的代码,删除的机制是存在的:
//yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
view plain to clipboardprint?
<span style="font-size:14px;"><span style="font-family:Microsoft YaHei;font-size:12px;"> /** * Clean up the staging directory. */ private def cleanupStagingDir() { var stagingDirPath: Path = null try { val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean if (!preserveFiles) { stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) if (stagingDirPath == null) { logError("Staging directory is null") return } logInfo("Deleting staging directory " + stagingDirPath) fs.delete(stagingDirPath, true) } } catch { case ioe: IOException => logError("Failed to cleanup staging dir " + stagingDirPath, ioe) } }</span></span>
按照这个逻辑,默认在AM关闭的时候,是会删除HDFS上的jar包的。不过没有正常删除。推测这应该是一个1.0.2里面的bug,而在1.1.0里面已经修复。
nodemanager节点上的jar包缓存
升级到1.1.0版本后,HDFS上的jar包问题就解决了。但是nodemanager节点上的jar包还是会保留。这个问题的定位很纠结,不过结果却出乎意料的简单。不说了,上结果吧。
配置方法
(1) 配置yarn-site.xml:
<span style="font-family:Microsoft YaHei;font-size:12px;"> <property>
<name>yarn.nodemanager.local-dirs</name>
<value>local-dir1, local-dir2,local-dir3</value>
</property>
<property>
<name>yarn.nodemanager.localizer.cache.target-size-mb</name>
<value>1024</value>
</property>
<property>
<name>yarn.nodemanager.localizer.cache.cleanup.interval-ms</name>
<value>1800000</value>
</property></span>
**yarn.nodemanager.local-dirs: 这个目录是nodemanager上的作业中间数据存放路径。推荐配置多个盘上的多个路径,从而分散作业执行中的磁盘IO压力。
**yarn.nodemanager.localizer.cache.target-size-mb:配置nodemanager上的缓存目录的最大限度。nodemanager上有一个deletion server服务,会定期检测,如果yarn.nodemanager.local-dirs中配置的目录大小(如果配置了多个,则计算多个目录的总大小)是否超过了这里设置的最大限度值。如果超过了,就删除一些已经执行完的container的缓存数据。
因为spark提交作业后遗留在nodemanager上的jar包就在yarn.nodemanager.local-dirs下面,所以只要这里配置合适的大小值。那么nodemanager上的deletion server是会自动检测并保证目录总大小的。所以只要配置了这个量,我们就不需要再担心nodemanager上的jar包缓存问题了,交给yarn就好了!很简单啊有木有,可就这么个问题,居然花了我一个星期的时间去定位。
**yarn.nodemanager.localizer.cache.cleanup.interval-ms: deletion server多长时间做一次检测,并且清除缓存目录直到目录大小低于target-size-mb的配置。
通过上面这三个量的配置,nodemanager会确保本地的缓存数据总量在target-size-mb之下,也就是超过了的话,之前的spark的jar包就会被删除。所以我们就不需要再担心nodemanager节点上的spark jar包缓存问题了。不过target-size-mb的默认值是10G,这个值当然可以根据你的实际情况进行调整。
7. 如何理解 yarn 的 container
container就是“一组资源”,现在是“内存+CPU”,未来还有可能把网络带宽之类的也包含进去。
当有一个Application(在MRV1里叫Job),第一个container就用来跑ApplicationMaster,然后ApplicationMaster再申请一些container来跑Mapper,之后再申请一些container来跑Recer。
container既可以跑Mapper也可以跑Recer,就不像MRV1里的map slot只能跑map、rece slot只能跑rece。