全链路数据治理在网易严选的实践
The following article is from InfoQ Author 左琴
【大鱼提醒:公众号推送规则变了,如果您想及时收到推送,麻烦点个在看,或者把本号置顶】
正文开始
数据治理(Data Governance)的边界非常的宽泛,通常会把数据的定义、存储方式、使用规范、数据安全策略、数据质量等等都包括在其中。治理是一个很官僚化的术语,流程、评审、审计、规范这些令人昏昏欲睡的东西,构成了数据治理的常规手段和工具。因此,数据技术团队往往对“数据治理”有些抗拒,而数据治理团队往往觉得工作低效不好玩,吃力不讨好。因此,对于数据治理团队而言,我们除了要治理数据体系,提升其运转效率;还要把工作变得高效而精彩。
随着业务的快速发展,数据开发任务也会井喷式的增长,再加上大数据的服务链路是非常复杂多变的,涉及到收集、存储、访问、计算、数据加工、数据挖掘、对外服务等等场景。随着数据规模、数据结构、使用场景随时的变化,特别在互联网公司,开发效率优先的工作模式下,数据体系的快速腐化是不可避免的。数据体系的快速腐化最终又会影响开发效率,甚至影响整个公司的决策效率。
图一 网易严选数据技术体系 1.0
图一是网易严选数据技术体系,我们可以看到,数据从业务系统(业务日志 / 数据库)产生,经过数据的同步和集成,再经过数据开发的加工和整理,最后再被数据产品 / 算法 /BI 等服务和应用使用,整个链路非常长并且会依赖许多消息服务 / 计算 / 存储 / 任务调度 / 资源调度等服务和组件。
所以在网易严选,我们任务数据治理是贯穿整个数据生命周期的,而数据在整个生命周期所依赖的数据服务也是在数据治理的范畴之内,数据治理的目的是为了保障数据稳定高质地进行生产,并推进整体的资源优化,主要包括两个部分:
1、数据的治理和数据任务的治理。2、数据服务的治理。我们治理的对象不仅仅是数据本身,也包括数据生产和使用全链路的服务和组件。数据治理的核心目标是什么?这是我们首先需要回答的问题。
严选数据团队对数据治理这个议题进行过认真的探讨,我们认为数据治理是一种管理手段,而管理的重要目的是提升效率。
严选数据治理的第一个核心目标是提效,提升数据体系整体服务效率。 数据治理的第二个核心目标是可靠。提供可量化的能力指标对外提供数据支持。
明确了提效和可靠两个治理的核心目标后,有三个问题放在我们面前,需要立刻解开。
我们数据治理的边界在哪? 我们需要投入多少人力到数据治理中,是否需要组建团队持续的进行治理? 最酷 & 最高效的解决问题的方式是什么?
对于第一个问题,我们的回答是全链路数据治理。数据治理团队的边界感是非常重要的,我们所说的边界感意思是不要拘泥于大数据体系中的数据治理,而是要扩大治理的边界,才能真正达到治理的效果。因此,全链路是我们数据治理的一个核心策略。这个策略的核心原则是无边界的数据治理。
第二个问题 & 第三个问题,可以同时回答。我们不打算组建数据治理委员会或者数据治理办公室;也不打算通过治理周会,数据保障运动来做治理。作为很酷的团队,我们对此回答是 MetaData Driven Data Governance(DDDG)。首先会组建一个小规模的数据治理 Team,这个团队的主要工作是持续分析与观察严选数据现状,根据实际情况,不断构建 Auto/Smart 的工具,将数据治理的工作日常化、自动化、智能化。最好的数据治理就是润物细无声。
数据治理是一个系统工程,解决的是数据体系中相互影响的各类问题,这是一个典型的多变量多目标优化的系统。因此,在工程架构设计实践上需要从面到线再到点,否则很容易在细节中迷失方向。因此在将数据治理系统工程化的工作中,主要构建了以下几个部分:
1、Data System。在数据体系中,数据经过清洗与一系列的分析任务(etl)到最终被使用,会经历数据源(data source)、数据集成(data integrate)、数据存储(data storage)、计算引擎(compute framework)、数据服务(data service)等多个模块服务,我们需要将数据流转路径的关系识别和串联起来。
2、Data Measure System。数据治理要解决数据体系中的很多问题:资源消耗、数据安全、数据可靠性、效率提升等等。我们需要建构一个可衡量系统(measureable system),需要梳理出在数据治理的议题下,数据系统中各个模块和服务需要关心的核心指标是哪些,并精准和实时的计算这些指标,构建一个衡量系统。
3、Data Governace Model。有了第一步和第二步的工作基础,然后就可以针对不同的治理目标,利用这些变量和关系,构建不同的治理模型,这是数据治理工程化过程中比较复杂的一部分。将第一和第二步中的内容用一个公式来描述:
4、Data Governace System。整理清楚数据治理对象的各个模块及模块之间的影响关系后,下一部分需要做的工作是设计数据治理服务自身的工程化工作。数据治理的工程化设计和实现,核心目标是将治理工作变得日常化 ->自动化 -> 智能化。
首先,我们整体介绍下严选数据治理体系(图二),自底向上主要包括数据基础服务、数据治理模型、数据治理应用等 3 层。元数据服务主要用于存储全链路的数据元信息,监控服务主要用于收集和存储全链路数据任务和服务的运行状态信息,而血缘服务则是将数据表 / 数据任务的关系建立起来,由此我们可以追踪到数据转换的整个链路,在此基础上建立了数据治理模型,从而构建我们的自动化数据治理应用。
图二 数据治理整体设计图
数据治理基础服务主要包括:
1. 统一元数据服务(MetaService)
2. 全链路的血缘服务(LineageService),
3. 监控服务建设(MonitorService)。
图三 统一元数据服务
数据不是凭空产生的,它来源于线上业务系统,最终作用回业务,其运作模式类似反馈系统。而描述这些数据的 schema、流转过程、作用域等基础信息我们称为元数据(metadata)。元数据描述了大数据体系的运转状态和内部结构,我们通过元数据来创建各类数据治理模型,确定治理策略,也通过元数据来分析治理效果,元数据是严选数据治理体系最重要的驱动力。
严选的元数据来源广泛,主要包括以下几个部分:
Data Source Meta
严选数据体系主要的数据源就是日志和数据库。我们除了关心日志和数据库数据的 schema 之外,也收集了日志与线上服务;数据库与线上服务之间的关系,做为数据源元数据的一部分。比如,在严选,一个常见的数据源元数据类似:
data source meta = {
type:"db",
dbip:["127.0.0.1"],
dbport:3206,
table-name:"risk-user-score",
table-schema:"...",
service-tag:"risk-management-system",
maintainer:"zuoqin"
}
服务在 CMDB 中登记的名字和实际业务的技术负责人也是数据源元数据一部分。这些信息有助于帮我们快速定位产生异常数据源的业务和实际的处理人。
Data Message Meta
在严选流计算平台中运转的每一条数据都会产生一条动态更新的元数据。这条元数据记录着原始数据产生的时间,唯一 ID,原始数据经历过哪些关键节点(比如经过某个 kafka topic),被哪个流计算任务处理过,以及每个处理节点的耗时等等。Data Table Meta数据表的元数据,是最为关键的元数据之一。主要包括数据表的 schema,维护人,存储空间,格式等等。
Data Task Meta
数据进入数据湖后,会有大量的数据计算任务(Data Task)来处理这些数据。一个典型的数据任务元数据类似:
data task meta = {
task-name:"hello-world",
work-flow-name:"hello-world",
maintainer:"zuoqin",
jobtype:"sparkSQL",
dependencies:["table-1", "table-2",...],
tag:["dwd","trade-domain",...]
}
Data Service Meta
当数据被计算整理完毕后,需要回馈到业务系统或者决策系统发挥价值。数据在什么样的系统,发挥什么样的价值,重要性如何,相应数据服务的可用性指标等构成了 Data Service Meta。
图四 全链路监控服务
在数据体系中,everything is measurable,这也是全链路监控服务对我们数据体系的要求。我们开发了 yxMetricSystem 做为基础能力支撑,yxMetricSystem 的主要工作是让数据体系的所有服务都可以被实时衡量,这个系统包括了衡量指标的定义,实现,收集与计算等内容。
下面我以数据的流程过程为线索来介绍下严选的全链路数据监控服务。
数据收集阶段
数据主要的来源是数据库和日志。新产生的数据会实时的通过大数据集成服务 Datahub 流入到 kafka/pulsar 等消息组件,等待后续的处理。
数据流入到消息组件后,我们会为每一条数据分配一个唯一的消息 ID,并生成一条对应的 ProfileMeta,这条 ProfileMeta 用于描述这条数据当前的状态,包括所处的阶段,时间戳等等元信息。ProfileMeta 准备完毕后会被写入到一个新的消息队列,等待后续的监控服务进行处理与分析,所以每条进入大数据平台的数据,都会在这个阶段转化成{Data, ProfileMeta}二元组。在数据收集阶段,我们还会监控数据交换服务 Datahub 及其组件的服务状态,消息中间件 kafka/pulsar 的服务性能等。
数据流处理阶段
数据一旦进入到消息组件,就会有对应的流计算任务对他们进行处理。这个阶段的监控会比较复杂,监控服务需要获取 flink 任务内部的运行状态,包括资源开销,每秒处理消息数,pengding 待处理的消息数,消息队列等待延迟等等。通过这些信息来诊断当前 Flink 任务是否处于异常状态(比如处理过慢造成延迟,资源不足,等待外部依赖服务,OOM 等等),以便我们能尽快的感知异常并介入处理。
在这个阶段,消息组件 kafka/pulsar 的性能依然是我们关注的重点,同时我们还需要关注 Flink 平台的状态,外部依赖比如 HBase/Redis/ES 的性能与稳定性。
数据批处理阶段
严选的离线计算引擎主要使用 spark。在严选,有上万个批处理工作流在对数据进行密集的处理和分析。在这个阶段,监控服务需要跟踪每个批处理任务的执行状态,包括任务调度是否存在延迟,任务的运行状态,数据在计算过程中是否存在倾斜状态,跟踪记录任务的资源开销。
批处理阶段需要依赖大量的数据基础设施互相协作,监控服务会监控调度服务 AirFlow,Azkaban,HDFS,spark/hive 等。
数据对外服务阶段
批处理产生的数据,有很大一部分需要支撑线上的服务,如 CRM,自动决策系统,广告,风控等服务。数据通过 Datahub 输出到外部存储,然后通过数据服务网关,对外提供数据服务。
在这个阶段,监控服务需要保障 Datahub 服务的可靠;也需要监控外部存储,比如 redis,hbase 等服务的状态。
在严选早期,数据血缘主要用于数据依赖关系查询,这也是数据血缘最常见的一个使用场景。血缘描述了数据从收集,生产到服务的全链路的变化和存在形式。
血缘其实是 Table, Task(Service)多种实体(Entity)构成的网络图。Table 实体有多种类型,比如 kafka/hbase/redis/hive 表等,Task(Service)实体主要用来表示数据生产任务和数据访问服务。
结合元数据和监控数据,血缘同时也刻画了数据模型,数据开发者,计算引擎,数据指标等 数据体系关键要素之间的内在关联。
血缘的完整性和准确率直接决定了它的应用价值,是数据治理从人工走向自动化的基础。
准确率低于 99% 的数据血缘无法应用在自动化场景。在大数据体系中,提升数据血缘准确性的策略基本都类似,主要包括:
血缘服务能识别所有计算引擎的数据访问和生成 SQL。比如,hive,spark,presto 等等; 血缘服务能识别通过 Scala 代码等方式处理数据; 血缘服务不仅要能识别已经发生的计算产生的依赖,还要能识别将来会发生的计算产生的依赖。
图五是我们血缘服务的架构图,血缘服务主要包括 Lineage Agent 和 Bachelor Core 两个部分。
图五 血缘服务架构图
Lineage Agent
网易严选的数据平台中有多种计算和计算引擎,都对应具体的 Lineage Agent 实现。Lineage Agent 需要完成对应服务的血缘关系解析,并按照规定的数据格式组织好血缘模型,将其发送到 kafka 供后续处理。
Bachelor core
模块的功能是进行血缘数据和元数据的存储和管理,底层血缘关系的存储使用了图数据库 Neo4j。Bachelor core 负责从 kafka 拉取血缘关系的变更命令,然后根据不同类型的命令更改存储的血缘关系,支持的关系变更命令有:
UPSERT 创建 / 更新血缘关系 DELETE 删除血缘关系
Bahelor core 还提供了血缘关系的查询导出的功能,外部系统可以通过 HTTP/RPC 的方式来获取存储的血缘关系,如表的下游任务,任务的输入输出表等关系。
图六 网易严选数据技术体系 2.0
当数据治理从人工的管理手段转换成自动化的系统解决方案后,我们的数据技术体系就有了一定的自动修复和优化的能力(图六)。数据体系运转过程产生的 Meta 信息,成为数据治理系统的输入;而数据治理系统根据这些 Meta 信息来分析整个数据体系运转的状态,并自动化的去调整和优化数据系统,形成一个自动循环的闭环。数据模型、数据沙盘、数据治理服务是这闭环中关键的部分。
数据治理模型基于数据体系的元数据 / 监控数据 / 血缘服务,我们构建模型作为自动化治理的基础,准确的模型可以用于模拟,因此我们在此基础上实现了严选数据治理沙盘模拟系统,后面会详细展开。
在批数据生产链路,我们有几个数据治理模型,用于我们对数据生产链路进行自动化的治理,比如 Table Lifecycle Model, Task Governance Model, Data Batch Process Model 等。
图七 批数据链路数据治理模型
Table Lifecycle Model
主要用于识别表的访问热度,根据表的访问次数来进行自动化分类:Hot, Warn, Cold,数据治理服务中的 Task Governace Module 即是在 Table Lifecycle Model 的基础上实现模型,策略,执行等全流程自动化治理。
Task Health Model
数据生产任务,是数据治理的核心,这些任务如果不持续优化,就会不断出现各类问题。从任务维度,我们根据经验总结主要包括 7 种需要优化的类型,分别是:冷任务,错误的任务依赖,缺少任务依赖,任务配置不合理,耗时过长,耗资源过多,任务倾斜等。Task Health Model 主要是识别以上可以优化的数据计算任务,为任务优化的 action 提供基础。
Data batch-process Model
Data Batch Process Model 是一个系统模型。根据图七我们可以看到,在有向无环图中,任务和表的治理都是基于节点本身,而 Data Batch Process Model 关注的是整个数据生产的大盘,包含了整个数据生产链路的关键点,系统资源的瓶颈,整体调度策略信息等等。Data Batch Process Model 能告诉我们从哪些角度去优化和治理系统能得到最好的 ROI,是数据治理团队工作重要的输入。
图八
数据治理沙盘我们通过对建立的模型进行参数化,构建了数据治理沙盘系统,沙盘是数据体系中重要的决策支持工具,我们可以在策略落地之前就推演出它在未来产生的实际价值。
严选的数据治理沙盘系统提供了一个 TestRun 的能力,通过人工设定优化策略,执行 TestRun 之后,最终的结果报告能告知我们以下几个维度:
1、策略的最终效果,比如能把数仓的生产时间提升多少百分点,比如我们的优化策略能节省多少内存和 CPU 时间?
2、资源消耗的情况,比如当前系统的资源是否满足我们的优化需求?特别是大促前期,我们可以通过沙盘系统推演我们在大促期间要准备的机器资源。
沙盘系统是一个强大的工具,它的强大是建立在对系统实际运行情况的高度模拟和准确推演上。因此,沙盘模型的准确率是这个系统最为关键的部分。
数据系统的状态随时在变化,因此模型的效果也会随之衰退,因此为保证模型的准确率,我们会将每天生产系统的运行数据对我们的沙盘推演模型进行验证和修正。以下是我们的 DataProduce Progress Model 的准确率监控曲线,误差率在 [-4%,4%] 范围波动。
图九 沙盘准确率监控曲线
我们举个在严选真实的数据治理案例以便大家有更加直观的感知。
治理目标:提升整个数据链路的产出时间目标量化:凌晨 4 点半数仓 T+1 任务完成率 = 90% 优化到 凌晨 3 点半数仓 T+1 任务完成率 = 90%,即核心数据生产时间提前一个小时。治理模型:DataProduce Progress Model治理策略:减少每个任务执行时间 / 优化调度计划我们需要决策出哪个治理策略对目标有更好的效果,并优先落地实施。
基于这个治理模型,我们进行了沙盘模拟:
模拟一:
v1 - Base Model
v2 - 优化任务时间
我们将当前状态(V1)与实施任务执行减半后的模型(V2)进行对比。
模拟二:
v1 - Base Model
v2 - 优化调度时间
模拟三:
v1 - Base Model
v2 - 优化调度模型 且 同时优化任务执行时间
结论:
2、优化任务调度时间能有效的完成我们的目标;3、在优化任务调度时间的同时,优化计算引擎(减少任务运行时间)能把整体生产时间提前 2 个小时。
通过沙盘推演的结果可以看出,影响严选数据生产时间最主要的制约因此是核心任务的调度时间不合理。在不解决这个问题的情况下,无论是增加资源还是优化计算引擎都不会对目标产生明显的影响。
在任务的调度时间充分优化后,再去提升计算引擎的计算能力,能进一步加快数仓数据的生产。因此,我们的治理手段主要分成两步:
第一步:调整和优化调度计划; 第二步: 优化任务节点计算时间。
数据治理服务是多组策略(Strategy)和行为(Action)的集合。有了数据治理基础服务和治理模型之后,我们就可以在此基础上快速构建强大和成体系的数据治理服务。严选主要的数据治理服务都能对系统进行自动化的治理和优化,我们把这组服务称为自动化数据治理系统(yx Smart DataGovern System)。自动化数据治理系统包括 Table Governance、Task Governance、System Governance。Table Governance App 可以将 Hot 的数据自动化的放入到 Alluxio 等缓存服务中,来降低访问的延迟和对系统 IO 的消耗;对 Cold 的数据,自动化的将其放入高密度冷存储服务,从而有效降低系统的存储成本;对于 Small File Table,放入小文件合并 module 进行自动合并。
随着时间和业务的变化,数据计算任务会不断增加,其中有一些计算任务可能耗费大量计算资源,可能不断的生产无效数据,是数据体系中巨大的腐化源。还存在大量计算任务的实现方式不合理,数据存在倾斜导致计算无法充分分布式,任务依赖配置缺失导致数据异常的风险等问题,都需要数据任务治理服务来解决。
Task Governance App 基于 Task Health Model 自动的分析并识别这些有问题的任务,Task Governance App 对于尽可能的实现自动化的处置策略,比如:冷任务自动下线,任务调度依赖自动补全,任务参数自动化调优,而对于目前还需要人工干预的问题,比如耗时任务 / 耗资源任务 / 数据倾斜任务通知数据开发工程师并提供优化帮助。
表和任务治理关注的是个体,而系统性治理执行的是更为全局的治理策略,能够帮我们找到数据系统中的短板,也能帮我们找到演进与优化大数据体系的路径与方法。目前我们 System Governance 主要集中在数据的生产过程,比如更智能的数据生产调度策略,任务的自动分级和降级等。
数据生产调度优化(Data Produce Schedule Optimize)
根据系统沙盘模拟我们发现目前调度计划不合理是目前数据生产最大的瓶颈。这个问题本质上,是因为数仓任务的调度时间是通过人工设定的(比如某个任务安排再 01:30AM 执行),随着业务和数据的变化,依赖数据开发同学人工去管理和优化这个调度时间,在具体的管理实践中是一个不可能的目标,系统腐化是不可避免的。
所以我们的实现主要包括:
1、Goal:是尽可能的提升数据生产链路的产出时间
2、Limit:系统运转时对外部资源的需求,系统内各个服务对外部资源的需求。比如,spark 计算任务对内存和 CPU 的需求。
3、Model:Data batch-process Model ,用于计算 T+1 数据生产链路的运行数据
4、Action:根据 T+1 数据生产链路的运行数据(任务运行时间 / 资源消耗等)来自动设置当天调度策略。这本质上是一种 CBO 模型,对相对稳定的数据生成过程比较有效。
任务自动分级和降级(Task Classify and Degradation)
这块是我们目前在进行的工作,由于我们的数据计算资源无法在短期内进行灵活的和无条件的扩展,因此分级对我们来说就非常重要。有了分级,我们可以对不通级别的计算任务进行不同的处置策略,比如大促期间,资源有限的情况下优先保障重要的计算任务得以执行。
如何对任务进行自动分级呢?整个数据生产和使用链路是一个有向无环图,我们会根据数据最终的使用场景来区分数据的重要性,也就是图中的叶子节点,比如说:
实时或 T+1 指标的重要性(P0,P1)
线上服务 s1/s2 使用的数据(P0,P1)
我们采用 PageRank 算法,结合叶子节点标记分数 + 任务本身被依赖次数加权得分,即得出每个任务的分数,然后再根据任务得分,对计算任务进行分级。同时,我们可以把离线批处理链路的优先级加权计算一直追溯到流计算链路,这样从数据的收集 / 传输 / 流处理 / 批处理整个全链路的数据收集和计算任务都可以进行自动分级,设置自动处理策略。
除了在批处理链路上的数据治理工作,我们在流处理链路上的数据治理工作也在开展,主要以提前发现数据质量问题为目标,自动探测消息丢失 / 重复 / 延迟并预警修复等,篇幅关系暂时不展开。
数据治理的总结和展望全链路数据治理是一个系统工程(System Engineering),目的是为了让我们的大数据系统更高效低成本的运转,数据驱动和系统控制理论是全链路数据治理工作的重要理论依据。运动式的人工治理应该被视为临时的 Workaround,逐渐被摒弃,因为治理并不是我们的最终目的,除了事后的治理工作,我们应该去改进和优化数据平台的建设工作,使数据体系架构能够朝着更智能化(smart)的方向不断优化和演进。
—————— / END / ——————
分析最新的数据思想,与百万数据从业者一起成长
分享、点赞、在看,三连、三连!