知乎基于Celeborn优化Spark Shuffle的实践
1 背景
Spark ESS Shuffle在大作业稳定性上更有优势,在Executor意外退出或者GC严重时,已经完成的Map端的Shuffle数据,可以继续被下游读取,不受影响,所以知乎使用的是ESS(External Shuffle Service)作为Spark的Shuffle服务。
但是ESS也有自己的局限性,ESS Shuffle过程中,每个Reducer Task需要去每个上游Mapper Task的输出文件中读取属于自己的Block,从而产生大量的网络连接以及随机IO,大量的随机IO会导致容易达到磁盘的IOPS瓶颈,作业性能和稳定性都会明显下降 [1][2]。在知乎,经常遇到IO负载高的节点导致个别Spark作业Shuffle Read耗时不稳定甚至超时导致的作业执行耗时不稳定、失败等问题。下图可以清楚的描述ESS Shuffle过程中的磁盘读取及网络连接情况:
为了更好的表述ESS Shuffle存在的问题,借用LinkedIn论文中的一张Shuffle数据统计图,图中很清楚的描述了使用ESS Shuffle时,5000个样本作业不同Shuffle Stage的平均Shuffle Read Block大小分布,以及作业平均Shuffle Read Block大小与作业Task Shuffle Read耗时关联关系。从图中不难看出,存在大量的KB级别的Shuffle Read Block(每个Block至少会一次磁盘IO),而且Shuffle Read Block越小,整体Shuffle Read耗时更长。
2 Spark RSS实现调研
RSS有不少开源实现,我们主要关注了两个国内大厂的开源实现,腾讯开源的Apache Uniffle以及阿里开源的Apache Celeborn。Uniffle在知乎之前的在离线混部项目中有使用,当时的Celeborn版本还不支持MR作业,而我们需要同时把符合条件的MR、Spark作业调度到混部Hadoop集群,该项目就选择了Uniffle,很好地支持了知乎的在离线混部项目,参考 知乎k8s在离线混部-离线篇。
在本次大数据作业迁移RSS项目中,我们跟Celeborn社区进行了一次交流,Celeborn社区给介绍了下Celeborn的特性,发现Celeborn具有一些吸引我们的特性,比如平滑升级、磁盘容错、基于磁盘负载的调度等,同时Celeborn也已经支持了MR作业,于是我们重新对ESS、Uniffle、Celeborn做了一次对比测试。
我们在测试环境中使用TPC-DS 3000sf数据集进行了ESS、Celeborn、Uniffle对比测试,在我们的测试场景Celeborn、Uniffle查询性能相比ESS都有明显优势,Celeborn与Uniffle差距不大,同时Celeborn的内存消耗更低,下图是在测试环境ESS与Celeborn的耗时比值,数值是ESS耗时相对Celeborn耗时的倍数,大于1代表ESS更慢:
线上环境相对测试环境磁盘负载要高很多,我们线上环境目前确实存在严重的随机Shuffle问题,理论上线上环境RSS相对ESS的优势会更大一些,所以选择RSS势在必行;另外,考虑到我们在测试中Celeborn相对Uniffle的内存消耗优势,以及Celeborn社区介绍的平滑升级、磁盘容错、基于负载的调度的特性,最终我们选择了在线上环境部署Celeborn。
3 Celeborn上线
3.1 Celeborn部署
在Celeborn之前,知乎使用ESS为Spark、MR作业提供Shuffle服务,ESS嵌入在NodeManager服务中,为了更好的ESS性能,缓存更多的Shuffle 分区元数据信息到内存,给NodeManager配置了比较高的内存。
我们上线RSS的目标除了解决Spark作业Shuffle稳定性和性能问题,同时希望不增加额外的机器成本,最终RSS可以完全平替ESS,所有作业迁移到RSS后,下线ESS,降低NodeManager的资源配置,整体不因为部署RSS增加资源开销。所以我们最终确定Celeborn跟Hadoop集群混合部署,每一台Hadoop节点部署一个Celeborn Worker服务。
为了保障Spark作业迁移到RSS过程的稳定性,作业分批进行灰度迁移,前期只在部分机器上部署Celeborn Worker服务,同时为了验证Spark作业迁移到RSS后是否确实有收益,在灰度上线阶段,避免ESS和RSS共用磁盘互相影响,我们对RSS使用的磁盘及ESS使用的磁盘进行了隔离,每台Hadoop机器有12块磁盘,DataNode依然同时使用所有磁盘,但是配置ESS只使用其中的10块磁盘,RSS使用另外2块磁盘,整体部署架构如下:
3.2 Spark作业迁移Celeborn
3.3 Celeborn升级
这样要面临一次整体集群的升级,Celeborn 支持滚动升级,无论是升级还是下线,都能够在不影响当前正在运行的作业的情况下进行。这种优雅的升级和下线方式,保证了业务的连续性和稳定性,避免了长尾作业的等待时间,提高了运维效率。
而且在部署层面,运维的感知就是部署了一个程序,然后检测到程序启动即可,很多的状态等等恢复工作 Celeborn 都会自动完成,不需要运维干预,相较于常规部署与升级 Hadoop 要简单很多。
同时这次的升级我们全程都在正常的工作时间进行,搭配自身的 Ansible 运维平台,我们只需极低的运维开发就可以实现数千个 Celeborn Worker 节点的运维。
3.4 Celeborn兼容Spark1.6
但是知乎还有不少历史作业是使用Spark1.6实现的,每天大概600个作业,当时评估推动业务方升级作业Spark版本到Spark2、Spark3很困难,为了把所有Spark作业都迁移到RSS,我们自己实现了Celeborn Spark1.6的客户端,支持Spark1.6作业。
Spark1客户端相对于Spark2、Spark3版本最主要的不同在于处理Stage重试相关逻辑。Celeborn Spark高版本客户端在发生Shuffle Fetch Failed异常触发Stage重试时,通过Spark的MapOutputTracker接口清理异常Stage上游所有Map的输出,依赖TaskContext中的stage重试次数信息,决定是否生成新的Celeborn内部ShuffleID。Spark1中MapOutputTracker接口、TaskContext字段信息跟高版本不同,针对Spark1的Stage重试,我们在Celeborn侧ShuffleWriter尝试获取ShuffleID时,是否生成新的ShuffleID不依赖Stage重试信息,而是依赖当前Stage在用的ShuffleID是否发生了ShuffleFetch异常,发生过异常,则在后面同一个Shuffle Stage的Write任务申请ShuffleID时,分配新的ShuffleID;Spark侧在开启Celeborn Shuffle的情况下,发生Stage重试时,内部直接清理Shuffle Stage Output信息,保障重试时上游Stage所有Task的重新提交。
目前已经在所有Spark1.x作业上线,运行稳定,并且遇到异常Celeborn Worker节点导致FetchFailed时,可以触发Stage重试并成功。
4 遇到的问题
4.1 Kyuubi Spark SQL上线Celeborn后无法创建线程
在一次Celeborn服务从500节点扩容到700节点后,每当用户查询高峰时间段,所有Spark Driver日志中都会有
OutOfMemoryError: unable to create new native thread
线程创建异常的报错,导致大量用户查询失败,此时机器及Jvm内存都远没有达到上限。通过跟Celeborn社区的交流,以及统计分析Spark Driver的线程栈,发现Celeborn针对每个Shuffle Stage都会启动新的线程池用于向分配给当前 Stage Shuffle资源的节点创建连接、预留Slot、提交Commit数据,而在我们的Kyuubi Spark Adhoc场景,每个Spark作业中可能同时运行大量SQL,大量的Shuffle Stage,单个Driver进程线程数高峰可能达到接近1w+个,其中Celeborn InitWorker、ReserveSlot、CommitFiles相关的线程累计8k+,当Worker节点是500时,因为需要连接的Worker相对少,扩容到700后,需要创建更多线程连接更多的Worker节点,怀疑线程数达到了上限。
在该问题发生时,我们检查了机器操作系统配置的线程数、进程数上限,分别是400w+、30w+,机器上实际启动的线程数并没有达到这个上限。在知乎,Kyuubi服务是通过Systemd启动的,Spark Driver进程都是Kyuubi的子进程,我们最终发现达到的上限是Systemd的DefaultTasksMax限制的,该配置会限制Systemd启动的服务以及该服务所有子进程可以使用的线程数上限(所有子进程累加),因为问题发生时,机器内存、CPU等资源还很富裕,所以我们增加了Systemd DefaultTasksMax的配置暂时解决了这个问题。
后续Celeborn社区对线程池创建相关逻辑进行了优化,不同Shuffle复用同一个InitWorker、ReserveSlot、CommitFiles线程池,目前该优化也已在知乎上线,大幅优化了创建的线程数。
4.2 Spark作业设置Celeborn参数后不生效
instanceof
检查时),会使用AppClassLoader加载CelebornShuffleHandle类。4.3 Spark作业中包含GlobalLimit算子时使用RSS问题
在切换到RSS之前,GlobalLimit算子在进行Shuffle的时候,每个上游Task数据都是写本地,下游Task在Shuffle Read到Limit限制的数据后就结束了;切换到RSS之后,上游Task数据是写到RSS节点,并且属于同一个Reduce Task的数据会写到同一个RSS节点,也就是TB级别的数据会写同一个RSS节点,直接导致该RSS节点因数据写入太快,内存使用过高被Exclude,SQL执行失败。
GlobalLimit算子问题在我们这边是普遍存在的,在知乎Spark是Adhoc的底层查询引擎之一,每天会有大量的Spark SQL Adhoc查询,大量的Spark SQL查询复用一批Spark执行环境,节约启动耗时。为了减少复用的Spark环境的Driver内存压力,我们会自动修改用户的SQL,添加查询结果Insert Hdfs Directory的逻辑,查询平台再从Hdfs上读取结果给用户展示,同时在用户没有主动添加Limit的情况下,我们会自动给用户添加Limit限制。
这个问题不能通过修改单个SQL进行解决,我们调研发现Spark中有一个LocalLimit算子,该算子可以限制每个Spark RDD Partition的结果数据行数,于是我们的解决思路,就是给Spark添加一个执行计划优化规则,当执行计划中包含GlobalLimit算子,并且GlobalLimit算子的上游是一个Project算子(即Select)的情况下,自动在GlobalLimit算子前添加一个LocalLimit算子。
4.4 节点负载高导致shuffle不稳定
知乎Celeborn是跟Hadoop部署到同一批机器上的,在作业灰度迁移的过程中,偶尔会发生个别机器负载很高,导致Commit超时或者Spark跟Celeborn通信超时,触发Shuffle异常,影响了作业执行稳定性,针对该问题,我们做了几个调整
4.5 Celeborn Worker连接数过高
celeborn.master.slot.assign.maxWorkers 500
5 收益
5.1 作业性能优化
从作业整体分位数耗时、Shuffle耗时曲线看优化明显,大作业提速明显,所有作业99分位数耗时优化30%以上。
从用户反馈看,已经接入RSS Shuffle的作业,在性能和稳定性上都有了明显优化,业务方主动反馈作业变快了,近期我们也已经基本没有再收到用户反馈的shuffle耗时不稳定、shuffle连接异常等问题。
因为1/3以上的Shuffle流量切到了RSS,但是给RSS的磁盘只有整个集群的1/6,所以对于没有接入RSS的作业,也有明显的优化,公司核心例行作业结束时间提前了一个小时以上。
5.2 资源使用优化
6 总结与展望
7 参考
Magnet: Push-based Shuffle Service for Large-scale Data Processing
往期推荐
阿里云 PAI 大语言模型微调训练实践
大模型推荐系统的打怪升级之路!
从电商场景,看抖音集团数据治理实践
当大数据分析遇上方程式赛车:捕捉极速赛场上的时间印记
AI Agent 在 1688 电商平台中的应用
小红书图数据库在分布式并行查询上的探索
领域大模型的挑战与机遇:从构建到应用
电商知识图谱建设及大模型应用探索
大模型在推荐系统中的应用及挑战
点个在看你最好看
SPRING HAS ARRIVED