查看原文
其他

如何获取流式应用程序中checkpoint的最新offset

fjs 大数据学习与分享 2022-07-09
对于流式应用程序,保证应用7*24小时的稳定运行,是非常必要的。因此对于计算引擎,要求必须能够适应与应用程序逻辑本身无关的问题(比如driver应用失败重启、网络问题、服务器问题、JVM崩溃等),具有自动容错恢复的功能。
目前,Spark(Spark Streaming/Structured Streaming)和Flink的checkpoint机制,就是处理类似情况,实现容错机制的核心利器。
对于Flink:
为了保证其高可用、Exactly Once的特性,提供了一套强大的checkpoint机制,它能够根据配置周期性地基于流中各个operator的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦出现故障时,能够将整个应用流程序恢复到故障前的某一种态,从而修正因为故障带来的程序数据状态中断。
对于Spark:

在流式应用中,Spark Streaming/Structured Streaming会将关于应用足够多的信息checkpoint到高可用、高容错的分布式存储系统,如HDFS中,以便从故障中进行恢复。checkpoint有两种类型的数据:

  1.  数据checkpoint
  2. 对于一些复杂程序,比如跨多个批次组合数据的有状态转换,生成的RDD依赖于先前批次的RDD,导致依赖链的长度随批次的增加而增加。因为故障恢复时间与依赖链成正比,从而导致恢复时间也跟着增长。因此就有必要周期性的将RDD checkpoint到可靠的分布式存储系统中,以此切断依赖链。

    这在Spark中的状态算子,如mapWithState、updateStateByKey中尤为常见。
  3. 元数据checkpoint

    顾名思义,就是将定义流式应用程序中的信息保存到容错系统中,用于从运行流应用程序的driver节点发生故障时,进行容错恢复。元数据包括:

    1. 配置:用于创建流应用程序DStream操作:

    2. 定义流应用程序的DStream操作集

    3. 未完成的批次:未完成的批次job

本文的重点不在于checkpoint具体含义,而是以Spark为例,阐述如何通过程序获取checkpoint中最新的offset,以此为思路,来解决生产中的实际问题。

通常我们会checkpoint到HDFS,首先来看一下checkpoint信息:

offsets目录记录了每个批次中的offset,此目录中的第N条记录表示当前正在处理,第N-1个及之前的记录指示哪些偏移已处理完成。
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/commits/bigdatalearnshare/checkpointLocation/binlog-2-kafka/metadata/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/bigdatalearnshare/checkpointLocation/binlog-2-kafka/receivedData/bigdatalearnshare/checkpointLocation/binlog-2-kafka/sources
hdfs dfs -ls /bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets
/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/0/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/1/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2
hdfs dfs -cat /bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2
v1{"batchWatermarkMs":0,"batchTimestampMs":1590632490083,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"1"}}2400000001667289

最终获取最新offset的程序示例:

/** * @Author bigdatalearnshare */object ReadOffsets {
def main(args: Array[String]): Unit = { val path = "/bigdatalearnshare/checkpointLocation/binlog-2-kafka/offsets/2"
val fs = FileSystem.get(new Configuration())
val lastFile = fs.listStatus(new Path(path)).filterNot(_.getPath.getName.endsWith(".tmp.crc")) .map { fileName => (fileName.getPath.getName.split("/").last.toInt, fileName.getPath) }.maxBy(_._1)._2
val offset = readFile(lastFile.toString).split("\n").last
assert("2400000001667289".equals(offset)) }
def readFile(path: String): String = { val fs = FileSystem.get(new Configuration()) var br: BufferedReader = null var line: String = null val result = ArrayBuffer.empty[String] try { br = new BufferedReader(new InputStreamReader(fs.open(new Path(path)))) line = br.readLine() while (line != null) { result += line line = br.readLine() } } finally { if (br != null) br.close() }
result.mkString("\n") }
}

这一点在生产环境中还是有一定应用场景的,比如,通过解析mysql binlog日志,将数据同步到kafka,然后再通过消费者程序消费kafka中的数据保存到存储系统中,如delta,通过offset信息对比来校验,binlog到kafka的延迟(如,通过获取binlog中的offset信息与流程序同步到kafka时进行checkpoint的offset做对比)、kafka到存储系统中的延迟。

此外,要注意commits目录下记录的是已完成的批次信息。在实际进行offset比对时,要以此为基准再去获取offsets目录下的offsets信息。
推荐文章:
Spark SQL中Not in Subquery为何低效以及如何规避
Spark流式状态管理
Spark中广播变量详解以及如何动态更新广播变量
SparkSQL与Hive metastore Parquet转换
Spark实现推荐系统中的相似度算法

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存