【技术帖】Kylin v2.0 Spark Cubing优化改进
编者注:本文原载自编程小梦,感谢作者康凯森撰文并授权转载。康凯森,美团点评大数据工程师,Apache Kylin commiter,目前主要负责Apache Kylin在美团点评的平台化建设。
Kylin v2.0引入了Spark Cubing Beta版本,本文主要介绍我是如何让 Spark Cubing支持启用Kerberos的HBase集群,再介绍下Spark Cubing的性能测试结果和适用场景。
Spark Cubing简介
在简介Spark Cubing之前,我简介下MapReduce Batch Cubing。所谓的MapReduce Batch Cubing就是利用MapReduce计算引擎批量计算Cube,其输入是Hive表,输出是HBase的KeyValue,整个构建过程主要包含以下6步:
建立Hive的大宽表;(MapReduce计算)
对需要字典编码的列计算列基数;(MapReduce计算)
构建字典;(JobServer计算 or MapReduce计算)
分层构建Cuboid;(MapReduce计算)
将Cuboid转为HBase的KeyValue结构(HFile);(MapReduce计算)
元数据更新和垃圾回收。
详细的Cube生成过程可以参考 :Apache Kylin Cube 构建原理(http://blog.bcmeng.com/post/kylin-cube.html)。
而Kylin v2.0的Spark Cubing就是在Cube构建的第4步替换掉MapReduce。
如下图,就是将5个MR job转换为1个Spark job:
(注:以下两个图片引自Apache Kylin官网的blog
:By-layer Spark Cubing(http://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/),更详细的介绍也可以参考这篇blog。)
MapReduce计算5层的Cuboid会用5个MR Application计算:
Spark计算Cuboid只会用1个Application计算:
Spark Cubing的核心实现类是SparkCubingByLayer
。
Spark Cubing访问Kerberos认证的HBase解1
第一种简单的做法是将访问HBase的token从Kylin的JobServer传递到executor中,这种做法的限制是只能运行在Yarn-client模式中,即必须让driver运行在Kylin的JobServer中。关于yarn-cluster mode和yarn-client mode两种模式的区别可以参考:Apache Spark Resource Management and YARN App Models(http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/)。
这种做法的实现方式很简单,只需在SparkCubingByLayer的new SparkConf()之前加入以下3行代码:
Configuration configuration = HBaseConnection.getCurrentHBaseConfiguration(); HConnection connection = HConnectionManager.createConnection(configuration); TokenUtil.obtainAndCacheToken(connection, UserProvider.instantiate(configuration).create(UserGroupInformation.getCurrentUser()));
但是如果只能在yarn-client模式下运行,必然无法运行在生产环境,因为Kylin JobServer机器的内存肯定不够用。
Spark Cubing访问Kerberos认证的HBase解2
既然Spark Cubing在启用Kerberos认证的HBase集群下无法运行的根本原因是Spark Cubing需要从HBase直接访问Job相关的Kylin元数据,那我们把元数据换个地方存不就可以了,所以我们将每个Spark Job相关的Kylin元数据上传到HDFS,并用Kylin的HDFSResourceStore来管理元数据。
在介绍实现思路前,我先简介下Kylin元数据的存储结构和Kylin的ResourceStore。
首先,Kylin每个具体的元数据都是一个JSON文件,整个元数据的组织结构是个树状的文件目录。如图是Kylin元数据的根目录:
我们知道Kylin元数据的组织结构后,再简介下Kylin元数据的存储方式。 元数据存储的抽象类是ResourceStore,具体的实现类共有3个:
FileResourceStore本地文件系统
HBaseResourceStore HBase
HDFSResourceStore HDFS
其中只有HBase可以用于生产环境,本地文件系统主要用来测试,HDFS不能用于生产的原因是并发处理方面还有些问题。具体用哪个ResourceStore是通过配置文件的kylin.metadata.url来决定的。
所以下面的问题就是我们如何将HBase中的元数据转移到HDFS和如何将HBaseResourceStore转为HDFSResourceStore?
确定Spark Job需要读取哪些Kylin元数据
将需要的Kylin元数据dump到本地
改写kylin.metadata.url并将所有配置写到本地的元数据目录
利用ResourceTool将本地的元数据上传到指定的HDFS目录
在Spark executor中根据指定HDFS元数据目录的Kylin配置文件构造出HDFSResourceStore。
当然,在最后我们需要清理掉指定HDFS目录的元数据。 整个思路比较简单清晰,但是实际实现中还是有很多细节需要去考虑。
Spark Cubing参数配置
以下是我使用的Spark配置,目的是尽可能让用户不需要关心Spark的配置
//运行在yarn-cluster模式kylin.engine.spark-conf.spark.master=yarn kylin.engine.spark-conf.spark.submit.deployMode=cluster //启动动态资源分配,个人认为在Kylin生产场景中是必须的,因为我们不可能让每个用户自己去指定executor的个数kylin.engine.spark-conf.spark.dynamicAllocation.enabled=truekylin.engine.spark-conf.spark.dynamicAllocation.minExecutors=10kylin.engine.spark-conf.spark.dynamicAllocation.maxExecutors=1024kylin.engine.spark-conf.spark.dynamicAllocation.executorIdleTimeout=300kylin.engine.spark-conf.spark.shuffle.service.enabled=truekylin.engine.spark-conf.spark.shuffle.service.port=7337//内存设置kylin.engine.spark-conf.spark.driver.memory=4G//数据规模较大或者字典较大时可以调大executor内存kylin.engine.spark-conf.spark.executor.memory=4G kylin.engine.spark-conf.spark.executor.cores=1//心跳超时kylin.engine.spark-conf.spark.network.timeout=600//队列设置kylin.engine.spark-conf.spark.yarn.queue=root.rz.hadoop-hdp.test//分区大小kylin.engine.spark.rdd-partition-cut-mb=100
Spark Cubing的构建性能
对于百万级,千万级,亿级的源数据,且无很大字典的情况下,我的测试结果和官方:By-layer Spark Cubing (http://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/)的结果基本一致,构建速度提升比较明显,而且Cuboid的层次数越多,提速越明显。
此外,我专门测试了数十亿级源数据或者有超大字典的情况,构建提速也十分明显:
测试Cube1
原始数据量: 27亿行,9个维度,包含1个精确去重指标,字典基数7千多万
MR Cuboid构建耗时:75分钟
Spark Cuboid第一次构建耗时:40分钟(spark.executor.memory = 8G,没有加spark.memory.fraction参数)
Spark Cuboid第二次构建耗时:24分钟(spark.executor.memory = 8G,spark.memory.fraction = 0.5)
为什么减小spark.memory.fraction可以加速构建?
因为减小spark.memory.fraction,可以增大executor中User Memory的大小,给Kylin字典更多的内存,这样就可以避免全局字典换入换出,减少GC。
测试Cube2
原始数据量:24亿行,13个维度,38个指标(其中9个精确去重指标),不过这个cube的精确去重指标基数比较小,只有几百万。
MR Cuboid构建耗时:31分钟
Spark Cuboid构建耗时:8分钟
总结来说,Spark Cubing的构建性能相比MR有1倍到3倍的提升
。
Spark Cubing的资源消耗
除了构建性能,我们肯定还会关注资源消耗。在这次测试中我没有对所以测试结果进行资源消耗分析,只分析了几个Cube。
我的结论是,在我采用的Spark配置情况下,对于中小规模数据集Spark的资源消耗是小于MR的
(executor的内存是4G);对于有大字典的情况(executor的内存是8G),CPU资源Spark是小于MR的,但是内存资源Spark会比MR略多,在这种情况下,我们相当于用内存资源来换取了执行效率
。
Spark Cubing的优缺点
优点:
利用RDD的Cache特性,尽可能利用内存来避免重复IO
大部分场景下Cuboid构建速度有明显提升
在集群资源充足的情况下,我们可以用更多的资源换取更好的构建性能
缺点:
目前版本还未历经生产环境考验,稳定性不确定
不适合有超大字典的场景
引入Spark Cubing将带来额外的运维成本和沟通成本
Spark Cubing的适用场景
个人的结论是,除了有好几亿基数超大字典的这种情况,其他情况应该都适用Spark Cubing
,其中:
Cuboid层次越多越适用。
数据规模越小越适用。
字典越小越适用。
Spark Cubing字典加载优化
Spark和MR有一点重要的区别就是Spark的Task是在线程中执行的,MR的Task是在进程中执行的。这点区别会对Kylin的Cube构建造成重要影响,在MR Cubing中,每个Mapper task只需要load一次字典,但是在Spark Cubing中,一个executor的多个task会多次load字典,如果字典较大,就会造成频繁GC,导致执行变慢。
针对这个问题,我做了两点优化:
让每个executor里的字典只load一次,让该executor的所有Task共享字典。
给全局字典的AppendTrieDictionary中使用的LoadingCache增加maximumSize。 我用了一个有6亿基数的全局字典测试了这个优化,优化后GC时间明显缩短。
总结
用HDFSResourceStore替换HBaseResourceStore后,Spark Cubing已经具备了在启用Kerberos的HBase集群环境下大规模使用的基础。后续我将开放Spark Cubing功能让感兴趣的用户使用。最后,十分感谢我们团队Spark 小伙伴的给力支持。
更多信息请点击阅读原文。
您可能还会想看
【技术帖】使用KyBot寻找Apache Kylin离线构建瓶颈
【技术帖】Apache Kylin支持Query Pushdown
How to register KAP as a system service by systemd?
【Strata Data预告】Apache Kylin 2.0:从Hadoop上的OLAP 引擎到实时数据仓库