查看原文
其他

Flink系列 - 实时数仓之ETL实战(二)

 


分享嘉宾:小飞牛

编辑整理:仙子紫霞

出品平台:数据仓库与Python大数据



正文

一、概述


我们已经使用ogg实现了mysql数据以json的格式同步到了kafka里边去了,也就是说我们的源端的埋点的数据已经处理好了;


那么接下来我们就可以使用 Flink 开始对数据源进行处理计算,当然这里值得一提的是:ogg 同步过来的json数据格式是嵌套型的,而且我们的数据不是想普通的网站日志那么简单,因为的源数据是从数据库中过来的-会涉及到增、删、改,因此我们要对刚从源库中过来的数据进行简单的 ETL 处理。


废话不多说,先看下数据格式是长啥样的:

# 添加{"table":"bms_st.employees","op_type":"I","op_ts":"2012-04-12 14:23:13.177344","current_ts":"2012-04-12T14:23:19.796000","pos":"00000000010000036968","after":{"EID":"101","ENAME":"changyin","ESAL":6666.66}}{"table":"bms_st.employees","op_type":"I","op_ts":"2012-04-12 14:23:13.177344","current_ts":"2012-04-12T14:23:19.797000","pos":"00000000010000037147","after":{"EID":"102","ENAME":"siling","ESAL":1234.12}}# 修改{"table":"bms_st.employees","op_type":"U","op_ts":"2012-04-12 14:24:37.789810","current_ts":"2012-04-12T14:24:44.247000","pos":"00000000010000037501","before":{"EID":"102","ENAME":"siling","ESAL":1234.12},"after":{"EID":"102","ENAME":"sunsiling","ESAL":1000.00}}# 删除{"table":"bms_st.employees","op_type":"D","op_ts":"2012-04-12 14:24:37.789810","current_ts":"2012-04-12T14:24:44.248000","pos":"00000000010000037636","before":{"EID":"101","ENAME":"changyin","ESAL":6666.66}}


从数据格式中可以看得出:op_type 是我们对数据源的增删改的标志,真正的数据是在 after 或者 before 的值里边的。接下来我们将用 Flink 对这些数据进行 ETL处理 并发往 kafka 供下一层数仓计算使用:


二、项目结构



mmain:程序入口
utils:工具类
entity:实体类
commonbase:抽象父类
achieve:实现类


三、项目的实现


3.1 静态的资源文件,用于配置信息 application.properties:

# source kafka configPJbtServers1: cdh101:9092,cdh102:9092,cdh103:9092PJgroupId1: testPJoffsetReset1: latestPJtopicStr1: piaoju-topic
# sink kafka configpj-BtServers2: cdh101:9092,cdh102:9092,cdh103:9092pj-ZkStr2: cdh101:2181,cdh102:2181,cdh102:2181pj-GroupId2: testpj-OffsetReset2: latestpj-TopicStr2: piaoju-to-kafka-topic
# ---------------------------------------------------------------------------------------------------------
# 员工日增薪资employee_tb_name: bms_st.employeesemployee_job_name: EmployeeSource#employee_create_table: employee_money#employee_row_col: tb_name VARCHAR, op_type VARCHAR, ts VARCHAR, eId VARCHAR, eName VARCHAR, eSal VARCHAR

3.2 在 utils目录 下创建获取以上文件信息值的类 LoadPropertiesFile.java:

import java.io.InputStream;import java.util.Properties;/** * @author feiniu * @create 2020-03-26 9:37 */public class LoadPropertiesFile {
public static String getPropertyFileValues(String proKey){ String proStr = ""; try { //读取配置文件 InputStream is = LoadPropertiesFile.class.getClassLoader().getResourceAsStream("application.properties"); Properties properties = new Properties(); properties.load(is); proStr = properties.getProperty(proKey); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } return proStr; }
}


3.3 commonbase 目录下创建抽象类 对接kafka的数据,并解析关键字段,代码架构如下:

package com.nfdwsyy.commonbase;
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.nfdwsyy.utils.LoadPropertiesFile;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.text.ParseException;import java.util.Properties;/** * @author feiniu * @create 2020-04-03 20:47 */public abstract class SourceCommonBase {
public void getDataStream(String jobName) throws Exception {
// 1. 环境的设置
// 2.资源配置文件信息的获取
// 3.消费者接收数据并做json的简要解析
// 4.抽象方法的设置
}


  1. 环境的设置:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启 Checkpoint,每 1000毫秒进行一次 Checkpoint env.enableCheckpointing(1000); // Checkpoint 语义设置为 EXACTLY_ONCE env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // CheckPoint 的超时时间 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间,只允许 有 1 个 Checkpoint 在发生 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 两次 Checkpoint 之间的最小时间间隔为 500 毫秒 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 当 Flink 任务取消时,保留外部保存的 CheckPoint 信息 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 作业最多允许 Checkpoint 失败 1 次(flink 1.9 开始支持) env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);

2.资源配置文件信息的获取:

// 获取资源配置文件信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", LoadPropertiesFile.getPropertyFileValues("PJbtServers1")); properties.setProperty("group.id", LoadPropertiesFile.getPropertyFileValues("PJgroupId1")); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化 properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", LoadPropertiesFile.getPropertyFileValues("PJoffsetReset1")); //value 反序列化

3.消费者接收数据并做json的简要解析:

FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>( LoadPropertiesFile.getPropertyFileValues("PJtopicStr1"), new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(myConsumer).setParallelism(1);
// prase json DataStream<String> mStream = stream.map(new MapFunction<String, String>() { @Override public String map(String s) throws Exception { JSONObject jsonObject = JSON.parseObject(s); String table = jsonObject.getString("table"); String op_type = jsonObject.getString("op_type"); String op_ts = jsonObject.getString("op_ts"); String before = jsonObject.getString("before"); String after = jsonObject.getString("after");
String resultStr = parseSourceKafkaJson(table,op_type,op_ts,before,after); return resultStr; } });
// let chirld etl to kafka sendToSinkKafka(mStream);
env.execute(jobName);



4.抽象方法的设置:

// let chirld class do it public abstract String parseSourceKafkaJson(String table, String op_type, String op_ts, String before, String after) throws ParseException;
// sink to kafka public abstract void sendToSinkKafka(DataStream<String> mStream);

3.4 achieve下创建实现类,用于对数据进行 ETL 处理,类的架构设计如下:

package com.nfdwsyy.achieve;
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.nfdwsyy.commonbase.SourceCommonBase;import com.nfdwsyy.entity.Employee;import com.nfdwsyy.utils.LoadPropertiesFile;import com.nfdwsyy.utils.MySinkKafka;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.shaded.netty4.io.netty.util.internal.StringUtil;import org.apache.flink.streaming.api.datastream.DataStream;import java.io.Serializable;import java.text.ParseException;
/** * @author feiniu * @create 2020-07-23 10:12 */public class EmpSourceAchi extends SourceCommonBase implements Serializable {
@Override public String parseSourceKafkaJson(String table, String op_type, String op_ts, String before, String after) throws ParseException {
// 1.数据的 ETL 处理 (这里根据实际情况而定)
}
@Override public void sendToSinkKafka(DataStream<String> mStream) {
// 2.将处理完之后的数据发往 kafka 队列 供下游计算使用
}
// 3. 调用父类的处理方法,供主类调用
}


1.数据的 ETL 处理:

String eId = ""; String eName = ""; double eSal = 0; double after_money = 0; double before_money = 0;
JSONObject jObjBefore = JSON.parseObject(before); JSONObject jObjAfter = JSON.parseObject(after);
System.out.println("在 parseSourceKafkaJson 方法中,table -> "+ table +" , op_type -> "+ op_type +" , op_ts -> "+ op_ts +" , before -> "+ before + " , after -> "+ after);
String tb_name = LoadPropertiesFile.getPropertyFileValues("employee_tb_name"); Employee employee = null; if (StringUtil.isNullOrEmpty(op_type) || StringUtil.isNullOrEmpty(table)){ System.out.println("获取的类型为空哦-> "+ op_type); }else if (table.equals(tb_name)){ switch (op_type){ case "I": eId = jObjAfter.getString("EID"); eName = jObjAfter.getString("ENAME"); eSal = Double.parseDouble(jObjAfter.getString("ESAL")); break; case "U": eId = jObjAfter.getString("EID"); eName = jObjAfter.getString("ENAME"); after_money = Double.valueOf(jObjAfter.getString("ESAL")); before_money = Double.valueOf(jObjBefore.getString("ESAL")); eSal = after_money - before_money; break; case "D": eId = jObjBefore.getString("EID"); eName = jObjBefore.getString("ENAME"); eSal = Double.parseDouble("-"+ jObjBefore.getString("ESAL")); break; }
employee = new Employee(tb_name, op_type, op_ts, eId, eName, eSal); }
// the entity must have tb_name return JSONObject.toJSONString(employee);


2.将处理完之后的数据发往 kafka :

DataStream<String> mS = mStream.filter(new FilterFunction<String>() { @Override public boolean filter(String s) throws Exception { if (StringUtil.isNullOrEmpty(s)){ return false; } else { return true;
} } });
String broker_list = LoadPropertiesFile.getPropertyFileValues("pj-BtServers2"); String topic = LoadPropertiesFile.getPropertyFileValues("pj-TopicStr2"); String groupId = LoadPropertiesFile.getPropertyFileValues("pj-GroupId2"); String offsetReset = LoadPropertiesFile.getPropertyFileValues("pj-OffsetReset2"); // the entity must have tb_name String tb_name = LoadPropertiesFile.getPropertyFileValues("employee_tb_name"); // 发往 Kafka 的自定义类 mS.addSink(new MySinkKafka(broker_list, topic, groupId, offsetReset, tb_name)).name("employee_tb_name");


  1. 调用父类的处理方法,供主类调用 :

// transfer the parent method public void successKafka2KafkaMethod(){
try { String jobName = LoadPropertiesFile.getPropertyFileValues("employee_job_name"); getDataStream(jobName +" Source"); } catch (Exception e) { e.printStackTrace(); }
}


到这里整体上算是弄完了,但是要注意的一点是数据发往 kafka 的类是需要我们去自定义的,接下来我们再去创建一个数据发往 kafka 的工具类:

package com.nfdwsyy.utils;
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
/** * @author feiniu * @create 2020-04-04 10:29 */public class MySinkKafka extends RichSinkFunction<String> {
private Properties props = null; private KafkaProducer producer = null; private ProducerRecord record = null;
private String broker_list; private String topic; private String groupId; private String offsetReset; private String sourceTbName;
public MySinkKafka(String broker_list, String topic, String groupId, String offsetReset, String sourceTbName) { this.broker_list = broker_list; this.topic = topic; this.groupId = groupId; this.offsetReset = offsetReset; this.sourceTbName = sourceTbName; }
@Override public void open(Configuration parameters) throws Exception { super.open(parameters);
props = new Properties(); props.put("bootstrap.servers", broker_list); props.put("group.id", groupId); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化 props.put("auto.offset.reset", offsetReset); //value 反序列化
producer = new KafkaProducer<String, String>(props);
}
@Override public void invoke(String value, Context context) {
if(value.equals("") || value.equals("null")) { System.out.println("Sink 中 invoke 方法过来的字符串值-> "+ value); } else { JSONObject jObjNew = JSON.parseObject(value); String tb_name = jObjNew.getString("tb_name");
System.out.println("表明对比 -> " + tb_name + " --- " + sourceTbName);
if (tb_name.equals(sourceTbName)) { record = new ProducerRecord<String, String>(topic, null, null, value); producer.send(record); System.out.println("发送数据: " + value); producer.flush(); } }
}
@Override public void close() throws Exception { super.close(); }
}

  1. 创建主类,调用ETL方法:

package com.nfdwsyy.mmain;
import com.nfdwsyy.achieve.EmpSourceAchi;
/** * @author feiniu * @create 2020-07-23 10:55 */public class EmployeeMain01 {
public static void main(String[] args){
EmpSourceAchi empAchi = new EmpSourceAchi(); empAchi.successKafka2KafkaMethod();
}
}


好了,全部代码都写完了,接下来我们可以去测试使用咯。


四、本地测试 并 打包部署上 yarn


4.1 本地测试

运行程序之后对数据库的源表进行增删改,即可在控制台看到发往kafka的数据,这里不做本地测试。


4.2 部署上 yarn 服务器

打包并上传至服务器的指定目录,然后执行如下命令部署应用:

bin/flink run -m yarn-cluster -ynm oggsyncflinkjob -d -c com.nfdwsyy.mmain.EmployeeMain01 /opt/mycdhflink/myjar/Kafka2FlinkETL2Kafka.jar

这时候我们可以在页面上部署情况了:



接下来我们再启动接收ETL之后的消费者:

bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic piaoju-to-kafka-topic --from-beginning

源库中对表数据操作:


处理过后的数据如下图:


从处理结果的数据看来,其实它已经变成是一个处理过增删改操作之后最简单的 json串了,那么至于如果对这些处理过后的数据进行计算如聚合等那都是小菜一碟了;欲知如何计算,请看下回分晓~ 关注我们,加入flink技术群,一起成长,一起晋升!


如果您喜欢本文,欢迎点击右上角,把文章分享到朋友圈~~


作者:小飞牛_666
链接:https://www.jianshu.com/p/fbda25d993df
著作权归作者所有,本公众号取得独家授权。欢迎广大技术人员投稿,加v:iom111128,备注:投稿





往期推荐

架构视角剖析MapReduce流程【附调优指南】(推荐收藏)

二面阿里惨败,全是算法基本功的问题···

网易大数据用户画像实践

一篇文章读懂大数据中台架构

阿里巴巴电商搜索推荐实时数仓演进之路


共同成长,您的晋升,我们始终的愿景~

▼ 福利时刻 ▼ 


01. 后台回复「06」,即可领取大数据数仓经典书籍。

02. 后台回复「08」,即可领取大厂实时数仓高清ppt。

03. 后台回复「加群」,或添加小助微信IDiom1128  审核通过拉您入群(大数据|数仓|分析|Flink|资源)或领取资料。

欢迎大家扫描下方二维码订阅「数据仓库与Python大数据」内容并推荐给更多数据方向的朋友,希望有更多机会和大家交流。

Q: 关于数仓BI,你还想了解什么?

欢迎关注置顶公众号

入群请联系小助手:iom1128『紫霞仙子』

 

关注不迷路~ 各种福利、资源定期分享


🧐分享、点赞、在看,给个三连击呗!👇

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存