最佳实践|如何写出简单高效的 Flink SQL?
1. Flink SQL Insight 2. Best Practices 3. Future Works
01
Flink SQL Insight
02
Best Practices
2.1 Sub-Plan Reuse
2.2 Fast Aggregation
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s // 用户按需配置
作业对延迟没有严格要求(因为攒 Batch 本身会带来延迟);
当前 Aggregate 算子访问 State 是瓶颈;
下游算子的处理能力不足(开启 MiniBatch 会减少往下游输出的数据量)。
所有 Aggregate Function 都实现了 merge 方法,不然没法在 GlobalAgg 在对 LocalAgg 的结果进行合并;
LocalAgg 聚合度比较高,或者 GlobalAgg 存在数据倾斜,否则开启 LocalAgg 会引入额外的计算开销。
table.optimizer.distinct-agg.split.enabled: true;
table.optimizer.distinct-agg.split.bucket-num:1024
Query 中存在 Distinct Function,且存在数据倾斜;
非 Distinct Function 只能是如下函数:count,sum,avg,max,min;
数据集比较大的时候效果更好,因为 Partial/Final 两层 Agg 之间会引入额外的网络 Shuffle 开销;
Partial Agg 还会引入额外的计算和 State 的开销。
2.3 Fast Join
当 Join Key 中带有 PrimaryKey(以下简称 PK) 时,State 中存储的数据较少,它只存了输入数据和对应的关联次数。当 Join 的输入存有 PK 时,State 中存储了一个 Map,Map Key 是 PK,Value 是输入数据以及关联的次数。当 Join Key 和 Join 输入都没有 PK 时,State 仍用 Map 存储,但它的 Key 是输入的数据,Value 是这一行数据的出现次数和关联次数。虽然它们都是用 Map 存储,Join 输入带 PK 时的 State 访问效率高于没有 PK 的情况。因此建议用户在 Query 中尽量定义 PK 信息,帮助优化器更好的优化。
建议在 Join 之前只保留必要的字段,让 Join State 里面存储的数据更少。
将 Regular Join 改写成 Lookup Join。主流来一条数据会触发 Join 计算,Join 会根据主流的数据查找维表中相关最新数据,因此 Lookup Join 不需要 State 存储输入数据。目前很多维表 Connector 提供了点查机制和缓存机制,执行性能非常好,在生产中被大量使用。后面章节会单独介绍 Lookup Join 相关优化。Lookup Join 的缺点是当维表数据有更新时,无法触发 Join 计算。
将 Regular Join 改写为 Interval Join。Interval Join 是在 Regluar Join 基础上对 Join Condition 加了时间段的限定,从而在 State 中只需要存储该时间段的数据即可,过期数据会被及时清理。因此 Interval Join 的 State 相比 Regular Join 要小很多。
把 Regular Join 改写成 Window Join。Window Join 是在 Regluar Join 基础上定义相关 Window 的数据才能被 Join。因此,State 只存放的最新 Window,过期数据会被及时清理。
把 Regular Join 改写成 Temporal Join。Temporal Join 是在 Regular Join 上定义了相关版本才能被 Join。Temporal Join 保留最新版本数据,过期数据会被及时清理。
2.4 Fast Lookup Join
Full Caching,将所有数据全部 Cache 到内存中。该方式适合小数据集,因为数据量过大会导致 OOM。Flink 提供了 Table Hints 开启。同时,还可以通过 Hints 定义 reload 策略。
Partial Caching,适合大数据集使用,框架底层使用 LRU Cache 保存最近被使用的数据。当然,也可以通过 Hint 定义 LRU Cache 的大小和 Cache 的失效时间。
No Caching,即关闭 Cache。
2.5 Fast Deduplication
2.6 Fast TopN
不要输出 row_number 字段,这样可以大大减少下游处理的数据量。如果下游需要排序,可以在前端拿到数据后重排。
增加 TopN 算子中 Cache 大小,减少对 State 的访问。Cache 命中率的计算公式为:cache_hit = cache_size * parallelism / top_n_num / partition_key_num。由此可见,增加 Cache 大小可以增加 Cache 命中率(可以通过 table.exec.rank.topn-cache-size 修改 Cache 大小,默认值是 1 万)。需要注意的是,增加 Cache 大小时,TaskManager 的内存也需要相应增加。
分区字段最好与时间相关。如果 Partition 字段不与时间属性关联,无法通过 TTL 进行清理,会导致 State 无限膨胀。(配置了 TTL,数据被过期清理可能导致结果错误,需要慎重)
2.7 Efficient User Defined Connector
SupportsFilterPushDown,将 Filter 条件下推到 Scan 里,从而减少 Scan 的读 IO,以提高性能。
SupportsProjectionPushDown,告诉 Scan 只读取必要的字段,减少无效的字段读取。
SupportsPartitionPushDown,在静态优化时,告诉 Scan 只需要读取有效分区,避免无效分区读取。
SupportsDynamicFiltering,在作业运行时,动态识别出哪些分区是有效的,避免无效分区读取。
SupportsLimitPushDown,将 limit 值下推到 Scan 里,只需要读取 limit 条数据即可,大大减少了 Scan I/O。
SupportsAggregatePushDown,直接从 Scan 中读取聚合结果,减少 Scan 的读 I/O,同时输出给下游的数据更少。
SupportsStatisticReport,Connector 汇报统计信息给优化器,以便优化器产生更优的执行计划。
2.8 Use Hints Well
Table Hints,主要用于改变 Table 表的配置,例如 Lookup Table 的缓存策略可以通过 Table Hint 进行修改。
Query Hints,当前它可以建议优化器选择合适的 Join 策略。Lookup Join 可以通过 Hint 修改 Lookup 策略,请参考上述 Lookup Join 相关优化的介绍;也可以通过 Hint 让优化器为批作业选择对应的 Join 算法,具体请参考上图所示。
03
Future Works
最后,我们希望优化更加智能,会探索做一些动态优化相关工作,例如根据流量变化,在线自动优化 plan。
往期精选