查看原文
其他

实战!使用Apache Hudi DeltaStreamer将数据流写入OSS

ApacheHudi 2022-04-23

The following article is from Flink Author vinoyang

一、首先从 https://github.com/apache/incubator-hudi.git 将hudi clone到自己本地idea使用clean install -DskipTests -DskipITs -Dcheckstyle.skip=true进行编译注意:1、目前hudi使用的是hadoop2.7.3版本,如果使用hadoop3.x版本请使用hadoop3.0.0编译,; 注意:目前写入oss功能仅在hudi最新0.6.0分支

  1. clean install -DskipTests -DskipITs -Dcheckstyle.skip=true -Drat.skip=true -Dhadoop.version=3.0.0

需要的jar包:

1.jdom-1.1.jar 2.hadoop-aliyun-3.2.1.jar 3.aliyun-sdk-oss-3.8.1.jar 4.hudi-utilities-bundle_2.11-0.6.0-SNAPSHOT.jar 5.hive-jdbc-2.1.1.jar (同步hive所需) 6.hive-service-2.1.1.jar (同步hive所需)

二、修改core.site.xml 文件

1、将以下下配置添加进core-site.xml,可以选择hadoop安装目录下core-site.xml也可以新建此配置文件将其打进jar包 2、或者将以下配置复制进hadoop下core-site.xml

  1. <property>

  2. <name>fs.oss.endpoint</name>

  3. <value>xxxxxx</value>

  4. <description>Aliyun OSS endpoint to connect to.</description>

  5. </property>


  6. <property>

  7. <name>fs.oss.accessKeyId</name>

  8. <value>xxxxx</value>

  9. <description>Aliyun access key ID</description>

  10. </property>


  11. <property>

  12. <name>fs.oss.accessKeySecret</name>

  13. <value>xxxxx</value>

  14. <description>Aliyun access key secret</description>

  15. </property>


  16. <property>

  17. <name>fs.oss.impl</name>

  18. <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>

  19. </property>

三、执行

  1. spark-submit --master yarn --jars /home/t3cx/apps/hudi/hudi/hive-jdbc-2.1.1.jar,/home/t3cx/apps/hudi/hudi/hive-service-2.1.1.jar,/home/t3cx/apps/hudi/hudi/hadoop-aliyun-3.2.1.jar,/home/t3cx/apps/hudi/hudi/aliyun-sdk-oss-3.8.1.jar,/home/t3cx/apps/hudi/hudi/jdom-1.1.jar \

  2. --name t_business \

  3. --driver-memory 1G \

  4. --num-executors 2 \

  5. --executor-memory 1G \

  6. --executor-cores 4 \

  7. --deploy-mode cluster \

  8. --conf spark.driver.userClassPathFirst=true \

  9. --conf spark.yarn.executor.memoryOverhead=512 \

  10. --conf spark.yarn.driver.memoryOverhead=512 \

  11. --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /home/t3cx/apps/hudi/hudi/hudi-utilities-bundle_2.11-0.6.0-SNAPSHOT.jar` \

  12. --props hdfs://xxxxxx/hudi/config/t3_trip.t_business_kafka.properties \

  13. --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \

  14. --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \

  15. --target-base-path oss://...../business_cow \

  16. --op UPSERT \

  17. --target-table oss_business_cow\

  18. --enable-hive-sync \

  19. --table-type COPY_ON_WRITE \

  20. --source-ordering-field create_time \

  21. --source-limit 5000000

四、验证





您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存