查看原文
其他

揭开 Pulsar Flink connector 的非神秘面纱

赵建云 ApachePulsar 2021-10-18


本期 TGIP-CN 直播,我们邀请到了来自 StreamNative 的高级工程师赵建云,他为我们分享了一些关于 Pulsar Flink connector 的介绍和使用细节等。


最新 2.5.0 版本的 Pulsar Flink connector 更新介绍可以参考:Pulsar Flink Connector 2.5.0 正式发布





Pulsar 简介


Apache Pulsar 是云原生的分布式消息流系统,采用计算和存储分层的架构和以 Segment 为中心的分片存储,具有更好的性能、可扩展性和灵活性,是一款可以无限扩展的分布式消息队列。


Pulsar 的生态主要在三个层面进行体现:连接、存储和数据处理。区别于其他 MQ 流平台,Pulsar 在连接层面有 Pulsar clients 加持并附带多种跨语言版本,同时支持多种协议处理(KoP/AoP/MoP 等)等。


与大多数消息传递系统的单片架构不同,Pulsar 采用分层分片式的架构,服务层和存储层都能够独立扩展,以提供更好的性能、可扩展性和灵活性,这种设计对容器非常友好,使得 Pulsar 成为流原生平台的理想选择。



更多关于 Apache Pulsar 的详细介绍,可以参考:Apache Pulsar 介绍




Apache Flink 简介


Apache Flink 是⼀个在⽆界和有界数据流上进⾏状态计算的框架和分布式处理引擎,这几年内比较火。Flink 可以在所有常⻅的集群环境中运⾏,并以 in-memory 的速度和任意的规模进⾏计算。

Flink 的框架大致可以分为三块内容,从左到右依次为:数据输入、Flink 数据处理、数据输出。Pulsar Flink connector 对 Flink 中间的数据处理部分做了相当完备的支持。





Pulsar+Flink=


Pulsar 和 Flink 都是当下比较出色的计算框架,Apache Flink 是当下最流⾏的数据计算引擎,Apache Pulsar 是消息订阅系统中的翘楚。当他们强强联合会迸发出怎样的产品效果呢? 

Pulsar 跟 Flink 有一些相同的特性,比如它们的批处理都是有界的数据流。目前 Pulsar 跟 Flink 的集成方式有多种。

  • 使⽤流式连接器(Streaming Connectors)⽀持流式⼯作负载;

  • 使⽤批式源连接器(Batch Source Connectors)⽀持批式⼯作负载;

  • Pulsar 还提供了对 Schema 的原⽣⽀持,可以与 Flink 集成并提供对数据的结构化访问,例如,使⽤ Flink SQL 在 Pulsar 中查询数据。 



从架构的⻆度来看,我们可以想象两个框架之间的融合,使⽤ Apache Pulsar 作为统⼀的数据层视图,使⽤ Apache Flink 作为统⼀的计算、数据处理框架和 API。




Pulsar Flink connector


目前 Pulsar Flink connector 支持 Flink 1.9 及以上版本,同时支持 Flink Stream、Batch、Table、Catalog 等功能,以及 Pulsar Topic 动态发现和 Pulsar Schema 的功能支持。

Pulsar Flink connector 在使用上比较简单,由一个 Source 和一个 Sink 组成。Source 的功能是将一个或多个主题下的消息传入到 Flink source 中,Sink 的功能就是从 Flink Sink 中获取数据并放入到某些主题下,在使用方式上可以参考下方代码。
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = new Properties();props.setProperty("topic", "test-source-topic");props.setProperty("partitiondiscoveryintervalmillis", "5000");
FlinkPulsarSource<String> source = new FlinkPulsarSource<>(serviceUrl, adminUrl, new SimpleStringSchema(), props);source.setStartFromEarliest(); DataStream<String> stream = see.addSource(source);
FlinkPulsarSink<Person> sink = new FlinkPulsarSink( serviceUrl, adminUrl, Optional.of(topic), // mandatory target topic or use `Optional.empty()` if sink to different topics for each record props, TopicKeyExtractor.NULL, // replace this to extract key or topic for each record Person.class, RecordSchemaType.AVRO);
stream.addSink(sink);

🧰 exactly-once

Pulsar 中的 MessageId 是全局唯一且有序的,并且对应 Pulsar 中的实际物理存储。因此想要实现 Exactly Once, 只需要结合 Flink 的 Checkpoint 机制,将 MessageId 存储到 Checkpoint 即可。



对于连接器的 Source 任务,在每次触发 Checkpoint 的时候,会将各个分区当前处理的 MessageId 保存到状态存储里面,这样在任务重启的时候,每个分区都可以通过 Pulsar 提供的 Reader seek 接口找到 MessageId 对应的消息位置,然后从这个位置之后读取消息数据。

通过 Checkpoint 机制,还能够向存储数据的节点发送数据使用完毕的通知,从而能准确删除过期的数据,做到存储的合理利用。

🧰 Topic 动态发现

Topic 动态发现这一功能主要是为了解决流数据分析应用是长时间执行的,因此在分析应用执行期间,topic 的分区或者用户订阅的 topic 会动态增删。为了使我们的流计算应用可以感知这种变化,可以启动一个定时任务,定期检查是否有新增 topic,并启动 reader 处理数据。


🧰 Pulsar Schema 支持

Pulsar 支持自定义 Avro schema 和写入消费 Avro、JSON、Protobuf 等格式的消息,使得消息结构化。通过这样也带来了 Pulsar SQL、Function 等相关的生态支持,用户可以在更多的系统中操作数据。


Pulsar Flink connector 中 Source、Sink 对于 Pulsar Schema 均做了相关技术支持,使数据在Pulsar、Flink 中流转并保持结构化一致。

🧰 良好的拓展性

对于连接器而言,良好的拓展性可以使得用户在使用的过程中,既可以简单操作,又可以创造更多可能性。


在 Source 层面,我们通常不会满足在消息中只拿到了 value 数据,而是会希望能获取到 message 的消息发布时间、扩展属性等。Pulsar Flink connector 提供了自定义的数据解码接口,用户选择可以实现 PulsarDeserializationSchema 来实现自己的需求。
public FlinkPulsarSource( String adminUrl, ClientConfigurationData clientConf, PulsarDeserializationSchema<T> deserializer, Properties properties) { \\ 初始化FlinkPulsarSource参数}

在 Sink 层面,Pulsar Flink connector 支持使用每条记录中的 topic 来输出消息。
public FlinkPulsarSink( String adminUrl, Optional<String> defaultTopicName, ClientConfigurationData clientConf, Properties properties, TopicKeyExtractor<T> topicKeyExtractor, Class<T> recordClazz, RecordSchemaType recordSchemaType) {}

在这里再多提一点关于 Source 使用的 Pulsar 消费模式为 Reader,并非常规的订阅者模式。Reader 模式和常规的订阅者模式不同的是,reader 启动时必须指定游标位置,产生一个随机的订阅名,会话结束后会取消订阅。

更多关于 Pulsar Flink connector 中 catalog、SQL、DDL 等特性的介绍,可以查看下方回放视频 42:00-50:00 时间段。





Q&A


Q:Pulsar Flink connector 底层都是用 reader 读的吧?
A:是的,对于 Stream 模式下是用 reader 读的。Batch 的话是新模型,不会直接使用 Pulsar client 进行交互,而是使用 bookie。




总结


本期直播主要为大家介绍了关于 Pulsar Flink connector 的相关内容,下期 TGIP-CN 直播我们将为大家带来关于 Apache Pulsar 管理工具的干货内容,可扫描下方二维码进行活动报名哦!


👍 推荐阅读

➡️ StreamNative 宣布开源 MoP!

➡️ Pulsar Flink Connector 2.5.0 正式发布

➡️ 如何使用 Apache Flink 查询 Pulsar 流


👇🏻 点击「阅读原文」来为 Pulsar 发光发热吧!

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存