StreamNative 和中国移动宣布开源 AoP: Apache Pulsar 支持原生 AMQP 协议
我们很高兴地宣布 StreamNative 和中国移动开源了 “AoP”(AMQP on Pulsar)。AoP 将 AMQP 协议处理插件引入 Pulsar broker。这样一来,Apache Pulsar 就支持原生 AMQP 协议。
与 KoP(https://hub.streamnative.io/protocol-handlers/kop/0.2.0)相似,AoP 是一种可插拔的协议处理插件。
将 AoP 协议处理插件添加到现有 Pulsar 集群后,用户不用修改代码就可以将现有的 RabbitMQ 应用程序和服务迁移到 Pulsar。
这样,RabbitMQ 应用程序就可以使用 Pulsar 的强大功能,例如利用 Apache BookKeeper 保存事件流和 Pulsar 分层存储特性。
Apache Pulsar 是一个流数据平台。最初,Apache Pulsar 就采用云原生、分层分片的架构。该架构将服务和存储分离开来,使系统实现更友好的容器化。
Pulsar 的云原生架构具备强扩展性、高一致性和高弹性,使公司能通过实时数据解决方案扩展业务。自 2016 年开源以来,Pulsar 已得到广泛采用,并于 2018 年成为 Apache 顶级项目。
Plusar 为队列和流工作负载提供统一的消息模型。Pulsar 支持自己基于 protobuf 的二进制协议,以确保高性能和低延迟。protobuf 有利于实现 Pulsar 客户端。
而且,该项目也支持 Java,Go,Python 和 C ++ 语言以及社区提供的第三方客户端。
https://pulsar.apache.org/docs/en/client-libraries/#thirdparty-clients
但是,对于使用其他消息传输协议编写的应用程序,用户必须重写这些应用程序,否则这些应用程序无法采用 Pulsar 新的统一消息传输协议。
为了解决这一问题,Pulsar 社区开发了一些应用程序,以便将 AMQP 应用程序从其他消息系统迁移到 Pulsar。例如,Pulsar 提供了 RabbitMQ Source connector 和 RabbitMQ Sink connector,允许用户在 Pulsar 和 RabbitMQ 之间传输数据。但是,那些想要从其他 AMQP 应用程序切换到 Pulsar 的用户仍然有强烈的需求。
StreamNative 收到大量的用户请求,请求帮助从其他消息系统迁移到 Pulsar 。同时,StreamNative 也意识到在 Pulsar 上原生支持其他消息传输协议(例如 AMQP 和 Kafka)的必要性。
所以,StreamNative 开始致力于将通用协议处理插件框架引入到 Pulsar 中。该框架允许使用其他消息传输协议的开发人员使用 Pulsar。
中国移动是 OpenStack Foundation 的金牌会员,拥有全球最多的 OpenStack 集群部署实践。 RabbitMQ 是 OpenStack 消息中间件的缺省集成。在部署和维护 RabbitMQ 的过程中,中国移动遭遇了巨大挑战。
在 OpenStack 系统中,作为 RPC 通信组件,RabbitMQ 处理大量流入和流出的消息。在操作过程中,经常会积压消息。这将导致内存异常,从而进一步导致进程因为内存异常而卡住。
另一方面,RabbitMQ 的镜像队列用于确保数据的高可用性。当节点进入异常状态时,整个群集将无法正常使用。此外,RabbitMQ 的编程语言 erlang 晦涩难懂且难以解决。综上所述,考虑到 RabbitMQ 集群的不稳定性,以及操作、维护和故障排除的难度,中国移动计划开发一种可以替代 RabbitMQ 的中间件产品。
同时,中国移动的公有云中有很多需要使用 AMQP 消息队列的客户,但是现有的 RabbitMQ 不满足云访问的条件,因此,中国移动的中间件团队开始研究 AMQP 消息队列。在对比 Qpid,RocketMQ 和 Pulsar 之后,中国移动被 Pulsar 的计算存储分离架构所吸引。
在对 Pulsar 进行了一段时间的调查之后,ChinaMobile 发现 StreamNative 已经开源了 KoP,这使得中国移动确定基于 Pulsar 开发 AMQP 具有可行性。中国移动开始与 StreamNative 共同开发 AoP 协议处理插件。
实现方式
AoP 是一个可插拔的协议处理插件,可以通过使用 Pulsar 的 topics, cursors 等特性在 Pulsar 上支持原生 AMQP 协议。
下图展示了AoP 协议处理插件与 Pulsar 集群的结合。AMQP Proxy 服务和 AMQP 协议处理插件都与 Pulsar broker 一起运行。目前,AoP 是基于 AMQP 0.9.1 协议进行开发,我们也在考虑增加对 AMQP 1.0 协议的支持。
AMQP 0.9.1 引入了一些基础概念,例如 Exchagne, Queue 和 Router。这些与 Pulsar 的模型有着较大的区别。所以我们需要找到一种方法,支持利用 Pulsar 现有的一些特征,并将它们联系在一起。下图展示了消息在 AoP 中的流转,并讨论了关于消息持久化,消息路由,消息投递的细节。
当 Producer 发送消息到 AmqpExchange,AmqpExchange 将消息持久化到 Pulsar Topic (我们称之为存储原始消息的 Topic)。 AmqpExchange 的 replicator 会将消息传递给 Router。 Router 判断是否需要将消息路由给 AmqpQueue。如果是,会将原始消息的 ID 存入AmqpQueue 的 Topic 中 (我们称之为存储索引消息的 Topic)。 AmqpQueue 将消息传递给 consumer。
📒 AmqpExchange
📒 AmqpMessageRouter
📒 AmqpQueue
📒 Vhost 分配
📒 Proxy
AMQP 客户端建立与 AoP Proxy 的连接。
AoP Proxy 向 Pulsar cluster 发送查找请求,以便确定 Vhost 的 owner broker 的 URL 地址。
Pulsar 集群 将 owner broker 的 URL 地址返回给 AoP Proxy。
AoP Proxy 建立与 Vhose 的 owner broker 的连接并开始在 AMQP 客户端和 Vhost 的 owner broker 之间传输数据。
https://github.com/streamnative/aop/wiki
🙋♂️ 试用 AoP
https://github.com/streamnative/aop
http://streamnative.io/download
https://github.com/streamnative/aop/releases/download/v0.1.0/pulsar-protocol-handler-amqp-0.1.0-SNAPSHOT.nar
https://hub.streamnative.io/protocol-handlers/aop/0.1.0/
🎙️ 致 谢