查看原文
其他

源码 | spark shuffle service在中通的优化实践

The following article is from 科技中通 Author 狄俊


热文推荐:☞ Spark面试题汇总及答案(推荐收藏)



 正文

一、背景

1.1、业务现状

中通快递是一家以快递为核心业务,集跨境、快运、商业、云仓、航空、金融、智能、传媒、冷链等生态版块于一体的综合物流服务企业。2020年,中通快递全年业务量突破170亿件,同比增长超40%。

随着中通业务量的不断突破,数据时效的需求不断提高,基于hive + mapreduce的离线计算无法满足我们的需求,2020年底我们开始尝试将hive计算迁移到spark 2.3.2上加速离线计算,随着hive计算迁移到spark计算的进行,夜间etl任务总耗时不断减少,yarn资源占用也开始降低,同时也暴露出了一些问题。

1.2、spark shuffle service

spark根据算子之间的宽窄依赖划分stage,同一个stage由多个task并行执行;stage与stage之间数据传输通过task写入本地文件实现,上一个stage的task执行完成后将结果数据写入executor所在机器的本地磁盘,下一个stage启动后根据相关的id获取上一个stage的结果文件,这一stage之间数据传输,文件写入读取的过程称之为spark的shuffle阶段。

spark shuffle

这一过程executor不仅承担着启动task完成计算的职责还承担着管理shuffle数据的职责,这就导致了两个致命的问题:

  • 一是:executor节点挂掉,会导致无法处理下一stage获取shuffle文件请求;

  • 二是:计算完成后,executor资源无法释放,需要等到shuffle文件全部读取完成后才能释放,导致资源浪费;

正是因为上述问题,所以spark在1.2版本引入了external shuffle service功能。

打开shuffle service后,executor在task完成计算将结果写入本地磁盘,并将文件路径告诉NodeManager后就退出将资源释放了,下一stage获取shuffle文件的请求由NodeManager完成,这样就完成了计算与shuffle文件管理的分离;

shuffle文件写入交由NodeManager管理:


shuffle文件读取:


二、shuffle service引起的NodeManager异常

2.1、缓存引起的NodeManager cpu异常

在开启spark external shuffle service后一段时间,我们发现夜间etl高峰期间NodeManager cpu利用率越来越高,随即我们在监控上加入异常自动触发脚本,在cpu利用率上来超过某一阈值后,会将NodeManager进程信息、线程信息、内存信息、磁盘io信息、打开文件信息等现场信息保存下来等待分析;

脚本信息:


通过脚本打印出的进程信息,我们发现cpu占用最高的线程都是shuffle server线程且在异常期间居然有128个shuffle server线程:

shuffle server线程信息:


为什么是128个线程呢?扒源码发现TransportServer创建netty EventLoopGroup时指定的nThreads为0,netty默认nThreads为0时的线程数为cpu核心数的两倍,而我们生产上机器核心数是64核,所以线程数是128;我们开始怀疑是不是线程数太多了导致的cpu利用率高,而spark standalone模式的配置中,nThreads指定为8,官方文档中也说8个线程可以满足万兆带宽的传输,所以我们将shuffle service的Transport线程数也改为8(后续定位到真正问题后,配置又恢复为原来的配置了)

发布后,通过监控发现,cpu利用率暂时已经恢复正常:


但是存在少部分任务出现超时与报错:“Unable to create executor due to Unable to register with external shuffle server due to : java.util.concurrent.TimeoutException: Timeout waiting for task”;

jstack信息显示,主要的时间花在shuffle反复读取index文件上。

在spark 2.1.0之前,shuffle fetch占用executor的10-15%时间,主要耗时在读取shuffle生成的索引文件上;spark 2.1.0基于文件数限制,添加索引文件信息缓存提速shuffle fetch,但是文件索引只限制了单个executor的shuffle fetch缓存文件数,如果executor数量很多,就会导致缓存占用的内存巨大;spark 2.3.0改为基于内存大小限制,默认100M;对于集群上大部分shuffle作业而言,这100M缓存杯水车薪,等于没有缓存,所以我们通过参数"spark.shuffle.service.index.cache.size"将缓存大小调整到1G;

正当我们以为问题已经解决事情已经过去了的时候,监控突然又告警了,发现cpu利用率又出现了异常,这次发现cpu利用率异常随gc波动;

cpu利用率异常:


gc异常:


gc时长异常:


有了第一次的经验,我们直接上服务器,分析进程信息后发现之前通过参数"spark.shuffle.service.index.cache.size"限制的内存远远不止参数限制的1G;

通过源码发现,spark缓存索引文件使用的是google guava框架,源码如下:

spark缓存索引文件

在guava缓存框架中,并没有配置缓存大小的配置,而是通过weigher来控制缓存能放入多少被缓存数据;

spark将"spark.shuffle.service.index.cache.size" key的值即上面配置的1G作为weigher参数创建guava缓存,而放入缓存数据计算weigher值时,只通过indexInfo获取到了文件本身的大小,而忽略了文件路径的大小,导致使用内存超出限制,频繁GC;

实际上在集群上的shuffle文件路径都是非常深,文件路径的长度往往都很长,所以文件路径大小也是不能忽略的;

找到问题后,要修复问题就很简单了,修复后的代码如下:

spark缓存索引文件bug 

第二天dump内存信息发现,缓存大小刚好为配置的大小:

验证

2.2、锁引起的NodeManager cpu异常

在解决完shuffle缓存引起的NodeManager cpu异常后我们又一次碰到了shuffle引起NodeManager的cpu异常,高cpu利用率线程jstack信息如下:


通过jstack信息发现,高cpu的线程都是在执行OneForOneStreamManager的chunksBeingTransferred方法;

chunksBeingTransferred方法:


streams是一个ConcurrentHashMap,保存了每一个读取shuffle数据流的状态,当channel读取数据时,由netty调用ChannelInboundHandlerAdapter的子类TransportChannelHandler的channelRead方法完成流注册到OneForOneStreamManager的过程,时序图如下:


OneForOneStreamManager主要是管理shuffle读取流的状态,而方法chunksBeingTransferred的作用是返回“正在传输的块数”;而这个方法的调用是在TransportRequestHandler类处理shuffle数据传输时,spark会去检查shuffle service上“允许同时传输的最大块数”与“正在传输的块数”,如果“正在传输的块数”大于“允许同时传输的最大块数”,新进来的连接就会被关闭;而这个控制“允许同时传输的最大块数”的参数是"spark.shuffle.maxChunksBeingTransferred"默认值为:Long.MAX_VALUE;

处理FetchRequest:


默认允许同时传输的最大块的数量:


我们认为这个默认的“最大允许传输块数”值是我们永远无法达到的数值,而每次处理Fetch请求的时候就要去遍历一次ConcurrentHashMap的所有值检查下“正在传输的块数”,而在etl峰值期间,块读取的流的注册非常频繁,ConcurrentHashMap是读写互斥的,这无故增加了很多不必要的锁的竞争。

抱着这样的疑问,我们上社区查找解决方案,幸运的是,发现网易与我们遇到了相同的问题:https://issues.apache.org/jira/browse/SPARK-31069,同时网易也向社区提交了patch,但是没有被官方合并:https://github.com/apache/spark/pull/30139。

其patch原理很简单,就是在Fetch检查正在传输块的数量前,判断"spark.shuffle.maxChunksBeingTransferred"是不是默认值,如果是默认值则跳过检查,减少不必要的锁的竞争;

patch源码:


而事实也证明,打上patch,发布后,发现cpu利用率已经恢复正常。

2.3、连接未正常释放引发的NodeManager内存泄露

虽然cpu异常的问题已经解决了,但是日常监控发现NodeManager内存又出现了异常,随即分析dump的内存发现,dump的内存主要是OneForOneStreamManager:

dump内存信息

进一步分析发现,OneForOneStreamManager中还留有已经关闭的连接的StreaStates存在;

StreaStates

分析源码发现OneForOneStreamManager的关闭连接的方法中,遍历streams拿到StreamState,循环StreamState关闭ManagedBuffer时,如果出现异常,此时则会跳出关闭连接方法,导致后续的stream无法被关闭,造成内存泄露;

内存泄露

通过社区搜索发现,这是一个spark的bug,在spark 2.4.5版本中已经修复,并且社区提供了patch。

合并patch后的代码如下:

patch

三、未来展望

spark shuffle service虽然解决了executor计算任务和shuffle数据管理任务解耦合,但是引入了新的问题,即shuffle的任务放到了NodeManager的身上,夜间etl任务繁忙shuffle数据量巨大时,可能会引起NodeManager抖动,严重甚至可能会造成NodeManager宕机;

鉴于以上问题,可以将shuffle service部署为一个独立的服务,减少对yarn集群的依赖;

目前业界,已有优秀的开源方案了:https://github.com/uber/RemoteShuffleService,京东也自研了一套Remote Shuffle Service

京东Remote Shuffle Service设计:


中通后续可能会尝试将shuffle service脱离yarn集群单独部署。


RECOMMEND
往期干货ClickHouse高可用架构与实践DB数据同步到数仓架构与实践数据开发,如何平衡效率与质量大数据架构师:数据平台开发工具链万字详解Spark 性能调优(建议收藏)Apache Doris在数仓中的应用实践

欢迎各位技术大佬向本公众号积极投稿,提升经验分享、信息互通的技术交流氛围,共同解决技术难题、共同进步(投稿咨询请联系加v:iom1128)

用户画像-标签体系(dwt层)

更多精彩,关注公众号回复:"spark",即可获得spark案例实战资料。


希望这篇文章可以帮到你~

欢迎大家点个在看,分享至朋友圈

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存