实战!使用Apache Hudi DeltaStreamer将数据流写入OSS
The following article is from Flink Author vinoyang
一、首先从 https://github.com/apache/incubator-hudi.git 将hudi clone到自己本地idea使用clean install -DskipTests -DskipITs -Dcheckstyle.skip=true进行编译注意:1、目前hudi使用的是hadoop2.7.3版本,如果使用hadoop3.x版本请使用hadoop3.0.0编译,; 注意:目前写入oss功能仅在hudi最新0.6.0分支
clean install -DskipTests -DskipITs -Dcheckstyle.skip=true -Drat.skip=true -Dhadoop.version=3.0.0
需要的jar包:
1.jdom-1.1.jar 2.hadoop-aliyun-3.2.1.jar 3.aliyun-sdk-oss-3.8.1.jar 4.hudi-utilities-bundle_2.11-0.6.0-SNAPSHOT.jar 5.hive-jdbc-2.1.1.jar (同步hive所需) 6.hive-service-2.1.1.jar (同步hive所需)
二、修改core.site.xml 文件
1、将以下下配置添加进core-site.xml,可以选择hadoop安装目录下core-site.xml也可以新建此配置文件将其打进jar包 2、或者将以下配置复制进hadoop下core-site.xml
<property>
<name>fs.oss.endpoint</name>
<value>xxxxxx</value>
<description>Aliyun OSS endpoint to connect to.</description>
</property>
<property>
<name>fs.oss.accessKeyId</name>
<value>xxxxx</value>
<description>Aliyun access key ID</description>
</property>
<property>
<name>fs.oss.accessKeySecret</name>
<value>xxxxx</value>
<description>Aliyun access key secret</description>
</property>
<property>
<name>fs.oss.impl</name>
<value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
</property>
三、执行
spark-submit --master yarn --jars /home/t3cx/apps/hudi/hudi/hive-jdbc-2.1.1.jar,/home/t3cx/apps/hudi/hudi/hive-service-2.1.1.jar,/home/t3cx/apps/hudi/hudi/hadoop-aliyun-3.2.1.jar,/home/t3cx/apps/hudi/hudi/aliyun-sdk-oss-3.8.1.jar,/home/t3cx/apps/hudi/hudi/jdom-1.1.jar \
--name t_business \
--driver-memory 1G \
--num-executors 2 \
--executor-memory 1G \
--executor-cores 4 \
--deploy-mode cluster \
--conf spark.driver.userClassPathFirst=true \
--conf spark.yarn.executor.memoryOverhead=512 \
--conf spark.yarn.driver.memoryOverhead=512 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /home/t3cx/apps/hudi/hudi/hudi-utilities-bundle_2.11-0.6.0-SNAPSHOT.jar` \
--props hdfs://xxxxxx/hudi/config/t3_trip.t_business_kafka.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--target-base-path oss://...../business_cow \
--op UPSERT \
--target-table oss_business_cow\
--enable-hive-sync \
--table-type COPY_ON_WRITE \
--source-ordering-field create_time \
--source-limit 5000000
四、验证