查看原文
其他

深入理解 Apache Pulsar Connector 与 Functions 的关系

tuteng ApachePulsar 2021-10-18


🎙️阅读本文需要约 10 分钟。




📖 背景知识


  • Connector:Apache Pulsar 的连接器,包括 Source 和 Sink 两个组件。

  • Functions:Apache Pulsar 的轻量级计算组件。




📐 Instance 架构


使用 pulsar-admin source、pulsar-admin sink 或 pulsar-admin function 命令操作 Source、Sink 或 Function 时,在 Worker 上会启动一个 Instance,Instance 架构如下图所示。



前文提到,使用 pulsar-admin 启动一个 Source、Sink 或 Function 时,实际是启动了一个 Instance,该 Instance 跑在 Worker(图中标注 Worker Service 的深蓝色方块)上。


🙋上图有两个「Worker Service」:
  • Worker Service1 上有两个 Instance,分别是 Source 和 Function。 

  • Worker Service2 上有一个 Instance,即 Sink。


另外,图中的 PulsarSource、IdentityFunction 和 PulsarSink 组件都是 Instance 的一部分。

当外部系统向 Pulsar 输入数据时,会使用 pulsar-admin source 启动 Source 的 Instance,该 Instance 会初始化三个组件,代码如下:
// sink 组件,负责数据输出 setupOutput(contextImpl);// source 组件,负责数据输入setupInput(contextImpl);// function 组件,负责简单计算和过滤等return new JavaInstance(contextImpl, object);

首先初始化数据的出口,如果数据目的地有问题,就不会继续进行。


每个组件内部有以下逻辑判断:
if (sinkSpec.getClassName().isEmpty()) { // 未指定 className,则使用系统默认的组件初始化,即前文提到的 PulsarSink} else { // 如果可以找到 className,则使用它进行初始化}// If source classname is not set, we default pulsar sourceif (sourceSpec.getClassName().isEmpty()) { // 未指定 className,则使用系统默认的组件初始化,即前文提到的 PulsarSource} else { // 如果可以找到 className,则使用它进行初始化}// create the functionsif (userClassObject instanceof Function) { // 使用默认的的 IdentityFunction 进行初始化 this.function = (Function) userClassObject;} else { // 使用用户定义的 Function this.javaUtilFunction = (java.util.function.Function) userClassObject;}

Source 命令启动后,Instance 发现提供了 className,则会使用 className 替换系统默认的 Source 组件接收外部系统的数据。收到数据后,会将这些数据放在 queue 中。


无论是用户自定义的 source,还是系统默认的 PulsarSource,收到数据之后都会执行以下代码:
consume(record);public void consume(Record<T> record) { try { queue.put(record); } catch (InterruptedException e) { throw new RuntimeException(e); } }

Pulsar 会把数据放至队列中,等待 Function 处理。这里已指定了 Source ,因此使用系统默认的 IdentityFunction。

🙋PulsarSource 和用户自定义 source 有以下区别:
  • 用户自定义 Source:用于与外部系统集成(图中的红色箭头)。例如,数据库和日志等。
  • 系统默认的 PulsarSource:会启动 Consumer ,用于消费来自 Pulsar topic 的数据(图中指向 Consumer 的黄色箭头)。


数据流入至 Function 时,会执行以下逻辑:
// process the messageresult = javaInstance.handleMessage(currentRecord, currentRecord.getValue());public JavaExecutionResult handleMessage(Record<?> record, Object input) { ... if (function != null) { // 用户自定义 function 的处理逻辑 output = function.process(input, context); } else { // 系统默认 IdentityFunction 的处理逻辑 output = javaUtilFunction.apply(input); } ...}

数据在 Function 处理完成之后,会进入到下一个组件,即 Sink,Sink 同样会执行类似的判断。

如果是用户自定义 Sink,则会调用用户的类执行初始化;如果没有用户自定义 Sink,则使用系统默认的 PulsarSink 执行初始化,从而完成数据的输出,这里使用了 PulsarSink。


🙋PulsarSink 与用户自定义 Sink 有以下区别:

  • 用户自定义 Sink:将数据输出至外部系统(图中第二个红色箭头)。
  • 系统默认的 PulsarSink:会初始化 Pulsar 的 Producer,将数据输出至 Pulsar Topic(图中指向外部的黄色箭头)。


以上流程是在命令行中执行 pulsar-admin source 时进行的逻辑,当执行 Function 时,会将 IdentityFunction 替换为用户的 Function 对象。

但是 Source 和 Sink 会使用系统默认的 PulsarSource 和 PulsarSink 进行初始化(图中标注 Function 的蓝色方块)。pulsar-admin sink 同样如此,此处不再赘述。



📣总结一下

本文分享了 Source、Sink 和 Function 之间的关系以及数据流通的过程。

Instance 实际包含了三个组件:Source、Sink 和 Function,这三个组件是不同的,因此,「Source 和 Sink 是一种特殊的 Function」 的说法是不准确的

同时,本文还介绍了每个 Instance 对 pub/sub 模型的封装。Source 和 Sink 作为 Instance 中单独的组件,在构建与外部系统的生态时有重要意义。

🙋‍♀️点击查看更多 Apache Pulsar Connector 相关系列文章



作者 | tuteng

审校 | Anonymitaet

编辑 | Sylvia




📣Join Pulsar 📣


Apache Pulsar 鼓励大家积极参与开源社区,欢迎大家积极提交 PR,具体 contribute 流程可以点击「阅读原文」查看哦~



: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存