快来看!Schema 真的好简单啊!
每周一份 Pulsar 的直播干货,不知道大家吸收的怎么样鸭?上周末斯杰大佬又给大家带来了 Pulsar 的基础干货——关于 Pulsar Schema 的相关内容。
首先回顾一些最近一周 Pulsar 的相关进展:
1. Pulsar Summit 2020 线上峰会将在 6 月 17-18 日举行。想要参与的小伙伴可以去官网查看更多细节。
🔗https://pulsar-summit.org
2. Pulsar 2.5.2 版本已发布!具体详情参考👉🏻《Apache Pulsar 发布 2.5.2 版本》。
3. Pulsar 2.6.0 版本也已在计划中,敬请期待!
4. 新增 PIP-64: Introduce REST endpoints for producing, consuming and reading messages
5. 新增 PIP-65:Adapting Pulsar IO Sources to support Batch Sources
6. 5 月 26 日,来自 Nutanix 的 Shivji Kumar Jha 带来关于「Lessons From Managing A Pulsar Cluster」的线上研讨会。
如果你想参与此活动,可以通过以下链接进行申请,当然也可以期待后续的视频回放。
https://us02web.zoom.us/webinar/register/2015887356422/WN_gZqEn6KTRu25wh4d36bdCw
接下来就一起看看关于 Pulsar Schema 的相关细节吧!
抛砖引玉
在 schema 层面,我们假设用户已经开始使用 Pulsar producer API 发送消息,形式为 byte 字节组。
像下图事例中,要发出一个「用户」相关数据(姓名、年龄),就需要一套序列化和反序列化工具,将其变成字节数组再发送出去。
这就是典型的 producer-consume 的结构运行,也是大部分消息中间件都会用到的结构化处理流程。
这个流程下可能还需要额外考虑一些问题,比如这个信息对象里是否有字段缺失、结构里是否有字段类型发生改变等情况。
所以就会导致出现多个 producer 或者 consumer 都采取类似的操作,来保证 consumer 能消费到正确格式的数据。
那么如果提前设计好该结构的数据格式,是不是所有的操作就可以一步到位更简单化了呢?
这就是 schema 的效果啦。
Schema 介绍
在前边介绍的这种情况下,就需要系统去提供相应的 schema 去定义数据格式。Schema 的作用就是「如何序列化和反序列化数据」。以前,你可能需要人工去制定使用哪个序列化框架,但现在就可以用 schema 解决掉了。同时它还可以更新迭代数据结构,以及兼容多数据格式。
在 Pulsar 里,通常用 「JSON」类型去存储 schema 信息。其中用到的几个数据在这里解释一下:
`type`: 所使用的 schema 类型,后续会提到。
`schema`:你要使用的 schema 完整定义细节,内容则是取决于 schema type。
`properties`:跟 schema 关联的部分属性。
具体代码实例可参考回放视频 18:20-23:10 时间段。
在上边代码实例中,schema type 决定该数据如何进行序列化与反序列化。在 Pulsar 里,主要支持两类 schema,简单型和复杂型。
简单型的话,其实在大部分语言里都能看到。
简单型应用举例👆🏻
复杂型里用最多的两种就是 Key-Value 和 Struct。
Key-Value 是一个复合型,要同时定义 key schema 和 value schema,将这两者的信息组合在一起放置在 schema 信息文件内。通过 key schema 进行反序列化消息里的 key,通过 value schema 去反序列化消息里的 value。
Struct 则是存储在 topic 里的结构化数据。
目前 struct 支持三种类型, AVRO、JSON、Protobuf 这三种。真正把 struct 里的信息结构化还是通过 AVRO schema definition 来定义的。
定义 struct 时通常有两种方式。一种是静态的,struct 为固定的(已提前定义好),比如已经设置好的 POJO/AVRO/Protobuf。
第二种是通用的,即之前并不知道要消费消息的结构,这时通过这个通用的接口/结构去消费即可。
虽然是通用型的,但仍需 build 过程。对于 consumer 而言,可能真的不知道我要消费的 schema 是什么样子,能不能消费?这种情况下仍然是可以消费的。
在 Pulsar 里提供了 Auto schema 来进行此方式的消费。一个是 `AUTO_PRODUCE`,producer 根据 topic 中的模式验证字节,只有符合消息才会被生产。
另一个是 `AUTO_CONSUME`,consumer 将消息反序列化的结果纪录为“GenericRecord”。这个模式下就是 consumer 完全不知道 topic schema 且无法提前定义。
Schema 的运作流程
在上述过程或情况下,schema 究竟是如何运作来发挥作用的呢?
想要理解 schema 运作的详情,我们就一定要先知道,schema 信息只会存在于 producer 创建和连接 broker;以及 consumer 被创建并连接到 broker 的时间点内。之后的消息是不需要 schema 检验的。
以上方示例来简单说一下基本流程。
在创建 producer 时,提前声明表示出 `Schema.AVRO(User.class)` 为当前 producer 使用的 schema,这样 broker 就只需进行一次校验就可以,之后便可以顺畅的使用此 schema 来进行序列化消息。
当在客户端执行 `Schema.AVRO` 时,会根据当前 user 去生成相对应 schema 并放置到一个 「SchemaInfo」的 JSON 文件内。当创建一个新的 producer 时,第一次连接时也会带入 schema 的相关信息。
当 broker 接收到 schema 信息后,会检查此 topic 是否存在 schema。如果没有,则会直接创建。Topic 有 schema 后就会立刻进入第二步操作了。
这时 broker 会校验当前 schema 跟以往的 schema 是否兼容。如果兼容,则会生成一个全新版本的 schema,然后将 schema 信息返回给 producer。
如果不兼容,它就会将 producer 连接拒绝,同理 consumer 端也会进行相应操作。
所以, schema 的优点在于它可以在 broker 层面进行数据完整性的校验,避免了 producer 会产生一些 consumer 无法解析的数据,进而将流程进行清晰化的梳理。
Schema Evolution
在前边提到了 schema 兼容性检查,也就是 schema evolution。通常,它需要检查字段的增加/减少、类型的更改等。这里就出现了「Schema compatibility check」的工具,每个 schema 都会对应一个检查器。
其中,每个模式类型都会有一个检查器,同时由 `schemaRegistryCompatibilityCheckers` 进行配置。目前,只有 AVRO 和 JSON 格式文件支持 schema evolution。
关于 schema 兼容性检测,主要是以下几种:
具体实际代码操作可以参考回放视频 48:00-52:00 时间段。
管理 schema
在管理 schema 层面就很简单了,因为 Pulsar 本身就提供了丰富的管理端程序。有了 Schema Restful API,你就可以轻松上传、获取、删除 schema 信息了。
同时 schema 在与其他的大数据处理系统结合时,也会很方便很实用。目前 Pulsar 支持 Presto、Hive、Flink、Spark 的 schema 集成。
直播过程中斯杰大佬也为我们演示了 Pulsar 与 Presto 集成的代码端操作,具体详情可查看回放视频 53:30-64:00 时间段。
Q & A
Q:使用 schema 之后,底层默认使用的哪种方式序列化消息?
A:Schema 的类型决定了你如何进行序列化。比如你的 schema 是 JSON,那它用的是 JSON 的序列化与反序列化;如果是 AVRO,就是 AVRO 方式的序列化。在反序列化结束后,需要通过 Protobuf 进行中间传输。
总 结
通过本讲我们可以了解到,在 Pulsar 系统中,更推荐大家使用 schema 进行生产和消费。因为通过它可以节省很多对于序列化、反序列化处理过程,同时能保证数据的统一性和管理性,以及方便与其他系统结构的融合搭配。
在下周的分享中,将会为大家带来「Queuing and Streaming - Learn Pulsar Subscription Types」相关的内容,敬请期待!
👇🏻报名直播可直接点击「阅读原文」进行