在游戏运营行业,函数计算如何解决数据采集分析痛点?
新游戏:面向玩家需要提供更充足的推广资源和更完整的游戏内容。
老游戏:通过用户行为分析,投入更多的精力和成本,制作更优质的版本内容。
游戏研发商:研发游戏的公司,生产和制作游戏内容。比如王者荣耀的所有英雄设计、游戏战斗场景、战斗逻辑等,全部由游戏研发公司提供。 游戏发行商:游戏发行商的主要工作分三大块:市场工作、运营工作、客服工作。游戏发行商把控游戏命脉,市场工作核心是导入玩家,运营工作核心是将用户价值最大化、赚取更多利益。
游戏平台/渠道商:游戏平台和渠道商的核心目的就是曝光游戏,让尽量多的人能发现你的游戏。
游戏运营核心诉求
数据采集系统传统架构
当流量脉冲来的时候,这部分是否可以快速扩容以应对流量冲击。 游戏运营具备潮汐特性,并非天天都在进行,这就需要考虑如何优化资源利用率。
流量太大,节点加少了,导致一部分流量的数据没有采集到。 流量没有预期那么大,节点加多了,导致资源浪费。
数据采集系统Serverless架构
无需运维介入,研发同学就可以很快的搭建出来。 无论流量大小,均可以平稳的承接。 函数计算拉起的实例数量可以紧贴流量大小的曲线,做到资源利用率最优化,再加上按量计费的模式,可以最大程度优化成本。
架构解析
1. 接收数据函数
# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
HELLO_WORLD = b'Hello world!\n'
def handler(environ, start_response):
logger = logging.getLogger()
context = environ['fc.context']
request_uri = environ['fc.request_uri']
for k, v in environ.items():
if k.startswith('HTTP_'):
# process custom request headers
pass
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
# 接收回传的数据
request_body = environ['wsgi.input'].read(request_body_size)
request_body_str = urllib.parse.unquote(request_body.decode("GBK"))
request_body_obj = json.loads(request_body_str)
logger.info(request_body_obj["action"])
logger.info(request_body_obj["articleAuthorId"])
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [HELLO_WORLD]
2. 处理数据的函数
1)Funcraft
2)安装 Fun
sudo npm install @alicloud/fun -g
$ fun --version
3.6.20
3)编写 template.yml
ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
FCBigDataDemo:
Type: 'Aliyun::Serverless::Service'
Properties:
Description: 'local invoke demo'
VpcConfig:
VpcId: 'vpc-xxxxxxxxxxx'
VSwitchIds: [ 'vsw-xxxxxxxxxx' ]
SecurityGroupId: 'sg-xxxxxxxxx'
LogConfig:
Project: fcdemo
Logstore: fc_demo_store
dataToKafka:
Type: 'Aliyun::Serverless::Function'
Properties:
Initializer: index.my_initializer
Handler: index.handler
CodeUri: './'
Description: ''
Runtime: python3
FCBigDataDemo:自定义的服务名称。通过下面的 Type 属性标明是服务,即Aliyun::Serverless::Service。
Properties:Properties下的属性都是该服务的各配置项。
VpcConfig:服务的 VPC 配置,包含:
VpcId:VPC ID。 VSwitchIds:交换机 ID,这里是数组,可以配置多个交换机。 SecurityGroupId:安全组 ID。
LogConfig:服务绑定的日志服务(SLS)配置,包含:
Project:日志服务项目。 Logstore:LogStore 名称。
dataToKafka:该服务下自定义的函数名称。通过下面的 Type 属性标明是函数,即Aliyun::Serverless::Function。
Properties:Properties下的属性都是该函数的各配置项。
Initializer:配置初始化函数。
Handler:配置入口函数。
Runtime:函数运行环境。
4)安装第三方依赖
fun install --runtime python3 --package-type pip kafka-python
5)部署函数
# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
from kafka import KafkaProducer
producer = None
def my_initializer(context):
logger = logging.getLogger()
logger.info("init kafka producer")
global producer
producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
def handler(event, context):
logger = logging.getLogger()
# 接收回传的数据
event_str = json.loads(event)
event_obj = json.loads(event_str)
logger.info(event_obj["action"])
logger.info(event_obj["articleAuthorId"])
# 向Kafka发送消息
global producer
producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))
producer.close()
return 'hello world'
my_initializer:函数实例被拉起时会先执行该函数,然后再执行 handler 函数 ,当函数实例在运行时,之后的请求都不会执行 my_initializer 函数 。一般用于各种连接的初始化工作,这里将初始化 Kafka Producer 的方法放在了这里,避免反复初始化 Produer。
handler:该函数只有两个逻辑,接收回传的数据和将数据发送至 Kafka 的指定 Topic。
下面通过 fun deploy 命令部署函数,该命令会做两件事:
根据 template.yml 中的配置创建服务和函数。
将 index.py 和 .fun 上传至函数中。
3. 函数之间调用
# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
import fc2
HELLO_WORLD = b'Hello world!\n'
client = None
def my_initializer(context):
logger = logging.getLogger()
logger.info("init fc client")
global client
client = fc2.Client(
endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",
accessKeyID="your_ak",
accessKeySecret="your_sk"
)
def handler(environ, start_response):
logger = logging.getLogger()
context = environ['fc.context']
request_uri = environ['fc.request_uri']
for k, v in environ.items():
if k.startswith('HTTP_'):
# process custom request headers
pass
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
except (ValueError):
request_body_size = 0
# 接收回传的数据
request_body = environ['wsgi.input'].read(request_body_size)
request_body_str = urllib.parse.unquote(request_body.decode("GBK"))
request_body_obj = json.loads(request_body_str)
logger.info(request_body_obj["action"])
logger.info(request_body_obj["articleAuthorId"])
global client
client.invoke_function(
'FCBigDataDemo',
'dataToKafka',
payload=json.dumps(request_body_str),
headers = {'x-fc-invocation-type': 'Async'}
)
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
return [HELLO_WORLD]
def my_initializer(context):
logger = logging.getLogger()
logger.info("init fc client")
global client
client = fc2.Client(
endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",
accessKeyID="your_ak",
accessKeySecret="your_sk"
)
通过函数计算Client调用第二个函数
global client
client.invoke_function(
'FCBigDataDemo',
'dataToKafka',
payload=json.dumps(request_body_str),
headers = {'x-fc-invocation-type': 'Async'}
)
使用两个函数的目的
4. 配置Kafka
....
producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
....
...
# 第一个参数为Topic名称
producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))
...
Flink Kafka消费者
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo");
String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092");
Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo");
FlinkKafkaConsumer<UserBehaviorEvent> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps);
kafka.setStartFromLatest();
kafka.setCommitOffsetsOnCheckpoints(false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserBehaviorEvent> dataStreamByEventTime = env.addSource(kafka);
压测验证
创建压测场景
总结
﹀
﹀
﹀