sparkhivesql
Ⅰ spark从hive数据仓库中读取的数据可以使用sparksql进行查询吗
1、为了让Spark能够连接到Hive的原有数据仓库,我们需要将Hive中的hive-site.xml文件拷贝到Spark的conf目录下,这样就可以通过这个配置文件找到Hive的元数据以及数据存放。
在这里由于我的Spark是自动安装和部署的,因此需要知道CDH将hive-site.xml放在哪里。经过摸索。该文件默认所在的路径是:/etc/hive/conf 下。
同理,spark的conf也是在/etc/spark/conf。
此时,如上所述,将对应的hive-site.xml拷贝到spark/conf目录下即可
如果Hive的元数据存放在Mysql中,我们还需要准备好Mysql相关驱动,比如:mysql-connector-java-5.1.22-bin.jar。
2、编写测试代码
val conf=new SparkConf().setAppName("Spark-Hive").setMaster("local")
val sc=new SparkContext(conf)
//create hivecontext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ") //这里需要注意数据的间隔符
sqlContext.sql("LOAD DATA INPATH '/user/liujiyu/spark/kv1.txt' INTO TABLE src ");
sqlContext.sql(" SELECT * FROM jn1").collect().foreach(println)
sc.stop()
3、下面列举一下出现的问题:
(1)如果没有将hive-site.xml拷贝到spark/conf目录下,会出现:
分析:从错误提示上面就知道,spark无法知道hive的元数据的位置,所以就无法实例化对应的client。
解决的办法就是必须将hive-site.xml拷贝到spark/conf目录下
(2)测试代码中没有加sc.stop会出现如下错误:
ERROR scheler.LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
在代码最后一行添加sc.stop()解决了该问题。
Ⅱ 源码级解读如何解决Spark-sql读取hive分区表执行效率低问题
问题描述
在开发过程中使用spark去读取hive分区表的过程中(或者使用hive on spark、nodepad开发工具),部分开发人员未注意添加分区属性过滤导致在执行过程中加载了全量数据,引起任务执行效率低、磁盘IO大量损耗等问题。
解决办法
1、自定义规则CheckPartitionTable类,实现Rule,通过以下方式创建SparkSession。
2、自定义规则CheckPartitionTable类,实现Rule,将规则类追加至Optimizer.batches: Seq[Batch]中,如下。
规则内容实现
1、CheckPartitionTable规则执行类,需要通过引入sparkSession从而获取到引入conf;需要继承Rule[LogicalPlan];
2、通过splitPredicates方法,分离分区谓词,得到分区谓词表达式。在sql解析过程中将谓词解析为TreeNode,此处采用递归的方式获取分区谓词。
3、判断是否是分区表,且是否添加分区字段。
4、实现Rule的apply方法
大数据和云计算的关系
大数据JUC面试题
大数据之Kafka集群部署
大数据logstsh架构
大数据技术kafka的零拷贝
Ⅲ Hive、SparkSQL是如何决定写文件的数量
Hive自身和Spark都提供了对Hive的SQL支持,用SQL的交互方式操作Hive底层的HDFS文件,两种方式在写文件的时候有一些区别:
Hive在通过SQL写文件是通过MapRece任务完成的,如下面这个例子:
在表中插入数据后,可以hdfs对应路径下找到存储的文件
可以看到插入生成了1个文件,这是因为每一条插入语句都会单独启动一个MapRece任务,一个MapRece任务对应一个结果文件。
当插入过程有shuffle时:
由 Hive实现group by的过程 可知,group by的时候会以group by的字段为key进行shuffle,即上例中的 game_id 字段。从执行日志中可以看到整个任务启用了62个mapper和1个recer,由于最终写数据的过程是在recer中完成,所以最终写数据的文件数量也应该只有1个。
注:Hive控制recer数量的规则如下:
Spark SQL也可以在hive中操作文件,执行命令
Hdfs中文件的存储如下:
可以发现即使是同一条语句,spark也会启动两个任务区并行的写文件,最终产生了两个文件结果。
Spark中同样以类似的SQL为例:
与Hive不同的是,Spark在执行shuffle过程的时候,会为每一个shuffle的key启动一个任务来写数据,上例中的key game_id 在源数据source_table的分布情况是共有26个不同的key。
因此spark会启动26个任务来写数据,在最终的结果文件中也应该有26个文件:
由于spark的写文件方式,会导致产生很多小文件,会对NameNode造成压力,读写性能变差,为了解决这种小文件问题,spark新的版本(笔者使用2.4.0.cloudera2版本)中支持了动态规划shuffle过程,需要配置spark.sql.adaptive.enabled属性。
在将spark.sql.adaptive.enabled属性设置为true后,spark写文件的结果为
从结果可以看到只有一个文件,这是由于动态规划的作用,在写文件的时候只启动了一个任务。动态规划的细节请参考 Adaptive Execution 让 Spark SQL 更高效更智能 。
Ⅳ Spark SQL(十):Hive On Spark
Hive是目前大数据领域,事实上的SQL标准。其底层默认是基于MapRece实现的,但是由于MapRece速度实在比较慢,因此这几年,陆续出来了新的SQL查询引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。
Spark SQL与Hive On Spark是不一样的。Spark SQL是Spark自己研发出来的针对各种数据源,包括Hive、JSON、Parquet、JDBC、RDD等都可以执行查询的,一套基于Spark计算引擎的查询引擎。因此它是Spark的一个项目,只不过提供了针对Hive执行查询的工功能而已,适合在一些使用Spark技术栈的大数据应用类系统中使用。
而Hive On Spark,是Hive的一个项目,它是将Spark作为底层的查询引擎(不通过MapRece作为唯一的查询引擎)。Hive On Spark,只适用于Hive,在可预见的未来,很有可能Hive默认的底层引擎就从MapRece切换为Spark了;适合于将原有的Hive数据仓库以及数据统计分析替换为Spark引擎,作为全公司通用的大数据统计分析引擎。
Hive On Spark做了一些优化:
1、Map Join
Spark SQL默认对join是支持使用broadcast机制将小表广播到各个节点上,以进行join的。但是问题是,这会给Driver和Worker带来很大的内存开销。因为广播的数据要一直保留在Driver内存中。所以目前采取的是,类似乎MapRece的Distributed Cache机制,即提高HDFS replica factor的复制因子,以让数据在每个计算节点上都有一个备份,从而可以在本地进行数据读取。
2、Cache Table
对于某些需要对一张表执行多次操作的场景,Hive On Spark内部做了优化,即将要多次操作的表cache到内存中,以便于提升性能。但是这里要注意,并不是对所有的情况都会自动进行cache。所以说,Hive On Spark还有很多不完善的地方。
Hive QL语句 =>
语法分析 => AST =>
生成逻辑执行计划 => Operator Tree =>
优化逻辑执行计划 => Optimized Operator Tree =>
生成物理执行计划 => Task Tree =>
优化物理执行计划 => Optimized Task Tree =>
执行优化后的Optimized Task Tree
Ⅳ spark1.6.3执行hivesql遇到left join 时报can't zip rdds
在spark 上跑hive sql 脚本,在spark 2.x以上跑没问题 ,hive上没问题,spark1.6上跑最后会报如图以上问题 ,墙内比较难找到对应解决办法 ,可以在脚本里配置一下配置,spark.sql.adaptive.enabled=true,默认值是false。设置为true的话,就是开启了Spark SQL自适应查询引擎,就是在运行时,通过一些统计指标来动态优化Spark sql的执行计划。
产生原因目前没有找到合适的解释,产生问题的地方很简单,就是两个表在left join 预估是因为数据倾斜导致 ,a left join b的时候 ,b表数据较小,大量数据关联不上,导致数据倾斜
Ⅵ spark SQL和hive到底什么关系
Hive是一种基于HDFS的数据仓库,并且提供了基于SQL模型的,针对存储了大数据的数据仓库,进行分布式交互查询的查询引擎。
SparkSQL并不能完全替代Hive,它替代的是Hive的查询引擎,SparkSQL由于其底层基于Spark自身的基于内存的特点,因此速度是Hive查询引擎的数倍以上,Spark本身是不提供存储的,所以不可能替代Hive作为数据仓库的这个功能。
SparkSQL相较于Hive的另外一个优点,是支持大量不同的数据源,包括hive、json、parquet、jdbc等等。SparkSQL由于身处Spark技术堆栈内,基于RDD来工作,因此可以与Spark的其他组件无缝整合使用,配合起来实现许多复杂的功能。比如SparkSQL支持可以直接针对hdfs文件执行sql语句。