其他
Flink 1.11:更好用的流批一体 SQL 引擎
The following article is from Flink 中文社区 Author 陈玉兆(玉兆)
更方便的追加或修改表定义 灵活的声明动态的查询参数 加强和统一了原有 TableEnv 上的 SQL 接口 简化了 connector 的属性定义 对 Hive 的 DDL 做了原生支持 加强了对 python UDF 的支持
Create Table Like
CREATE [TEMPORARY] TABLE base_table (
id BIGINT,
name STRING,
tstmp TIMESTAMP,
PRIMARY KEY(id)
) WITH (
'connector': 'kafka'
)
CREATE [TEMPORARY] TABLE derived_table (
WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
LIKE base_table;
CREATE [TEMPORARY] TABLE derived_table (
id BIGINT,
name STRING,
tstmp TIMESTAMP,
PRIMARY KEY(id),
WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
) WITH (
‘connector’: ‘kafka’
)
多属性策略
ALL:完整的表定义 CONSTRAINTS: primary keys, unique key 等约束 GENERATED: 主要指计算列和 watermark OPTIONS: WITH (...) 语句内定义的 table options PARTITIONS: 表分区信息
INCLUDING:包含(默认行为) EXCLUDING:排除 OVERWRITING:覆盖
CREATE [TEMPORARY] TABLE base_table (
id BIGINT,
name STRING,
tstmp TIMESTAMP,
PRIMARY KEY(id)
) WITH (
'connector': 'kafka',
'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',
'format': 'json'
)
CREATE [TEMPORARY] TABLE derived_table (
WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
WITH (
'connector.starting-offset': '0'
)
LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS);
CREATE [TEMPORARY] TABLE derived_table (
id BIGINT,
name STRING,
tstmp TIMESTAMP,
WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
) WITH (
'connector': 'kafka',
'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',
'format': 'json'
)
Dynamic Table Options
create table kafka_table (
id bigint,
age int,
name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'employees',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '123456',
'format' = 'csv',
'csv.ignore-parse-errors' = 'false'
)
在之前的版本,如果用户有如下需求:
用户需要指定特性的消费时间戳,即修改 scan.startup.timestamp-millis 属性 用户想忽略掉解析错误,需要将 format.ignore-parse-errors 改为 true
table_name /*+ OPTIONS('k1'='v1', 'aa.bb.cc'='v2') */
CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);
-- override table options in query source
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
-- override table options in join
select * from
kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1
join
kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2
on t1.id = t2.id;
-- override table options for INSERT target table
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;
// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.dynamic-table-options.enabled", "true");
SQL API 改进
原先的 sqlUpdate() 方法传递 DDL 语句会立即执行,而 INSERT INTO 语句在调用 execute 方法时才会执行 Table 程序的执行入口不够清晰,像 TableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 都可以触发 table 程序执行 execute 方法没有返回值。像 SHOW TABLES 这样的语句没有很好地方式返回结果。另外,sqlUpdate 方法加入了越来越多的语句导致接口定义不清晰,sqlUpdate 可以执行 SHOW TABLES 就是一个反例 在 Blink planner 一直提供多 sink 优化执行的能力,但是在 API 层没有体现出来
更清晰的执行语义
组织多条语句一起执行
新旧 API 对比
Current Interface | New Interface |
tEnv.sqlUpdate("CREATE TABLE ..."); | TableResult result = tEnv.executeSql("CREATE TABLE ..."); |
tEnv.sqlUpdate("INSERT INTO ... SELECT ..."); tEnv.execute("test"); | TableResult result = tEnv.executeSql("INSERT INTO ... SELECT ..."); |
execute vs createStatementSet
Hive 语法兼容加强
EnvironmentSettings settings = EnvironmentSettings.newInstance()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// to use hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// use the hive catalog
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
create external table tbl1 (
d decimal(10,0),
ts timestamp)
partitioned by (p string)
location '%s'
tblproperties('k1'='v1');
create table tbl2 (s struct<ts:timestamp,bin:binary>) stored as orc;
create table tbl3 (
m map<timestamp,binary>
)
partitioned by (p1 bigint, p2 tinyint)
row format serde 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe';
create table tbl4 (
x int,
y smallint)
row format delimited fields terminated by '|' lines terminated by '\n';
更简洁的 connector 属性
使用 connector 作为 connector 的类型 key,connector 版本信息直接放到 value 中,比如 0.11 的 kafka 为 kafka-0.11 去掉了其余属性中多余的 connector 前缀 使用 scan 和 sink 前缀标记 source 和 sink 专有属性 format.type 精简为 format ,同时 format 自身属性使用 format 的值作为前缀,比如 csv format 的自身属性使用 csv 统一作前缀
CREATE TABLE kafkaTable (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'scan.startup.mode' = 'earliest-offset'
)
JDBC catalog
CREATE CATALOG mypg WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);
USE CATALOG mypg;
Python UDF 增强
DDL 定义 python UDF
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();
向量化支持
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type="pandas")
def add(i, j):
return i + j
table_env = BatchTableEnvironment.create(env)
# register the vectorized Python scalar function
table_env.register_function("add", add)
# use the vectorized Python scalar function in Python Table API
my_table.select("add(bigint, bigint)")
# use the vectorized Python scalar function in SQL API
table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")
详情参见:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/metrics.html
展望后续
了解更多 Flink 1.11 重大变更与新增功能特性可点击「阅读原文」~
关注 中文社区,获取更多技术干货
你在看」吗?