技术解析|如何将 Pulsar 数据快速且无缝接入 Apache Doris
导读:Apache Doris Routine Load 支持了将 Kafka 数据接入 Apache Doris,并保障了数据接入过程中的事务性操作。Apache Pulsar 定位为一个云原生时代企业级的消息发布和订阅系统。那么 Apache Pulsar 用户如何将数据接入 Apache Doris 呢?本次分享将介绍利用 KoP 如何将 Pulsar 数据快速且无缝接入 Apache Doris。
KoP 架构介绍
KoP 是 Kafka on Pulsar 的简写,顾名思义就是如何在 Pulsar 上实现对 Kafka 数据的读写。KoP 将 Kafka 协议处理插件引入 Pulsar Broker 来实现 Apache Pulsar 对 Apache Kafka 协议的支持。将 KoP 协议处理插件添加到现有 Pulsar 集群后,用户不用修改代码就可以将现有的 Kafka 应用程序和服务迁移到 Pulsar。
利用企业级多租户特性简化运营。 避免数据搬迁,简化操作。 利用 Apache BookKeeper 和分层存储持久保留事件流。 利用 Pulsar Functions 进行无服务器化事件处理。
KoP 架构如下图,通过图可以看到 KoP 引入一个新的协议处理插件,该协议处理插件利用 Pulsar 的现有组件(例如 Topic 发现、分布式日志库-ManagedLedger、cursor 等)来实现 Kafka 传输协议。
Routine Load 订阅 Pulsar 数据思路
Apache Doris Routine Load 支持了将 Kafka 数据接入 Apache Doris,并保障了数据接入过程中的事务性操作。Apache Pulsar 定位为一个云原生时代企业级的消息发布和订阅系统,已经在很多线上服务使用。那么 Apache Pulsar 用户如何将数据接入 Apache Doris 呢,答案是通过 KoP 实现。
--------------------------
| Apache Doris |
| --------------- |
| | Routine Load | |
| --------------- |
--------------------------
|Kafka Protocol(librdkafka)
------------v--------------
| --------------- |
| | KoP | |
| --------------- |
| Apache Pulsar |
--------------------------
操作实践
JDK 安装:略 下载 Pulsar 二进制包,并解压:
#下载
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz
#解压并进入安装目录
tar xvfz apache-pulsar-2.10.0-bin.tar.gz
cd apache-pulsar-2.10.0
组件编译和安装
1. 下载 KoP 源码
git clone https://github.com/streamnative/kop.git
cd kop
mvn clean install -DskipTests
mkdir apache-pulsar-2.10.0/protocols
# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols
cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols
4. 添加后的结果查看
[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/
pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar
KoP 配置添加
1. 在 standalone.conf 或者 broker.conf 添加如下配置
#kop适配的协议
messagingProtocols=kafka
#kop 的NAR文件路径
protocolHandlerDirectory=./protocols
#是否允许自动创建topic
allowAutoTopicCreationType=partitioned
2. 添加如下服务监听配置
# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0
kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.
# If it’s not configured, it will be the same with `kafkaListeners` config by default
kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
当出现如下错误:
java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.
添加如下配置,开启 transactionCoordinatorEnabled
kafkaTransactionCoordinatorEnabled=true
transactionCoordinatorEnabled=true
#前台启动
#bin/pulsar standalone
#后台启动
pulsar-daemon start standalone
#进入Doris
mysql -u root -h 127.0.0.1 -P 9030
# 创建数据库
create database pulsar_doris;
#切换数据库
use pulsar_doris;
#创建clicklog表
CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog
(
`clickTime` DATETIME NOT NULL COMMENT "点击时间",
`type` String NOT NULL COMMENT "点击类型",
`id` VARCHAR(100) COMMENT "唯一id",
`user` VARCHAR(100) COMMENT "用户名称",
`city` VARCHAR(50) COMMENT "所在城市"
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog
COLUMNS(clickTime,id,type,user)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "127.0.0.1:9092",
"kafka_topic" = "test",
"property.group.id" = "doris"
);
pulsar_doris :Routine Load 任务所在的数据库
load_from_pulsar_test:Routine Load 任务名称
clicklog:Routine Load 任务的目标表,也就是配置 Routine Load 任务将数据导入到 Doris 哪个表中。
strict_mode:导入是否为严格模式,这里设置为 False。
format:导入数据的类型,这里配置为 Json。
kafka_broker_list:Kafka Broker 服务的地址
kafka_broker_list:Kafka Topic 名称,也就是同步哪个 Topic 上的数据。
property.group.id:消费组 ID
数据导入和测试
public class ClickLog {
private String id;
private String user;
private String city;
private String clickTime;
private String type;
... //省略getter和setter
}
消息构造和发送的核心代码逻辑如下:
String strDateFormat = "yyyy-MM-dd HH:mm:ss";
@Autowired
private Producer producer;
try {
for(int j =0 ; j<50000;j++){
int batchSize = 1000;
for(int i = 0 ; i<batchSize ;i++){
ClickLog clickLog = new ClickLog();
clickLog.setId(UUID.randomUUID().toString());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);
clickLog.setClickTime(simpleDateFormat.format(new Date()));
clickLog.setType("webset");
clickLog.setUser("user"+ new Random().nextInt(1000) +i);
producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));
}
}
} catch (Exception e) {
e.printStackTrace();
}
2. ROUTINE LOAD 任务查看执行 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;命令,查看导入任务的状态。
mysql> SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;
*************************** 1. row ***************************
Id: 87873
Name: load_from_pulsar_test
CreateTime: 2022-05-31 12:03:34
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:pulsar_doris
TableName: clicklog1
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"clickTime,id,type,user","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Europe/London","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}
CustomProperties: {"group.id":"doris","kafka_default_offsets":"OFFSET_END","client.id":"doris.client"}
Statistic: {"receivedBytes":5739001913,"runningTxns":[],"errorRows":0,"committedTaskNum":168,"loadedRows":50000000,"loadRowsRate":23000,"abortedTaskNum":1,"errorRowsAfterResumed":0,"totalRows":50000000,"unselectedRows":0,"receivedBytesRate":2675000,"taskExecuteTimeMs":2144799}
Progress: {"0":"51139566"}
Lag: {"0":0}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)
ERROR:
No query specified
从上面结果可以看到 totalRows 为 50000000,errorRows 为 0。说明数据不丢不重的导入 Apache Doris 了。
3. 数据统计验证执行如下命令统计表中的数据,发现统计的结果也是 50000000,符合预期。
mysql> select count(*) from clicklog;
+----------+
| count(*) |
+----------+
| 50000000 |
+----------+
1 row in set (3.73 sec)
mysql>
通过 KoP 我们实现了将 Apache Pulsar 数据无缝接入 Apache Doris ,无需对 Routine Load 任务进行任何修改,并保障了数据导入过程中的事务性。与此同时,Apache Doris 社区已经启动了 Apache Pulsar 原生导入支持的设计,相信在不久后就可以直接订阅 Pulsar 中的消息数据,并保证数据导入过程中的 Exactly-Once 语义。
加入社区
如果你对 Apache Doris 感兴趣,请点击“阅读原文”了解并加入 Doris!我们也发起了征文活动,邀你讲讲与Doris “相遇 相知 相识”的故事,不仅有精美礼品相送,还可获得 SelectDB 全渠道传播曝光加持!最后,欢迎更多的开源技术爱好者加入 Apache Doris 社区,携手成长,共建社区生态。
相关链接:
SelectDB 官方网站:
https://selectdb.com
Apache Doris 官方网站:
http://doris.apache.org
Apache Doris Github:
https://github.com/apache/doris
Apache Doris 开发者邮件组:
dev@doris.apache.org