RisingWave 1.7 发布!新增自适应并行度、支持加密函数等多个功能特性
1自适应并行度
自 1.7 版开始,所有新增的流处理作业将默认启用自适应并行度。也就是说,如果您向集群添加了额外的节点或 CPU,RisingWave 将会自动调整并行度以充分利用新增的资源,为用户提供更为顺畅的扩展体验。如果您尚未准备好使用这一功能,可以将 disable_automatic_parallelism_control
设置为 true
,在整个会话中禁用此选项,以最大程度地确保系统的稳定性。
如有需要,可以为 Table、物化视图或 Sink 设置固定的并行度,以便控制流带宽,并方便预测资源分配。
ALTER TABLE table_name SET PARALLELISM = 8;
更多细节,请查看:
Cluster scaling[1] 「集群扩展」
2从外部 Source 读取所有列与 Schema
在 RisingWave 中,创建 Table 或 Source 以从外部读取数据时,必须定义 Schema,即指定每个列的列名和数据类型,除非数据采用 Protobuf 格式。
若存在大量列,逐一定义每列可能会很繁琐。此次更新后,您可使用 *
表示应在 Table 或 Source 中包含来自 Schema Registry 的所有列。额外生成的列也可以包括在内。请记住,此方法仅在定义了外部 Schema Registry 时才有效。
CREATE TABLE from_kafka (
*,
gen_i32_field int AS int32_field + 2)
WITH (
connector = 'kafka',
topic = 'test',
properties.bootstrap.server = 'message_queue:29092')
FORMAT UPSERT ENCODE AVRO (
schema.registry = 'http://message_queue:8081');
更多细节,请查看:
CREATE TABLE[2] 「CREATE TABLE 命令」 CREATE SOURCE[3]「CREATE SOURCE 命令」
3INCLUDE
子句
现在支持在 CREATE SOURCE
和 CREATE TABLE
使用 INCLUDE
子句,用于添加不属于有效负载部分的消息组件作为附加列。例如,INCLUDE
子句可以添加消息键(Message Key)、时间戳或主题(Topic Header)这样的列,以进行下一步分析。比如,您可以在创建 Table 或 Source 时添加时间戳列:
CREATE SOURCE s_name(
...
) INCLUDE timestamp AS include_ts
WITH (...)
FORMAT ... ENCODE ...;
使用 AS
指定别名是可选的,但如果没有指定,列名则将根据所使用的连接器类型自动生成。因此,如果上述 Source 连接的是 Kafka 消息队列,生成的列名将是 _rw_kafka_timestamp
。
在从数据流中摄取数据时,还有以下选项可选,包括键、分区、偏移、时间戳或 Header。
INCLUDE {key | partition | offest | timestamp | header} [AS col_name]
此外,对于 UPSERT
类型的 Source 和 Table,必须要用 INCLUDE KEY
,因为 RisingWave 将使用此列执行 UPSERT
语义。在这种情况下,不能将主键定义为多个列。
更多细节,请查看:
INCLUDE clause[4] 「 INCLUDE
子句」
4增强 UDF 支持
1.7 版本增强了对用户自定义函数(UDF)的支持。UDF 让您能在数据转换和计算方面更灵活。您可以选择使用编程语言定义外部 UDF,或在 RisingWave 内使用 SQL 定义 UDF。一旦创建了 UDF,就可以像使用任何其他内置函数一样在 SQL 查询中使用它们。此次更新,我们新增用 JavaScript 和 Rust 定义的外部 UDF,同时新增 SQL UDF。
JavaScript UDF
您可以通过使用 CREATE FUNCTION
命令在 RisingWave 中创建 JavaScript UDF。与其他语言定义的 UDF 相比,创建 JavaScript UDF 最为简单,因为 RisingWave 中内置了 QuickJS 虚拟机。您不需要在本地机器上进行额外设置,也不需要在单独的文件中定义函数。JavaScript UDF 仅限于计算任务,如果您的数据需要简单但重复的转换,那么它们是完美的选择。
CREATE FUNCTION gcd(a int, b int) RETURNS int LANGUAGE javascript AS $$
if(a == null || b == null) {
return null;
}
while (b != 0) {
let t = b;
b = a % b;
a = t;
}
return a;
$$;
Rust UDF
您也可以使用 Rust 在 RisingWave 中创建 UDF。您可以选择将 WebAssembly (WASM) 二进制文件以 base64 编码嵌入到 SQL 中,或从对象存储加载 WASM 二进制文件。与使用 Python 和 Java 定义的 UDF 相比,用 Rust 定义的 UDF 在性能方面有更好的表现,但仅限于计算任务。与使用 JavaScript 创建的 UDF 不同,Rust 需要额外的设置。具体请参见下方链接。
SQL UDF
您还可以用 SQL 在 RisingWave 内部创建 UDF。通过这种方式,您可以封装常用逻辑,并将复杂的操作和计算抽象为可重用的函数,提高查询的可读性。此外,创建 UDF 时,您可以调用内置的 SQL 函数和预定义的 SQL UDF,但目前不支持递归定义。
以下是一个简单的 SQL UDF 示例,用于将两个参数相加。对于输入参数,在定义时,您可以选择是否命名。以下例子中的参数是未命名的。
CREATE FUNCTION add(INT, INT)
RETURNS int
LANGUAGE SQL
AS $$select $1 + $2$$;
SELECT add(1, -1);
----返回结果
0
更多细节,请查看:
Use UDFs in JavaScript[5] 「用 JavaScript 使用 UDF」 Use UDFs in Rust[6] 「用 Rust 使用 UDF」 SQL UDFs[7]
5支持加密函数
加密函数是将输入内容转换为不可读格式的算法,用来保护数据安全。对于加密后的数据,可以通过特定的密钥或算法解密,将其还原回其原始形式。您现在可以使用原始加密函数 encrypt
和 decrypt
来加解密数据。需要注意的是,这两个函数只是根据给定的算法对输入内容加密,不提供额外的安全措施。以下是它们的句法:
encrypt(data bytea, key bytea, type text) -> bytea
decrypt(data bytea, key bytea, type text) -> bytea
加密或解密的具体算法由 type
指定,type
由 algorithm
、mode
和 padding
构成。其具体设置可参见下方链接。
algorithm [-mode] [/pad:padding]
以下是一个示例,用于加密”Hello, World!”,其使用 aes
算法、cbc
模式,并以 pkcs
方式填充。
SELECT encrypt('Hello, World!', 'my_secret_key', 'aes-cbc/pad:pkcs');
----返回结果
\\330\\317\\204\\357\\327\\367\\206\\241\\253\\024\\303\\013\\215\\030\\231\\257
(1 row)
更多细节,请查看:
Cryptographic functions[8] 「加密函数」
6CDC 连接器的增强功能
本次更新为 MySQL 和 PostgreSQL CDC 连接器引入了一些增强功能。
首先,创建共享的 MySQL 和 PostgreSQL Source 时,默认的 transactional
参数值已更改为 true
。如果创建 CDC Table,默认值则为 false
。此参数允许您对 CDC Table 或 Source 启用或禁用事务。
现在,在创建 MySQL 和 PostgreSQL CDC Table 时,也可使用 snapshot
参数。当此参数设置为 false
时,RisingWave 中的 CDC Table 将仅消费在 Table 创建后发生的上游事件。上游 MySQL 或 PostgreSQL Table 中的任何现有数据将不会出现在 RisingWave 的 Table 中。
CREATE TABLE orders_no_backfill (
order_id int,
order_date date,
customer_name string,
PRIMARY KEY (order_id)
) WITH (
snapshot = 'false'
) FROM pg_source TABLE 'public.orders_tx';
更多细节,请查看:
Ingest data from MySQL CDC[9]「从 MySQL CDC 导入数据」 Ingest data from PostgreSQL CDC[10]「从 PostgreSQL CDC 导入数据」
7总结
以上只是 RisingWave 1.7 版本新增的部分功能,如果您想了解本次更新的完整列表,包括新函数和集群设置等,请查看更详细的发布说明[11]。
如果您想提前了解下个月的版本及其新功能,请访问 RisingWave GitHub repository[12]。
如果您想了解 RisingWave 的所有动态,请在官网[13]订阅我们的邮件月刊,关注我们的 Twitter[14] 和 LinkedIn[15]。同时,也欢迎您加入我们的微信中文社群和 Slack[16] 英文社区,与我们的工程师还有全球各地的 RisingWave 爱好者交流!
Cluster scaling: https://docs.risingwave.com/docs/current/k8s-cluster-scaling/
[2]CREATE TABLE: https://docs.risingwave.com/docs/current/sql-create-table/
[3]CREATE SOURCE: https://docs.risingwave.com/docs/current/sql-create-source/
[4]INCLUDE clause: https://docs.risingwave.com/docs/current/include-clause/
[5]Use UDFs in JavaScript: https://docs.risingwave.com/docs/current/udf-javascript/
[6]Use UDFs in Rust: https://docs.risingwave.com/docs/current/udf-rust/
[7]SQL UDFs: https://docs.risingwave.com/docs/current/sql-create-function/#sql-udfs
[8]Cryptographic functions: https://docs.risingwave.com/docs/current/sql-function-cryptographic-functions/
[9]Ingest data from MySQL CDC: https://docs.risingwave.com/docs/current/ingest-from-mysql-cdc/
[10]Ingest data from PostgreSQL CDC: https://docs.risingwave.com/docs/current/ingest-from-postgres-cdc/
[11]发布说明: https://docs.risingwave.com/release-notes/
[12]RisingWave GitHub repository: https://github.com/risingwavelabs/risingwave
[13]官网: https://www.risingwave.com/
[14]Twitter: https://www.risingwave-labs.com/twitter?__hstc=32235681.25e2c16d83245fd21429e8d1b780a47c.1692637175278.1697232675775.1697659305484.23&__hssc=32235681.3.1697659305484&__hsfp=1531353701
[15]LinkedIn: https://www.risingwave-labs.com/linkedin?__hstc=32235681.25e2c16d83245fd21429e8d1b780a47c.1692637175278.1697232675775.1697659305484.23&__hssc=32235681.3.1697659305484&__hsfp=1531353701
[16]Slack: https://www.risingwave-labs.com/slack?__hstc=32235681.25e2c16d83245fd21429e8d1b780a47c.1692637175278.1697232675775.1697659305484.23&__hssc=32235681.3.1697659305484&__hsfp=1531353701
关于 RisingWave
RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了近 150 名开源贡献者和近 3000 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。往期推荐
技术内幕