查看原文
其他

在生鲜零售业,DolphinScheduler 还能这么玩!

单葛尧 海豚调度 2022-10-28

点击上方 蓝字关注我们



 


✎ 编 者 按 


2021 年,Apache DolphinScheduler 社区又迎来了新的蓬勃发展,社区活跃度持续提高。目前,项目 GitHub Star 已达 6.7k,贡献者达 260+,fork 数 2.4k+。仅在过去一个月内,社区就发布了两个新版本,并迎来 33 位作者向 dev 提交 85 次 commits,向所有分支提交 161 次 commits。在 dev 上,有 504 个文件发生了变化,共计新增 10,421 行代码,删除 12,376 行代码。此外,27 人 merge 了 113 个 Pull Request,102 个 issue 得到解决。


为了让更多活跃参与者的贡献被看到,社区特推出《社区星力量》专栏,分享他们挥洒汗水所得的开源成果,以及在参与开源过程中发生的小趣事。如果你也想让大家看见闪闪发光的你,欢迎投稿告诉社区你的故事,都有可能会得到刊登报道,优秀故事还会获得社区提供的走心奖品和纪念品哦!


投稿请发邮箱:xiyan@whaleops.com 加微信(Leonard-ds)。


今天,我们故事的主人公是来自食行生鲜的研发中心数据平台开发工程师 单葛尧。



01

团队介绍



大家好,我叫单葛尧,目前就职于食行生鲜,担任研发中心数据平台开发工程师,Base 苏州。


食行生鲜研发中心数据平台的研发方向为 DMP(Data Management Platform,数据管理平台),以及以 Flink 为基准的实时平台的研发。


食行生鲜研发中心数据平台团队


食行生鲜是一家采用“预订制”模式,通过全程冷链配送和社区智能冷柜自提方式,为用户提供优质生鲜服务的中国新零售领军企业。公司从家庭消费更多属于计划性消费这一底层逻辑出发,在深刻理解生鲜零售的第一性原理是要不断降低交易费用、提升用户体验的基础上,首创“预订制+全程冷链+冷柜自提”的运营模式,实现生鲜零库存,并通过集约化冷链配送,降低损耗和配送成本,成功解决生鲜电商领域一直以来普遍存在的高成本难题。

      

因上述原因,需要数据部门提供大量数据处理、业务报表、用户标签画像等。于是我司于六月立项更换调度系统,契机主要是因为原自研调度系统为单体架构,在分布式、高可用、可容错方面存在欠缺,因单体崩溃影响数据产出。于是数仓部门的小伙伴经过长时间调研,比较了 Airflow、Azkaban 等,最终定下来选择了DolphinScheduler,原因如下:



02

Azkaban


Azkaban 是LinkedIn开源的一个分布式工作流调度系统,比较成熟,开箱即用。


项目地址:https://azkaban.github.io/


先上一张架构图 :


主要由如下几种组件构成:


  • Web Server : 主要包括工作流配置管理,用户认证,定时调度,触发任务执行功能

  • Executor:处理实际工作流和任务的执行

  • Database: 存储工作流和任务的元信息


具体执行流程:


【1】调度器触发或者人工触发,生成工作流实例信息出入数据库

【2】更具LB选择一个Executor来执行该工作流实例

【3】执行状态和结果信息存入数据

【4】Web server查询数据库告知任务状态信息


架构特点:


  • Web server : 通过quartz来实现分布式调度 ,所有可以水平扩展,不存在单点问题

  • Executor : 可以水平扩展,不存在单点问题

  • 工作流上的任务只能在一个executor执行,不能分散到多个executor上执行,这点限制了调度能力,可能导致资源分布不均衡

 


03

Airflow



Apache 顶级项目,基于 Python 编写。


项目地址:http://airflow.apache.org/


以非常灵活的方式来支持数据的 ETL 过程,同时还支持非常多的插件来完成诸如 HDFS 监控、邮件通知等功能。Airflow 支持单机和分布式两种模式,支持 Master-Slave 模式,支持 Mesos 等资源调度,有非常好的扩展性,但不足之处是不支持 sqoop命令。

 

Airflow 的概念相对比较复杂,比较核心的有 DAG 、Operators 、Tasks 三个概念。DAG 表示的是由很多个 Task 组成有向无环图,可以理解为 DAG 里面的一个节点,Task 由 Operators 具体执行,Operators 有很多种,比如运行 Bash 任务的 Operators 。


架构图如下:


主要有如下几种组件构成:


  • web server: 主要包括工作流配置,监控,管理等操作

  • scheduler: 工作流调度进程,触发工作流执行,状态更新等操作

  • 消息队列:存放任务执行命令和任务执行状态报告

  • worker: 执行任务和汇报状态

  • mysql: 存放工作流,任务元数据信息


具体执行流程:


【1】scheduler扫描dag文件存入数据库,判断是否触发执行

【2】到达触发执行时间的dag,生成dag_run,task_instance 存入数据库

【3】发送执行任务命令到消息队列

【4】worker从队列获取任务执行命令执行任务

【5】worker汇报任务执行状态到消息队列

【6】schduler获取任务执行状态,并做下一步操作

【7】schduler更具状态更新数据库


架构特点:


  • web server 主要起管理功能可以横向扩展 ,不存在单点

  • scheduler: 只能启动一个,存在单点问题。但有第三方开源方案来解决这个问题,热备方案:https://github.com/teamclairvoyant/airflow-scheduler-failover-controller

  • worker:可以启动多个 横向扩展

 


04

DolphinScheduler



Dolphin 原名 Easy Scheduler,2019 年 9 月份改名为 Dolphin Scheduler, 并进入 Apache 孵化器进行孵化。


项目地址:https://dolphinscheduler.apache.org/ 


Apache Dolphin Scheduler 是一个分布式、易扩展的可视化 DAG 工作流任务调度系统。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中开箱即用,基于 java 开发。


架构图如下:


主要组件构成:


  • api server: api接口,管理,执行工作流,任务等。

  • Master: 拆分工作流并按照顺序执行任务

  • 消息队列:存储任务执行命名消息

  • worker: 执行具体任务并更新任务状态

 

具体执行流程:


【1】触发工作执行,生产待执行工作流命令存入数据库

【2】master扫描执行待执行工作流命令,执行工作流

【3】拆分工作流为一个个执行任务并生成任务实例存入数据库

【4】发送执行任务命令到消息队列

【5】worker 从消息队列里获取执行任务,执行

【6】更新执行任务的状态回数据库

【7】master更具任务执行状态,进行下一步操作


架构特点:


  • api server : 可横向扩展

  • master:有状态服务 通过zk 来选举一个leader,支持横向扩展

  • worker: 可横向扩展

 

不足之处是没有采用 Airflow 的状态汇报到消息队列的方式,这样会导致master 需要维护很多任务实例的线程来跟踪任务状态,这样可能导致master cpu 压力。


详细介绍可参考:https://www.itsiv.com/archives/551

 

功能对比:


功能

Azkaban
Airflow
DolphinScheduler

调度模块

Quartz
自实现
quartz

Job类型

Command、HadoopShell、Java、HadoopJava、Pig、Hive等,支持插件式扩展
Python、Bash、HTTP、Mysql等,支持Operator的自定义扩展。
支持传统的shell任务,同时支持大数据平台任务调度:MR、Spark、SQL(mysql、postgresql、hive、sparksql)、Python、Procedure、Sub_Process

Executor触发

Restful
Restful
Restful

工作流

project - - flows - - tasks
dag - - tasks
project - - flows - - tasks

部署运维

简单
较复杂,包括WebServer、Scheduler、Worker和Flower(可选,用于)
简单

单点故障

Web Server存在单点故障风险
Scheduler存在单点故障风险。
去中心化的多Master和多Worker

HA额外要求

DB
Celery / Dask / Mesos + Load Balancer + DB
不需要(本身就支持HA)

过载处理

任务太多时会卡死服务器
任务太多时会卡死服务器
任务队列机制,单个机器上可调度的任务数量可以灵活配置,当任务过多时会缓存在任务队列中,不会造成机器卡死

DAG监控界面

只能看到任务状态
不能直观区分任务类型
任务状态、任务类型、重试次数、任务运行机器、可视化变量等关键信息一目了然

可视化流程定义

通过自定义DolphinSchedulerL绘制DAG并打包上传
通过python代码来绘制DAG,使用不便,特别是对不会写代码的业务人员基本无法使用。
所有流程定义操作都是可视化的,通过拖拽任务来绘制DAG,配置数据源及资源。同时对于第三方系统,提供api方式的操作。

快速部署

集群化部署复杂
集群化部署复杂
一键部署

是否能暂停和恢复

只能先将工作流杀死再重新运行
只能先将工作流杀死再重新运行
支持暂停(非真暂停),恢复操作,支持停止操作

是否支持多租户

支持
easyscheduler上的用户可以通过租户和hadoop用户实现多对一或一对一的映射关系,这对大数据作业的调度是非常重要的。

是否支持集群扩展

是,但是复杂
Executor水平扩展
是,但是复杂
Executor水平扩展
调度器使用分布式调度,整体的调度能力会随便集群的规模线性增长,Master和Worker支持动态上下线

开发语言

Java
Python
java

二次开发

一般
英文文档,难度较大
容易,中文支持比较好。

社区活跃

有点活跃100+
1000+
500+

依赖组件

Mysql、python2.7
mysql数据库
Mysql或者pg

注:以上调研为 2021.6 数据

        

之后数月,数据部门齐心合力,多次压测,于 2021.9.25 正式投入生产,版本为 1.3.6。目前我们有 2000+ 的工作流定义,使用三台机器部署DolphinScheduler,平稳运行一个月,极大地保障了数据部门的数据产出效率。



05

顺利投产



生产环境的基本产出效率得到了满足,我又和数仓的小伙伴们开始琢磨起了易用性的问题,因 1.3.x 工作流定义表大 Json 的复杂性,导致如果工作流里的任务庞大,就会造成该 Json 也非常大,可读性非常差,使用时也需解析 Json,非常耗费性能且任务没法重用,致使数据大量冗余,我们想基于任务级别开发血缘链路可视化也难度重重。

       

正愁着此事,社区的小伙伴仿佛看到了我们的痛点,10 月初,我通过社区了解了 2.0 版本即将上线,在做了简单的调研后,我们断定 2.0 版本解决了我们目前困扰我们的几个大难题:


1.Json 的拆分,大大提升了性能和后续开发的可能性

2.工作流版本控制,不再担心误操作无法溯源归因

3.告警模块实例插件化,帮助我们更好的集成入我司的数据告警中心

4.引入工作流血缘可视化,帮助我们得以了解整个数据生产链路的上下游关系


(当然,2.0版本给我们的惊喜不仅仅于此)


说干就干,我花了一周的时间对比新老数据结构的差异,写了一个长长的Python 脚本去做迁移(后续社区在 release 版会有正式的迁移方案),我编译了 GitHub上 dev 分支的源码就上了测试环境,并于 10 月最后两天投入了压测,一切顺利,于是我们于 10 月 31 日晚发布上线,准备投入生产。

 

生成全局唯一键 code 的 Python 版实现,用于新老版本数据结构的迁移

 


06

出问题,我惊出一身汗



结果天不遂人愿,次日早上六点,我接到了数仓小伙伴的电话求助,Hadoop 面板展示没有一个任务在运行!我一激灵,今天一号,会有大量月度报表与数据处理会在今日产出,一旦调度系统出问题,将会影响到很多业务部门。我打开 DolphinScheduler,发现当时所有的任务都处于运行状态,所以没有触发失败告警,但是到计算集群里看,这些任务早早已经结束,已完成的 task 状态不被更新,新调起的任务实例也不能成功运行,一直处于提交成功状态,查看各项服务的日志,毫无报错。



07

社区给出解决方案,有惊无险



我没法短时间定位错误,正一筹莫展之际,想到了前段时间社区协助我解决问题的 caishunfeng 小伙伴,shunfeng 同学十分热情,根据现象给出了两种猜测:


1.创建了大量的 Netty Channel,造成网络负载高,导致后续任务不能正常通信;

2.Zookeeper 的 session expire 了,导致临时节点被清掉;

 

他还贴心地贴出了代码,与我共同讨论这个事故现象,我更改后重新打包,并且成功发布至了生产,重启了服务,后续至发布今日,生产环境未再出现宕机状态。

 

生产环境稳定下来了,我的工作也从保证生产稳定性变成了业务功能的开发,我们在后续也切到了alpha-release版本,使用中发现了一个问题,当工作流存在全局变量时,补数功能就会失效,定位到问题是commandType 为 COMPLEMENT_DATA 时,没法正确处理 runTime为 NULL 时的情况。一个简单的 BugFix,在shunfeng 小伙伴的帮助下,我提交了 PR,https://github.com/apache/dolphinscheduler/pull/6759,也在后续 merge 到了 release-prepare 中。



08

提交PR,参与贡献一条龙



之后我还和 shunfeng 提了好多意见并展开了讨论,如我希望后续调起的高优先级工作流可以先被执行,在排队展现出一种插队的效果,观察到原worker 服务的线程池等待队列由 LinkedBlockingQueue 实现,它是 FIFO 的,不能满足我们的需求,我们更换了线程池的等待队列的实现为 PriorityBlockingQueue,在 TaskExecuteThread 的 compareTo 中获取到工作流或是任务的优先级,得以实现这个“插队”效果。

 

后续我司会根据数仓小伙伴的业务需求,不断完善生产环境的 DolphinScheduler,并将共性需求化成 feature 与 PR,回馈社区。我们很欣喜地看到由国人主导的顶级项目不断蓬勃壮大,也在参与 DolphinScheduler 迭代的过程中深刻理解了 Apache 的“社区大于代码”之道,观察到 Datahub 已经集成 Airflow,后期我司也将推行数据治理平台,希望可以集成 DolphinScheduler,共建良好的 DolphinScheduler 生态!






参与贡献



随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。

参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:


贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。


社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689


非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22


如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/docs/development/contribute.html


来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。


参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手

微信(Leonard-ds) 手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。添加小助手微信时请说明想参与贡献。


来吧,开源社区非常期待您的参与。







社区官网

https://dolphinscheduler.apache.org/

代码仓地址https://github.com/apache/dolphinscheduler

您的 Star,是 Apache DolphinScheduler 为爱发电的动力❤️ ~

投稿请添加社区小助手微信

(Leonard-ds)




☞荔枝机器学习平台与大数据调度系统“双剑合璧”,打造未来数据处理新模式!

最佳实践|联通数科基于 DolphinScheduler 的二次开发

DolphinScheduler 荣获 2021 中国开源云联盟优秀开源项目奖!

议题征集令 | Apache DolphinScheduler Meetup 2021 来啦,议题征集正式开启!

☞重构、插件化、性能提升 20 倍,Apache DolphinScheduler 2.0 alpha 发布亮点太多!

☞巨变!a16z 关于新一代数据基础设施架构的深度洞察

手把手教你 Apache DolphinScheduler 本地开发环境搭建 | 中英文视频教程

☞Apache DolphinScheduler使用规范与使用技巧分享


点击阅读原文,加入开源!



点个在看你最好看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存