Airflow 101: 隐藏小技巧帮你快速上手!
今日份知识你摄入了么?
目录
1. 目标受众
2. 背景
3. 入门
4. Airflow概念
5. DAG示例
6. Airflow命令行界面(CLI)
7. 注意事项、建议、一些提示
1.目标受众
本文的目标受众是那些对 Airflow 有少量经验、或没有经验,并希望快速上手的人。
2. 背景
Airflow
Airflow 是一个开源的工作流管理平台,用于管理复杂的管道
它始于 Airbnb,可用于通过DAG(有向无环图)管理和调度 ETL 管道
其中, Airflow 管道是定义 DAG 的 Python 脚本构成的
许多人把Airflow作为调度数据管道的Cron的替代品
Crom不再用于 ETL 调度
Cron 可用于调度 ETL 作业,但它不提供管理 ETL 工作组件之间的相关性或故障的机制
为了说明这一点,下面是Airflow DAG简单实例图
本图的DAG 包含 了7 个 Airflow 任务,用于展示披萨烘焙中的逻辑。
图源:https://tech.curama.jp/entry/2018/03/23/130000
关键特性:任务相关性
这个例子可能有点无聊,但上面强调的重点是相关性逻辑(dependency logic)
可以看到,只有在 add_cheese、add_ham、add_pineapple 和 add_mushroom 全部完成后,才能执行执行bake_pizza
数据操作友好型:监控并提醒你的数据管道
Airflow 的一大卖点是,它可以用来创建自动化的、受监控的数据管道
例如,请参考REA 公司博客文章中的 Airflow DAG 图例
DAG 中的最后 2 个任务的触发条件是根据任务的输出结果 ,即data_is_fresh :
图源:https://www.rea-group.com/blog/watching-the-watcher/
如果数据不是最新的,则触发 data_freshness_failure_alert,会让一条消息发布到slack平台:
使用 PostToSlack 操作符可以通过 Airflow 将消息发布至 Slack
我的动力/动机
在我过去参与的五六个项目中,同样的问题一次又一次地浮现在我的脑海中——“如果数据管道出现故障,我们是否可以设置某种监控和提醒来通知我们?比如,通过 slack 或电子邮件?”
Airflow 通常被人们推荐为候选解决方案,但讨论也到此为止了。但是,想法是永远不会自己实现的。
所以我决定自己动手……
3. 入门
3.1. 托管程序(Managed Provider)VS.本地安装(Local Install):本地安装可快速帮助你进行实际操作
有多个托管提供程序可以用于运行 Airflow,例如 Astronomer, Cloud Composer, AWS MWAA等
但根据我的经验,我强烈建议只在本地设置 Airflow ,这样能更方便了解概念和常见设计
要避免白忙一场
由于每个托管服务解决方案都存在着细微的差别,我发现自己在评估每个托管服务时经常偏离主题,从而陷入困境。在AWS MWAA中尤其如此。
我讲这些事情的目的是,在开始使用该工具之前,没有人想在设置/配置上浪费太多时间。
3.2. 本地安装步骤
Airflow 的网站文档中说明了如何在本地运行 Airflow。本地安装包括以下 5 个步骤:
安装 Airflow l 的步骤
接下来:我将介绍一些核心的Airflow概念/理念。
4. Airflow概念
Airflow 中的主要概念要么会被描述为核心理念,要么被描述为附加功能。
4.1. 核心理念
从根本上说,Airflow 可用于构建和运行工作流
这些工作流表示为 DAG(有向无环图),其中包含各个工作组件,我们称之为任务(Tasks)
DAG(有向无环图)
DAG 是 Airflow 的核心概念,它将任务组合在一起,通过相关性和关系组织起来,用于说明如何运行各项任务。这是从 Airflow 的文档中提取的一个基本示例图:
这个 DAG示例图定义了四项任务(A、B、C 和 D),并规定了运行的顺序,并描述了任务相关性。同时,还需要定义DAG的运行频率,例如“从明天开始每 5 分钟一次”。
运算符
运算符定义了针对某项任务而执行的操作。Airflow 中两个捆绑运算符包括:
BashOperator:执行shell命令/脚本
PythonOperator:执行Python代码
如果你需要的操作符没有默认安装在 Airflow里面,你可以在庞大的社区 Provider Package中找到。有一些运算符非常受人们欢迎,包括:
支持 DBMS 相关的操作:MySqlOperator / PostgresOperator / MsSqlOperator / OracleOperator
JdbcOperator
SlackAPIOperator
S3FileTransformOperator
4.2. 附加功能
除了Airflow 的核心对象之外,还有许多相应的功能用于支持某些操作。
连接(Connections)和挂钩(Hooks)
连接(Connections)
连接本质上是一组参数(例如:用户名、密码、主机名),以及它连接到的系统类型和唯一名称(称为 conn_id)
最终,这些参数会用于存储某外部平台的预设凭据(predefined credentials)
例如,你可以创建一个连接,用来捕获用于连接至特点 AWS 账户的相关参数
挂钩(Hooks)
挂钩(Hooks)是一个外部平台的高级接口,可让你无需编写访问API /使用特殊库的代码,就能快速轻松地与这些平台沟通
挂钩(Hooks)与连接(Connections)集成,共同收集凭证
你可以在 Airflow 的 API 文档中查看Airflow挂钩的完整列表
XComs
XComs 是一种让任务相互交流的机制,因为默认情况下,任务是相互隔离开的
当试图在某个DAG中获取之前的任务输出时,XComs会非常有用。
更多详细信息,请参阅:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
变量(Varibles)
变量(Varibles)是一个通用的键/值存储(Key/value store),可从你的任务中查询,并可通过 Airflow 的用户界面或 CLI 轻松设置
你只需在变量模型上导入并调用它,即可使用:
接下来,我将展示一些DAG示例 ,例如让 Airflow 和Slack 、MSSQL 交互。
5. DAG 示例
在你设置了 Airflow 环境(本地安装或托管提供程序都可)之后,你就可以继续创建你的Airflow 工作流程了。
在开始之前,我们可以查看一些 Airflow DAG示例,了解每个 DAG 所涉及的内容。在这篇文章中,我将简单介绍一下一下我整理的DAGs:
1.样板(模板)DAG
2.MSSQL 查询 DAG
3.“发布至 Slack” DAG
4.任务失败时的“发布至 Slack”DAG
现在,让我们从样板 DAG 开始。
5.1. 样板 DAG
下面显示的是我开发 Airflow DAG 的初步操作:
这里需要为新手指出一些关键的部分:
1. Python Airflow模块
2. default_args(默认参数)
3. Airflow任务(例如,example_task)
1. Python Airflow模块
你通常至少会使用到以下 Python Airflow 模块:
请注意,我把导入 PythonOperator 和 Airflow 变量模块的调用comment掉了,因为虽然这两者非常常见,但并非总是这么用(具体取决于你的用法)。
2.default_args(默认参数)
为 Airflow DAG 指定默认参数是常见的做法,这里的关键是实际参数本身
我经常使用上面这 5 个参数
3.Airflow任务(例如,example_task)
在 Airflow DAG 的底部,你可以列出 你的Airflow 任务
Airflow 任务要使用某种操作符,例如:在这个例子中,操作符是PythonOperator
该任务是调用 Python 函数,eg_python_function
同样,值得指出的是参数 provide_context
5.2. 样板 MSSQL 查询 DAG
下面显示的是 Airflow DAG 执行 MSSQL DB 查询所需的骨架结构:
其中需要注意的重要事项为:
1. 任务:mssql_select_all_query
上述 DAG 的关键是在任务 mssql_select_all_query 中执行(显示在脚本底部,如下所示),其中,脚本使用了 Python 模块 pymssql 执行 MSSQL 查询。
该任务简单使用了 PythonOperator 去调用 Python 函数 mssql_query(代码如下所示):
2. DAG 脚本与用于 Python mssql_query() 函数的代码非常相似
Python 函数 mssql_query() 使用的语法和在普通 Python 脚本中使用 Python 模块 pymssql 时使用的语法相同
我展示这一点的目的,只是为了强调在 Airflow 中是否使用现有的 Python 脚本根本不重要
3. Python Airflow模块
请注意脚本顶部使用的 2 个 Airflow 模块:
正如在第一个样板 dag 的注释中所提到的,这两个模块很常用,但在该脚本中的注释为占位符。而在这里,确实使用了这两个模块!
特别值得指出的是,变量模块已用于检索数据库凭据,这些凭据可用于查询。
5.2. “发布到 Slack” DAG
这是一个非常简单/精简的脚本,用于将所需内容发布到 slack。例如:
1. SlackWebhookOperator
要在Slack上发布消息,你需要使用 Slack Webhook。此外,你还需要使用 Python Airflow 运算符—SlackWebhookOperator,在脚本顶部导入:
2. slack_token
重要的是,要在Slack上发布消息,你需要一个 slack token。其中有很多步骤/细微差别,例如,你需要在 Slack 中为你的公司创建一个“应用程序”,然后创建一个 webhook 等。
3. 用于Airflow连接的密码字符串
你的 Slack webhook URL 会类似于://hooks.slack.com/services//T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
你的密码将跟在 https://hooks.slack.com/services 之后
重要提示:这个值就是你的 slack token 。因此,你会在 DAG 脚本中用到这个token。我会把值存在一个变量中:
5.3. 任务失败时的“发布到 Slack”DAG
之前的 DAG 只是演示了如何从将Airflow 的消息发送至 Slack,而下面的 DAG 则包含Airflow 任务失败发送消息至 Slack 的功能。
下面的 DAG 是一个简单的例子,如果函数 read_op() 中变量 i 的值为 1,则调用任务失败。
当任务失败时,会向 Slack 发布一条消息:
上面有一些关键事项:
1. on_failure_callback
这个脚本中要指出的关键是一个任务选项,on_failure_callback
该默认选项表示,“如果 Airflow 任务失败,则调用 Python 函数 slack_failure_msg”(如下所示)
2. 返回 failure_alert.execute(context=context)
这与最初的“post to DAG”脚本之间的另一个区别在于返回的对象。可以看到,在调用 Python 函数时,就会执行 Airflow failure_alert。最终发布类似于以下内容的帖子:
6. Airflow CLI
Airflow 的 CLI 可以让操作在UI上通过终端执行。这对于编写脚本/自动化活动非常有用。下表描述了一些我觉得很有用的 CLI 命令示例:
7. 注意事项、建议和一些提示
1. Airrflow 2.0 的变化——Airflow 核心和提供程序
在 Airflow 版本 1 中,有一个“统一”包,用于构建外部服务的导入模块,例如AWS、GCP 等。
但是,在版本 2 更改中,Airflow 的程序包结构(package structure)重新成为设计的核心,目前为止,有61个不同的程序包。每个程序包能用于某个外部服务(Google、Amazon、Microsoft、Snowflake)、数据库(Postgres、MySQL)或协议(HTTP/FTP)
因此,我们在利用外部服务库提出的在线解决方案需要特别注意,并注意它们所针对的 Airflow 版本。
2. 在线提出解决方案时,注意所用的Airflow版本
刚开始开发 DAG 时,由于旧版 Airflow API 的一些功能已弃用,我遇到了许多问题
特别是,许多在线解决方案的案例都引用了不具备旧版本兼容性的功能
但Airflow DAG有一点很好,那就是基本上不再使用 Airflow 连接---S3,而是使用“AWS”连接
从 S3 → AWS 连接类型的转变
尽管现在普遍改用 AWS 连接,但很多 Airflow Hooks 还在用 S3 连接。有大量挂钩尚未更新,提供断回兼容性(break-back compatibility),例如 MSSQL 插件操作符 mssql_to_s3_operator
同样,这会导致 redshift_to_s3_operator 出现问题
所以我的建议是,注意在线解决方案的日期/最新更新日期。
2. XComs
最初研究 Airflow 时,我们并没有真正关注 XComs 的功能。
但结果是,我经常将XComs用于任务之间的传递输出,并且经常这样做。
下面显示的是上面列出的 DAG 的片段,“在任务失败时发布到 Slack DAG”,这个过程就用到了 XComs:
作为初步操作和示例,Python 函数返回的输出(即上面代码中的 return eg_op)将用作 XCom 值示例,在这个过程中,Airflow 任务中将调用该函数。
在后续的 Python 函数中,需要执行以下操作,读取 Airflow 任务的输出:
在函数定义中传入 **kwargs,将未指定数量的参数传入函数
创建任务变量(命名规则通常为,ti)
读取特定任务的输出,使用以下命令:ip = ti.xcom_pull(task_ids='[name of task]')
3. 常用Airflow配置选项
你可以用 airflow.cfg 文件配置你的 Airflow 环境,该文件位于 ${AIRFLOW_HOME}/airflow.cfg 中。下面这些参数很有用:
通用Airflow配置选项
4. 时区感知 DAG
你可以在Airflow配置中设置默认时区。但是,如需在 DAG 代码中使用本地时区,你需要了解 DAG 时区。
Airflow 是一个调度程序,由于“下一个执行日期”和“开始日期”这些选项很重要,DAG 中的代码会使用 UTC 时区运行。要使你的DAG感知时区,你需要使用 Python 钟摆模块(pendulum module),它能提供start_date的时区感知。请参阅以下示例:
5. 一些提示
1. Airflow:非常冷门的提示、技巧和最佳实践
* 使用默认参数避免重复
* 使用 PythonOperator 时的,在连接中存储敏感数据以及 “context”字典
2. Airflow注意事项
* “不要更改 start_date + interval”
* “开发过程中刷新 DAG”,“不要忘记启动调度程序”
不更改 start_date 和 interval ,对于新手来说尤其重要。
3. Airflow提示、技巧和注意事项
我建议你在构建第一个 DAG 并掌握主要概念以后阅读本文。本文提供了一些有用的概念,如任务状态回调(Tasks States Callbacks),并提供了关于 UTC 日期问题的描述,都非常有用。
感谢阅读
原文作者:Paul Fry
翻译作者:Lia
美工编辑:过儿
校对审稿:Jiawei Tong
原文链接:https://paulfry999.medium.com/airflow-101-hints-and-tips-to-quickly-get-started-b48fe8948602
本周公开课预告
往期精彩回顾
点击“阅读原文”查看数据应用学院核心课程