如何使用 OpenTracing 和 Jaeger 追踪 Pulsar 消息
🎙️阅读本文需要大约 5 分钟
OpenTracing(https://opentracing.io/) 是针对应用程序和 OSS(Open-Source Software)软件包的开放分布式追踪标准。许多追踪后端服务都支持 OpenTracing API,例如 Jaeger、Zipkin 和 SkyWalking。
本文详细介绍如何使用 Jaeger 通过 OpenTracing API 追踪 Pulsar 消息。
准备工作
在开始前,需要安装好 JDK 8、Maven 3 和 Pulsar(集群模式或单机模式)。如果还没有安装 Pulsar,可以查看下方链接,按照提示进行安装。
http://pulsar.apache.org/docs/en/standalone/
第 1 步:启动 Jaeger 后端
1. 在 Docker 中启动 Jaeger 后端。
docker run -d -p 6831:6831/udp -p 16686:16686 jaegertracing/all-in-one:latest
成功启动 Jaeger 后,就可以打开 Jaeger UI 网站。
📌 如何你没有 Jaeger Docker 环境,可以:
下载二进制文件
https://www.jaegertracing.io/download/通过源代码构建
https://www.jaegertracing.io/docs/1.17/getting-started/#from-source
2. 访问 `http://localhost:16686`,无需填写用户名或密码就可以打开 Jeager UI 网站。
第 2 步:添加 maven dependencies
本示例使用 Open Tracing Pulsar Client。
https://hub.streamnative.io/monitoring/opentracing-pulsar-client/0.1.0
它是 Pulsar Client 与 OpenTracing API(基于 Pulsar Client Interceptors)的集成,用于追踪 Pulsar 消息。OpenTracing Pulsar Client 由 StreamNative 研发,是 StreamNatvie Hub(https://hub.streamnative.io/) 中的监控工具。
添加 Jaeger client dependency 以连接到 Jaeger 后端。
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>io.streamnative</groupId>
<artifactId>opentracing-pulsar-client</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-client</artifactId>
<version>1.2.0</version>
</dependency>
第 3 步:使用 OpenTracing Pulsar Client
为便于理解,本示例假设有 2 个 Job 和 2 个 topic。Job-1 向 topic-A 发送消息,Job-2 从 topc-A 消费消息。当 Job 2 收到 topic-A 的消息后,Job 2 会向 topic-B 发送消息,然后 Job-3 从 topic-B 消费消息。因此,在这种情况下有 2 个 topic、2 个 producer 和 2 个 consumer。
要完成上述工作场景中的任务,需要启动三个应用程序。
Job-1:发布消息到 topic-A
Job-2:消费 topic-A 中的消息,并发布消息到 topic-B
Job-3:消费 topic-B 中的消息
以下示例为发布消息至 topic-A。
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-1").withSampler(samplerConfig).withReporter(reporterConfig);
Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<String> producerA = client.newProducer(Schema.STRING)
.topic("topic-A")
.intercept(new TracingProducerInterceptor())
.create();
for (int i = 0; i < 10; i++) {
producerA.newMessage().value(String.format("[%d] Hello", i)).send();
}
以下示例为从 topic-A 消费消息,并将消息发布到 topic-B。
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-2").withSampler(samplerConfig).withReporter(reporterConfig);
Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("topic-A")
.subscriptionName("open-tracing")
.subscriptionType(SubscriptionType.Shared)
.intercept(new TracingConsumerInterceptor<>())
.subscribe();
Producer<String> producerB = client.newProducer(Schema.STRING)
.topic("topic-B")
.intercept(new TracingProducerInterceptor())
.create();
while (true) {
Message<String> received = consumer.receive();
SpanContext context = TracingPulsarUtils.extractSpanContext(received, tracer);
TypedMessageBuilder<String> messageBuilder = producerB.newMessage();
messageBuilder.value(received.getValue() + " Pulsar and OpenTracing!");
// Inject parent span context
tracer.inject(context, Format.Builtin.TEXT_MAP, new TypeMessageBuilderInjectAdapter(messageBuilder));
messageBuilder.send();
consumer.acknowledge(received);
}
📝Job-3
以下示例为从 topic-B 消费消息。
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-3").withSampler(samplerConfig).withReporter(reporterConfig);
Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("topic-B")
.subscriptionName("open-tracing")
.subscriptionType(SubscriptionType.Shared)
.intercept(new TracingConsumerInterceptor<>())
.subscribe();
while (true) {
Message<String> received = consumer.receive();
System.out.println(received.getValue());
consumer.acknowledge(received);
}
现在,可以分别运行 Job-3、Job-2 和 Job-1。控制台中会出现 Job-3 接收的日志,如下:
[0] Hello Pulsar and OpenTracing!
[1] Hello Pulsar and OpenTracing!
...
[9] Hello Pulsar and OpenTracing!
现在,你可以再次打开 Jaeger UI,页面中会出现十条消息追踪链路。
点击任务名称即可查看消息追踪链路的详细信息。
可以从 span 名称轻松辨别是 producer 还是 consumer 发布了此条消息,span 名称格式为 `To__<topic-name>` 和 `From__<topic-name>__<subscription_name>`。
总 结
OpenTracing Pulsar Client 集成了 Pulsar 客户端和 OpenTracing,可以实现轻松地追踪消息。如果你在应用程序中使用了 Pulsar 和 OpenTracing,赶快试一试吧!
https://hub.streamnative.io/monitoring/opentracing-pulsar-client/0.1.0
另外,我们还发布过一篇类似的博客《如何使用 Apache Skywalking 追踪 Apache Pulsar 消息》,可以点击文末「阅读原文」进行查看。
想要随时掌握 Pulsar 的研发进展、用户案例和热点话题吗?快来关注 Apache Pulsar 和 StreamNative 微信公众号,我们第一时间在这里分享与 Pulsar 有关的一切。