谁动了我的数据?去哪儿网MySQL嗅探程序
作者介绍
雷孝龙,去哪儿网资深DBA。2019年8月加入去哪儿网,负责公司的MySQL/Redis运维,以及自动化方案的设计与实施。曾就职于达梦数据库、映客直播;擅长于数据库管理、维护及优化等工作。
一、背景
作为 DBA ,在日常运维工作中,常常会被开发同学问到类似下面的这些问题:
我的这条数据不知道被谁改了,能帮我查一下吗? 能帮我查查这张表都有谁在访问吗? 昨天某个时间监控有个尖刺,能帮忙看看执行了哪些 SQL 吗? 某一个 SQL 语句中有手机号、身份证号,或者密码明文等敏感信息,这个语句是谁在执行? 某一张表中存储了手机号、密码等明文信息,又是谁在读写这张表呢?
面对这些问题,我们会想方设法去找找慢日志,或者查询日志,甚至分析 binlog ,运气好的话可以找到一些线索,但大多数时候都不尽人意。详细的访问信息,带用户名、客户端 IP 的只有慢日志会记录,生产环境又不可能把所有 SQL 都记录下来,那会极大的拖慢系统性能,甚至有几率会带来故障,因此这成为了一个让 DBA 头疼的问题。
那么,怎么获取到数据全量的访问日志呢,我们来分析一下常见的一些方法:
从 Server 端记录
就是前面提到的,通过慢日志、查询日志来记录。优缺点也很明显,优点就是记录的信息准确,开启简单;缺点就是会占用系统性能,同时带来风险,这种实现方式通常有几种方案:
General log:这种方案可以记录所有访问日志,但日志内容匮乏,只包括了 Query 语句,而缺少了访问 IP ,用户名等信息,并且如果线上都打开这个日志文件,对 MySQL 性能的冲击很大,容易产生故障,所以很难实现我们的需求。
Slow log:将 long_query_time 参数设置为 0 ,即可记录所有 MySQL Server 端执行的 SQL 语句,用户名、客户端 IP 、执行时间等信息均有记录,但是带来的问题与开启 General log 相同,生产环境不可能一直开启。
Audit plugin:使用 MySQL 自带的审计功能,其实是可以实现部分需求的,但问题点在于 audit 本身是 MySQL 的一个插件,与版本关系比较大,功能不尽相同,而最大的一个问题是,如果开启这个插件功能,需要重启数据库,这是比较要命的,所以也不能被采用。
从客户端记录
客户端发起请求和收到响应时,能够记录自己发出的 SQL 请求,结果集以及响应时间,但我们知道,客户端众多,汇总起来是一个问题,也不能保证每台机器执行 SQL 的时序性。更不能体现 Server 端的真实执行时间,网络开销、客户端程序处理耗时等因素没法排除。
MySQL中间层记录
通过中间层日志追加的方式,记录所有对于底层数据库的请求;但这种方式受限于中间层的特性,并且 MySQL 和中间层的关系往往是多对多,也存在一个汇总的问题,同时也会增加额外开销。更加重要的是,在我们使用的 MySQL 架构中,没有中间层这个模块,所以这种方案就已经被排除在外。
网络旁路抓取
这是一种常见的 SQL 审计手段,对服务是非入侵式的,通过网络抓包来获取对于数据库的全量访问日志。这种方式既不会对服务造成任何影响,同时也可以控制对于服务器资源的占用,准确性也有一定保障
根据上面所分析的方案,我们决定使用最为安全,运维最容易的方案,即网络旁路抓取的方案。
二、功能需求
抓包成功率:需要能够确保准确的抓取到所有对于数据库的请求,这也是决定这个程序是否可用的最重要因素。
多通道、自主探测发现:程序能够对于服务器数据库实例自主探测发现,当创建新集群新实例时,程序能够发现新的数据库端口,并对其数据流进行收集抓取,也就是说要实现自动化、智能化,尽可能少的人为干预。
数据便于分析处理:需要有一个合理的存储将数据汇总,从数据库维度、业务维度对访问日志进行分析汇总,既要保证数据分析的时效性,又能保证数据容量足够大,能存储线上所有抓取回来的数据。需要查明细的时候,还能很快地查出来。
便于开发维护:开发语言需要简单易懂,便于快速迭代,且兼容性好,适用于线上不同版本的操作系统;后期维护发布等也需要考虑到,一旦变更需要所有服务器重新部署。
稳定、轻量级、高性能:程序需要有能够在线上稳定运行,虽然挂了不影响线上服务,但收集到访问数据准确性会大打折扣;其次,抓包程序部署在数据库服务器,不能占用过多系统资源,否则得不偿失,所以需要在有限的资源范围内,还得保证程序的高性能,能够快速处理掉抓取到的大量数据。
开源程序调研
我们调研了几个目前已有的 MySQL 抓包程序,结果如下:
通过对比,目前没有比较适合于我们诉求的抓包工具,需要重新开发一个适用于 Qunar 自身的数据库抓包程序。
程序结构
通过上面的讲述,已经明确了我们现在需要做的事情,就是将某个 MySQL 端口的流量抓取到,然后按照 MySQL 公布的协议格式,分析出来客户端与 MySQL Server 之间往来流转的信息,包括最主要的 SQL 语句,执行时间,影响行数,用户名, IP 等信息。
程序结构图
主函数代码示例
type Sniffer struct {
sessionMgr *SessionManager
ports []int
device string
handle *pcap.Handle
decoders []*MySQLDecoder
packetChannels []chan gopacket.Packet
resultChannels []chan MySQLResult
writeChannels []chan Result
dbConnManager *DbConnManager
}
func (snf *Sniffer) Start() {
// 初始化会话
snf.sessionMgr.InitSession(snf.dbConnManager, snf.ports)
// 使用 gopacket 对数据包进行实时捕获
if handle, err := pcap.OpenLive(snf.device, 65535, true, pcap.BlockForever); err != nil {
...
...
return
} else {
snf.handle = handle
}
// 过滤MySQL实例端口
snf.setBPF()
go snf.Dispatch()
// 解析MySQL协议
go snf.Decoder()
// 对解析对包结果进行处理
go snf.handleResult()
// 结果投入到 kafka
go snf.Write()
// 定时任务,用于检测新建的数据库实例、抓包率统计等
go snf.ServerCron()
// 关闭会话
go snf.sessionMgr.deleteSession(snf.resultChannels[0])
}
2.1 如何实现抓包
Linux 操作系统对数据包的处理流程:
网卡->网卡驱动->数据链路层->IP层->传输层->应用程序
Libpcap
Libpcap 是 Packet Capture Libray 的英文缩写,即数据包捕获函数库。该库提供的C函数接口,用于捕捉经过指定网络接口的数据包。
我们熟知的 tcpdump 和 wireshark 就是在 Libpcap 的基础上开发而成的。Libpcap 提供的接口函数实现和封装了与数据包截获有关的过程。Libpcap 提供了用户级别的网络数据包捕获接口,并充分考虑到应用程序的可移植性。Libpcap 可以在绝大多数 Linux 平台上运行。
它的工作在上层应用程序与网络接口之间。
主要功能
数据包捕获:捕获流经网卡的原始数据包;
自定义数据包发送:构造任何格式的原始数据包;
流量采集与统计:采集网络中的流量信息;
规则过滤:提供自带规则过滤功能,按需要选择过滤规则。
2.2 MySQL协议解析
MySQL 客户端与服务器的完整交互过程如下
MySQL 数据包报文结构
主要分为登陆报文、客户端命令请求报文、服务器响应报文。
1、登陆报文
手初始化报文(服务器 -> 客户端)
登陆认证报文(客户端 -> 服务器)
2、客户端命令请求报文(客户端 -> 服务器)
命令:用于标识当前请求消息的类型,例如切换数据库(0x02)、查询命令(0x03)等,常见命令说明如下:
3、服务器响应报文
当客户端发起认证请求或命令请求后,服务器会返回相应的执行结果给客户端。客户端在收到响应报文后,需要首先检查第1个字节的值,来区分响应报文的类型。
2.3 抓包结果结果处理
解析报文后,我们需要保留的有用信息如下:
{
"client_host": "192.168.225.219",
"client_addr": "192.168.225.219:59154",
"user": "h_qta_rw",
"db": "qta_product_baseinfo",
"rows": 1,
"bytes_send": 625,
"query": "select\n \n id,\n wrapper_id,\n real_phone,\n phone_type,\n did_phone,\n did_prefix\n \n from\n wrapper_did_phone where wrapper_id='hta10850s5i'",
"thread_id": 3243074,
"request_time": "2021-09-26T02:40:24.459756+08:00",
"response_time": "2021-09-26T02:40:24.459874+08:00",
"error": "",
"duration": "0.118 ms",
"db_addr": "10.88.133.221:3306",
"port": 3306,
"host_name": "l-xxx.h.cn5",
"tables": "qta_product_baseinfo.wrapper_did_phone",
"checksum": "33353139414136414438343442394533",
"fingerPrint": "select id, wrapper_id, real_phone, phone_type, did_phone, did_prefix from wrapper_did_phone where wrapper_id=?"
}
2.4 结果存储
我们抓取到的数据,最终都需要存储起来,这样才能用来分析,才能被运维的时候使用到,但我们线上有上千个 MySQL 实例,总共的 QPS 有几十万,这么大的数据量如何存储呢?
我们最初考虑使用的是 Kafka+ES+Flink 的方案,使用了一段时间后,发现有些问题不能满足我们的需求,包括:
ES 查询速度慢,导致分析起来很费时间。
整条数据流,从 Kafka 出来之后,延迟就非常大,通常要达到好几个小时,不能接受。
聚合配置,修改起来非常困难,并且是由另一个团队来负责的,查问题的时候不太方便。
由于上面所述的问题,我们决定换一种实现方式,经过调研发现,现在的数据分析方案中,Clickhouse 是比较火的,并且速度非常快,也支持集群,所以我们尝试使用这个方案来解决这个问题,即:Kafka+Clickhouse。
经过一段时间的使用,发现这种方案非常好,基本没有延迟,并且数据保证的精度也很高,不会因为聚合丢失太多有用的数据,并且占用空间并不会太大,下面是我们使用 Clickhouse 方案中所涉及到的一些细节
如前面程序结构图所示,我们将抓包所产生的json串投递到对应机房kafka,再通过Clickhouse自带的 Kafka Engine 表接收 kafka 数据,Clickhouse 存储了所有线上 MySQL 实例的抓包结果。
Clickhouse接受收kafka数据流程如下:
在 ClickHouse 中建立 Kafka Engine 外表,作为 Kafka 数据源的一个接口。
CREATE TABLE sniffer.sniffer_kafka_queue_cn1
(
`client_host` String,
`client_addr` String,
`user` String,
`db` String,
`rows` Int32,
`bytes_send` UInt32,
`query` String,
`thread_id` UInt32,
`request_time` String,
`response_time` String,
`error` String,
`duration` String,
`db_addr` String,
`port` UInt32,
`host_name` String,
`tables` String,
`checksum` String,
`fingerPrint` String
)
ENGINE = Kafka()
SETTINGS kafka_broker_list = 'l-kafka1.data.cn1:9092,l-kafka2.data.cn1:9092,l-kafka3.data.cn1:9092,l-kafka4.data.cn1:9092', kafka_topic_list = 'logs.logger.d_dba_prod', kafka_group_name = 'clickhouse-reader', kafka_format = 'JSONEachRow', kafka_max_block_size = 524288
在 ClickHouse 中创建普通表(通常是 MergeTree 系列)存储 Kafka中 的数据。
CREATE TABLE sniffer.sniffer_view
(
`client_host` String,
`client_addr` String,
`user` String,
`db` String,
`rows` UInt32,
`bytes_send` UInt32,
`query` String,
`request_time` DateTime,
`response_time` DateTime,
`duration` String,
`db_addr` String,
`port` UInt32,
`host_name` String,
`tables` String,
`checksum` String,
`fingerPrint` String,
`times` UInt32,
`ts` DateTime,
`_topic` Array(String),
`_offset` Array(UInt64),
`_partition` Array(UInt64)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/newshard1/sniffer_view', 'l-xxx.dba.cn1')
PARTITION BY (toYYYYMMDD(response_time), substr(host_name, length(host_name) - 2, 3))
ORDER BY (response_time, host_name, port, tables, checksum)
TTL response_time + toIntervalDay(7)
SETTINGS index_granularity = 8192
在 ClickHouse 中创建 Materialized View , 监听 Kafka 中的数据,并将数据写入 ClickHouse 存储表中。
CREATE MATERIALIZED VIEW sniffer.sniffer_kafka_queue_cn1_mv TO sniffer.sniffer_view
(
`client_host` String,
`client_addr` String,
`user` String,
`db` String,
`rows` UInt32,
`bytes_send` UInt32,
`query` String,
`request_time` DateTime,
`response_time` DateTime,
`duration` String,
`db_addr` String,
`port` UInt32,
`host_name` String,
`tables` String,
`checksum` String,
`fingerPrint` String,
`times` UInt64,
`ts` DateTime,
`_topic` Array(String),
`_partition` Array(UInt64),
`_offset` Array(UInt64)
) AS
SELECT
client_host,
client_addr,
user,
db,
max(rows) AS rows,
max(bytes_send) AS bytes_send,
any(query) AS query,
toDateTime(substr(max(request_time), 1, 19)) AS request_time,
toDateTime(substr(max(response_time), 1, 19)) AS response_time,
max(duration) AS duration,
any(db_addr) AS db_addr,
port,
host_name,
tables,
checksum,
any(fingerPrint) AS fingerPrint,
count(*) AS times,
now() AS ts,
groupArray(_topic) AS _topic,
groupArray(_partition) AS _partition,
groupArray(_offset) AS _offset
FROM sniffer.sniffer_kafka_queue_cn1
GROUP BY
client_host,
client_addr,
user,
db,
port,
host_name,
tables,
checksum
三、开发与运维
上图所展示的是,我们的 sniffer 程序在开发和运维过程中所涉及到的模块和单元,因为这个程序需要在每一台数据库服务器上需要部署,我们的目标是尽可能实现免维护的特性,所以需要在核心功能之外增加很多额外的功能,主要包括:
端口自我发现:主要用来实现新部署一个实例,sniffer 能自动地发现这个实例,并且抓取到对应的流量,而不需要 DBA 再去处理新实例的这方面的工作了。
心跳:主要是用来方便管理哪些还活着,哪些是不是由于某种原因已经死掉了。
运维状态:包括抓取队列,发送队列,实时抓包率等信息,还是方便我们在管理过程中,更精确地掌握 sniffer 的实时信息。
垃圾回收:这个主要是为了让 sniffer 更简洁,占用资源更少,更轻量而去做的。
实现了上面的功能之后,在运维中,就可以很容易地实现批量部署,灰度发布,异常报警,自动发现,平滑升级,异常恢复等功能了,目前我们的状态基本上就是免维护的,运行稳定。
四、问题分析
在开发使用过程中,遇到了很多问题,经过好几个同学的手,逐步解决掉的,主要包括:系统丢包问题,程序的内存泄漏问题,解析数据性能问题,SQL 语法解析问题,信息丢失或者不全的问题等。解决这些问题是一个复杂而且漫长的过程,也是程序的难点所在,由于本章篇幅有限,先带大家简单了解我们抓包程序的功能和大体实现方法及运维思路,这些问题留到之后来分享,尽请期待。
五、总结与展望
目前,我们所有 MySQL 服务器均已部署了抓包程序,对于线上约 95% 左右的实例,我们的抓包准确率能够达到 98% 以上,个别请求量大,SQL 复杂的实例,丢包仍然存在,我们仍在不断努力改进中,目前也基本满足了我们的需求。
有了数据库全量的访问数据,我们能够做很多事情,包括:
资源归属:我们可以很清楚地掌握哪些库、表、用户等是不是还在使用,或者被谁使用了,我们从此在 MySQL 运维中,做到了:“冤有头,债有主”。
SQL 统计报表:我们通过对 SQL 语句的访问记录,可以很清楚地了解到哪些SQL语句是高频访问的,哪些更新语句的影响行数大,哪些语句中有对大字段的操作,并且长度大于某个预定义的值的,都是可以很好的被抓出来的。
资源优化、业务变化分析:可以对不访问的库表,用户进行回收,从而节省资源。
安全审计:很明显,有了这样的利器之后,安全无忧。
调用栈打通:通过抓包分析,我们很容易确定某一个 IP 访问了什么数据库,什么表,使用的是什么语句,这样不管是从上到下查问题,还是从下到上查问题,都能很容易地定位,而不是像之前,一走到数据库这里,就断线了。
通知:以后如果要迁移一个数据库,或者做一个主动运维的工作,我们都可以很轻松地找到被影响的业务直接到负责人,简单高效。
责任归属:当发现一些由于语句写的不好影响数据库的故障,我们很容易可以定位是什么业务导致的,甚至可以提前预警一些不好的语句,直接找到语句的 Owner ,然后提前解决。
其它:还有很多可以做的功能,这里就不一一列举了。
很明显,有了我们这些工作,运维不再是难事儿。很多问题,原来是被动接受的,现在就有可能变为了主动“反击”。这也是我们团队现在推崇的被叫做是“基于统计的数据库运维”的基础,基于统计,也就是基于数据,基于数据,也可以被理解为智能,等到我们把所有的运维的这些事儿都掌握了之后,了然于胸之后,我们就可以很自信的说,我们已经是 AIDBA 了。到那时,我们不会再为开发的提问而不知所措,也不再为谁动了数据发愁,我们能够精准定位到每一刻数据的变更,防患于未然,保护好我们的数据,更主要的是,我们还可以让数据库资源,变得更加健康,DBA 的工作,变得更加高效。
招募贴:
END