其他
实战 | 基于 Serverless 技术的视频截帧架构如何实现?
前言
视频直播是一种创新的在线娱乐形式,具有多人实时交互特性,在电商、游戏、在线教育、娱乐等多个行业都有着非常广泛的应用。随着网络基础设施的不断改善以及社交娱乐需求的不断增长,视频直播在持续渗透进大家的日常生活,并占据用户的零碎休闲时间。视频直播的技术支撑能力也在不断提高,从而促进视频直播市场规模从 2014年的 212.5 亿元增长到 2020年的 548.5 亿元,并将在未来五年继续以 12.8% 左右的增长率快速发展。
视频截帧需求概述
视频截帧技术架构分析
为了支持业务高峰,必须按照高峰期的用户量来评估集群规模,在业务低峰期就会造成巨大的浪费。 在某些场景下,比如明星效应的带动,业务量会有突增,有可能需要对集群进行临时扩容,这种情况下往往扩容速度会滞后于业务流的增速,造成部分业务的降级处理。
基于函数计算 FC 的 Serverless 架构
Serverless 架构视频截帧技术实现
输出视频流
ffmpeg -re -i test.flv -vcodec copy -acodec aac -ar 44100 -f flv rtmp://xxx.xxx.xxx.xxx:1935/stream/test
安装 Funcraft
fun --version
fun config
命令进行初始化配置,这个操作需要提供阿里云 Account ID、Access Key Id、Secret Access Key、 Default Region Name 等常规信息, 这些信息可以从函数计算控制台(https://fc.console.aliyun.com/)首页的右上方获得。其他的信息比如 timeout 等直接使用默认值即可。配置 OSS
配置日志服务 SLS
编写函数
通过 FFmpeg 命令截取1张图片; 保存到 OSS。
import json, oss2, subprocess
HELLO_WORLD = b'Snapshot OK!\n'
OSS_BUCKET_NAME = b'snapshot'
def handler(environ, start_response):
logger = logging.getLogger()
context = environ['fc.context']
request_uri = environ['fc.request_uri']
for k, v in environ.items():
if k.startswith('HTTP_'):
pass
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
#获得直播流的地址
rtmp_url = request_body.decode("UTF-8")
#通过FFmpeg命令截取一张图片
cmd = ['/code/ffmpeg', '-i', rtmp_url, '-frames:v', '1', '/tmp/snapshot.png' ]
try:
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
except subprocess.CalledProcessError as exc:
err_ret = {'returncode': exc.returncode, 'cmd': exc.cmd, 'output': exc.output.decode(),'stderr': exc.stderr.decode()}
print(json.dumps(err_ret))
raise Exception(context.request_id + ' transcode failure')
#上传到OSS
creds = context.credentials
auth = oss2.StsAuth(creds.access_key_id, creds.access_key_secret, creds.security_token)
bucket = oss2.Bucket(auth, 'http://oss-{}-internal.aliyuncs.com'.format(context.region), OSS_BUCKET_NAME)
logger.info('upload pictures to OSS ...')
for filename in os.listdir("/tmp"):
bucket.put_object_from_file("example/" + filename, "/tmp/" + filename)
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [HELLO_WORLD]
handler
,其中的 environ 参数携带了调用函数的客户端相关信息以及上下文信息。我们可以从 HTTP 请求 Body 中,解析出 STMP 直播流的地址,并通过 FFmpeg 命令截取一张图片。/code
目录,可以通过/code/ffmpeg
路径进行执行。这是因为我们在对函数进行部署的时候,已经将 FFmpeg 可执行程序和这段代码打包在了这个目录中,在接下来介绍函数部署的时候,我们会进一步介绍如何将函数代码与可执行程序一起打包。部署函数
code
目录中,这样可以在代码中通过路径/code/ffmpeg
调用 ffmpeg 命令。template.yml
文件,描述所有的部署信息。ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
#服务
snapshotService:
Type: 'Aliyun::Serverless::Service'
Properties:
Description: 'Snapshot Semo'
Policies:
- AliyunOSSFullAccess
#之前创建的日志项目和日志仓库
LogConfig:
Project: fc-bj-pro
Logstore: fc-log
#函数
snapshot:
Type: 'Aliyun::Serverless::Function'
Properties:
Handler: index.handler
Runtime: python3
MemorySize: 128
Timeout: 600
CodeUri: './code'
# HTTP触发器
Events:
http-test:
Type: HTTP
Properties:
AuthType: ANONYMOUS
Methods: ['POST']
snapshotService
,其拥有对 OSS 的全部操作权限,并引用了之前所创建的日志项目和日志仓库。snapshot
,对应的运行环境为 Python3,并且定义了一个名为http-test
的 HTTP 触发器。fun deploy
,如果看到提示server SnapshotService deploy success
,就代表代码和 ffmpeg 程序已经打包部署到云上了。执行函数
连续截帧
配置消息队列 Kafka
安装 Kafka 客户端 SDK
fun install --runtime python3 --package-type pip kafka-python
打通对 VPC 内资源的访问能力
代码实现
ffmpeg -i rtmp://xxx.xxx.xxx.xxx:1935/stream/test -r 1 -strftime 1 /tmp/snapshot/%Y%m%d%H%M%S.jpg
import logging, json, oss2, subprocess
from multiprocessing import Process
from kafka import KafkaProducer
HELLO_WORLD = b'Snapshot OK!\n'
OSS_BUCKET_NAME = b'snapshot'
logger = logging.getLogger()
output_dir = '/tmp/shapshot'
# 扫描图片目录
def scan(bucket, producer):
flag = 1
while flag:
for filename in os.listdir(output_dir):
if filename == 'over':
# ffmpeg命令完成,准备停止扫描
flag = 0
continue
logger.info("found image: %s", snapshotFile)
try:
full_path = os.path.join(output_dir, filename)
# 上传到OSS
bucket.put_object_from_file("snapshot/" + filename, full_path)
# 发送到Kafka
producer.send('snapshot', filename.encode('utf-8'))
# 删除图片
os.remove(full_path)
except Exception as e:
logger.error("got exception: %s for %s", e.message, filename)
time.sleep(1)
def handler(environ, start_response):
logger = logging.getLogger()
context = environ['fc.context']
#创建图片输出文件夹
if not os.path.exists(output_dir):
os.mkdir(output_dir)
#解析HTTP请求,获得直播流的地址
request_uri = environ['fc.request_uri']
for k, v in environ.items():
if k.startswith('HTTP_'):
pass
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
rtmp_url = request_body.decode("UTF-8")
#启动Kafka Producer
producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
#启动OSS Bucket
creds = context.credentials
auth = oss2.StsAuth(creds.access_key_id, creds.access_key_secret, creds.security_token)
bucket = oss2.Bucket(auth, 'http://oss-{}-internal.aliyuncs.com'.format(context.region), OSS_BUCKET_NAME)
#启动扫描进程
scan_process = Process(target=scan, args=(bucket, producer))
#通过FFmpeg命令按每秒1帧的频繁连续截帧
cmd = ["/code/ffmpeg", "-y", "-i", rtmp_url, "-f", "image2", "-r", "1",
"-strftime", "1", os.path.join(output_dir, "%Y%m%d%H%M%S.jpg")]
try:
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
except subprocess.CalledProcessError as exc:
err_ret = {'returncode': exc.returncode, 'cmd': exc.cmd, 'output': exc.output.decode(),'stderr': exc.stderr.decode()}
logger.error(json.dumps(err_ret))
raise Exception(context.request_id + ' transcode failure')
#写入标志文件,子进程结束工作
os.system("touch %s" % os.path.join(output_dir, 'over'))
scan_process.join()
producer.close()
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [HELLO_WORLD]
进一步优化
长视频截帧
每个函数只截 1 帧:当截帧频率比较低,或者只需要在某几个特定的时间点对视频流进行截帧的时候,我们不需要让函数的生命周期与视频流的播放周期保持一致,可以让每一个函数在启动后,只截取单帧图片。通过自定义的触发程序,可以在必要的时间点启动函数,也可以通过 Serverless 工作流来对函数进行更复杂的编排,更多关于 Serverless 工作流的介绍可以参考 https://www.aliyun.com/product/fnf。 通过多个函数接力完成:函数计算 FC 内置了 fc2 模块,可以用于函数之间的相互调用。这样我们可以控制每个截帧函数的运行时间控制在 10 分钟之内,比如 8 分钟为固定的运行周期。在一个函数结束前,启动另一个函数接力完成截帧任务,直到视频流结束。这种方案非常适合对于截帧频率的精确度要求不是特别高的场景,因为在两个函数进行任务交接的时候,会有一秒左右的时间无法严格保证截帧频率的精确度。 使用性能实例:除了默认的弹性实例以外,函数计算 FC 还提供了性能实例,性能实力属于大规格实例,资源上限更高,适配场景更多,能够突破 10 分钟的执行时长上限。性能实例扩容速度较慢,弹性伸缩能力不及弹性实例,但我们可以通过单实例多并发(https://help.aliyun.com/document_detail/144586.html)和预留模式(https://help.aliyun.com/document_detail/138103.html)的配合,来提升性能实例的弹性能力。具体介绍可以参考单实例多并发和预留模式。
费用优化
总结
﹀
﹀
﹀