查看原文
其他

Flink SQL 写入MySQL、Kafka、DataStream

The following article is from 大数据老哥 Author 大数据老哥


前言


         表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。    

         具体实现,输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入注册过的 TableSink 中。

一、输入到文件
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypesTable}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{CsvFileSystemSchema}

/**
 * @Package
 * @author 大数据老哥
 * @date 2020/12/18 0:16
 * @version V1.0
 */

object FlinkSqlSourceFileSinkFile {
  def main(args: Array[String]): Unit = {
    // 构建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 构建Table运行环境
    val tableEnv = StreamTableEnvironment.create(env)
    //通过connect 读取数据
    tableEnv.connect(new FileSystem().path("./data/user.txt"))
      .withFormat(new Csv()) // 设置格式
      .withSchema(new Schema() // 设置元数据信息
        .field("id"DataTypes.STRING())
        .field("name"DataTypes.STRING())
      ).createTemporaryTable("FileInput")

    // 查询数据
    val table: Table = tableEnv.sqlQuery("select * from FileInput")
    //通过connect 输出数据
    tableEnv.connect(new FileSystem().path("./data/output.csv"))
      .withFormat(new Csv()) // 设置格式
      .withSchema(new Schema() // 设置元数据信息
        .field("id"DataTypes.STRING())
        .field("name"DataTypes.STRING())
      ).createTemporaryTable("FileOutput")

    table.insertInto("FileOutput")

    env.execute("FlinkSqlSourceFileSinkFile")
  }
}
二、更新模式(Update Mode)

         在流处理过程中,表的处理并不像传统定义的那样简单。         对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行转换。与外部系统交换的消息类型,由更新模式(update mode)指定。

2.1 追加模式(Append Mode)

在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。

2.2 撤回模式(Retract Mode)

撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。为插入(Insert)会被编码为添加消息;

为删除(Delete)则编码为撤回消息; 

为更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行)的添加消息。 

在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。 

2.3 Upsert(更新插入)模式

在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息,外部连接器需要知道这个唯一 key 的属性。在插入(Insert)和更新(Update)都被编码为 Upsert 消息;在删除(Delete)编码为 Delete 信息。这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率会更高

三、输出到Kafka

         除了输出到文件,也可以输出到 Kafka。我们可以结合前面 Kafka 作为输入数据,构建数据管道,kafka 进,kafka 出。

实现代码

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{CsvFileSystemKafkaSchema}

/**
 * @Package
 * @author 大数据老哥
 * @date 2020/12/18 16:51
 * @version V1.0
 */

object FlinkSqlSourceFileSinkKafka {
  def main(args: Array[String]): Unit = {
    // 构建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 构建表运行环境
    val tableEnv = StreamTableEnvironment.create(env)
    // 读取文件数据
    tableEnv.connect(new FileSystem().path("./data/user.txt"))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id"DataTypes.STRING())
        .field("name"DataTypes.STRING())
      ).createTemporaryTable("FileInput")
    // 设置kafka的输出
    tableEnv.connect(new Kafka()
      .version("0.11"// 设置kafka的版本
      .topic("FlinkSqlTest"// 设置要连接的主题
      .property("zookeeper.connect""node01:2181,node02:2181,node03:2181"//设置zookeeper的连接地址跟端口号
      .property("bootstrap.servers""node01:9092,node02:9092,node03:9092"//设置kafka的连接地址跟端口号
    ).withFormat(new Csv())
      .withSchema(new Schema()
        .field("id"DataTypes.STRING())
        .field("name"DataTypes.STRING())
      ).createTemporaryTable("outPutKafka")

    val res = tableEnv.sqlQuery("select  * from  FileInput")

    res.insertInto("outPutKafka")
    env.execute("FlinkSqlSourceFileSinkKafka")
  }

}

消费kafka中的数据

./kafka-console-consumer.sh --from-beginning --topic FlinkSqlTest  --zookeeper node01:2181,node02:2181,node03:2181
四、输出到 MySQL

Flink 专门为 Table API 的 jdbc 连接提供了 flink-jdbc 连接器,我们需要先引入依赖:

  <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-jdbc_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>

         jdbc 连接的代码实现比较特殊,因为没有对应的 java/scala 类实现 ConnectorDescriptor,所以不能直接 tableEnv.connect()。不过 Flink SQL 留下了执行 DDL 的接口:tableEnv.sqlUpdate()。对于 jdbc 的创建表操作,天生就适合直接写 DDL 来实现,所以我们的代码可以这样写:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{CsvFileSystemSchema}

/**
* @Package
* @author 大数据老哥
* @date 2020/12/18 21:17
* @version V1.0
*/

object FlinkSQLSSourceFileSinkMySQL {
 def main(args: Array[String]): Unit = {
   // 构建运行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   // 构建表运行环境
   val tableEnv = StreamTableEnvironment.create(env)
   // 读取文件数据
   tableEnv.connect(new FileSystem().path("./data/user.txt"))
     .withFormat(new Csv())
     .withSchema(new Schema()
       .field("id"DataTypes.STRING())
       .field("name"DataTypes.STRING())
     ).createTemporaryTable("FileInput")
   val result = tableEnv.sqlQuery("select * from  FileInput ")
   //定义sql语句
   var sqlinstall=
     """
       |create table jdbcOutputTable (
       |id varchar(10),
       |name varchar(20)
       |) with(
       |'connector.type'='jdbc',
       |'connector.url'='jdbc:mysql://node02:3306/test',
       |'connector.table'='user',
       | 'connector.driver' = 'com.mysql.jdbc.Driver',
       | 'connector.username' = 'root',
       | 'connector.password' = '123456'
       | )
       |"
"".stripMargin
       tableEnv.sqlUpdate(sqlinstall)
   result.insertInto("jdbcOutputTable")
   env.execute("FlinkSQLSSourceFileSinkMySQL")
 }
}

创建MySQL表

create table user(
    id varchar(10),
    name varchar(20)
)
五、将表转换成 DataStream

         表可以转换为 DataStream 或 DataSet。这样,自定义流处理或批处理 程序就可以继续在Table API 或 SQL 查询的结果上运行了。         

        将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型。通常,最方便的转换类型就是 Row当然,因为结果的所有字段类型都是明确的,我们也经常会用元组类型来表示         

       表作为流式查询的结果,是动态更新的。所以,将这种动态查询转换成的数据流,同样需要对表的更新操作进行编码,进而有不同的转换模式。

5.1 Table API 中表到 DataStream 有两种模式

  • 追加模式(Append Mode) 用于表只会被插入(Insert)操作更改的场景。

  • 撤回模式(Retract Mode)

             用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。        

        得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(老数据, Delete)。

代码实现如下:

val resultStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable)
val aggResultStream: DataStream[(Boolean, (StringLong))] = tableEnv.toRetractStream[(StringLong)](aggResultTable)
resultStream.print("result")
aggResultStram.print("aggResult")

所以,没有经过groupby 之类聚合操作,可以直接用 toAppendStream 来转换;而如果经过了聚合,有更新操作,一般就必须用 toRetractDstream

六、Query 的解释和执行

         Table API 提供了一种机制来解释(Explain)计算表的逻辑和优化查询计划。这是通过TableEnvironment.explain(table)方法或 TableEnvironment.explain()方法完成的。

explain 方法会返回一个字符串,描述三个计划:

  • 未优化的逻辑查询计划
  • 优化后的逻辑查询计划
  • 实际执行计划

我们可以在代码中查看执行计划:

val explaination: String = tableEnv.explain(resultTable)
println(explaination)

         Query 的解释和执行过程,老 planner 和 blink planner 大体是一致的,又有所不同。整体来讲,Query 都会表示成一个逻辑查询计划,然后分两步解释:

  1. 优化查询计划
  2. 解释成 DataStream 或者 DataSet 程序

         而 Blink 版本是批流统一的,所以所有的 Query,只会被解释成 DataStream 程序;另外在批处理环境 TableEnvironment 下,Blink 版本要到 tableEnv.execute()执行调用才开始解释。

七、总结

         上述讲解了一些关于Flink SQL 输出的内容如我们常用的(kafka、MySQL、文件、DataStream)还有常用的hive的没有写出来,因为hive跟MySQL有点区别后续会单独出一片文章给大家讲解。喜欢的朋友一键三连呗~~  喜欢小伙伴可以关注公众号一起进步。

识别二维码回复关键词:经典、快手、数据治理、实时数仓、画像源码.....等等,都有资料奉上!

资源获取



往期推荐

10周拿下腾讯数据分析师认证


PPT-1219大数据技术沙龙-数据成本治理、数据仓库、数据湖、增长黑客、ABtest、实时平台.PPT


BATJ与小企业数据仓库面试题

点个 在看 你最好看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存