在游戏运营行业,Serverless 如何解决数据采集分析痛点?
作者 | 计缘
众所周知,游戏行业在当今的互联网行业中算是一棵常青树。在疫情之前的 2019 年,中国游戏市场营收规模约 2884.8 亿元,同比增长 17.1%。2020 年因为疫情,游戏行业更是突飞猛进。玩游戏本就是中国网民最普遍的娱乐方式之一,疫情期间更甚。据不完全统计,截至 2019 年,中国移动游戏用户规模约 6.6 亿人,占中国总网民规模 8.47 亿的 77.92%,可见游戏作为一种低门槛、低成本的娱乐手段,已成为大部分人生活中习以为常的一部分。
对于玩家而言,市面上的游戏数量多如牛毛,那么玩家如何能发现和认知到一款游戏,并且持续的玩下去恐怕是所有游戏厂商需要思考的问题。加之 2018 年游戏版号停发事件,游戏厂商更加珍惜每一个已获得版号的游戏产品,所以这也使得“深度打磨产品质量”和“提高运营精细程度”这两个游戏产业发展方向成为广大游戏厂商的发展思路,无论是新游戏还是老游戏都在努力落实这两点:
新游戏:面向玩家需要提供更充足的推广资源和更完整的游戏内容。老游戏:通过用户行为分析,投入更多的精力和成本,制作更优质的版本内容。这里我们重点来看新游戏。一家游戏企业辛辛苦苦研发三年,等着新游戏发售时一飞冲天。那么问题来了,新游戏如何被广大玩家看到?首先来看看游戏行业公司的分类:


游戏运营核心诉求
游戏厂商花钱买量,换来的用户信息以及新用户注册信息是为持续的游戏运营服务的,那么这个场景的核心诉求就是采集用户信息的完整性。比如说,某游戏厂商一天花 5000w 投放广告,在某平台某时段产生了每秒 1w 次的广告点击率,那么在这个时段内每一个点击广告的用户信息要完整的被采集到,然后入库进行后续分析。这就对数据采集系统提出了很高的要求。这其中,最核心的一点就是系统暴露接口的环节要能够平稳承载买量期间不定时的流量脉冲。在买量期间,游戏厂商通常会在多个平台投放广告,每个平台投放广告的时间是不一样的,所以就会出现全天不定时的流量脉冲现象。如果这个环节出现问题,那么相当于买量的钱就打水漂了。数据采集系统传统架构

数据采集系统 Serverless 架构
我们可以通过函数计算 FC 来取代传统架构中暴露 HTTP 回传数据这部分,从而完美解决传统架构中存在问题,参考文章:《资源成本双优化!看 Serverless 颠覆编程教育的创新实践》。先来看架构图:
架构解析
从上面的架构图可以看到,整个采集数据阶段,分了两个函数来实现,第一个函数的作用是单纯的暴露 HTTP 接口接收数据,第二个函数用于处理数据,然后将数据发送至消息队列 Kafka 和数据库 RDS。1. 接收数据函数
我们打开函数计算控制台,创建一个函数:函数类型:HTTP(即触发器为 HTTP)
函数名称:receiveData
运行环境:Python3

函数实例类型:弹性实例
函数执行内存:512MB
函数运行超时时间:60 秒
函数单实例并发度:1

触发器类型:HTTP 触发器
触发器名称:defaultTrigger
认证方式:anonymous(即无需认证)
请求方式:GET,POST






2. 处理数据的函数
第一个函数是通过在函数计算控制台在界面上创建的,选择了运行环境是 Python3,我们可以在官方文档中查看预置的 Python3 运行环境内置了哪些模块,因为第二个函数要操作 Kafka 和 RDS,所以需要我们确认对应的模块。
1)Funcraft
Funcraft 是一个用于支持 Serverless 应用部署的命令行工具,能帮助我们便捷地管理函数计算、API 网关、日志服务等资源。它通过一个资源配置文件(template.yml),协助我们进行开发、构建、部署操作。所以第二个函数我们需要使用 Fun 来进行操作,整个操作分为四个步骤:• 安装 Fun 工具。• 编写 template.yml 模板文件,用来描述函数。• 安装我们需要的第三方依赖。• 上传部署函数。2)安装 Fun
Fun 提供了三种安装方式:• 通过 npm 包管理安装 —— 适合所有平台(Windows/Mac/Linux)且已经预装了 npm 的开发者。• 通过下载二进制安装 —— 适合所有平台(Windows/Mac/Linux)。• 通过 Homebrew 包管理器安装 —— 适合 Mac 平台,更符合 MacOS 开发者习惯。文本示例环境为 Mac,所以使用 npm 方式安装,非常的简单,一行命令搞定:sudo npm install @alicloud/fun -g安装完成之后。在控制终端输入 fun 命令可以查看版本信息:$ fun --version3.6.20在第一次使用 fun 之前需要先执行 fun config 命令进行配置,按照提示,依次配置 Account ID、Access Key Id、Secret Access Key、 Default Region Name 即可。其中 Account ID、Access Key Id 你可以从函数计算控制台首页的右上方获得:fun config? Aliyun Account ID ***01? Aliyun Access Key ID ***qef6j? Aliyun Access Key Secret ***UFJG? Default region name cn-hangzhou? The timeout in seconds for each SDK client invoking 60? The maximum number of retries for each SDK client 33)编写 template.yml
新建一个目录,在该目录下创建一个名为 template.yml 的 YAML 文件,该文件主要描述要创建的函数的各项配置,说白了就是将函数计算控制台上配置的那些配置信息以 YAML 格式写在文件里:ROSTemplateFormatVersion: 2015-09-01Transform: Aliyun::Serverless-2018-04-03Resources:FCBigDataDemo:Type: Aliyun::Serverless::ServiceProperties:Description: local invoke demoVpcConfig:VpcId: vpc-xxxxxxxxxxxVSwitchIds: [ vsw-xxxxxxxxxx ]SecurityGroupId: sg-xxxxxxxxxLogConfig:Project: fcdemoLogstore: fc_demo_storedataToKafka:Type: Aliyun::Serverless::FunctionProperties:Initializer: index.my_initializerHandler: index.handlerCodeUri: ./Description: Runtime: python3我们来解析以上文件的核心内容:FCBigDataDemo:自定义的服务名称。通过下面的 Type 属性标明是服务,即 Aliyun::Serverless::Service。Properties:Properties 下的属性都是该服务的各配置项。VpcConfig:服务的 VPC 配置,包含:VpcId:VPC ID。
VSwitchIds:交换机 ID,这里是数组,可以配置多个交换机。
SecurityGroupId:安全组 ID。
LogConfig:服务绑定的日志服务(SLS)配置,包含:Project:日志服务项目。
Logstore:LogStore 名称。
dataToKafka:该服务下自定义的函数名称。通过下面的 Type 属性标明是函数,即 Aliyun::Serverless::Function。
Properties:Properties下的属性都是该函数的各配置项。Initializer:配置初始化函数。Handler:配置入口函数。Runtime:函数运行环境。目录结构为:
4)安装第三方依赖
服务和函数的模板创建好之后,我们来安装需要使用的第三方依赖。在这个示例的场景中,第二个函数需要使用 Kafka SDK,所以可以通过 fun 工具结合 Python 包管理工具 pip 进行安装:fun install --runtime python3 --package-type pip kafka-python执行命令后有如下提示信息:

5)部署函数
现在编写好了模板文件以及安装好了我们需要的 Kafka SDK 后,还需要添加我们的代码文件 index.py,代码内容如下:# -*- coding: utf-8 -*-import loggingimport jsonimport urllib.parsefrom kafka import KafkaProducerproducer = Nonedef my_initializer(context): logger = logging.getLogger() logger.info("init kafka producer") global producer producer = KafkaProducer(bootstrap_servers=XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092)def handler(event, context): logger = logging.getLogger() # 接收回传的数据 event_str = json.loads(event) event_obj = json.loads(event_str) logger.info(event_obj["action"]) logger.info(event_obj["articleAuthorId"]) # 向Kafka发送消息 global producer producer.send(ikf-demo, json.dumps(event_str).encode(utf-8)) producer.close() return hello world代码很简单,这里做以简单的解析:my_initializer:函数实例被拉起时会先执行该函数,然后再执行 handler 函数 ,当函数实例在运行时,之后的请求都不会执行 my_initializer 函数 。一般用于各种连接的初始化工作,这里将初始化 Kafka Producer 的方法放在了这里,避免反复初始化 Produer。 handler:该函数只有两个逻辑,接收回传的数据和将数据发送至 Kafka 的指定 Topic。下面通过 fun deploy 命令部署函数,该命令会做两件事:根据 template.yml 中的配置创建服务和函数。将 index.py 和 .fun 上传至函数中。


3. 函数之间调用
目前两个函数都创建好了,下面的工作就是由第一个函数接收到数据后拉起第二个函数发送消息给 Kafka。我们只需要对第一个函数做些许改动即可:# -*- coding: utf-8 -*-import loggingimport jsonimport urllib.parseimport fc2HELLO_WORLD = bHello world!\nclient = Nonedef my_initializer(context): logger = logging.getLogger() logger.info("init fc client") global client client = fc2.Client( endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com", accessKeyID="your_ak", accessKeySecret="your_sk" )def handler(environ, start_response): logger = logging.getLogger() context = environ[fc.context] request_uri = environ[fc.request_uri] for k, v in environ.items(): if k.startswith(HTTP_): # process custom request headers pass try: request_body_size = int(environ.get(CONTENT_LENGTH, 0)) except (ValueError): request_body_size = 0 # 接收回传的数据 request_body = environ[wsgi.input].read(request_body_size) request_body_str = urllib.parse.unquote(request_body.decode("GBK")) request_body_obj = json.loads(request_body_str) logger.info(request_body_obj["action"]) logger.info(request_body_obj["articleAuthorId"]) global client client.invoke_function( FCBigDataDemo, dataToKafka, payload=json.dumps(request_body_str), headers = {x-fc-invocation-type: Async} ) status = 200 OK response_headers = [(Content-type, text/plain)] start_response(status, response_headers) return [HELLO_WORLD]如上面代码所示,对第一个函数的代码做了三个地方的改动:导入函数计算的库:import fc2
添加初始化方法,用于创建函数计算 Client:
def my_initializer(context): logger = logging.getLogger() logger.info("init fc client") global client client = fc2.Client( endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com", accessKeyID="your_ak", accessKeySecret="your_sk")这里需要注意的时,当我们在代码里增加了初始化方法后,需要在函数配置中指定初始化方法的入口:
通过函数计算 Client 调用第二个函数
global client client.invoke_function( FCBigDataDemo, dataToKafka, payload=json.dumps(request_body_str), headers = {x-fc-invocation-type: Async})invoke_function 函数有四个参数:第一个参数:调用函数所在的服务名称。
第二个参数:调用函数的函数名称。
第三个参数:向调用函数传的数据。
第四个参数:调用第二个函数 Request Header 信息。这里主要通过 x-fc-invocation-type 这个 Key 来设置是同步调用还是异步调用。这里设置 Async 为异步调用。
如此设置,我们便可以验证通过第一个函数提供的 HTTP 接口发起请求→采集数据→调用第二个函数→将数据作为消息传给 Kafka 这个流程了。使用两个函数的目的
到这里有些同学可能会有疑问,为什么需要两个函数,而不在第一个函数里直接向 Kafka 发送数据呢?我们先来看这张图:
4. 配置 Kafka
在游戏运营这个场景中,数据量是比较大的,所以对 Kafka 的性能要求也是比较高的,相比开源自建,使用云上的 Kafka 省去很多的运维操作,比如:• 我们不再需要再维护 Kafka 集群的各个节点。• 不需要关心主从节点数据同步问题。• 可以快速、动态扩展 Kafka 集群规格,动态增加 Topic,动态增加分区数。• 完善的指标监控功能,消息查询功能。总的来说,就是一切 SLA 都有云上兜底,我们只需要关注在消息发送和消息消费即可。所以我们可以打开 Kafka 开通界面,根据实际场景的需求一键开通 Kafka 实例,开通 Kafka 后登录控制台,在基本信息中可以看到 Kafka 的接入点:• 默认接入点:走 VPC 内网场景的接入点。• SSL 接入点:走公网场景的接入点。将默认接入点配置到函数计算的第二个函数中即可。....producer = KafkaProducer(bootstrap_servers=XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092)....然后点击左侧控制台 Topic 管理,创建 Topic:



Flink Kafka 消费者
在这个场景中,Kafka 后面往往会跟着 Flink,所以这里简要给大家介绍一下在 Flink 中如何创建 Kafka Consumer 并消费数据。代码片段如下:final ParameterTool parameterTool = ParameterTool.fromArgs(args);String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo");String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092");Properties kafkaProps = new Properties();kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo");FlinkKafkaConsumer<UserBehaviorEvent> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps);kafka.setStartFromLatest();kafka.setCommitOffsetsOnCheckpoints(false);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<UserBehaviorEvent> dataStreamByEventTime = env.addSource(kafka);以上就是构建 Flink Kafka Consumer 和添加 Kafka Source 的代码片段,还是非常简单的。压测验证
至此,整个数据采集的架构就搭建完毕了,下面我们通过压测来检验一下整个架构的性能。这里使用阿里云 PTS 来进行压测。创建压测场景
打开 PTS 控制台,点击左侧菜单创建压测/创建PTS场景:



压力模式:
并发模式:指定有多少并发用户同时发请求。
RPS模式:指定每秒有多少请求数。
递增模式:在压测过程中可以通过手动调节压力,也可以自动按百分比递增压力。
最大并发:同时有多少个虚拟用户发起请求。
递增百分比:如果是自动递增的话,按这里的百分比递增。
单量级持续时长:在未完全达到压力全量的时候,每一级梯度的压力保持的时长。
压测总时长:一共需要压测的时长。
这里因为资源成本原因,并发用户数设置为 2500 来进行验证。

总结
至此,整个基于 Serverless 搭建的大数据采集传输的架构就搭建好了,并且进行了压测验证,整体的性能也是不错的,并且整个架构搭建起来也非常简单和容易理解。这个架构不光适用于游戏运营行业,其实任何大数据采集传输的场景都是适用的,目前也已经有很多客户正在基于 Serverless 的架构跑在生产环境,或者正走在改造 Serverless 架构的路上。基于 Serverless 还有很多其他的应用场景,之后我会一一分享给大家,大家如果有任何疑问也可以加入钉钉群:35712134 来寻找答案,我们不见不散!扫一扫,关注我们