spark源码语言
Ⅰ 源码级解读如何解决Spark-sql读取hive分区表执行效率低问题
问题描述
在开发过程中使用spark去读取hive分区表的过程中(或者使用hive on spark、nodepad开发工具),部分开发人员未注意添加分区属性过滤导致在执行过程中加载了全量数据,引起任务执行效率低、磁盘IO大量损耗等问题。
解决办法
1、自定义规则CheckPartitionTable类,实现Rule,通过以下方式创建SparkSession。
2、自定义规则CheckPartitionTable类,实现Rule,将规则类追加至Optimizer.batches: Seq[Batch]中,如下。
规则内容实现
1、CheckPartitionTable规则执行类,需要通过引入sparkSession从而获取到引入conf;需要继承Rule[LogicalPlan];
2、通过splitPredicates方法,分离分区谓词,得到分区谓词表达式。在sql解析过程中将谓词解析为TreeNode,此处采用递归的方式获取分区谓词。
3、判断是否是分区表,且是否添加分区字段。
4、实现Rule的apply方法
大数据和云计算的关系
大数据JUC面试题
大数据之Kafka集群部署
大数据logstsh架构
大数据技术kafka的零拷贝
Ⅱ Spark源码分析之SparkSubmit的流程
本文主要对SparkSubmit的任务提交流程源码进行分析。 Spark源码版本为2.3.1。
首先阅读一下启动脚本,看看首先加载的是哪个类,我们看一下 spark-submit 启动脚本中的具体内容。
可以看到这里加载的类是org.apache.spark.deploy.SparkSubmit,并且把启动相关的参数也带过去了。下面我们跟一下源码看看整个流程是如何运作的...
SparkSubmit的main方法如下
这里我们由于我们是提交作业,所有会走上面的submit(appArgs, uninitLog)方法
可以看到submit方法首先会准备任务提交的环境,调用了prepareSubmitEnvironment,该方法会返回四元组,该方法中会调用doPrepareSubmitEnvironment,这里我们重点注意 childMainClass类具体是什么 ,因为这里涉及到后面启动我们主类的过程。
以下是doPrepareSubmitEnvironment方法的源码...
可以看到该方法首先是解析相关的参数,如jar包,mainClass的全限定名,系统配置,校验一些参数,等等,之后的关键点就是根据我们 deploy-mode 参数来判断是如何运行我们的mainClass,这里主要是通过childMainClass这个参数来决定下一步首先启动哪个类。
childMainClass根据部署模型有不同的值:
之后该方法会把准备好的四元组返回,我们接着看之前的submit方法
可以看到这里最终会调用doRunMain()方法去进行下一步。
doRunMain的实现如下...
doRunMain方法中会判断是否需要一个代理用户,然后无论需不需要都会执行runMain方法,我们接下来看看runMain方法是如何实现的。
这里我们只假设以集群模式启动,首先会加载类,将我们的childMainClass加载为字节码对象mainClass ,然后将mainClass 映射成SparkApplication对象,因为我们以集群模式启动,那么上一步返回四元组中的childMainClass的参数为ClientApp的全限定名,而这里会调用app实例的start方法因此,这里最终调用的是ClientApp的start方法。
ClientApp的start方法如下...
可以看到这里和之前我们的master启动流程有些相似。
可以参考我上一篇文章 Spark源码分析之Master的启动流程 对这一流程加深理解。
首先是准备rpcEnv环境,之后通过master的地址获取masterEndpoints端点相关信息,因为这里运行start方法时会将之前配置的相关参数都传进来,之后就会通过rpcEnv注册相关clientEndPoint端点信息,同时需要注意,这里会把masterEndpoints端点信息也作为构造ClientEndpoint端点的参数,也就是说这个ClientEndpoint会和masterEndpoints通信。
而在我上一篇文章中说过,只要是setupEndpoint方法被调用,一定会调用相关端点的的onStart方法,而这会调用clientEndPoint的onStart方法。
ClientEndPoint类中的onStart方法会匹配launch事件。源码如下
onStart中匹配我们的launch的过程,这个过程是启动driverWrapper的过程,可以看到上面源码中封装了mainClass ,该参数对应DriverWrapper类的全限定名,之后将mainClass封装到command中,然后封装到driverDescription中,向Master申请启动Driver。
这个过程会向Mster发送消息,是通过rpcEnv来实现发射消息的,而这里就涉及到outbox信箱,会调用postToOutbox方法,向outbox信箱中添加消息,然后通过TransportClient的send或sendRpc方法发送消息。发件箱以及发送过程是在同一个线程中进行。
而细心的同学会注意到这里调用的方法名为SendToMasterAndForwardReply,见名之意,发送消息到master并且期待回应。
下面是rpcEnv来实现向远端发送消息的一个调用流程,最终会通过netty中的TransportClient来写出。
之后,Master端会触发receiveAndReply函数,匹配RequestSubmitDriver样例类,完成模式匹配执行后续流程。
可以看到这里首先将Driver信息封装成DriverInfo,然后添加待调度列表waitingDrivers中,然后调用通用的schele函数。
由于waitingDrivers不为空,则会走LaunchDriver的流程,当前的application申请资源,这时会向worker发送消息,触发Worker的receive方法。
Worker的receive方法中,当Worker遇到LaunchDriver指令时,创建并启动一个DriverRunner,DriverRunner启动一个线程,异步的处理Driver启动工作。这里说启动的Driver就是刚才说的org.apache.spark.deploy.worker.DriverWrapper
可以看到上面在DriverRunner中是开辟线程异步的处理Driver启动工作,不会阻塞主进程的执行,而prepareAndRunDriver方法中最终调用 runDriver..
runDriver中主要先做了一些初始化工作,接着就开始启动driver了。
上述Driver启动工作主要分为以下几步:
下面我们直接看DriverWrapper的实现
DriverWrapper,会创建了一个RpcEndpoint与RpcEnv,RpcEndpoint为WorkerWatcher,主要目的为监控Worker节点是否正常,如果出现异常就直接退出,然后当前的ClassLoader加载userJar,同时执行userMainClass,在执行用户的main方法后关闭workerWatcher。
以上就是SparkSubmit的流程,下一篇我会对SparkContext的源码进行解析。
欢迎关注...
Ⅲ Spark使用的语言是什么
Spark的框架使用Scala语言编写的,简洁而优雅;
Spark的开发目前主要可以使用三种语言:Scala、Java、Python
Ⅳ 可能是全网最详细的 Spark Sql Aggregate 源码剖析
纵观 Spark Sql 源码,聚合的实现是其中较为复杂的部分,本文希望能以例子结合流程图的方式来说清楚整个过程。这里仅关注 Aggregate 在物理执行计划相关的内容,之前的 parse、analyze 及 optimize 阶段暂不做分析。在 Spark Sql 中,有一个专门的 Aggregation strategy 用来处理聚合,我们先来看看这个策略。
本文暂不讨论 distinct Aggregate 的实现(有兴趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我们来看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理执行计划的
创建聚合分为两个阶段:
AggregateExpression 共有以下几种 mode:
Q:是否支持使用 hash based agg 是如何判断的?
摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd
为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为
这样就能进入 HashAggregateExec 的分支
构造函数主要工作就是对 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 进行了初始化
在 enable code gen 的情况下,会调用 HashAggregateExec#inputRDDs 来生成 RDD,为了分析 HashAggregateExec 是如何生成 RDD 的,我们设置 spark.sql.codegen.wholeStage 为 false 来 disable code gen,这样就会调用 HashAggregateExec#doExecute 来生成 RDD,如下:
可以看到,关键的部分就是根据 child.execute() 生成的 RDD 的每一个 partition 的迭代器转化生成一个新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各个 partition。由于 TungstenAggregationIterator 涉及内容非常多,我们单开一大节来进行介绍。
此迭代器:
注:UnsafeKVExternalSorter 的实现可以参考:
UnsafeRow 是 InternalRow(表示一行记录) 的 unsafe 实现,由原始内存(byte array)而不是 Java 对象支持,由三个区域组成:
使用 UnsafeRow 的收益:
构造函数的主要流程已在上图中说明,需要注意的是:当内存不足时(毕竟每个 grouping 对应的 agg buffer 直接占用内存,如果 grouping 非常多,或者 agg buffer 较大,容易出现内存用尽)会从 hash based aggregate 切换为 sort based aggregate(会 spill 数据到磁盘),后文会进行详述。先来看看最关键的 processInputs 方法的实现
上图中,需要注意的是:hashMap 中 get 一个 groupingKey 对应的 agg buffer 时,若已经存在该 buffer 则直接返回;若不存在,尝试申请内存新建一个:
上图中,用于真正处理一条 row 的 AggregationIterator#processRow 还需进一步展开分析。在此之前,我们先来看看 AggregateFunction 的分类
AggregateFunction 可以分为 DeclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。
DeclarativeAggregate 是一类直接由 Catalyst 中的 Expressions 构成的聚合函数,主要逻辑通过调用 4 个表达式完成,分别是:
我们再次以容易理解的 Count 来举例说明:
通常来讲,实现一个基于 Expressions 的 DeclarativeAggregate 函数包含以下几个重要的组成部分:
再来看看 AggregationIterator#processRow
AggregationIterator#processRow 会调用
生成用于处理一行数据(row)的函数
说白了 processRow 生成了函数才是直接用来接受一条 input row 来更新对应的 agg buffer,具体是根据 mode 及 aggExpression 中的 aggFunction 的类型调用其 updateExpressions 或 mergeExpressions 方法:
比如,对于 aggFunction 为 DeclarativeAggregate 类型的 Partial 下的 Count 来说就是调用其 updateExpressions 方法,即:
对于 Final 的 Count 来说就是调用其 mergeExpressions 方法,即:
对于 aggFunction 为 ImperativeAggregate 类型的 Partial 下的 Collect 来说就是调用其 update 方法,即:
对于 Final 的 Collect 来说就是调用其 merge 方法,即:
我们都知道,读取一个迭代器的数据,是要不断调用 hasNext 方法进行 check 是否还有数据,当该方法返回 true 的时候再调用 next 方法取得下一条数据。所以要知道如何读取 TungstenAggregationIterator 的数据,就得分析其这两个方法。
分为两种情况,分别是:
Agg 的实现确实复杂,本文虽然篇幅已经很长,但还有很多方面没有 cover 到,但基本最核心、最复杂的点都详细介绍了,如果对于未 cover 的部分有兴趣,请自行阅读源码进行分析~
Ⅳ Spark支持通过GO语言编写程序吗
最新官方api接口,没有go,所以不支持。