查看原文
其他

Airflow 101: 隐藏小技巧帮你快速上手!

Lia 大数据应用 2022-10-18

今日份知识你摄入了么?

目录


  • 1. 目标受众

  • 2. 背景

  • 3. 入门

  • 4. Airflow概念

  • 5.  DAG示例

  • 6. Airflow命令行界面(CLI)

  • 7. 注意事项、建议、一些提示


1.目标受众


本文的目标受众是那些对 Airflow 有少量经验、或没有经验,并希望快速上手的人。


2. 背景


Airflow


  • Airflow 是一个开源的工作流管理平台,用于管理复杂的管道

  • 它始于 Airbnb,可用于通过DAG(有向无环图)管理和调度 ETL 管道

  • 其中, Airflow 管道是定义 DAG 的 Python 脚本构成的

  • 许多人把Airflow作为调度数据管道的Cron的替代品


Crom不再用于 ETL 调度


  • Cron 可用于调度 ETL 作业,但它不提供管理 ETL 工作组件之间的相关性或故障的机制

  • 为了说明这一点,下面是Airflow DAG简单实例图

  • 本图的DAG 包含 了7 个 Airflow 任务,用于展示披萨烘焙中的逻辑。


图源:https://tech.curama.jp/entry/2018/03/23/130000


关键特性:任务相关性


  • 这个例子可能有点无聊,但上面强调的重点是相关性逻辑(dependency logic)

  • 可以看到,只有在 add_cheese、add_ham、add_pineapple 和 add_mushroom 全部完成后,才能执行执行bake_pizza


数据操作友好型:监控并提醒你的数据管道


  • Airflow 的一大卖点是,它可以用来创建自动化的、受监控的数据管道

  • 例如,请参考REA 公司博客文章中的 Airflow DAG 图例

  • DAG 中的最后 2 个任务的触发条件是根据任务的输出结果 ,即data_is_fresh :


图源:https://www.rea-group.com/blog/watching-the-watcher/


如果数据不是最新的,则触发 data_freshness_failure_alert,会让一条消息发布到slack平台:


使用 PostToSlack 操作符可以通过 Airflow 将消息发布至 Slack


我的动力/动机


在我过去参与的五六个项目中,同样的问题一次又一次地浮现在我的脑海中——“如果数据管道出现故障,我们是否可以设置某种监控和提醒来通知我们?比如,通过 slack 或电子邮件?”


Airflow 通常被人们推荐为候选解决方案,但讨论也到此为止了。但是,想法是永远不会自己实现的。


所以我决定自己动手……


3. 入门


3.1. 托管程序(Managed Provider)VS.本地安装(Local Install):本地安装可快速帮助你进行实际操作


  • 有多个托管提供程序可以用于运行 Airflow,例如 Astronomer, Cloud Composer, AWS MWAA等

  • 但根据我的经验,我强烈建议只在本地设置 Airflow ,这样能更方便了解概念和常见设计


要避免白忙一场


  • 由于每个托管服务解决方案都存在着细微的差别,我发现自己在评估每个托管服务时经常偏离主题,从而陷入困境。在AWS MWAA中尤其如此。

  • 我讲这些事情的目的是,在开始使用该工具之前,没有人想在设置/配置上浪费太多时间。


3.2. 本地安装步骤


  • Airflow 的网站文档中说明了如何在本地运行 Airflow。本地安装包括以下 5 个步骤:

安装 Airflow l 的步骤


接下来:我将介绍一些核心的Airflow概念/理念。


4. Airflow概念


Airflow 中的主要概念要么会被描述为核心理念,要么被描述为附加功能。


4.1. 核心理念


  • 从根本上说,Airflow 可用于构建和运行工作流

  • 这些工作流表示为 DAG(有向无环图),其中包含各个工作组件,我们称之为任务(Tasks)


DAG(有向无环图)


DAG 是 Airflow 的核心概念,它将任务组合在一起,通过相关性和关系组织起来,用于说明如何运行各项任务。这是从 Airflow 的文档中提取的一个基本示例图:



这个 DAG示例图定义了四项任务(A、B、C 和 D),并规定了运行的顺序,并描述了任务相关性。同时,还需要定义DAG的运行频率,例如“从明天开始每 5 分钟一次”。


运算符


运算符定义了针对某项任务而执行的操作。Airflow 中两个捆绑运算符包括:


  • BashOperator:执行shell命令/脚本

  • PythonOperator:执行Python代码


如果你需要的操作符没有默认安装在 Airflow里面,你可以在庞大的社区 Provider Package中找到。有一些运算符非常受人们欢迎,包括:


  • 支持 DBMS 相关的操作:MySqlOperator / PostgresOperator / MsSqlOperator / OracleOperator

  • JdbcOperator

  • SlackAPIOperator

  • S3FileTransformOperator


4.2. 附加功能


除了Airflow 的核心对象之外,还有许多相应的功能用于支持某些操作。


连接(Connections)和挂钩(Hooks)


连接(Connections)


  • 连接本质上是一组参数(例如:用户名、密码、主机名),以及它连接到的系统类型和唯一名称(称为 conn_id)

  • 最终,这些参数会用于存储某外部平台的预设凭据(predefined credentials)

  • 例如,你可以创建一个连接,用来捕获用于连接至特点 AWS 账户的相关参数


挂钩(Hooks)


  • 挂钩(Hooks)是一个外部平台的高级接口,可让你无需编写访问API /使用特殊库的代码,就能快速轻松地与这些平台沟通

  • 挂钩(Hooks)与连接(Connections)集成,共同收集凭证

  • 你可以在 Airflow 的 API 文档中查看Airflow挂钩的完整列表


XComs


  • XComs 是一种让任务相互交流的机制,因为默认情况下,任务是相互隔离开的

  • 当试图在某个DAG中获取之前的任务输出时,XComs会非常有用。

  • 更多详细信息,请参阅:https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html


变量(Varibles)


  • 变量(Varibles)是一个通用的键/值存储(Key/value store),可从你的任务中查询,并可通过 Airflow 的用户界面或 CLI 轻松设置

  • 你只需在变量模型上导入并调用它,即可使用:



接下来,我将展示一些DAG示例 ,例如让 Airflow 和Slack 、MSSQL 交互。


5. DAG 示例


在你设置了 Airflow 环境(本地安装或托管提供程序都可)之后,你就可以继续创建你的Airflow 工作流程了。


在开始之前,我们可以查看一些 Airflow DAG示例,了解每个 DAG 所涉及的内容。在这篇文章中,我将简单介绍一下一下我整理的DAGs:


  • 1.样板(模板)DAG

  • 2.MSSQL 查询 DAG

  • 3.“发布至 Slack” DAG

  • 4.任务失败时的“发布至 Slack”DAG


现在,让我们从样板 DAG 开始。


5.1. 样板 DAG


下面显示的是我开发 Airflow DAG 的初步操作:

 


这里需要为新手指出一些关键的部分:


  • 1. Python Airflow模块

  • 2. default_args(默认参数)

  • 3. Airflow任务(例如,example_task)


1. Python Airflow模块


你通常至少会使用到以下 Python Airflow 模块:



请注意,我把导入 PythonOperator 和 Airflow 变量模块的调用comment掉了,因为虽然这两者非常常见,但并非总是这么用(具体取决于你的用法)


2.default_args(默认参数)



  • 为 Airflow DAG 指定默认参数是常见的做法,这里的关键是实际参数本身

  • 我经常使用上面这 5 个参数


3.Airflow任务(例如,example_task)



  • 在 Airflow DAG 的底部,你可以列出 你的Airflow 任务

  • Airflow 任务要使用某种操作符,例如:在这个例子中,操作符是PythonOperator

  • 该任务是调用 Python 函数,eg_python_function

  • 同样,值得指出的是参数 provide_context


5.2. 样板 MSSQL 查询 DAG


下面显示的是 Airflow DAG 执行 MSSQL DB 查询所需的骨架结构:



其中需要注意的重要事项为:


1. 任务:mssql_select_all_query


上述 DAG 的关键是在任务 mssql_select_all_query 中执行(显示在脚本底部,如下所示),其中,脚本使用了 Python 模块 pymssql 执行 MSSQL 查询。



该任务简单使用了 PythonOperator 去调用 Python 函数 mssql_query(代码如下所示)



2. DAG 脚本与用于 Python mssql_query() 函数的代码非常相似


  • Python 函数 mssql_query() 使用的语法和在普通 Python 脚本中使用 Python 模块 pymssql 时使用的语法相同

  • 我展示这一点的目的,只是为了强调在 Airflow 中是否使用现有的 Python 脚本根本不重要


3. Python Airflow模块


请注意脚本顶部使用的 2 个 Airflow 模块:



正如在第一个样板 dag 的注释中所提到的,这两个模块很常用,但在该脚本中的注释为占位符。而在这里,确实使用了这两个模块!


特别值得指出的是,变量模块已用于检索数据库凭据,这些凭据可用于查询。


5.2. “发布到 Slack” DAG


这是一个非常简单/精简的脚本,用于将所需内容发布到 slack。例如:



1. SlackWebhookOperator


要在Slack上发布消息,你需要使用 Slack Webhook。此外,你还需要使用 Python Airflow 运算符—SlackWebhookOperator,在脚本顶部导入:



2. slack_token


  • 重要的是,要在Slack上发布消息,你需要一个 slack token。其中有很多步骤/细微差别,例如,你需要在 Slack 中为你的公司创建一个“应用程序”,然后创建一个 webhook 等。


3. 用于Airflow连接的密码字符串


  • 你的 Slack webhook URL 会类似于://hooks.slack.com/services//T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX

  • 你的密码将跟在 https://hooks.slack.com/services 之后

  • 重要提示:这个值就是你的 slack token 。因此,你会在 DAG 脚本中用到这个token。我会把值存在一个变量中:



5.3. 任务失败时的“发布到 Slack”DAG


之前的 DAG 只是演示了如何从将Airflow 的消息发送至 Slack,而下面的 DAG 则包含Airflow 任务失败发送消息至 Slack 的功能。


下面的 DAG 是一个简单的例子,如果函数 read_op() 中变量 i 的值为 1,则调用任务失败。


当任务失败时,会向 Slack 发布一条消息:



上面有一些关键事项:


1. on_failure_callback


这个脚本中要指出的关键是一个任务选项,on_failure_callback



该默认选项表示,“如果 Airflow 任务失败,则调用 Python 函数 slack_failure_msg”(如下所示)



2. 返回 failure_alert.execute(context=context)


这与最初的“post to DAG”脚本之间的另一个区别在于返回的对象。可以看到,在调用 Python 函数时,就会执行 Airflow failure_alert。最终发布类似于以下内容的帖子:



6. Airflow CLI


Airflow 的 CLI 可以让操作在UI上通过终端执行。这对于编写脚本/自动化活动非常有用。下表描述了一些我觉得很有用的 CLI 命令示例:



7. 注意事项、建议和一些提示


1. Airrflow 2.0 的变化——Airflow 核心和提供程序


在 Airflow 版本 1 中,有一个“统一”包,用于构建外部服务的导入模块,例如AWS、GCP 等。


但是,在版本 2 更改中,Airflow 的程序包结构(package structure)重新成为设计的核心,目前为止,有61个不同的程序包。每个程序包能用于某个外部服务(Google、Amazon、Microsoft、Snowflake)、数据库(Postgres、MySQL)或协议(HTTP/FTP)


因此,我们在利用外部服务库提出的在线解决方案需要特别注意,并注意它们所针对的 Airflow 版本。


2. 在线提出解决方案时,注意所用的Airflow版本


  • 刚开始开发 DAG 时,由于旧版 Airflow API 的一些功能已弃用,我遇到了许多问题

  • 特别是,许多在线解决方案的案例都引用了不具备旧版本兼容性的功能

  • 但Airflow DAG有一点很好,那就是基本上不再使用 Airflow 连接---S3,而是使用“AWS”连接

从 S3 → AWS 连接类型的转变


尽管现在普遍改用 AWS 连接,但很多 Airflow Hooks 还在用 S3 连接。有大量挂钩尚未更新,提供断回兼容性(break-back compatibility),例如 MSSQL 插件操作符 mssql_to_s3_operator


同样,这会导致 redshift_to_s3_operator 出现问题


所以我的建议是,注意在线解决方案的日期/最新更新日期。


2. XComs


最初研究 Airflow 时,我们并没有真正关注 XComs 的功能。


但结果是,我经常将XComs用于任务之间的传递输出,并且经常这样做。


下面显示的是上面列出的 DAG 的片段,“在任务失败时发布到 Slack DAG”,这个过程就用到了 XComs:



作为初步操作和示例,Python 函数返回的输出(即上面代码中的 return eg_op)将用作 XCom 值示例,在这个过程中,Airflow 任务中将调用该函数。


在后续的 Python 函数中,需要执行以下操作,读取 Airflow 任务的输出:


  • 在函数定义中传入 **kwargs,将未指定数量的参数传入函数

  • 创建任务变量(命名规则通常为,ti)

  • 读取特定任务的输出,使用以下命令:ip = ti.xcom_pull(task_ids='[name of task]')


3. 常用Airflow配置选项


你可以用 airflow.cfg 文件配置你的 Airflow 环境,该文件位于 ${AIRFLOW_HOME}/airflow.cfg 中。下面这些参数很有用:


通用Airflow配置选项


4. 时区感知 DAG


你可以在Airflow配置中设置默认时区。但是,如需在 DAG 代码中使用本地时区,你需要了解 DAG 时区。


Airflow 是一个调度程序,由于“下一个执行日期”和“开始日期”这些选项很重要,DAG 中的代码会使用 UTC 时区运行。要使你的DAG感知时区,你需要使用 Python 钟摆模块(pendulum module),它能提供start_date的时区感知。请参阅以下示例:



5. 一些提示

1. Airflow:非常冷门的提示、技巧和最佳实践


* 使用默认参数避免重复

* 使用 PythonOperator 时的,在连接中存储敏感数据以及 “context”字典


2. Airflow注意事项


* “不要更改 start_date + interval”

* “开发过程中刷新 DAG”,“不要忘记启动调度程序”


不更改 start_date 和 interval ,对于新手来说尤其重要。


3. Airflow提示、技巧和注意事项


我建议你在构建第一个 DAG 并掌握主要概念以后阅读本文。本文提供了一些有用的概念,如任务状态回调(Tasks States Callbacks),并提供了关于 UTC 日期问题的描述,都非常有用。


感谢阅读

原文作者:Paul Fry

翻译作者:Lia

美工编辑:过儿

校对审稿:Jiawei Tong

原文链接:https://paulfry999.medium.com/airflow-101-hints-and-tips-to-quickly-get-started-b48fe8948602

本周公开课预告



往期精彩回顾


在微软成功的数据科学家身上,我学到这5个习惯

如何用Python处理金融数据?

5个提升你分析Slide Decks的技巧

金融中的数据分析师,都做什么工作?

四种数据分析方法,帮你成功做决策!





点「在看」的人都变好看了哦

点击“阅读原文”查看数据应用学院核心课程

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存