揭开 Pulsar Flink connector 的非神秘面纱
本期 TGIP-CN 直播,我们邀请到了来自 StreamNative 的高级工程师赵建云,他为我们分享了一些关于 Pulsar Flink connector 的介绍和使用细节等。
最新 2.5.0 版本的 Pulsar Flink connector 更新介绍可以参考:Pulsar Flink Connector 2.5.0 正式发布
Pulsar 简介
Apache Pulsar 是云原生的分布式消息流系统,采用计算和存储分层的架构和以 Segment 为中心的分片存储,具有更好的性能、可扩展性和灵活性,是一款可以无限扩展的分布式消息队列。
Pulsar 的生态主要在三个层面进行体现:连接、存储和数据处理。区别于其他 MQ 流平台,Pulsar 在连接层面有 Pulsar clients 加持并附带多种跨语言版本,同时支持多种协议处理(KoP/AoP/MoP 等)等。
与大多数消息传递系统的单片架构不同,Pulsar 采用分层分片式的架构,服务层和存储层都能够独立扩展,以提供更好的性能、可扩展性和灵活性,这种设计对容器非常友好,使得 Pulsar 成为流原生平台的理想选择。
Apache Flink 简介
Pulsar+Flink=
使⽤流式连接器(Streaming Connectors)⽀持流式⼯作负载;
使⽤批式源连接器(Batch Source Connectors)⽀持批式⼯作负载;
Pulsar 还提供了对 Schema 的原⽣⽀持,可以与 Flink 集成并提供对数据的结构化访问,例如,使⽤ Flink SQL 在 Pulsar 中查询数据。
Pulsar Flink connector
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("topic", "test-source-topic");
props.setProperty("partitiondiscoveryintervalmillis", "5000");
FlinkPulsarSource<String> source = new FlinkPulsarSource<>(serviceUrl, adminUrl, new SimpleStringSchema(), props);
source.setStartFromEarliest();
DataStream<String> stream = see.addSource(source);
FlinkPulsarSink<Person> sink = new FlinkPulsarSink(
serviceUrl,
adminUrl,
Optional.of(topic), // mandatory target topic or use `Optional.empty()` if sink to different topics for each record
props,
TopicKeyExtractor.NULL, // replace this to extract key or topic for each record
Person.class,
RecordSchemaType.AVRO);
stream.addSink(sink);
Pulsar 中的 MessageId 是全局唯一且有序的,并且对应 Pulsar 中的实际物理存储。因此想要实现 Exactly Once, 只需要结合 Flink 的 Checkpoint 机制,将 MessageId 存储到 Checkpoint 即可。
Topic 动态发现这一功能主要是为了解决流数据分析应用是长时间执行的,因此在分析应用执行期间,topic 的分区或者用户订阅的 topic 会动态增删。为了使我们的流计算应用可以感知这种变化,可以启动一个定时任务,定期检查是否有新增 topic,并启动 reader 处理数据。
Pulsar 支持自定义 Avro schema 和写入消费 Avro、JSON、Protobuf 等格式的消息,使得消息结构化。通过这样也带来了 Pulsar SQL、Function 等相关的生态支持,用户可以在更多的系统中操作数据。
对于连接器而言,良好的拓展性可以使得用户在使用的过程中,既可以简单操作,又可以创造更多可能性。
public FlinkPulsarSource(
String adminUrl,
ClientConfigurationData clientConf,
PulsarDeserializationSchema<T> deserializer,
Properties properties) {
\\ 初始化FlinkPulsarSource参数
}
public FlinkPulsarSink(
String adminUrl,
Optional<String> defaultTopicName,
ClientConfigurationData clientConf,
Properties properties,
TopicKeyExtractor<T> topicKeyExtractor,
Class<T> recordClazz,
RecordSchemaType recordSchemaType) {
}
Q&A
总结
👍 推荐阅读
➡️ Pulsar Flink Connector 2.5.0 正式发布
➡️ 如何使用 Apache Flink 查询 Pulsar 流
👇🏻 点击「阅读原文」来为 Pulsar 发光发热吧!