实战:如何实时采集上亿级别数据?
背景介绍
公司业务系统做优化改造,同时为了能够实现全链路监控,需收集所有业务系统之间的调用日志。
数据情况:每天20亿+
机器成本:3台kafka集群,2台logstash采集机器
技术:Java,MQ,MLSQL,Logstash
下图为最终结果图
采集流程
流程分解
流程一:MLSQL 消费MQ
原始日志产生侧通过protobuf进行序列化推送至mq,然后通过MLSQL进行反序列化并进行简单的etl处理后,再推送至MQ
流程二:通过Logstash进行消费MQ
通过logstash消费经过MLSQL处理后的数据,并在这里通过ruby进行再次的加工处理,最后写入es和hdfs
注意:这里一部分流程推送到es是业务侧使用,而另一部分写入hdfs是提供给数仓使用
流程三: 数仓建模
这里通过数仓建模,将最后的指标结果推送至es提供给业务侧使用
注意:本篇主要是借鉴这个需求讲解Logstash在实际场景中的使用以及优化,其他两个部分流程不做详细讲解
为什么这样设计?
原因一:
首先这个需求属于日志采集的范畴,但Logstash本身不支持反序列化功能,需要自定义开发ruby插件来支持,但这样一来开发成本较高,且不好维护,所以使用了MLSQL结合UDF的方式进行流式处理
原因二:
大家在对最后的输出流程可能会有疑惑,为什么不直接通过MLSQL来写入到hdfs和es呢,这里有两点:
1.MLSQL写入hdfs会产生大量的小文件,需要单独开发合并文件的功能
2.最后写入es的数据是需要数仓结合其他业务数据进行建模的,而MLSQL在这点做的不太好,所以这里走的是离线处理的方式
说到这里,具体的场景需要结合公司的实际情况来进行决策的,有些同学或许会想为什么不用flume进行日志采集呢?那这里就不做过多的解释了,白菜萝卜各有所爱,适合自己的才是最好的!废话不多说,接下来进入正题,结合该需求场景,如何使用较少的成本完成大数据量的采集呢?以及如何优化呢?
Logstash开发流程
1.确定日志格式
首先呢,一个日志文件里肯定是不止一种日志格式,也有可能是标准化的格式,这里需要跟日志产生侧进行确认格式
2.调试grok
确定好日志格式后,编写grok语法,然后进行调试,本人是通过kibana6自带的grok debug进行调试。结合该需求背景,最后经过logstash采集的时候,其实已经通过MLSQL进行了处理,最后Logstash消费的是格式就是一个json字符串,所以不需要grok语法,但是这里还是简单举个例子来说明一下
3.调试ruby
结合该需求,使用ruby进行一些清洗逻辑
4.优化
这里优化的工作在整个需求开发周期的比例较大,因为数据量较大,且资源比较少,具体优化思路如下:
1.MLSQL优化
这部分的优化工作主要是在反序化这块,剔除了一部分无用字段,以及提前过滤了一部分数据量,这里给出一部分注册UDF的代码
2.Kafka端优化
因kafka集群是集团共用,所以kafka端的优化其实只涉及到消费端的优化。这里只调节了两个参数
一:数据压缩
二:消费者线程数
3.hdfs优化
logstash写入hdfs的部分不用使用自带的webhdfs插件,而是自定义的插件。
因自定义插件中涉及到文件锁的问题,会通过比对前后两次文件是否一致来进行文件最后的刷写,所以这里只能通过减少文件的更新频率来减少上下文的切换以及刷写操作
4.ES优化
es部分的优化也只是涉及到写优化,比如批量写入、调大线程数、增加refresh间隔、禁止swapping交换内存、禁止refresh和replica操作,调大index buffer等操作