查看原文
其他

《从0到1学习Flink》—— 如何自定义 Data Sink ?

zhisheng zhisheng 2021-05-26


前言

前篇文章 《从0到1学习Flink》—— Data Sink 介绍 介绍了 Flink Data Sink,也介绍了 Flink 自带的 Sink,那么如何自定义自己的 Sink 呢?这篇文章将写一个 demo 教大家将从 Kafka Source 的数据 Sink 到 MySQL 中去。

准备工作

我们先来看下 Flink 从 Kafka topic 中获取数据的 demo,首先你需要安装好了 FLink 和 Kafka 。

运行启动 Flink、Zookepeer、Kafka,

好了,都启动了!

数据库建表

1DROP TABLE IF EXISTS `student`;
2CREATE TABLE `student` (
3  `id` int(11unsigned NOT NULL AUTO_INCREMENT,
4  `name` varchar(25COLLATE utf8_bin DEFAULT NULL,
5  `password` varchar(25COLLATE utf8_bin DEFAULT NULL,
6  `age` int(10DEFAULT NULL,
7  PRIMARY KEY (`id`)
8ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

实体类

Student.java

1package com.zhisheng.flink.model;
2
3/**
4 * Desc:
5 * weixin: zhisheng_tian
6 * blog: http://www.54tianzhisheng.cn/
7 */

8public class Student {
9    public int id;
10    public String name;
11    public String password;
12    public int age;
13
14    public Student() {
15    }
16
17    public Student(int id, String name, String password, int age) {
18        this.id = id;
19        this.name = name;
20        this.password = password;
21        this.age = age;
22    }
23
24    @Override
25    public String toString() {
26        return "Student{" +
27                "id=" + id +
28                ", name='" + name + '\'' +
29                ", password='" + password + '\'' +
30                ", age=" + age +
31                '}';
32    }
33
34    public int getId() {
35        return id;
36    }
37
38    public void setId(int id) {
39        this.id = id;
40    }
41
42    public String getName() {
43        return name;
44    }
45
46    public void setName(String name) {
47        this.name = name;
48    }
49
50    public String getPassword() {
51        return password;
52    }
53
54    public void setPassword(String password) {
55        this.password = password;
56    }
57
58    public int getAge() {
59        return age;
60    }
61
62    public void setAge(int age) {
63        this.age = age;
64    }
65}

工具类

工具类往 kafka topic student 发送数据

1import com.alibaba.fastjson.JSON;
2import com.zhisheng.flink.model.Metric;
3import com.zhisheng.flink.model.Student;
4import org.apache.kafka.clients.producer.KafkaProducer;
5import org.apache.kafka.clients.producer.ProducerRecord;
6
7import java.util.HashMap;
8import java.util.Map;
9import java.util.Properties;
10
11/**
12 * 往kafka中写数据
13 * 可以使用这个main函数进行测试一下
14 * weixin: zhisheng_tian
15 * blog: http://www.54tianzhisheng.cn/
16 */

17public class KafkaUtils2 {
18    public static final String broker_list = "localhost:9092";
19    public static final String topic = "student";  //kafka topic 需要和 flink 程序用同一个 topic
20
21    public static void writeToKafka() throws InterruptedException {
22        Properties props = new Properties();
23        props.put("bootstrap.servers", broker_list);
24        props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
25        props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
26        KafkaProducer producer = new KafkaProducer<String, String>(props);
27
28        for (int i = 1; i <= 100; i++) {
29            Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
30            ProducerRecord record = new ProducerRecord<String, String>(topic, nullnull, JSON.toJSONString(student));
31            producer.send(record);
32            System.out.println("发送数据: " + JSON.toJSONString(student));
33        }
34        producer.flush();
35    }
36
37    public static void main(String[] args) throws InterruptedException {
38        writeToKafka();
39    }
40}

SinkToMySQL

该类就是 Sink Function,继承了 RichSinkFunction ,然后重写了里面的方法。在 invoke 方法中将数据插入到 MySQL 中。

1package com.zhisheng.flink.sink;
2
3import com.zhisheng.flink.model.Student;
4import org.apache.flink.configuration.Configuration;
5import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
6
7import java.sql.Connection;
8import java.sql.DriverManager;
9import java.sql.PreparedStatement;
10
11/**
12 * Desc:
13 * weixin: zhisheng_tian
14 * blog: http://www.54tianzhisheng.cn/
15 */

16public class SinkToMySQL extends RichSinkFunction<Student> {
17    PreparedStatement ps;
18    private Connection connection;
19
20    /**
21     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
22     *
23     * @param parameters
24     * @throws Exception
25     */

26    @Override
27    public void open(Configuration parameters) throws Exception {
28        super.open(parameters);
29        connection = getConnection();
30        String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
31        ps = this.connection.prepareStatement(sql);
32    }
33
34    @Override
35    public void close() throws Exception {
36        super.close();
37        //关闭连接和释放资源
38        if (connection != null) {
39            connection.close();
40        }
41        if (ps != null) {
42            ps.close();
43        }
44    }
45
46    /**
47     * 每条数据的插入都要调用一次 invoke() 方法
48     *
49     * @param value
50     * @param context
51     * @throws Exception
52     */

53    @Override
54    public void invoke(Student value, Context context) throws Exception {
55        //组装数据,执行插入操作
56        ps.setInt(1, value.getId());
57        ps.setString(2, value.getName());
58        ps.setString(3, value.getPassword());
59        ps.setInt(4, value.getAge());
60        ps.executeUpdate();
61    }
62
63    private static Connection getConnection() {
64        Connection con = null;
65        try {
66            Class.forName("com.mysql.jdbc.Driver");
67            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8""root""root123456");
68        } catch (Exception e) {
69            System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
70        }
71        return con;
72    }
73}

Flink 程序

这里的 source 是从 kafka 读取数据的,然后 Flink 从 Kafka 读取到数据(JSON)后用阿里 fastjson 来解析成 student 对象,然后在 addSink 中使用我们创建的 SinkToMySQL,这样就可以把数据存储到 MySQL 了。

1package com.zhisheng.flink;
2
3import com.alibaba.fastjson.JSON;
4import com.zhisheng.flink.model.Student;
5import com.zhisheng.flink.sink.SinkToMySQL;
6import org.apache.flink.api.common.serialization.SimpleStringSchema;
7import org.apache.flink.streaming.api.datastream.DataStreamSource;
8import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
9import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
10import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
11import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
12import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
13
14import java.util.Properties;
15
16/**
17 * Desc:
18 * weixin: zhisheng_tian
19 * blog: http://www.54tianzhisheng.cn/
20 */

21public class Main3 {
22    public static void main(String[] args) throws Exception {
23        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
24
25        Properties props = new Properties();
26        props.put("bootstrap.servers""localhost:9092");
27        props.put("zookeeper.connect""localhost:2181");
28        props.put("group.id""metric-group");
29        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
30        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
31        props.put("auto.offset.reset""latest");
32
33        SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
34                "student",   //这个 kafka topic 需要和上面的工具类的 topic 一致
35                new SimpleStringSchema(),
36                props)).setParallelism(1)
37                .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象
38
39        student.addSink(new SinkToMySQL()); //数据 sink 到 mysql
40
41        env.execute("Flink add sink");
42    }
43}

结果

运行 Flink 程序,然后再运行 KafkaUtils2.java 工具类,这样就可以了。

如果数据插入成功了,那么我们查看下我们的数据库:

数据库中已经插入了 100 条我们从 Kafka 发送的数据了。证明我们的 SinkToMySQL 起作用了。是不是很简单?

项目结构

怕大家不知道我的项目结构,这里发个截图看下:

最后

本文主要利用一个 demo,告诉大家如何自定义 Sink Function,将从 Kafka 的数据 Sink 到 MySQL 中,如果你项目中有其他的数据来源,你也可以换成对应的 Source,也有可能你的 Sink 是到其他的地方或者其他不同的方式,那么依旧是这个套路:继承 RichSinkFunction 抽象类,重写 invoke 方法。

关注我

转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/10/31/flink-create-sink/

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。

相关文章

1、《从0到1学习Flink》—— Apache Flink 介绍

2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

3、《从0到1学习Flink》—— Flink 配置文件详解

4、《从0到1学习Flink》—— Data Source 介绍

5、《从0到1学习Flink》—— 如何自定义 Data Source ?

6、《从0到1学习Flink》—— Data Sink 介绍

7、《从0到1学习Flink》—— 如何自定义 Data Sink ?

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存