查看原文
其他

Airflow从零到神学习心得

alitrack alitrack 2022-11-12

拖延了很久,终于把 Airflow 系统学了,这得益 B 站的一个非常棒的 airflow 培训教程,Airflow 从零到神[1]。UP 主不但有完整的视频,还提供了GitHub 库[2],放了教程所用的代码和 PPT,如果你觉得 UP 主的 PPT 不够细,你可以使用我之前介绍的工具:视频提取 PPT 开箱即用版本发布,自己动手抽取视频的 PPT(关于 B 站视频集合的下载方法,我在最后会附上)。

通过这次学习,期间踩到的一些雷,基本与 airflow 无关。

yfinance 抓取股票数据问题

本教程唯一(最后一节课关于 dbt 与系列课程关系不大)引入的外部组件就是yfinance, 就需要访问 Yahoo 拿股票数据。作者的视频是去年拍的,那个时候 Yahoo 还允许国内的 IP 访问。现在访问 https://finance.yahoo.com/ 会看到这样的提示

2021 年 11 月 1 日起,用户将无法从中国大陆使用 Yahoo 的产品与服务。这并不影响 Yahoo 在全球其他地方的产品及服务。我们感谢你一直以来的支持。


这个别怪某墙,是 Yahoo 主动屏蔽了国内用户的访问。

解决办法,

  • 使用外网的虚拟机,强烈建议该方案
  • 放弃 yfinance,改用下载 csv 模拟
  • 使用代理,这样会踩到另外一个雷,也是我本次学习最浪费时间,让我最绝望的地方。使用代理会碰到这样一个报错,
[2022-11-05, 11:07:44 CST] {local_task_job.py:164} INFO - Task exited with return code Negsignal.SIGSEGV
[2022-11-05, 11:07:44 CST] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

作者给的解决办法设置环境变量

export OBJC_DISABLE_INITIALIZE_FORK_SAFETY = YES

但我这里无效, 经过近一步搜索得到解决办法, 运行 schedule 时制定一个环境变量(不用 proxy,这其实是Python的一个bug),

no_proxy='*' airflow scheduler

而在 yfinance 里使用 prxoy,

msft = yf.Ticker("msft")
data = msft.history(period="1mo",proxy='127.0.0.1:6789')

云主机

  • UP 主用的 AWS 的 EC2
  • 我推荐使用 Gitpod.io(需要点英语基础),理由:
    • 功能强大
    • 注册方便
    • 有 GitHub 账号即可
    • 每个月 50 小时免费时间

Snowflake

这个可以申请一个月的免费使用, 个人建议使用别的数据库替代,比如本教程用到的 MySQL。

使用 Minio 替代 AWS S3 练习 S3 Operator

如果你没有 S3 账号,强烈建议使用MinIO替代,我之前也多次介绍到 MinIO,这里也不展开了。MiniO 兼容 S3,不需要注册。另外因为版本的问题, 新的版本已经没有 S3 这个连接类型,统一叫Amazon Web Services

  • 安装 providers
pip install apache-airflow-providers-amazon
  • 配置也很简单


# Extra 里需要添加
{"endpoint_url""https://play.min.io"}
# 这些信息有play.min.io提供
AWS Access Key ID : Q3AM3UQ867SPQQA43P2F,
AWS Secret Access Key : zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG

MySQL 建库、建表脚本

CREATE database if NOT EXISTS demodb;
use demodb;
CREATE TABLE stock_prices_stage (
    ticker VARCHAR(30),
    as_of_date DATE,
    open_price DOUBLE,
    high_price DOUBLE,
    low_price DOUBLE,
    close_price DOUBLE
);
CREATE index ids_stockpricestage ON stock_prices_stage(ticker, as_of_date);
DROP TABLE stock_prices;
CREATE TABLE stock_prices(
    id INT NOT NULL AUTO_INCREMENT,
    ticker VARCHAR(30),
    as_of_date DATE,
    open_price DOUBLE,
    high_price DOUBLE,
    low_price DOUBLE,
    close_price DOUBLE,
    created_at TIMESTAMP DEFAULT now(),
    updated_at TIMESTAMP DEFAULT now(),
    PRIMARY key(id)
);
CREATE index ids_stockprices ON stock_prices(ticker, as_of_date);

邮件发送

UP 主的例子是使用 Gmail 的 SMTP 发送邮件,国内的用户可以使用 QQ 邮箱替代,需要先开启 SMTP 服务才可以使用,官网有关于如何设置 SMTP 收发邮件的教程,

课程代码

这个也是本视频略显遗憾的地方,没有按课程章节准备独立的目录,只有最后的代码。当然自己敲一遍,印象更深。

B 站集合视频下载方法

  • 安装工具you-get
pip install you-get

-下载单个视频

!you-get -i  https://www.bilibili.com/video/BV19f4y1V7UG
  • 下载视频合集 网上提供了 playlist 的下载办法, 但不适用于本课程合集的情况,我这里提供一个简单的办法 访问https://space.bilibili.com/1311328717/channel/collectiondetail?sid=43276 在 console 里运行下面代码(具体参考只会 JavaScript,也可以在浏览器里写爬虫),
xx =$("#page-collection-detail ul li.small-item")
for(i=0;i<xx.length;i++)
    output.push(xx[i].attributes['data-aid'].value)
output


得到一个列表

output  = ['BV19f4y1V7UG''BV1xy4y1V7DW''BV1V341167Ci',
 'BV1tg41157dv''BV1Cf4y1n7QD''BV1Hf4y1G7EJ',
 'BV1Kq4y1T71P''BV12v411A7fC''BV1rQ4y1h7an',
 'BV1hv411P7sA''BV1yg411V7a7''BV1mq4y1U7vi',
 'BV1K44y1e7qN''BV18g411K7zu''BV1dr4y1r71c',
 'BV1Fu411R7yP']

for k in output:
    !you-get https://www.bilibili.com/video/{k}

这样就可以把 16 个视频下载到本地了。当然你可以可以使用我前些时间刚刚介绍到Playwright 来获得下载列表。

参考资料

[1]

Airflow从零到神: https://space.bilibili.com/1311328717/channel/collectiondetail?sid=43276

[2]

GitHub库: https://github.com/harrytandata/airflow_course


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存