Airflow从零到神学习心得
拖延了很久,终于把 Airflow 系统学了,这得益 B 站的一个非常棒的 airflow 培训教程,Airflow 从零到神[1]。UP 主不但有完整的视频,还提供了GitHub 库[2],放了教程所用的代码和 PPT,如果你觉得 UP 主的 PPT 不够细,你可以使用我之前介绍的工具:视频提取 PPT 开箱即用版本发布,自己动手抽取视频的 PPT(关于 B 站视频集合的下载方法,我在最后会附上)。
通过这次学习,期间踩到的一些雷,基本与 airflow 无关。
yfinance 抓取股票数据问题
本教程唯一(最后一节课关于 dbt 与系列课程关系不大)引入的外部组件就是yfinance
, 就需要访问 Yahoo 拿股票数据。作者的视频是去年拍的,那个时候 Yahoo 还允许国内的 IP 访问。现在访问 https://finance.yahoo.com/ 会看到这样的提示
2021 年 11 月 1 日起,用户将无法从中国大陆使用 Yahoo 的产品与服务。这并不影响 Yahoo 在全球其他地方的产品及服务。我们感谢你一直以来的支持。
这个别怪某墙,是 Yahoo 主动屏蔽了国内用户的访问。
解决办法,
使用外网的虚拟机,强烈建议该方案 放弃 yfinance,改用下载 csv 模拟 使用代理,这样会踩到另外一个雷,也是我本次学习最浪费时间,让我最绝望的地方。使用代理会碰到这样一个报错,
[2022-11-05, 11:07:44 CST] {local_task_job.py:164} INFO - Task exited with return code Negsignal.SIGSEGV
[2022-11-05, 11:07:44 CST] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
作者给的解决办法设置环境变量
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY = YES
但我这里无效, 经过近一步搜索得到解决办法, 运行 schedule 时制定一个环境变量(不用 proxy,这其实是Python的一个bug),
no_proxy='*' airflow scheduler
而在 yfinance 里使用 prxoy,
msft = yf.Ticker("msft")
data = msft.history(period="1mo",proxy='127.0.0.1:6789')
云主机
UP 主用的 AWS 的 EC2 我推荐使用 Gitpod.io(需要点英语基础),理由: 功能强大 注册方便 有 GitHub 账号即可 每个月 50 小时免费时间
Snowflake
这个可以申请一个月的免费使用, 个人建议使用别的数据库替代,比如本教程用到的 MySQL。
使用 Minio 替代 AWS S3 练习 S3 Operator
如果你没有 S3 账号,强烈建议使用MinIO替代,我之前也多次介绍到 MinIO,这里也不展开了。MiniO 兼容 S3,不需要注册。另外因为版本的问题, 新的版本已经没有 S3 这个连接类型,统一叫Amazon Web Services
安装 providers
pip install apache-airflow-providers-amazon
配置也很简单
# Extra 里需要添加
{"endpoint_url": "https://play.min.io"}
# 这些信息有play.min.io提供
AWS Access Key ID : Q3AM3UQ867SPQQA43P2F,
AWS Secret Access Key : zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG
MySQL 建库、建表脚本
CREATE database if NOT EXISTS demodb;
use demodb;
CREATE TABLE stock_prices_stage (
ticker VARCHAR(30),
as_of_date DATE,
open_price DOUBLE,
high_price DOUBLE,
low_price DOUBLE,
close_price DOUBLE
);
CREATE index ids_stockpricestage ON stock_prices_stage(ticker, as_of_date);
DROP TABLE stock_prices;
CREATE TABLE stock_prices(
id INT NOT NULL AUTO_INCREMENT,
ticker VARCHAR(30),
as_of_date DATE,
open_price DOUBLE,
high_price DOUBLE,
low_price DOUBLE,
close_price DOUBLE,
created_at TIMESTAMP DEFAULT now(),
updated_at TIMESTAMP DEFAULT now(),
PRIMARY key(id)
);
CREATE index ids_stockprices ON stock_prices(ticker, as_of_date);
邮件发送
UP 主的例子是使用 Gmail 的 SMTP 发送邮件,国内的用户可以使用 QQ 邮箱替代,需要先开启 SMTP 服务才可以使用,官网有关于如何设置 SMTP 收发邮件的教程,
课程代码
这个也是本视频略显遗憾的地方,没有按课程章节准备独立的目录,只有最后的代码。当然自己敲一遍,印象更深。
B 站集合视频下载方法
安装工具 you-get
pip install you-get
-下载单个视频
!you-get -i https://www.bilibili.com/video/BV19f4y1V7UG
下载视频合集 网上提供了 playlist 的下载办法, 但不适用于本课程合集的情况,我这里提供一个简单的办法 访问https://space.bilibili.com/1311328717/channel/collectiondetail?sid=43276 在 console 里运行下面代码(具体参考只会 JavaScript,也可以在浏览器里写爬虫),
xx =$("#page-collection-detail ul li.small-item")
for(i=0;i<xx.length;i++)
output.push(xx[i].attributes['data-aid'].value)
output
得到一个列表
output = ['BV19f4y1V7UG', 'BV1xy4y1V7DW', 'BV1V341167Ci',
'BV1tg41157dv', 'BV1Cf4y1n7QD', 'BV1Hf4y1G7EJ',
'BV1Kq4y1T71P', 'BV12v411A7fC', 'BV1rQ4y1h7an',
'BV1hv411P7sA', 'BV1yg411V7a7', 'BV1mq4y1U7vi',
'BV1K44y1e7qN', 'BV18g411K7zu', 'BV1dr4y1r71c',
'BV1Fu411R7yP']
for k in output:
!you-get https://www.bilibili.com/video/{k}
这样就可以把 16 个视频下载到本地了。当然你可以可以使用我前些时间刚刚介绍到Playwright 来获得下载列表。
参考资料
Airflow从零到神: https://space.bilibili.com/1311328717/channel/collectiondetail?sid=43276
[2]GitHub库: https://github.com/harrytandata/airflow_course