深度 | EB级规模大数据平台核心技术揭秘(下)
凌云时刻
编者按:作为业界少有的EB级别数据分布式平台,MaxCompute系统每天支撑上千万个分布式作业的运行。DAG作为MaxCompute执行引擎的核心技术之一,通过调整计算节点和数据连接边的不同物理特性,对其进行更通用化的扩展后,还可以探索一种全新的混合运行模式,也就是Bubble Execution。
Bubble 的切分与执行
采用Bubble Execution的作业(以下简称Bubble作业)和传统的离线作业一样,会通过一个DAG master(aka. Application Master)来管理整个DAG的生命周期。AM负责对DAG进行合理的bubble切分,以及对应的资源申请和调度运行。
整体而言,Bubble内部的计算节点,将按照计算加速度原则,包括同时使用预拉起的计算节点以及数据传输通过内存/网络直传进行pipeline加速。而不切在bubble内部的计算节点则通过经典离线模式执行,不在bubble内部的连接边(包括横跨bubble boundary的边)上的数据,均通过落盘方式进行传输。
Bubble切分方法,决定了作业的执行时间和资源利用率。需要根据计算节点的并发规模,节点内部算子属性等信息综合考虑。而在切分出bubble之后,Bubble的执行则涉及到节点的执行,与数据pipeline/barrier的shuffle方式怎么做到有机的结合,这里分开做一下描述。
Bubble 切分原理
Bubble Execution的核心思想在于将一个离线作业拆分成多个Bubble来执行。为了切分出有利于作业整体高效运行的bubble,有几个因素需要综合考虑:
计算节点内部算子特性:对于同时拉起bubble所有计算节点的调度模式而言,数据在bubble内部的上下游节点之间能否有效的进行pipeline处理,很大程度上决定了在bubble内部,下游节点是否会因处于空转状态带来资源浪费。所以在切分bubble的逻辑中,当节点包含barrier特性的算子而可能阻塞数据的pipeline时,将考虑不将该节点与其下游切入同一个bubble。
单个Bubble内部计算节点数目的多少:如同之前讨论的,一体化的资源申请/运行,当包含的计算节点过多时,可能无法申请到资源,或者即使能申请到其failure代价也可能无法控制。限定Bubble的大小,可以避免过大的一体化运行带来的负面作用。
聚合计算节点,切割Bubble的迭代方向:考虑到bubble大小的限制,从上而下切分bubble与从下而上切分bubble两种方式,可能导致切分的结果的不同。对于线上大部分作业而言,处理的数据往往呈倒三角型,对应的DAG也大多数是倒三角形态,所以默认采用自底向上的算法来切割bubble,也就是从距离root vertex最远的节点开始迭代。
在上述的几个因素中,算子的barrier属性由上层计算引擎(e.g., MaxCompute的optimizer)给出。一般而言,依赖global sort操作的算子(比如MergeJoin, SorteAggregate等),会被认为会造成数据阻塞(barrier),而基于hash特性操作的算子则对于pipeline更加友好。对于单个Bubble内部允许的计算节点数目,根据我们对线上准实时作业特点的分析和Bubble作业的实际灰度实验,选定的默认上限在500。
这是一个在大多数场景下比较合理的值,既能保证比较快速的拿到全量资源,同时由于处理数据量和DoP基本成正相关关系,这个规模的bubble一般也不会出现内存超限的问题。当然这些参数和配置,均允许作业级别通过配置进行微调,同时Bubble执行框架也会后继提供作业运行期间动态实时调整的能力。
在DAG的体系中,边连接的物理属性之一,就是边连接的上下游节点,是否有运行上的前后依赖关系。对于传统的离线模式,上下游先后运行,对应的是sequential的属性,我们称之为sequential edge。而对于bubble内部的上下游节点,是同时调度同时运行的,我们称连接这样的上下游节点的边,为concurrent edge。
可以注意到,这种concurrent/sequential的物理属性,在bubble应用场景上,实际与数据的传送方式(网络/内存直传 vs 数据落盘)的物理属性是重合的(Note: 但这两种依然是分开的物理属性,比如在必要的时候concurrent edge上也可以通过数据落盘方式传送数据)。
基于这样的分层抽象,Bubble切分算法,本质上就是尝试聚合DAG图的节点,将不满足bubble准入条件的concurrent edge还原成sequential edge的过程。最终,由concurrent edge联通的子图即为bubble。
在这里我们通过一个实际的例子来展示Bubble切分算法的工作原理。假设存在下图所示的DAG图,图中的圆圈表示计算顶点(vertex),每个圆圈中的数字表示该vertex对应的实际计算节点并发度。
在这个初始DAG基础上,按照上面介绍过的整体原则,以及本章节最后描述的一些实现细节,上图描述的初始状态,可以经过多轮算法迭代,最终产生如下的Bubble切分结果。在这个结果中产生了两个Bubbles: Bubble#0 [V2, V4, V7, V8],Bubble#1 [V6, V10], 而其他的节点则被判断将使用离线模式运行。
若当前vertex不能加入bubble,将其输入edge均还原为sequential edge(比如DAG图中的V9);
若当前vertex能够加入bubble,执行广度优先遍历算法聚合生成bubble,先检索输入edge连接的vertex,再检索输出edge连接的,对于不能联通的vertex,将edge还原为sequential edge(比如DAG图中遍历V2的输出vertex V5时会因为total task count超过500触发edge还原)。
而对任意一个vertex,只有当满足以下条件才能被添加到bubble中:
vertex和当前bubble之间不存在sequential edge连接;
vertex和当前bubble不存在循环依赖,即:
Case#1:该vertex的所有下游vertex中不存在某个vertex是当前bubble的上游;
Case#2:该vertex的所有上游vertex中不存在某个vertex是当前bubble的下游;
Case#3:该vertex的所有下游bubble中不存在某个vertex是当前bubble的上游;
Case#4:该vertex的所有上游bubble中不存在某个vertex是当前bubble的下游;
注:这里的上游/下游不仅仅代表当前vertex的直接后继/前驱,也包含间接后继/前驱
Bubble的调度与执行
1. Bubble调度
为了实现计算的加速,Bubble内部的计算节点的来源默认均来自常驻的预热资源池,这一点与准实时执行框架相同。与此同时我们提供了灵活的可插拔性,在必要的情况下,允许Bubble计算节点从Resource Manager当场申请(可通过配置切换)。
从调度时机上来看,一个Bubble内部的节点调度策略与其对应的输入边特性相关,可以分成下面几种情况:
不存在任何input edge的bubble root vertext(比如 Fig.9中的V2):作业一运行就被调度拉起。
只有sequential edge输入bubble root vertex(比如 Fig.9中的V6):等待上游节点完成度达到配置的min fraction比例(默认为100%,即所有上游节点完成)才被调度。
Bubble内部的vertex(即所有输入边都是concurrent edge,比如 Fig.9中的V4, V8, V10),因为其完全是通过concurrent edge进行连接的,会自然的被与上游同时触发调度。
Bubble边界上存在mixed-inputs的bubble root vertex(比如 Fig.9中的V7)。这种情况需要一些特殊处理,虽然V7与V4是通过concurrent edge链接,但是由于V7的调度同时被V3通过sequential edge控制,所以事实上需要等待V3完成min-fraction后才能调度V7。对于这种场景,可以将V3的min-fraction配置为较小(甚至0)来提前触发;此外Bubble内部我们也提供了progressive调度的能力,对这种场景也会有帮助。
比如图7中的Bubble#1,只有一条SequentialEdge外部依赖边,当V2完成后,就会触发V6 + V10(通过concurrent edge)的整体调度,从而将整个Bubble#1运行起来。
在Bubble被触发调度后,会直接向SMODE Admin申请资源,默认使用的是一体化Gang-Scheduling(GS)的资源申请模式,在这种模式下,整个Bubble会构建一个request,发送给Admin。当Admin有足够的资源来满足这个申请时,会将,再包含预拉起worker信息的调度结果发送给bubble作业的AM。
Fig.11 Bubble与Admin之间的资源交互
在准实时执行框架升级后,SMODE服务中的资源管理(Admin)和多DAG作业管理逻辑(MultiJobManager)已经解耦,因此bubble模式中的资源申请逻辑,只需要和Admin进行交互,而不会对于正常准实时作业的DAG执行管理逻辑带来任何影响。另外,为了支持线上灰度热升级能力,Admin管理的资源池中的每个常驻计算节点均通过Agent+多Labor模式运行,在调度具体资源时,还会根据AM版本,进行worker版本的匹配,并调度满足条件的labor给Bubble作业。
2. Bubble数据Shuffle
对于穿越Bubble bourndary上的sequential edge,其上传输的数据和普通离线作业相同,都是通过落盘的方式来进行数据传输。这里我们主要讨论在Bubble内部的数据传输方式。根据之前描述的作业bubble切分原则,bubble内部的通常具备充分的数据pipeline特性,且数据量不大。因此对于bubble内部concurrent edge上的数据,均采用执行速度最快的网络/内存直传方式来进行shuffle。
这其中网络shuffle的方式和经典的准实时作业相同,通过上游节点和下游节点之间建立TCP链接,进行网络直连发送数据。这种push-based的网络传送数据方式,要求上下游必须同时拉起,根据链式的依赖传递,这种网络push模式强依赖于Gang-Scheduling,此外在容错,长尾规避等问题上也限制了bubble的灵活性。
为了更好的解决以上问题,在Bubble模式上,探索了内存shuffle模式。在这一模式下,上游节点将数据直接写到集群ShuffleAgent(SA)的内存中,而下游节点则从SA中读取数据。内存shuffle模式的容错,扩展,包括在内存不够的时候将部分数据异步落盘保证更高的可用性等能力,由ShuffleService独立提供。
这种模式可以同时支持Gang-Scheduling/Progressive两种调度模式,也使其具备了较强的可扩展性,比如可以通过SA Locality调度实现更多的Local数据读取,通过基于血缘的instance level retry实现粒度更精细的容错机制等等。
Fault-Tolerance
作为一种全新的混合执行模式,Bubble执行探索了在离线作业和一体化调度的准实时作业间的各种细粒度平衡。在线上复杂的集群中,运行过程中各种各样的失败在所难免。而bubble这种全新模式下,为了保证失败的影响最小,并在可靠性和作业性能之间取得最佳的平衡,其对于失败处理的策略也更加的多样化。
针对不同的异常问题,我们设计了各种针对性容错策略,通过各种从细到粗的力度,处理执行过程中可能涉及的各种异常场景处理,比如:向admin申请资源失败、bubble中的task执行失败(bubble-rerun)、bubble多次执行失败的回退(bubble-renew),执行过程中AM发生failover等等。
1. Bubble Rerun
目前Bubble在内部计算节点失败时,默认采用的retry策略是rerun bubble。即当bubble内的某个节点的本次执行(attempt)失败,会立即rerun整个bubble,取消正在执行的同一版本的attempt。在归还资源的同时,触发bubble重新执行。通过这种方式,保证bubble内所有计算节点对应的(retry) attempt版本一致。
触发bubble rerun的场景有很多,比较常见的有以下几种:
Instance Failed:计算节点执行失败,通常由上层引擎的runtime错误触发(比如抛出retryable-exception)。
Resource Revoked:在线上生产环境,有很多种场景会导致资源节点重启。比如所在的机器整机oom、机器被加黑等。在worker被杀之后,重启之后的worker会依照最初的启动参数重新连回admin。此时,admin会将这个worker重启的消息封装成Resource Revoked发送给对应的AM,触发bubble rerun。
Admin Failover: 由于Bubble作业所使用的计算资源来自于SMODE的admin资源池,当admin由于某些原因Failover,或者SMODE整体服务被重启时,分配给AM的计算节点会被停止。Admin在Failover之后不感知当前各个节点被分配的AM信息,无法将这些重启的消息发送给AM。目前的处理方法是,每个AM订阅了admin对应的nuwa,在admin重启之后会更新这个文件. AM感知到信息更新后,会触发对应的taskAttempt Failed,从而rerun bubble。
Input Read Error:在计算节点执行时,读不到上游数据是一个很常见的错误,对于bubble来说,这个错误实际上有三种不同的类型:
Bubble内的InputReadError:由于shuffle数据源也在bubble内,在rerun bubble时,对应上游task也会重跑。不需要再做针对性的处理。
Bubble边界处的InputReadError: shuffle数据源是上游离线vertex(或也可能是另一个bubble)中的task产生,InputReadError会触发上游的task重跑,当前bubble rerun之后会被delay住,直到上游血缘(lineage)的新版本数据全部ready之后再触发调度。
Bubble下游的InputReadError: 如果bubble下游的task出现了InputReadError,这个事件会触发bubble内的某个task重跑,此时由于该task依赖的内存shuffle数据已经被释放,会触发整个bubble rerun。
2. Bubble Renew
在Admin资源紧张时, Bubble从Admin的资源申请可能等因为等待而超时。在一些异常情况下,比如bubble申请资源时刚好onlinejob服务处于重启的间隔,也会出现申请资源失败的情况。在这种情况下,bubble内所有vertex都将回退成纯离线vertex状态执行。
此外对于rerun次数超过上限的bubble,也会触发bubble renew。在bubble renew发生后,其内部所有边都还原成sequential edge,并在所有vertex重新初始化之后,通过回放内部所有调度状态机触发事件,重新以纯离线的方式触发这些vertex的内部状态转换。确保当前bubble内的所有vertex在回退后,均会以经典离线的模式执行,从而有效的保障了作业能够正常terminated。
Fig. 13 Bubble Renew
3. Bubble AM Failover
对于正常的离线作业,在DAG框架中,每个计算节点相关的内部调度事件都会被持久化存储,方便做计算节点级别的增量failover。但是对于bubble作业来说,如果在bubble执行过程发生了AM failover重启,通过存储事件的replay来恢复出的bubble,有可能恢复到running的中间状态。然而由于内部shuffle数据可能存储在内存而丢失,恢复成中间running状态的bubble内未完成的计算节点,会因读取不到上游shuffle数据而立刻失败。
这本质上是因为在Gang-Scheduled Bubble的场景上,bubble整体是作为failover的最小粒度存在的,所以一旦发生AM的failover,恢复粒度也应该在bubble这个层面上。所以对于bubble相关的所有调度事件,在运行中都会被当作一个整体,同时当bubble开始和结束的时候分别刷出bubbleStartedEvent和bubbleFInishedEvent。
一个bubble所有相关的events在failover后恢复时会被作为一个整体,只有结尾的bubbleFInishedEvent才表示这个bubble可以被认为完全结束,否则将重跑整个bubble。
比如在下图这个例子中,DAG中包含两个Bubble(Bubble#0: {V1, V2}, Bubble#1: {V3, V4}),在发生AM重启时,Bubble#0已经TERMINATED,并且写出BubbleFinishedEvent。而Bubble#1中的V3也已经Terminated,但是V4处于Running状态,整个Bubble #1并没有到达终态。AM recover之后,V1,V2会恢复为Terminated状态,而Bubble#1会重头开始执行。
上线效果
当前Bubble模式已经在公共云全量上线,SQL作业中34%执行Bubble,日均执行包含176K个Bubble。
我们针对signature相同的query在bubble execution关闭和打开时进行对比,我们发现在整体的资源消耗基本不变的基础上,作业的执行性能提升了34%,每秒处理的数据量提升了54%。
Fig 15. 执行性能/资源消耗对比
除了整体的对比之外,我们针对VIP用户也进行了针对性的分析,用户Project在打开了Bubble开关之后(下图中红色标记的点为打开Bubble的时间点),作业的平均执行性能有非常明显的提升。
Fig 16. VIP用户开启Bubble后平均执行时间对比