其他
深入理解 Apache Pulsar Connector 与 Functions 的关系
Connector:Apache Pulsar 的连接器,包括 Source 和 Sink 两个组件。
Functions:Apache Pulsar 的轻量级计算组件。
Worker Service1 上有两个 Instance,分别是 Source 和 Function。
Worker Service2 上有一个 Instance,即 Sink。
// sink 组件,负责数据输出
setupOutput(contextImpl);
// source 组件,负责数据输入
setupInput(contextImpl);
// function 组件,负责简单计算和过滤等
return new JavaInstance(contextImpl, object);
if (sinkSpec.getClassName().isEmpty()) {
// 未指定 className,则使用系统默认的组件初始化,即前文提到的 PulsarSink
} else {
// 如果可以找到 className,则使用它进行初始化
}
// If source classname is not set, we default pulsar source
if (sourceSpec.getClassName().isEmpty()) {
// 未指定 className,则使用系统默认的组件初始化,即前文提到的 PulsarSource
} else {
// 如果可以找到 className,则使用它进行初始化
}
// create the functions
if (userClassObject instanceof Function) {
// 使用默认的的 IdentityFunction 进行初始化
this.function = (Function) userClassObject;
} else {
// 使用用户定义的 Function
this.javaUtilFunction = (java.util.function.Function) userClassObject;
}
consume(record);
public void consume(Record<T> record) {
try {
queue.put(record);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
用户自定义 Source:用于与外部系统集成(图中的红色箭头)。例如,数据库和日志等。
系统默认的 PulsarSource:会启动 Consumer ,用于消费来自 Pulsar topic 的数据(图中指向 Consumer 的黄色箭头)。
// process the message
result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
public JavaExecutionResult handleMessage(Record<?> record, Object input) {
...
if (function != null) {
// 用户自定义 function 的处理逻辑
output = function.process(input, context);
} else {
// 系统默认 IdentityFunction 的处理逻辑
output = javaUtilFunction.apply(input);
}
...
}
🙋PulsarSink 与用户自定义 Sink 有以下区别:
用户自定义 Sink:将数据输出至外部系统(图中第二个红色箭头)。
系统默认的 PulsarSink:会初始化 Pulsar 的 Producer,将数据输出至 Pulsar Topic(图中指向外部的黄色箭头)。
🙋♀️点击查看更多 Apache Pulsar Connector 相关系列文章
Pulsar Connector 预览篇 Pulsar Source 入门篇 Pulsar Sink 入门指南 如何使用 Apache Flume 发送日志数据至 Apache Pulsar 使用 Elastic Beats 搜集日志到 Pulsar
作者 | tuteng
审校 | Anonymitaet
编辑 | Sylvia
📣Join Pulsar 📣
Apache Pulsar 鼓励大家积极参与开源社区,欢迎大家积极提交 PR,具体 contribute 流程可以点击「阅读原文」查看哦~