其他
SparkSQL在有赞大数据的实践(二)
点击关注“有赞coder”
获取更多技术干货哦~
作者:胡加华
团队:大数据团队
一、前言
91%
以上,最后也分享一些在 Spark 踩过的坑和经验希望能帮助到大家。Thrift Server 稳定性建设 SparkSQL 第二次迁移 一些踩坑和经验
二、Thrift Server 稳定性建设
60%
以上,而随之而来的是各种问题(包括用户的任务失败和服务不可用等),服务的稳定性受到了比较大的挑战。Overhead
,SQL 任务倾斜导致的 Executor 空闲浪费等。考虑到目前我们大部分 SQL 任务的执行时间很短,基本在 3
分钟之内,单个 Task 执行时间很短,所以选择 Thrift Server 相对合适一些。AB 测试灰度发布功能 Spark Metric 指标的采集、分析以及处理 基于 SQL 引擎选择服务拦截不合适的 SQL 任务
2.1 AB 灰度测试
优先级
、 时间段
、 流量比例
等配置的AB测试功能。2.2.1
rebase 到 2.3.3
版本,又或者为节省内存资源给算法任务而将Executor 内存配置从 3G
调整到 2G
等比较重大的变更,都降低了一些坑产生的影响。2.2 Spark Metric 采集
REST API
和 EventLog
功能,我们搭建一个 spark-monitor 应用。这个应用主要职责是近实时的读取 EventLog 产生的 Spark 事件,通过事件回放并结合 REST API 最终形成我们需要的 Job 数据,并写到 Hbase 表保存。基于采集到的数据主要有以下几个方面的用途:实时预警和干预。通过实时收集的 Job 数据,我们可以分析出哪些正在运行的 Job 是异常的,比如耗时非常久,产生很多 Task 等等场景,然后针对异常场景的 Action 可以是 Alarm 或者 Kill Job 等。
离线分析。每天从 Hbase 离线的同步到hive表做一些离线分析,比如统计存在 Failed Task 的任务、Peak Execution Memory 使用比较高的任务,或者数据倾斜的任务等。找出已发生问题或者潜在问题的任务,去优化 SQL 任务或者分析原因并反哺去调校 Thrift Server 配置。
历史 Job 数据保存,用于排查历史任务的日志查找和分析( Spark UI 因为内存限制无法保留历史任务的记录)。
2.3 基于引擎选择的 SQL 拦截
Not in Subquery
, CrossJoin
等 SQL 场景。三、SparkSQL 第二次迁移
91%
以上。节约离线集群资源成本。通过 tpcds
的性能测试,同等资源情况下 SparkSQL 是 Hive 的2~10
倍(不同的 query 性能表现不一样)。在生产验证下来大部分确实有差不多2
倍的性能。计算资源更合理的分配。由于 Spark 自身实现任务调度和资源分配,可以通过它已有的功能针对不同优先级的任务配置不同的资源配额。比如原先使用 Hive 时每一个 SQL 任务的 map 或者 reduce 并发数默认都是一样的,而使用 SparkSQL 时可以让 资源的比例按优先级倾斜
(即 scheduler pool 的功能)。
四、踩坑和经验
4.1 spark.sql.autoBroadcastJoinThreshold
10
MB。目前阀值判断的比较逻辑会参考几个因素:文件的大小
和 字段选择的裁剪
。比如某张 Hive 表的数据大小为 13
MB , 表 schema 为 struct<id:long,name:string>
,而假设当前 SQL 只使用到 name 字段,那根据字段选择情况并对文件大小进行裁剪估算所需总字节的公式为:20/(8+20)*13约等于9.3MB
(各个字段类型有不同的估算字节,比如long 是 8
个字节 ,string 是 20
个字节等),从而满足 BroadcastJoin 的条件。但是这里有几种情况需要额外考虑:1、表存储格式带来的差异,比如 使用 ZLIB 压缩的 ORC 格式跟 TEXT 格式就在数据存储上的文件大小可能会差很多,即使两张表都是 ORC 格式,压缩率的差异也是存在; 2、字段字节估算本身就有一定的误差,比如 string 字段认为是 20 个字节,对于一些极端情况的 string 大字段,那估算误差就会比较大; 3、读取 Hive 表的 "raw" 数据到内存然后展开成 Java 对象,内存的消耗也有一定放大系数。所以 10M
的阀值,最终实际上需要的内存可能达到 1G
,这个我们也是在生产环境上碰到过。4.2 spark.blacklist.enabled
Data Locality
还是会大概率的选择那个 host 上的 Executor。如果失败是因为 机器坏盘
引起的,那重试还是会失败,重试次数达到最大后那最终整个 Job 失败。而开启 blacklist 功能可以解决此类问题,将发生失败的 Executor 实例或者 host 添加到黑名单,那么重试可以选择其他实例或者 host ,从而提高任务的 容错
能力。4.3 spark.scheduler.pool
FairScheduler
策略,并配置不同 资源权重
的 Pool 给不同优先级的任务。4.4 spark.sql.adaptive.enabled
小文件
的问题(Map Only 的 SQL 场景不起作用)。adaptive 功能在 Spark 1.6 版本就已经支持,但是我们目前 yz-spark 版本合入是Intel 实现的增强版本(该版本还实现了另两个功能:动态调整执行计划
和 动态处理数据倾斜
),目前官方版本还没有合入(https://github.com/Intel-bigdata/spark-adaptive)correctness
的 bug, 在 broadcast join 的情况下可能会发生数据结果不正确的情况。当用于 broadcast 的 LongToUnsafeRowMap
对象如果被多次的序列化反序列化就会触发,导致野指针的产生,从而可能产生不正确的结果。当时这个问题导致我们一张核心仓库表产生错误数据。由于这个 bug 几周才偶现一次,复现的困难导致花费了一个月时间才定位原因。这次教训也让我们意识到需要经常的去关注社区版本的迭代,及早发现特别是那些比较严重的 bug fix,避免生产上的故障。4.6 SPARK-26604
30%
以上。4.7 低效且危险的 Not in Subquery
broadcast
成一个 list
,然后 t1 每一条记录通过 loop
遍历 list 去匹配是否存在。由于它的低效可能会长时间占用 executor 资源,同时 subquery 结果数据量比较大的情况下,broadcast 可能带来 driver 的 OOM 风险。Vol.242
活动预告