有赞大数据离线集群迁移实战
The following article is from 有赞coder Author 有赞技术
一、背景
有赞是一家商家服务公司,向商家提供强大的基于社交网络的,全渠道经营的SaaS系统和一体化新零售解决方案。随着近年来社交电商的火爆,有赞大数据集群一直处于快速增长的状态。在2019年下半年,原有云厂商的机房已经不能满足未来几年的持续扩容的需要,同时考虑到提升机器扩容的效率(减少等待机器到位的时间)以及支持弹性伸缩容的能力,我们决定将大数据离线Hadoop集群整体迁移到其他云厂商。
在迁移前我们的离线集群规模已经达到200+物理机器,每天40000+调度任务,本次迁移的目标如下:
将Hadoop上的数据从原有机房在有限时间内全量迁移到新的机房
如果全量迁移数据期间有新增或者更新的数据,需要识别出来并增量迁移
对迁移前后的数据,要能对比验证一致性(不能出现数据缺失、脏数据等情况)
迁移期间(可能持续几个月),保证上层运行任务的成功和结果数据的正确
有赞大数据离线平台技术架构
上文说了Hadoop集群迁移的背景和目的,我们回过头来再看下目前有赞大数据离线平台整体的技术架构,如图1.1所示,从低往上看依次包括:
图1.1 有赞大数据离线平台的技术架构
Hadoop生态相关基础设施,包括HDFS、YARN、Spark、Hive、Presto、HBase、Kafka、Kylin等
基础组件,包括Airflow(调度)、DataX(离线数据同步)、基于binlog的增量数据同步、SQL解析/执行引擎选择服务、监控&诊断等
平台层面,包括:数据开发平台(下文简称DP)、资产管理平台、数据可视化平台、算法训练平台等
本次迁移会涉及到从底层基础设施到上层平台各个层面的工作。
二、方案调研
在开始迁移之前,我们调研了业界在迁移Hadoop集群时,常用的几种方案:
2.1 单集群
两个机房公用一个Hadoop集群(同一个Active NameNode,DataNode节点进行双机房部署),具体来讲有两种实现方式:
(记为方案A) 新机房DataNode节点逐步扩容,老机房DataNode节点逐步缩容,缩容之后通过HDFS原生工具Balancer实现HDFS Block 副本的动态均衡,最后将Active NameNode切换到新机房部署,完成迁移。这种方式最为简单,但是存在跨机房拉取Shuffle数据、HDFS文件读取等导致的专线带宽耗尽的风险,如图2.1所示
(记为方案B) 方案A由于两个机房之间有大量的网络传输,实际跨机房专线带宽较少情况下一般不会采纳,另外一种带宽更加友好的方案是:
通过Hadoop的Rack Awareness来实现HDFS Block N副本双机房按比例分布(通过调整HDFS数据块副本放置策略,比如常用3副本,两个机房比例为1:2)
通过工具(需要自研)来保证HDFS Block副本按比例在两个机房间的分布(思路是:通过NameNode拉取FSImage,读取每个HDFS Block副本的机房分布情况,然后在预定限速下,实现副本的均衡)
图2.1 单集群迁移方案
优点:
对用户透明,基本无需业务方投入
数据一致性好
相比多集群,机器成本比较低
缺点:
需要比较大的跨机房专线带宽,保证每天增量数据的同步和Shuffle数据拉取的需要
需要改造基础组件(Hadoop/Spark)来支持本机房优先读写、在限速下实现跨机房副本按比例分布等
最后在完成迁移之前,需要集中进行Namenode、ResourceManager等切换,有变更风险
2.2 多集群
在新机房搭建一套新的Hadoop集群,第一次将全量HDFS数据通过Distcp拷贝到新集群,之后保证增量的数据拷贝直至两边的数据完全一致,完成切换并把老的集群下线,如图2.2所示。
这种场景也有两种不同的实施方式:
(记为方案C) 两边HDFS数据完全一致后,一键全部切换(比如通过在DP上配置改成指向新集群),优点是用户基本无感知,缺点也比较明显,一键迁移的风险极大(怎么保证两边完全一致、怎么快速识别&快速回滚)
(记为方案D)按照DP上的任务血缘关系,分层(比如按照数据仓库分层依次迁移ODS/DW/DM层数据)、分不同业务线迁移,优点是风险较低(分治)且可控,缺点是用户感知较为明显
图2.2 多集群迁移方案
优点:
跨机房专线带宽要求不高(第一次全量同步期间不跑任务,后续增量数据同步,两边双跑任务不存在跨机房Shuffle问题)
风险可控,可以分阶段(ODS/DW/DM)依次迁移,每个阶段可以验证数据一致性后再开始下一阶段的迁移
不需要改造基础组件(Hadoop/Spark)
缺点:
对用户不透明,需要业务方配合
在平台层需要提供工具,来实现低成本迁移、数据一致性校验等
2.3 方案评估
从用户感知透明度来考虑,我们肯定会优先考虑单集群方案,因为单集群在迁移过程中,能做到基本对用户无感知的状态,但是考虑到如下几个方面的因素,我们最终还是选择了多集群方案:
(主因)跨机房的专线带宽大小不足。上述单集群的方案A在Shuffle 过程中需要大量的带宽使用;方案B虽然带宽更加可控些,但副本跨机房复制还是需要不少带宽,同时前期的基础设施改造成本较大
(次因)平台上的任务类型众多,之前也没系统性梳理过,透明的一键迁移可能会产生稳定性问题,同时较难做回滚操作
因此我们通过评估,最终采用了方案D。
三、实施过程
在方案确定后,我们便开始了有条不紊的迁移工作,整体的流程如图3.1所示
图3.1 离线Hadoop多集群跨机房迁移流程图
上述迁移流程中,核心要解决几个问题:
第一次全量Hadoop数据复制到新集群,如何保证过程的可控(有限时间内完成、限速、数据一致、识别更新数据)?(工具保证)
离线任务的迁移,如何做到较低的迁移成本,且保障迁移期间任务代码、数据完全一致?(平台保证)
完全迁移的条件怎么确定?如何降低整体的风险?(重要考虑点)
3.1 Hadoop 全量数据复制
首先我们在新机房搭建了一套Hadoop集群,在进行了性能压测和容量评估后,使用DistCp工具在老集群资源相对空闲的时间段做了HDFS数据的全量复制,此次复制HDFS数据时新集群只开启了单副本,整个全量同步持续了两周。基于DistCp本身的特性(带宽限制:-bandwidth/基于修改时间和大小的比较和更新:-update)较好的满足全量数据复制以及后续的增量更新的需求。
3.2 离线任务的迁移
目前有赞所有的大数据离线任务都是通过DP平台来开发和调度的,由于底层采用了两套Hadoop集群的方案,所以迁移的核心工作变成了怎么把DP平台上任务迁移到新集群。
3.2.1 DP 平台介绍
有赞的DP平台是提供用户大数据离线开发所需的环境、工具以及数据的一站式平台,目前支持的任务主要包括:
离线导入任务(MySQL全量/增量导入到Hive)
基于binlog的增量导入(数据流:binlog -> Canal -> NSQ -> Flume -> HDFS -> Hive)
导出任务(Hive -> MySQL、Hive -> ElasticSearch、Hive -> HBase 等)
Hive SQL、Spark SQL任务
Spark Jar、MapReduce任务
其他:比如脚本任务
本次由于采用多集群跨机房迁移方案(两个Hadoop集群),因此需要在新旧两个机房搭建两套DP平台,同时由于迁移周期比较长(几个月)且用户迁移的时间节奏不一样,因此会出现部分任务先迁完,部分任务还在双跑,还有一些任务没开始迁移的情况。
3.2.2 DP 任务状态一致性保证
在新旧两套DP平台都允许用户创建和更新任务的前提下,如何保证两边任务状态一致呢(任务状态不限于MySQL的数据、Gitlab的调度文件等,因此不能简单使用MySQL自带的主从复制功能)?我们采取的方案是通过事件机制来实现任务操作时间的重放,展开来讲:
用户在老DP产生的操作(包括新建/更新任务配置、任务测试/发布/暂停等),通过事件总线产生事件消息发送到Kafka,新系统通过订阅Kafka消息来实现事件的回放,如图 3.2 所示。
图3.2 通过事件机制,来保证两个平台之间的任务状态一致
3.2.3 DP 任务迁移状态机设计
D底层的改造对用户来说是透明的,最终暴露给用户的仅是一个迁移界面,每个工作流的迁移动作由用户来触发。工作流的迁移分为两个阶段:双跑和全部迁移,状态流转如图 3.3 所示
图 3.3 工作流迁移状态流转
双跑
工作流的初始状态为未迁移,然后用户点击迁移按钮,会弹出迁移界面,如图3.4所示,用户可以指定工作流的任意子任务的运行方式,主要选项如下:
两边都跑:任务在新老环境都进行调度 老环境跑:任务只在老环境进行调度 新环境跑:任务只在新环境进行调度
导入任务 (MySQL -> Hive):通常是双跑,也就是两个集群在调度期间都会从业务方的MySQL拉取数据(由于拉取的是Slave库,且全量拉取的一般是数据量不太大的表) Hive、SparkSQL任务:通常也是双跑,双跑时新老集群都会进行计算。 MapReduce、Spark Jar任务:需要业务方自行判断:任务的输出是否是幂等的、代码中是否配置了指向老集群的地址信息等 导出任务:一般而言无法双跑,如果两个环境的任务同时向同一个 MySQL表(或者同一个ElasticSearch索引)写入/更新数据,容易造成数据不一致,建议在验证了上游Hive表数据在两个集群一致性后进行切换(只在新环境跑)。 同时处于用户容易误操作导致问题的考虑,DP平台在用户设置任务运行方式后,进行必要的规则校验: 如果任务状态是双跑,则任务依赖的上游必须处于双跑的状态,否则进行报错。 如果任务是第一次双跑,会使用Distcp将其产出的Hive表同步到新集群,基于Distcp本身的特性,实际上只同步了在第一次同步之后的增量/修改数据。 如果工作流要全部迁移(老环境不跑了),则工作流的下游必须已经全部迁移完。
迁移过程中工作流操作的限制规则
由于某个工作流迁移的持续时间可能会比较长(比如DW层任务需要等到所有DM层任务全部迁移完),因此我们既要保证在迁移期间工作流可以继续开发,同时也要做好预防误操作的限制,具体规则如下:迁移中的工作流在老环境可以进行修改和发布的,新环境则禁止 工作流在老环境修改发布后,会将修改的元数据同步到新环境,同时对新环境中的工作流进行发布。 工作流全部迁移,需要所有的下游已经完成全部迁移。
3.3 有序推动业务方迁移
工具都已经开发好了,接下来就是推动DP上的业务方进行迁移,DP上任务数量大、种类多、依赖复杂,推动业务方需要一定的策略和顺序。有赞的数据仓库设计是有一定规范的,所以我们可以按照任务依赖的上下游关系进行推动:导入任务(MySQL全量/增量导入Hive) 一般属于数据仓库的ODS层,可以进行全量双跑。 数仓中间层任务主要是Hive / Spark SQL任务,也可以全量双跑,在验证了新老集群的Hive表一致性后,开始推动数仓业务方进行迁移。 数仓业务方的任务一般是Hive / Spark SQL任务和导出任务,先将自己的Hive任务双跑,验证数据一致性没有问题后,用户可以选择对工作流进行全部迁移,此操作将整个工作流在新环境开始调度,老环境暂停调度。 数仓业务方的工作流全部迁移完成后,将导入任务和数仓中间层任务统一在老环境暂停调度。 其他任务主要是MapReduce、Spark Jar、脚本任务,需要责任人自行评估。
3.4 过程保障
工具已经开发好,迁移计划也已经确定,是不是可以让业务进行迁移了呢?慢着,我们还少了一个很重要的环节,如何保证迁移的稳定呢?在迁移期间一旦出现bug那必将是一个很严重的故障。因此如何保证迁移的稳定性也是需要着重考虑的,经过仔细思考我们发现问题可以分为三类,迁移工具的稳定,数据一致性和快速回滚。迁移工具稳定
新DP的元数据同步不及时或出现Bug,导致新老环境元数据不一致,最终跑出来的数据必定天差地别。 应对措施:通过离线任务比对两套DP中的元数据,如果出现不一致,及时报警。 工作流在老DP修改发布后,新DP工作流没发布成功,导致两边调度的airflow脚本不一致。 应对措施:通过离线任务来比对airflow的脚本,如果出现不一致,及时报警。 全部迁移后老环境DP没有暂定调度,导致导出任务生成脏数据。 应对措施:定时检测全部迁移的工作流是否暂停调度。 用户设置的运行状态和实际airflow脚本的运行状态不一致,比如用户期望新环境空跑,但由于程序bug导致新环境没有空跑。 应对措施:通过离线任务来比对airflow的脚本运行状态和数据库设置的状态。
Hive表数据一致性
Hive表数据一致性指的是,双跑任务产出的Hive表数据,如何检查数据一致性以及识别出来不一致的数据的内容,具体方案如下(如图3.6所示):双跑的任务在每次调度运行完成后,我们会上报<任务T、产出的表A>信息,用于数据质量校验(DQC),等两个集群产出的表A都准备好了,就触发数据一致性对比 根据<表名、表唯一键K>参数提交一个MapReduce Job,由于我们的Hive表格式都是以Orc格式存储,提交的MapReduce Job在MapTask中会读取表的任意一个Orc文件并得到Orc Struct信息,根据用户指定的表唯一键,来作为Shuffle Key,这样新老表的同一条记录就会在同一个ReduceTask中处理,计算得到数据是否相同,如果不同则打印出差异的数据 表数据比对不一致的结果会发送给表的负责人,及时发现和定位问题
四、迁移过程中的问题总结
使用DistCp同步HDFS数据时漏配参数(-p),导致HDFS文件owner信息不一致。 使用DistCp同步HDFS数据时覆盖了HBase的clusterId,导致 Hbase 两个集群之间同步数据时发生问题。 在迁移开始后,新集群的Hive表通过export import表结构来创建,再使用DistCp同步表的数据。导致Hive meta信息丢失了totalSize属性,造成了SparkSQL由于读取不到文件大小信息无法做broadcast join,解决方案是在DistCp同步表数据之后,执行Hive命令ANALYZE TABLE TABLE_NAME COMPUTE STATISTICS来生成表相关属性。 迁移期间由于在夜间启动了大量的MapReduce任务,进行Hive表数据比对,占用太多离线集群的计算资源,导致任务出现了延迟,最后将数据比对任务放在资源相对空闲的时间段。 工作流之间存在循环依赖,导致双跑-全部迁移的流程走不下去,所以数仓建设的规范很重要,解决方案就是要么让用户对任务重新组织,来重构工作流的依赖关系,要么两个工作流双跑后,一起全部迁移。 迁移期间在部分下游已经全部迁移的情况下,上游出现了问题需要重刷所有下游,由于只操作了老DP,导致新环境没有重刷,使迁移到新环境的下游任务受到了影响。 MapReduce和Spark Jar类型的任务无法通过代码来检测生成的上下游依赖关系,导致这类任务只能由用户自己来判断,存在一定的风险,后续会要求用户对这类任务也配上依赖的Hive表和产出的Hive表。
五、总结与展望
本次的大数据离线集群跨机房迁移工作,时间跨度近6个月(包括4个月的准备工作和2个月的迁移),涉及PB+的数据量和4万日均调度任务。虽然整个过程比较复杂(体现在涉及的组件众多、任务种类和实现复杂、时间跨度长和参与人员众多),但通过前期的充分调研和探讨、中期的良好迁移工具设计、后期的可控推进和问题修复,我们做到了整体比较平稳的推进和落地。同时针对迁移过程中遇到的问题,在后续的类似工作中我们可以做的更好:做好平台的治理,比如代码不能对当前环境配置有耦合 完善迁移工具,尽量让上层用户无感知 单Hadoop集群方案的能力储备,主要解决跨机房带宽的受控使用
内存泄露、内存溢出和堆外内存,JVM优化参数配置参数SparkStreaming和Kafka集成解析HBase中Memstore存在的意义以及列族的设计深入解析KafkaHive优化篇深入探讨RedisHadoop调优HDFS重要技术点解答
Spark存储Parquet数据到Hive,对map、array、struct字段类型的处理
关注大数据学习与分享,获取更多技术干货