查看原文
其他

博文推荐|Flink 消费 Kafka 实时写入 Apache Doris(KFD)

ApacheDoris 2022-04-24

The following article is from 锋哥聊DORIS数仓 Author 张家锋

1.概述

Apache Doris(原百度 Palo )是一款基于大规模并行处理技术的分布式 SQL 数据仓库,由百度在 2017 年开源,2018 年 8 月进入 Apache 孵化器。

Apache Doris 是一个现代化的 MPP 分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris 的分布式架构非常简洁,易于运维,并且可以支持 10PB 以上的超大数据集。

Apache Doris 可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!

2.场景介绍

这里我们介绍的是通过 Doris 提供的 Stream Load 结合 Flink 计算引擎怎么实现数据实时快速入库操作。

使用环境如下:

  mysql 5.x/8.x (主要是业务数据库)
  kafka 2.11 (消息队列)
  flink 1.10.1 (流式计算引擎)
  doris 0.14.7 (核心数仓)
  Canal (Mysql binlog数据采集工具)


3.实现方案

这里我们采用的历史数据离线处理+增量数据实时处理的架构

3.1 历史数据离线处理

历史数据离线处理方式,这里我们使用是 Doris ODBC 外表方式,将 MySQL 的表映射到 Doris 里,然后使用

  insert into <doris_table_name>  select * from <mysql_odbc_external_table>

3.1.1 外表创建方法

  1. 首先 Apache Doris 0.13.x以上版本

  2. 要在所有的 BE 节点安装对应数据的 ODBC 驱动

  3. 创建外表

具体可以参考我的另外一篇文章,这里不多做介绍。

Apache doris ODBC外表使用方式

3.2 增量数据实时处理

增量数据的实时处理,这里我们是通过 Canal 监控 MySQL Binlog 解析并推送到指定的 Kafka 队列,然后通过 Flink 去实时消费 Kafka 队列的数据,然后你可以根据自己的需要对数据进行处理,算法等,最后将明细数据或者实时计算的中间结果保存到对应的 Doris 数据表中,这里使用的是 Stream Load,你可以使用 Flink Doris Connector 。

3.2.1 Doris Sink实现

这里我们首先实现一个 Flink Doris Sink

 import com.alibaba.fastjson.JSON;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 /**
  * 自定义flink doris sink
  */
 public class DorisSink extends RichSinkFunction<String> {
 
     private static final Logger log = LoggerFactory.getLogger(DorisSink.class);
 
     private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
 
     private DorisStreamLoad dorisStreamLoad;
 
     private String columns;
 
     private String jsonFormat;
 
     public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
         this.dorisStreamLoad = dorisStreamLoad;
         this.columns = columns;
         this.jsonFormat = jsonFormat;
    }
 
     @Override
     public void open(Configuration parameters) throws Exception {
         super.open(parameters);
    }
 
 
     /**
      * 判断StreamLoad是否成功
      *
      * @param respContent streamload返回的响应信息(JSON格式)
      * @return
      */
     public static Boolean checkStreamLoadStatus(RespContent respContent) {
         if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
                 && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {
             return true;
        } else {
             return false;
        }
    }
 
     @Override
     public void invoke(String value, Context context) throws Exception {
         DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
         if (loadResponse != null && loadResponse.status == 200) {
             RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
             if (!checkStreamLoadStatus(respContent)) {
                 log.error("Stream Load fail{}:", loadResponse);
            }
        } else {
             log.error("Stream Load Request failed:{}", loadResponse);
        }
    }
 }

3.2.2 Stream Load 工具类

 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 import java.io.Serializable;
 import java.io.IOException;
 import java.io.BufferedOutputStream;
 import java.io.InputStream;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.Calendar;
 import java.util.UUID;
 
 
 /**
  * doris streamLoad
  */
 
 public class DorisStreamLoad implements Serializable {
 
     private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);
 //连接地址,这里使用的是连接FE
     private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
     //fe ip地址
   
 private String hostPort;
     //数据库
     private String db;
     //要导入的数据表名
     private String tbl;
     //用户名
     private String user;
     //密码
     private String passwd;
     private String loadUrlStr;
     private String authEncoding;
 
 
     public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
         this.hostPort = hostPort;
         this.db = db;
         this.tbl = tbl;

         this.user = user;
         this.passwd = passwd;
         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
         this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
    }
 //获取http连接信息
     private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {
         URL url = new URL(urlStr);
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
         conn.setInstanceFollowRedirects(false);
         conn.setRequestMethod("PUT");
         conn.setRequestProperty("Authorization", "Basic " + authEncoding);
         conn.addRequestProperty("Expect", "100-continue");
         conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
         conn.addRequestProperty("label", label);
         conn.addRequestProperty("max_filter_ratio", "0");
         conn.addRequestProperty("strict_mode", "true");
         conn.addRequestProperty("columns", columns);
         conn.addRequestProperty("format", "json");
         conn.addRequestProperty("jsonpaths", jsonformat);
         conn.addRequestProperty("strip_outer_array", "true");
         conn.setDoOutput(true);
         conn.setDoInput(true);

 
         return conn;
    }
 
     public static class LoadResponse {
         public int status;
         public String respMsg;
         public String respContent;
 
         public LoadResponse(int status, String respMsg, String respContent) {
             this.status = status;
             this.respMsg = respMsg;
             this.respContent = respContent;
        }
 

         @Override
         public String toString() {
             StringBuilder sb = new StringBuilder();
             sb.append("status: ").append(status);
             sb.append(", resp msg: ").append(respMsg);
             sb.append(", resp content: ").append(respContent);
             return sb.toString();
        }
    }
 //执行数据导入
     public LoadResponse loadBatch(String data, String columns, String jsonformat) {
         Calendar calendar = Calendar.getInstance();
         //导入的lable,全局唯一
         String label = String.format("flink_import_%s%02d%02d_%02d%02d%02d_%s",
                 calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
                 calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
                 UUID.randomUUID().toString().replaceAll("-", ""));
 

         HttpURLConnection feConn = null;
         HttpURLConnection beConn = null;
         try {
             // build request and send to fe
             feConn = getConnection(loadUrlStr, label, columns, jsonformat);
             int status = feConn.getResponseCode();
             // fe send back http response code TEMPORARY_REDIRECT 307 and new be location
             if (status != 307) {
                 throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
            }
             String location = feConn.getHeaderField("Location");
             if (location == null) {
                 throw new Exception("redirect location is null");
            }
             // build request and send to new be location
             beConn = getConnection(location, label, columns, jsonformat);
             // send data to be
             BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
             bos.write(data.getBytes());
             bos.close();
 
             // get respond
             status = beConn.getResponseCode();
             String respMsg = beConn.getResponseMessage();
             InputStream stream = (InputStream) beConn.getContent();
             BufferedReader br = new BufferedReader(new InputStreamReader(stream));
             StringBuilder response = new StringBuilder();
             String line;
             while ((line = br.readLine()) != null) {
                 response.append(line);
            }
             return new LoadResponse(status, respMsg, response.toString());
 
        } catch (Exception e) {
             e.printStackTrace();
             String err = "failed to load audit via AuditLoader plugin with label: " + label;
             log.warn(err, e);
             return new LoadResponse(-1, e.getMessage(), err);
        } finally {
             if (feConn != null) {
                 feConn.disconnect();
            }
             if (beConn != null) {
                 beConn.disconnect();
            }
        }
    }
 
 }

3.2.3 Flink Job

这个地方演示的是单表,如果是你通过 Canal 监听的多个表的数据,这里你需要根据表名进行区分,并和你 MySQL 表和 Doris 里的表建好对应关系,解析相应的数据即可

 import org.apache.doris.demo.flink.DorisSink;
 import org.apache.doris.demo.flink.DorisStreamLoad;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 
 import java.util.Properties;
 
 /**
  *
  * This example mainly demonstrates how to use flink to stream Kafka data.
  * And use the doris streamLoad method to write the data into the table specified by doris
  * <p>
  * Kafka data format is an array, For example: ["id":1,"name":"root"]
  */
 
 public class FlinkKafka2Doris {
     //kafka address
     private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092";
     //kafka groupName
     private static final String groupName = "test_flink_doris_group";
     //kafka topicName
     private static final String topicName = "test_flink_doris";
     //doris ip port
     private static final String hostPort = "xxx:8030";
     //doris dbName
     private static final String dbName = "db1";
     //doris tbName
     private static final String tbName = "tb1";
     //doris userName
     private static final String userName = "root";
     //doris password
     private static final String password = "";
     //doris columns
     private static final String columns = "name,age,price,sale";
     //json format
     private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";
 
     public static void main(String[] args) throws Exception {
 
         Properties props = new Properties();
         props.put("bootstrap.servers", bootstrapServer);
         props.put("group.id", groupName);
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("auto.offset.reset", "earliest");
         props.put("max.poll.records", "10000");
 
         StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         blinkStreamEnv.enableCheckpointing(10000);
         blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
         FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,
                 new SimpleStringSchema(),
                 props);
 
         DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);
 
         DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);
 
         dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat));
 
         blinkStreamEnv.execute("flink kafka to doris");
 
    }
 }

然后将 Flink Job 提交到集群上就可以运行了,数据就可以试试入库

这里其实是一个微批处理,你可以自己完善以下几部分:

  1. 每个批次最大入库记录数,或者每个多少秒进行一次入库,如果你的实时数据量比较小,或者你的数据比较大,这两条件哪个先到执行哪个

  2. 这里连接是 FE,你可以通过 FE 的 rest api 接口拿到所有的 BE 节点,直接连接 BE 进行入库,URL 地址只是将 FE 的 IP 和端口换成 BE 的 IP 及 HTTP 端口即可

  3. 为了避免你连接这个 BE 或者 FE 的时候,正好这个节点挂了,你可以进行重试其他 FE 或者 BE

  4. 为了避免单个节点压力,你可以进行轮询 BE 节点,不要每次都连接同一个 BE 节点

  5. 设置最大重试次数,如果超过这个次数,可以将导入失败的数据推送到 Kafka 队列,以方便后续人工手动处理


4.总结

本文只是抛砖引玉的方式给大家一个使用 Stream Load 进行数据接入的使用方式及示例,Doris还有很多数据接入的方式等待大家去探索。


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存