查看原文
其他

实战 | Flink 与 Hive 的磨合期

Editor's Note

在本文中,我们将一起学习通过 Flink SQL 比较详细地去实战操作 Hive 数据库,以及熟识 Flink SQL 提供的一些功能。同时祝读者朋友们情人节快乐!对你的ta好一点,早日脱单,早日走过磨合期哦~笔芯

The following article is from DataFlow范式 Author Jason

热文回顾:Apache Flink 1.10.0 重磅发布,合并blink,集成Hive!


有不少学习小密圈同学反馈,参考前面分享的文章《Hive 终于等来了 Flink》部署 Flink 并集成 Hive 时,出现一些 bug 以及兼容性等问题。

虽已等来,却未可用。

所以增加了这一篇文章,作为姊妹篇。

回顾

在上篇文章中,我们使用的 CDH 版本为 5.16.2,其中 Hive 版本为 1.1.0(CDH 5.x 系列 Hive 版本都不高于 1.1.0,是不是不可理解),Flink 源代码本身对 Hive 1.1.0 版本兼容性不好,存在不少问题。为了兼容目前版本,我们基于 CDH 5.16.2 环境,对 Flink 代码进行了修改,重新打包并部署。

其实经过很多开源项目的实战,比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情况下,替换一些 Jar 包,是可以解决兼容性的问题。对于我们的环境来说,可以使用 Hive 1.2.1 版本的一些 Jar 包来代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的开始部分,我们会解决这个问题,然后再补充上篇文章缺少的实战内容。

剪不断理还乱的问题

根据读者的反馈,我们将所有的问题总结为三类:

  • 1. Flink 如何连接 Hive 除了 API 外,有没有类似 spark-sql 命令

  • 2. 识别不到 Hadoop 环境或配置文件找不到

  • 3. 依赖包、类或方法找不到

1. Flink 如何连接 Hive

有的读者不太清楚,如何配置 Flink 连接 Hive 的 Catalog,这里补充一个完整的 conf/sql-client-hive.yaml 示例:

  1. catalogs:

  2. - name: staginghive

  3. type: hive

  4. hive-conf-dir: /etc/hive/conf

  5. hive-version: 1.2.1


  6. execution:

  7. planner: blink

  8. type: batch

  9. time-characteristic: event-time

  10. periodic-watermarks-interval: 200

  11. result-mode: table

  12. max-table-result-rows: 1000000

  13. parallelism: 1

  14. max-parallelism: 128

  15. min-idle-state-retention: 0

  16. max-idle-state-retention: 0

  17. current-catalog: staginghive

  18. current-database: ssb

  19. restart-strategy:

  20. type: fallback


  21. deployment:

  22. response-timeout: 5000

  23. gateway-address: ""

  24. gateway-port: 0

  25. m: yarn-cluster

  26. yn: 2

  27. ys: 5

  28. yjm: 1024

  29. ytm: 2048

sql-client-hive.yaml 配置文件里面包含:

  • 1. Hive 配置文件 catalogs 中配置了 Hive 的配置文件路径。

  • 2. Yarn 配置信息 deployment 中配置了 Yarn 的配置信息。

  • 3. 执行引擎信息 execution 配置了 blink planner,并且使用 batch 模式。batch 模式比较稳定,适合传统的批处理作业,而且可以容错,另外中间数据落盘,建议开启压缩功能。除了 batch,Flink 也支持 streaming 模式。

Flink SQL CLI 工具

类似 spark-sql 命令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 脚本。在 Flink 1.10 版本中,Flink SQL CLI 改进了很多功能,我们后面讲解。

sql-client.sh 使用方式如下:

  1. $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

2. 识别不到 Hadoop 环境或配置文件找不到

我们在上篇文章中提到过,在部署 Flink 的环境上部署 CDH gateway,包括 Hadoop、Hive 客户端,另外还需要配置一些环境变量,如下:

  1. export HADOOP_CONF_DIR=/etc/hadoop/conf

  2. export YARN_CONF_DIR=/etc/hadoop/conf

  3. export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive

  4. export HIVE_CONF_DIR=/etc/hive/conf

3. 依赖包、类或方法找不到

先查看一下 Flink 家目录下的 lib 目录:

  1. $ tree lib

  2. lib

  3. ├── flink-connector-hive_2.11-1.10.0.jar

  4. ├── flink-dist_2.11-1.10.0.jar

  5. ├── flink-hadoop-compatibility_2.11-1.10.0.jar

  6. ├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar

  7. ├── flink-table_2.11-1.10.0.jar

  8. ├── flink-table-blink_2.11-1.10.0.jar

  9. ├── hive-exec-1.1.0-cdh5.16.2.jar

  10. ├── hive-metastore-1.1.0-cdh5.16.2.jar

  11. ├── libfb303-0.9.3.jar

  12. ├── log4j-1.2.17.jar

  13. └── slf4j-log4j12-1.7.15.jar

如果上面前两个问题都解决后,执行如下命令:

  1. $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

报错,报错,还是报错:

Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory

其实在运行 sql-client.sh 脚本前,需要指定 Hadoop 环境的依赖包的路径,建议不要报错一个添加一个,除非有的读者喜欢。这里我们提示一个方便的方式,即设置 HADOOPCLASSPATH(可以添加到 ~/.bashprofile 中)环境变量:

  1. export HADOOP_CLASSPATH=`hadoop classpath`

再次执行:

  1. $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

很抱歉,继续报错:

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client

这里就是 Hive 1.1.0 版本的 Jar 包与 Flink 出现版本不兼容性的问题了,解决方法是:

  • 1. 下载 apache-hive-1.2.1 版本

  • 2. 替换 Flink lib 目录下的 Hive Jar 包 删除掉 hive-exec-1.1.0-cdh5.16.2.jar、 hive-metastore-1.1.0-cdh5.16.2.jar 和 libfb303-0.9.3.jar,然后添加 hive-exec-1.2.1.jar、 hive-metastore-1.2.1.jar 和 libfb303-0.9.2.jar,再次查看 lib 目录


  1. $ tree lib

  2. lib

  3. ├── flink-connector-hive_2.11-1.10.0.jar

  4. ├── flink-dist_2.11-1.10.0.jar

  5. ├── flink-hadoop-compatibility_2.11-1.10.0.jar

  6. ├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar

  7. ├── flink-table_2.11-1.10.0.jar

  8. ├── flink-table-blink_2.11-1.10.0.jar

  9. ├── hive-exec-1.2.1.jar

  10. ├── hive-metastore-1.2.1.jar

  11. ├── libfb303-0.9.2.jar

  12. ├── log4j-1.2.17.jar

  13. └── slf4j-log4j12-1.7.15.jar

最后再执行:

  1. $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

这时,读者就可以看到手握栗子的可爱小松鼠了。 

Flink SQL CLI 实践

在 Flink 1.10 版本(目前为 RC1 阶段) 中,Flink 社区对 SQL CLI 做了大量的改动,比如支持 View、支持更多的数据类型和 DDL 语句、支持分区读写、支持 INSERT OVERWRITE 等,实现了更多的 TableEnvironment API 的功能,更加方便用户使用。

接下来,我们详细讲解 Flink SQL CLI。

0. Help

执行下面命令,登录 Flink SQL 客户端:

  1. $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

  2. Flink SQL>

执行 HELP,查看 Flink SQL 支持的命令,如下为大部分常用的:

  • CREATE TABLE

  • DROP TABLE

  • CREATE VIEW

  • DESCRIBE

  • DROP VIEW

  • EXPLAIN

  • INSERT INTO

  • INSERT OVERWRITE

  • SELECT

  • SHOW FUNCTIONS

  • USE CATALOG

  • SHOW TABLES

  • SHOW DATABASES

  • SOURCE

  • USE

  • SHOW CATALOGS

1. Hive 操作

1.1 创建表和导入数据

为了方便读者进行实验,我们使用 ssb-dbgen 生成测试数据,读者也可以使用测试环境已有的数据来进行实验。

具体如何在 Hive 中一键式创建表并插入数据,可以参考我们早期的项目 https://github.com/MLikeWater/ssb-kylin

2. Hive 表

查看上个步骤中创建的 Hive 表:

  1. 0: jdbc:hive2://xx.xxx.xxx.xxx:10000> show tables;

  2. +--------------+--+

  3. | tab_name |

  4. +--------------+--+

  5. | customer |

  6. | dates |

  7. | lineorder |

  8. | p_lineorder |

  9. | part |

  10. | supplier |

  11. +--------------+--+

读者可以对 Hive 进行各种查询,对比后面 Flink SQL 查询的结果。

2. Flink 操作

2.1 通过 HiveCatalog 访问 Hive 数据库

登录 Flink SQL CLI,并查询 catalogs:

  1. $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

  2. Flink SQL> show catalogs;

  3. default_catalog

  4. staginghive


  5. Flink SQL> use catalog staginghive;

通过 show catalogs 获取配置的所有 catalog。由于我们在 sql-client-hive.yaml 文件中设置了默认的 catalog,即为 staginghive。如果需要切换到其他 catalog,可以使用 usecatalog xxx

2.2 查询 Hive 元数据

通过 Flink SQL 查询 Hive 数据库和表:

  1. # 查询数据库

  2. Flink SQL> show databases;

  3. ...

  4. ssb

  5. tmp

  6. ...

  7. Flink SQL> use ssb;


  8. # 查询表

  9. Flink SQL> show tables;

  10. customer

  11. dates

  12. lineorder

  13. p_lineorder

  14. part

  15. supplier


  16. # 查询表结构

  17. Flink SQL> DESCRIBE customer;

  18. root

  19. |-- c_custkey: INT

  20. |-- c_name: STRING

  21. |-- c_address: STRING

  22. |-- c_city: STRING

  23. |-- c_nation: STRING

  24. |-- c_region: STRING

  25. |-- c_phone: STRING

  26. |-- c_mktsegment: STRING

这里需要注意,Hive 的元数据在 Flink catalog 中都以小写字母使用。

2.3 查询

接下来,在 Flink SQL CLI 中查询一些 SQL 语句,完整 SQL 参考 https://github.com/MLikeWater/ssb-kylin 的 README。

目前 Flink SQL 解析 Hive 视图元数据时,会遇到一些 Bug,比如执行 Q1.1 SQL:

  1. Flink SQL> select sum(v_revenue) as revenue

  2. > from p_lineorder

  3. > left join dates on lo_orderdate = d_datekey

  4. > where d_year = 1993

  5. > and lo_discount between 1and3

  6. > and lo_quantity < 25;


  7. [ERROR] Couldnot execute SQL statement. Reason:

  8. org.apache.calcite.sql.validate.SqlValidatorException: Table'lineorder'not found; did you mean 'LINEORDER'?

Flink SQL 找不到视图中的实体表。

p_lineorder 表是 Hive 中的一张视图,创建表的语句如下:

  1. CREATE VIEW P_LINEORDER AS

  2. SELECT LO_ORDERKEY,

  3. LO_LINENUMBER,

  4. LO_CUSTKEY,

  5. LO_PARTKEY,

  6. LO_SUPPKEY,

  7. LO_ORDERDATE,

  8. LO_ORDERPRIOTITY,

  9. LO_SHIPPRIOTITY,

  10. LO_QUANTITY,

  11. LO_EXTENDEDPRICE,

  12. LO_ORDTOTALPRICE,

  13. LO_DISCOUNT,

  14. LO_REVENUE,

  15. LO_SUPPLYCOST,

  16. LO_TAX,

  17. LO_COMMITDATE,

  18. LO_SHIPMODE,

  19. LO_EXTENDEDPRICE*LO_DISCOUNT AS V_REVENUE

  20. FROM ssb.LINEORDER;

但是对于 Hive 中视图的定义,Flink SQL 并没有很好地处理元数据。为了后面 SQL 的顺利执行,这里我们在 Hive 中删除并重建该视图:

  1. 0: jdbc:hive2://xx.xxx.xxx.xxx:10000> create view p_lineorder as

  2. select lo_orderkey,

  3. lo_linenumber,

  4. lo_custkey,

  5. lo_partkey,

  6. lo_suppkey,

  7. lo_orderdate,

  8. lo_orderpriotity,

  9. lo_shippriotity,

  10. lo_quantity,

  11. lo_extendedprice,

  12. lo_ordtotalprice,

  13. lo_discount,

  14. lo_revenue,

  15. lo_supplycost,

  16. lo_tax,

  17. lo_commitdate,

  18. lo_shipmode,

  19. lo_extendedprice*lo_discount as v_revenue

  20. from ssb.lineorder;

然后继续在 Flink SQL CLI 中查询 Q1.1 SQL:

  1. Flink SQL> select sum(v_revenue) as revenue

  2. > from p_lineorder

  3. > left join dates on lo_orderdate = d_datekey

  4. > where d_year = 1993

  5. > and lo_discount between 1and3

  6. > and lo_quantity < 25;


  7. revenue

  8. 894280292647

继续查询 Q2.1 SQL:

  1. Flink SQL> select sum(lo_revenue) as lo_revenue, d_year, p_brand

  2. > from p_lineorder

  3. > left join dates on lo_orderdate = d_datekey

  4. > left join part on lo_partkey = p_partkey

  5. > left join supplier on lo_suppkey = s_suppkey

  6. > where p_category = 'MFGR#12'and s_region = 'AMERICA'

  7. > groupby d_year, p_brand

  8. > order by d_year, p_brand;


  9. lo_revenue d_year p_brand

  10. 8196341281998 MFGR#1206

  11. 8776512321998 MFGR#1207

  12. 7544894281998 MFGR#1208

  13. 8163694881998 MFGR#1209

  14. 6684823061998 MFGR#1210

  15. 6603666081998 MFGR#1211

  16. 8629025701998 MFGR#1212

  17. ...

最后再查询一个 Q4.3 SQL:

  1. Flink SQL> select d_year, s_city, p_brand, sum(lo_revenue) - sum(lo_supplycost) as profit

  2. > from p_lineorder

  3. > left join dates on lo_orderdate = d_datekey

  4. > left join customer on lo_custkey = c_custkey

  5. > left join supplier on lo_suppkey = s_suppkey

  6. > left join part on lo_partkey = p_partkey

  7. > where c_region = 'AMERICA'and s_nation = 'UNITED STATES'

  8. > and(d_year = 1997or d_year = 1998)

  9. > and p_category = 'MFGR#14'

  10. > groupby d_year, s_city, p_brand

  11. > order by d_year, s_city, p_brand;


  12. d_year s_city p_brand profit

  13. 1998 UNITED ST9 MFGR#1440 6665681

如果读者感兴趣的话,可以查询剩余的 SQL,当然也可以和 Spark SQL 进行比较。另外 Flink SQL 也支持 EXPLAIN,查询 SQL 的执行计划。

2.4 创建视图

同样,可以在 Flink SQL CLI 中创建和删除视图,如下:

  1. Flink SQL> create view p_lineorder2 as

  2. > select lo_orderkey,

  3. > lo_linenumber,

  4. > lo_custkey,

  5. > lo_partkey,

  6. > lo_suppkey,

  7. > lo_orderdate,

  8. > lo_orderpriotity,

  9. > lo_shippriotity,

  10. > lo_quantity,

  11. > lo_extendedprice,

  12. > lo_ordtotalprice,

  13. > lo_discount,

  14. > lo_revenue,

  15. > lo_supplycost,

  16. > lo_tax,

  17. > lo_commitdate,

  18. > lo_shipmode,

  19. > lo_extendedprice * lo_discount as v_revenue

  20. > from ssb.lineorder;

  21. [INFO] View has been created.

这里我们需要特别强调的是,目前 Flink 无法删除 Hive 中的视图:

  1. Flink SQL> drop view p_lineorder;

  2. [ERROR] Couldnot execute SQL statement. Reason:

  3. The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed.

2.5 分区操作

Hive 数据库中创建一张分区表:

  1. CREATE TABLE IF NOT EXISTS flink_partition_test (

  2. id int,

  3. name string

  4. ) PARTITIONED BY (day string, type string)

  5. stored as textfile;

接着,通过 Flink SQL 插入和查询数据:

  1. # 插入静态分区的数据

  2. Flink SQL> INSERT INTO flink_partition_test PARTITION (type='Flink', `day`='2020-02-01') SELECT 100001, 'Flink001';


  3. # 查询

  4. Flink SQL> select* from flink_partition_test;


  5. id name day type

  6. 100001Flink0012020-02-01Flink



  7. # 插入动态分区

  8. Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL';



  9. # 查询

  10. Flink SQL> select* from flink_partition_test;


  11. id name day type

  12. 100002Spark2020-02-02SparkSQL

  13. 100001FlinkSQL2020-02-01Flink



  14. # 动态和静态分区结合使用类似,不再演示

  15. # 覆盖插入数据

  16. Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION (type='Flink') SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4';


  17. id name day type

  18. 100002Spark2020-02-02SparkSQL

  19. 100001FlinkSQL2020-02-01Flink

字段 day 在 Flink 属于关键字,要特殊处理。

2.6 其他功能

2.6.1 函数

Flink SQL 支持内置的函数和自定义函数。对于内置的函数,可以执行 show functions 进行查看,这一块我们以后会单独介绍如何创建自定义函数。

2.6.2 设置参数

Flink SQL 支持设置环境参数,可以使用 set 命令查看和设置参数:

  1. Flink SQL> set;

  2. deployment.gateway-address=

  3. deployment.gateway-port=0

  4. deployment.m=yarn-cluster

  5. deployment.response-timeout=5000

  6. deployment.yjm=1024

  7. deployment.yn=2

  8. deployment.ys=5

  9. deployment.ytm=2048

  10. execution.current-catalog=staginghive

  11. execution.current-database=ssb

  12. execution.max-idle-state-retention=0

  13. execution.max-parallelism=128

  14. execution.max-table-result-rows=1000000

  15. execution.min-idle-state-retention=0

  16. execution.parallelism=1

  17. execution.periodic-watermarks-interval=200

  18. execution.planner=blink

  19. execution.restart-strategy.type=fallback

  20. execution.result-mode=table

  21. execution.time-characteristic=event-time

  22. execution.type=batch


  23. Flink SQL> set deployment.yjm = 2048;

总结

在本文中,我们通过 Flink SQL 比较详细地去操作 Hive 数据库,以及 Flink SQL 提供的一些功能。

当然,目前 Flink SQL 操作 Hive 数据库还是存在一些问题:

  • 目前只支持 TextFile 存储格式,还无法指定其他存储格式

    只支持 Hive 数据库中 TextFile 存储格式的表,而且 row format serde 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。虽然实现了 RCFile、ORC、Parquet、Sequence 等存储格式,但是无法自动识别 Hive 表的存储格式。如果要使用其他存储格式,需要修改源码,重新编译。不过社区已经对这些存储格式进行了测试,相信不久以后就可以在 Flink SQL 中使用。

  • OpenCSVSerde 支持不完善

    如果读者使用 TextFile 的 row format serde 为 org.apache.hadoop.hive.serde2.OpenCSVSerde 时,无法正确识别字段类型,会把 Hive 表的字段全部映射为 String 类型。

  • 暂时不支持 Bucket 表

  • 暂时不支持 ACID 表

  • Flink SQL 优化方面功能较少

  • 权限控制方面

    这方面和 Spark SQL 类似,目前基于 HDFS ACL 控制,暂时还没有实现 Sentry 或 Ranger 控制权限,不过目前 Cloudera 正在开发基于 Ranger 设置 Spark SQL 和 Hive 共享访问权限的策略,实现行/列级控制以及审计信息。

Flink 社区发展很快,所有这些问题只是暂时的,随着新版本的发布会被逐个解决。

如果 Flink SQL 目前不满足的需求,建议使用 API 方式来解决问题。





推荐阅读





爆炸消息 | Apache Flink 1.10.0 重磅发布,年度最大规模版本升级!



数仓社区

如有收获,请划至底部,点击“在看”,谢谢!

关注 社区,获取更多技术干货
你也「在看」吗?👇

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存