Hive常用性能优化方法实践全面总结
Apache Hive作为处理大数据量的大数据领域数据建设核心工具,数据量往往不是影响Hive执行效率的核心因素,数据倾斜、job数分配的不合理、磁盘或网络I/O过高、MapReduce配置的不合理等等才是影响Hive性能的关键。
列裁剪和分区裁剪
SELECT age, name FROM people WHERE age > 30;
在实施此项查询中,people表有3列(age,name,address),Hive只读取查询逻辑中真正需要的两列age、name,而忽略列address;这样做节省了读取开销,中间表存储开销和数据整合开销。
同理,对于Hive分区表的查询,我们在写SQL时,通过指定实际需要的分区,可以减少不必要的分区数据扫描【当Hive表中列很多或者数据量很大时,如果直接使用select * 或者不指定分区,效率会很低下(全列扫描和全表扫描)】。
谓词下推
在关系型数据库如MySQL中,也有谓词下推(Predicate Pushdown,PPD)的概念。它就是将SQL语句中的where谓词逻辑都尽可能提前执行,减少下游处理的数据量。
如下Hive SQL语句:
select
a.*,
b.*
from
a join b on (a.id = b.id)
where a.id > 15 and b.num > 16;
使用谓词下推,那么where条件会在join之前被处理,参与join的数据量减少,提升效率。
在Hive中,可以通过将参数hive.optimize.ppd设置为true,启用谓词下推。与它对应的逻辑优化器是PredicatePushDown。该优化器就是将OperatorTree中的FilterOperator向上提,见下图:
Hive join优化
关于Hive join,参考文章:《Hive join优化》。
>> hive.fetch.task.conversion
虽然Hive底层可以将Hive SQL转化为MapReduce执行,但有些情况不使用MapReduce处理效率跟高。比如对于如下SQL:
SELECT name FROM people;
在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,老版本hive默认是minimal,该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce。
<property>
<name>hive.fetch.task.conversion</name>
<value>more</value>
<description>
Expects one of [none, minimal, more].
Some select queries can be converted to single FETCH task minimizing latency.
Currently the query should be single sourced not having any subquery and should not have
any aggregations or distincts (which incurs RS), lateral views and joins.
0. none : disable hive.fetch.task.conversion
1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
</description>
</property>
将hive.fetch.task.conversion设置成none,在Hive shell中执行如下语句,都会执行MapReduce程序。
hive> set hive.fetch.task.conversion=none;
hive> select name from people;
hive> select * from people;
把hive.fetch.task.conversion设置成more,然后执行如下语句,如下查询方式都不会执行MapReduce程序。
hive> set hive.fetch.task.conversion=more;
hive> select name from people;
hive> select * from people;
>> group by
此外,通过hive.groupby.mapaggr.checkinterval参数可以设置map端预聚合的条数阈值,超过该值就会分拆job,默认值100000。
但是,相对于正常的任务执行,该参数配置为true时会多启动一个MR job,会增加开销,单纯依赖它解决数据倾斜并不能从根本上解决问题。因此,建议分析数据、Hive SQL语句等,了解产生数据倾斜的根本原因进行解决。
可以通过group by代替count(distinct)使用。示例如下:
原SQL:SELECT count(DISTINCT id) FROM people;
group by替换后:SELECT count(id) FROM (SELECT id FROM people GROUP BY id) tmp;
此外,如何用group by方式同时统计多个列?下面提供一种SQL方案:
select tmp.a, sum(tmp.b), count(tmp.c), count(tmp.d) from (
select a, b, null c, null d from some_table
union all
select a, 0 b, c, null d from some_table group by a,c
union all
select a, 0 b, null c, d from some_table group by a,d
) tmp;
>> 笛卡尔积
>> 本地模式
可以通过设置hive.exec.mode.local.auto的值为true,来让Hive在适当的时候自动启动这个优化。
set hive.exec.mode.local.auto=true;
设置本地MR的最大输入数据量,当输入数据量小于这个值时采用本地MR的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=51234560;
设置本地MR的最大输入文件个数,当输入文件个数小于这个值时采用本地MR的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;
>> left semi join替代in/exsits
left semi join是in、exists的高效实现。比如,对于如下SQL
select t1.id, t1.name from t1 where t1.id in (select t2.id from t2);
改为left semi join执行:
select t1.id, t1.name from t1 left semi join t2 on t1.id = t2.id;
>> MapReduce相关的优化
1. mapper和reducer个数
关于MapReduce中mapper和reducer个数的决定机制,建议阅读文章:《详解MapReduce》
1)输入阶段合并
如果有split大小小于这两个值,则会进行合并。
此外,hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值,hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值。如果平均大小不足的话,就会另外启动一个任务来进行合并。
3. 启用压缩
要启用中间压缩,需要设定hive.exec.compress.intermediate为true,同时指定压缩方式hive.intermediate.compression.codec为org.apache.hadoop.io.compress.SnappyCodec。
另外,参数hive.intermediate.compression.type可以选择对块(BLOCK)还是记录(RECORD)压缩,BLOCK的压缩率比较高。
输出压缩的配置基本相同,打开hive.exec.compress.output即可。
4. JVM重用
在MR job中,默认是每执行一个task就启动一个JVM。可以通过配置参数mapred.job.reuse.jvm.num.tasks来进行JVM重用。
>> 采用合适的存储格式
在HiveQL的create table语句中,可以使用stored as ...指定表的存储格式。Hive目前支持的存储格式有TextFile、SequenceFile、RCFile、avro、orc、parquet等。
当然,我们也可以采用alter table … [PARTITION partition_spec] set fileformat,修改具体表的文件格式
https://parquet.apache.org/和https://orc.apache.org/。
在分布式集群环境下,由于负载不均衡或者资源分布不均等原因,会造成同一个作业的多个job之间运行速度不一致,有些job的运行速度可能明显慢于其他任务,则这些job会拖慢整个作业的执行进度。为了避免这种情况发生,Hadoop采用了推测执行(Speculative Execution)机制。
"推测执行"机制,根据一定的规则推测出"拖后腿"的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
Hive同样可以开启推测执行。设置开启推测执行参数(在配置文件mapred-site.xml中进行配置)
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
<description>If true, then multiple instances of some map tasks
may be executed in parallel.</description>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
<description>If true, then multiple instances of some reduce tasks
may be executed in parallel.</description>
</property>
<property>
<name>hive.mapred.reduce.tasks.speculative.execution</name>
<value>true</value>
<description>Whether speculative execution for reducers should be turned on. </description>
</property>
关于调优这些推测执行变量,目前还很难给出一个具体建议。如果用户对于运行时的偏差非常敏感的话,那么可以将这些功能关闭掉。如果用户因为输入数据量很大而需要执行长时间的map或者reduce task的话,那么启动推测执行造成的浪费是非常巨大。
推荐文章:
一次Java内存泄漏的排查
经典的SparkSQL/Hive-SQL/MySQL面试-练习题