DeltaLake在工业大脑的实践分享
The following article is from Apache Spark技术交流社区 Author EMR团队
前言
异地异构流消息的处理
支持使用正则消费多个 Kafka Topic
使用 SubscribePattern ,可以使用正则实现同时消费多个 Topic 的数据,在一个园区有许多个 Topic 需要消费的场景下非常方便
对 HDFS 的支持和小文件合并的封装
使用 ubscribePattern,可以使用正则实现同时消费多个 Topic 的数据,在一个园区有许多个 Topic 需要消费的场景下非常方便
天然对写 HDFS 的支持,可以免去使用 Flink 的时候需要编写 HDFS Sinker,或者额外运维 Flume 集群带来的麻烦
每一个流式入库的场景,对于数据架构师来说都是一个性能与时效性的权衡取舍过程,不管是 Flink、Flume 还是 SparkStreaming,都会有“滚动写入容量(或条数)阈值”和“滚动写入时间阈值”的设计,在实际的实施过程中,根据业务对于数据延迟和性能的需求不同,来权衡二者。例如对于延迟容忍度很低的场景,可以将容量或条数阈值设置的很小(甚至为1)来让新的数据快速滚动写入,但是这样带来的副作用是 Sinker 的频繁IO,比如在 HDFS 产生很多的小文件,影响数据读写或 DataNode 的性能;在延迟容忍度较高的场景下,交付工程师则往往选择将条数阈值和时间阈值加大,带来更好的IO性能,但牺牲数据延迟。这是一种通用的方法,但在实际生产过程中,你会发现,要为许多的流作业维护许多不同的配置,这项工作的成本依然不小。使用 DeltaLake 来处理,则可以轻松很多,你可以将所有的流作业的滚动写入阈值设置成一样的(比如都比较小),这样所有的流作业都可以得到比较好的数据延迟,同时结合使用 DeltaLake 的特性功能 Optimize 和 Vacuum ,配置定时调度任务来周期执行,对小文件进行合并或删除,来保障 HDFS 的性能,这样可以使整个数据开发工作简单很多,也更好运维。关于 Optimize 特性可以参考:https://help.aliyun.com/document_detail/148373.html
流批融合的数据分析
在生产制造环节,机器设备的稳定运行对于产成品质量至关重要,而判定设备是否稳定运行的最直观方法,就是查看某些传感器的历史长时间历史趋势,在实际项目实施过程中,交付工程师往往使用流作业,将 Kafka 中大量的传感器时序数据加工后写入 OLAP 存储(例如阿里云 ADB、TSDB 或 HBase 等),来支撑上层数据分析应用的高并发、低响应时间的实时查询需求。
但是实际情况往往比这复杂得多,由于工业企业的信息化和数字化水平普遍不高,不同行业的生产过程自动化程度也参次不齐,有许多的设备实时数据其实并不准确,它们需要在若干时间以后(数分钟或者数小时),经过人工干预或者重新计算较正后才能使用。
所以在实际实施过程中,往往采用一种“滚动覆盖”的模式来不断改写 OLAP 存储中的数据,将 OLAP 分为“实时增量区”和“周期覆盖区”,例如下图所示:
上图使一个 OLAP 存储,所有的数据被分为橙色和蓝色两部分,上层数据应用可以无差别地查询这两个区域的数据,唯一的差别是:橙色的最新数据,由流计算作业实时从 Kafka 获取,做加工后写入;而蓝色区域,则由历史数据周期性计算(加入矫正逻辑)后写入,对昨天或更久之前的实时数据进行订正,这样周期往复,在保障数据时效性的同时,对历史数据做订正覆盖,来保障数据的正确性。
在以往的做法中,往往使用一个流+批的 Lambda 架构,用两种不同的计算引擎来处理流与批,如下图所示:
Lambda 架构的弊端也可由此可见,在两个不同的平台维护两台代码,还要保障它们两的计算逻辑完全一致,是比较费功夫的事情,在引入 DeltaLake 之后,事情变得相对简单,Spark 天生的流批一体设计,就很好地解决了代码复用和跨平台逻辑统一的问题,结合 DeltaLake 的特性(例如 ACID,OPTIMIZE 等),可以更优雅地完成这项工作,如下图:
另外值得一提的是,流批一体并不是 Spark 的独有特性,但是阿里云 EMR 在 SparkSQL 和 Spark Streaming 之上又对 SQL 进行了一层封装,使得业务人员能够更低门槛地使用类似Flink SQL 的语法来进行作业开发,使得流批场景下的代码复用和运维工作变得更加简单,这一点对于项目交付提效意义很大,具体可参考:
https://help.aliyun.com/document_detail/124704.html
对事务的处理和对算法的支持
传统的数据仓库,很少会在建模过程中引入事务,由于数据仓库要反映数据的变化情况,所以往往使用缓慢变化维度等方法来记录数据的状态变化,而并不会用 ACID 来让数据仓库与业务系统保持一致。
但是在工业数据中台的实施过程中,事务有他独特的使用场景,例如排产排程,是每一个工业企业都关心的重大问题,排产,往往从集团级别进行,根据客户订单、物料库存和工厂产能等角度来对当期的生产需求进行合理的分解和编排,来达到产能合理分配;排程则往往更加微观,在工厂级别,根据工单、物料和实际的生产情况来实时动态调整生产计划,达到资源利用率最大。它们都是需要众多数据融合求解的规划问题,如下图所示:
排产排程算法所需要的原始数据,往往来自多个业务系统,例如 ERP 提供订单和计划数据, WMS 提供物料数据,MES 提供工单和工序数据,这些数据必须融合到一起(物理上和逻辑上),才能作为排产排程算法的有效输入,所以在实施过程中往往需要一个统一的存储来存放来自各系统的数据。同时排产排程算法对数据的实效性也有一定的要求,它需要输入的数据能够尽量与各个业务系统保持一致,这样才能真实地反映出当时的生产情况,以便更好的进行排程。
在以往,我们这么处理这种场景:
利用各个业务系统的 CDC 能力,或者单独编写程序来轮询,准实时地获取数据变化
写入关系型数据库,在此过程中处理数据 Merge 的逻辑,让关系型数据库中的数据与业务系统数据准实时地保持一致
排产排程引擎在被触发的时候,从 RDB 拉取数据进行运算
这种架构有一些显而易见的问题,主要有:
用 RDB 替代大数据存储,计算的时候把数据 Query 到内存中,对于数据量比较大的情况会很困难
如果用 Hive 引擎来替代中间的 RDB,虽然在 Hive3.X 支持 ACID,但是实时性和 MapReduce 编程框架对于算法(求解器)的支持都难以满足工程需求
目前我们正在尝试引入 DeltaLake,结合 Spark 的特性来优化这个架构,如下图:
优化后的架构,有如下优点:
使用 HDFS+Spark 替代 RDB 作为中台存储,解决数据量大时候的存储问题
使用 Spark Streaming+DeltaLake 来对接原始数据,利用 DeltaLake 的 ACID 特性来处理数据进入中台存储时的 Merge 逻辑,同时在流式入库的时候同时对数据进行Merge+Optimize,保障读写性能
排产排程引擎不再从中台 Query 数据到内存计算,而是把算法任务封装成 Spark 作业,下发到计算平台完成计算,这样利用 Spark ML 编程框架对算法和 Python 的良好支持,以及 Spark 本身的分布式计算能力,对需要多轮迭代的规划算法进行分布式处理
利用 DeltaLake 的 Time Travel 特性,对数据版本进行管理或回滚,这对于算法模型的调试和评估是非常有利的
1)DeltaLake 的核心能力 ACID 对于数据实时性和准确性要求较高的应用很有帮助,尤其是算法应用,可以更有效地利用 Spark 对 ML 的天然支持
2)结合使用 DeltaLake 的 Optimize+Vacuum 和 Streaming 的流式入库能力,在大批量对接上游 Kafka 数据的时候会有更好的兼容性,同时可以有效的降低运维成本
3)利用阿里云 EMR 团队封装的 Streaming SQL 开发流作业,在大规模的数据中台项目实施过程中可以有效降低开发门槛和成本
目前,DeltaLake 在工业大脑的应用尚在实验阶段,例如流式入库、排产排程引擎、流批融合等多个场景正在工业大脑多个项目中应用,同时这些场景也在逐渐沉淀为工业大脑的标准产品,后续结合工业大脑3.0的数据+算法场景的可视化编辑和复制能力,可以快速复制到离散制造、汽车、钢铁等多个行业的场景中,用AI能力普惠中国工业。
感兴趣的同学可以参考:
https://www.aliyun.com/solution/industry/home