Kafka丢失数据问题优化总结以及重复消费原因分析
常见的Kafka环节丢失数据的原因有:
单批数据的长度超过限制会丢失数据,报kafka.common.MessageSizeTooLargeException异常解决:
Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer.
Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.
Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)
可以通过log.flush.interval.messages和log.flush.interval.ms来配置flush间隔,interval大丢的数据多些,小会影响性能但在0.8版本,可以通过replica机制保证数据不丢,代价就是需要更多资源,尤其是磁盘资源,Kafka当前支持GZip和Snappy压缩,来缓解这个问题是否使用replica取决于在可靠性和资源代价之间的balance。
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.flush.scheduler.interval.ms = 3000
设计上保证数据的可靠安全性,依据分区数做好数据备份,设立副本数等
push数据的方式:同步异步推送数据:权衡安全性和速度性的要求,选择相应的同步推送还是异步推送方式,当发现数据有问题时,可以改为同步来查找问题。
flush是Kafka的内部机制,Kafka优先在内存中完成数据的交换,然后将数据持久化到磁盘 Kafka首先会把数据缓存(缓存到内存中)起来再批量flush。可以通过log.flush.interval.messages和log.flush.interval.ms来配置flush间隔。
可以通过replica机制保证数据不丢 代价就是需要更多资源,尤其是磁盘资源,Kafka当前支持GZip和Snappy压缩,来缓解这个问题。是否使用replica(副本)取决于在可靠性和资源代价之间的balance(平衡)。
broker到Consumer kafka的consumer提供两种接口 high-level版本已经封装了对partition和offset的管理,默认是会定期自动commit offset,这样可能会丢数据的。
low-level版本自己管理spout线程和partition之间的对应关系和每个partition上的已消费的offset(定期写到zk)。
并且只有当这个offset被ack后,即成功处理后,才会被更新到zk,所以基本是可以保证数据不丢的即使spout线程crash(崩溃),重启后还是可以从zk中读到对应的offset。 异步要考虑到partition leader在未完成副本数follows的备份时就宕机的情况,即使选举出了新的leader但是已经push的数据因为未备份就丢失了 不能让内存的缓冲池太满,如果满了内存溢出,也就是说数据写入过快,Kafka的缓冲池数据落盘速度太慢,这时肯定会造成数据丢失。
尽量保证生产者端数据一直处于线程阻塞状态,这样一边写内存一边落盘。异步写入的话还可以设置类似flume回滚类型的batch数,即按照累计的消息数量,累计的时间间隔,累计的数据大小设置batch大小。
设置合适的方式,增大batch大小来减小网络IO和磁盘IO的请求,这是对于Kafka效率的思考 不过异步写入丢失数据的情况还是难以控制,还是得稳定整体集群架构的运行,特别是zookeeper,当然正对异步数据丢失的情况尽量保证broker端的稳定运作吧。
Kafka不像hadoop更致力于处理大量级数据,Kafka的消息队列更擅长于处理小数据。针对具体业务而言,若是源源不断的push大量的数据(eg:网络爬虫),可以考虑消息压缩。但是这也一定程度上对CPU造成了压力,还是得结合业务数据进行测试选择
broker端
topic设置多分区,分区自适应所在机器,为了让各分区均匀分布在所在的broker中,分区数要大于broker数。分区是Kafka进行并行读写的单位,是提升kafka速度的关键。
broker能接收消息的最大字节数的设置一定要比消费端能消费的最大字节数要小,否则broker就会因为消费端无法使用这个消息而挂起 broker可赋值的消息的最大字节数设置一定要比能接受的最大字节数大,否则broker就会因为数据量的问题无法复制副本,导致数据丢失
comsumer端
//producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好
props.put("compression.type", "gzip");
//增加延迟
props.put("linger.ms", "50");
//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。,
props.put("acks", "all");
//无限重试,直到你意识到出现了问题,设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
props.put("retries ", MAX_VALUE);
props.put("reconnect.backoff.ms ", 20000);
props.put("retry.backoff.ms", 20000);
//关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,以避免数据丢失
props.put("unclean.leader.election.enable", false);
//关闭自动提交offset
props.put("enable.auto.commit", false);
限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。注意:设置此参数是为了避免消息乱序
props.put("max.in.flight.requests.per.connection", 1);
强行kill线程,导致消费后的数据,offset没有提交,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
如果在close之前调用了consumer.unsubscribe()则有可能部分offset没提交,下次重启会重复消费。
Kafka数据重复Kafka设计的时候是设计了(at-least-once)至少一次的逻辑,这样就决定了数据可能是重复的,Kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
Kafka的数据重复一般情况下应该在消费者端,这时log.cleanup.policy = delete使用定期删除机制。
Kafka集群消息积压问题及处理策略
如何为Kafka集群确定合适的分区数以及分区数过多带来的弊端
Kafka分区分配策略(Partition Assignment Strategy)
Kafka中sequence IO、PageCache、SendFile的应用详解
全面解析Kafka