其他
Flink流式计算在节省资源方面的简单分析
往期文章回顾:小米流式平台架构演进与实践Flink在小米的发展简介
信息流推荐业务是小米从Spark Streaming迁移到Flink流式计算最早也是使用Flink最深的业务之一,在经过一段时间的合作优化后,对方同学给我们提供了一些使用效果小结,其中有几个关键点:
对于无状态作业,数据处理的延迟由之前Spark Streaming的16129ms降低到Flink的926ms,有94.2%的显著提升(有状态作业也有提升,但是和具体业务逻辑有关,不做介绍); 对后端存储系统的写入延迟从80ms降低到了20ms左右,如下图(这是因为Spark Streaming的mini batch模式会在batch最后有批量写存储系统的操作,从而造成写请求尖峰,Flink则没有类似问题): 对于简单的从消息队列Talos到存储系统HDFS的数据清洗作业(ETL),由之前Spark Streaming的占用210个CPU Core降到了Flink的32个CPU Core,资源利用率提高了84.8%;
很显然,更低的资源占用帮助业务更好的节省了成本,节省出来的计算资源则可以让更多其他的业务使用;为了让节省成本能够得到“理论”上的支撑,我们尝试从几个方面研究并对比了Spark Streaming和Flink的一些区别:调度计算VS调度数据对于任何一个分布式计算框架而言,如果“数据”和“计算”不在同一个节点上,那么它们中必须有一个需要移动到另一个所在的节点。如果把计算调度到数据所在的节点,那就是“调度计算”,反之则是“调度数据”;在这一点上Spark Streaming和Flink的实现是不同的。
// RDD
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
// KafkaRDD
override def getPreferredLocations(thePart: Partition): Seq[String] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
Seq(part.host)
// host: preferred kafka host, i.e. the leader at the time the rdd was created
}
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
总之,通过对比可以看出,Flink的streaming模式对于低延迟处理数据比较友好,Spark的mini batch模式则于异常恢复比较友好;如果在大部分情况下作业运行稳定的话,Flink在资源利用率和数据处理效率上确实更占优势一些。数据序列化
// Not a POJO demo.public class Person { private Logger logger = LoggerFactory.getLogger(Person.class); public String name; public int age;}
针对这一情况我们做了一些优化尝试,由于在小米内部很多业务是通过Thrfit定义的数据,正常情况下Thrift类是通过Kryo的默认序列化器进行序列化和反序列化的,效率比较低。虽然官方提供了优化文档,可以通过如下方式进行优化,但是对业务来讲也是存在一定使用门槛;final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register the serializer included with Apache Thrift as the standard serializer
// TBaseSerializer states it should be initialized as a default Kryo serializer
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);