查看原文
其他

使用Golang实现Spark UDF

字符串拼接工程师 Go开发大全 2021-01-31

(给Go开发大全加星标)


【导读】如何实现go语言写spark udf提交到spark任务运行?本文做了详细介绍

Spark非常适合处理大量数据。但是对于我们Go语言开发者来说并不友好,我们公司的人对Golang技术栈更熟悉。Spark的原生Scala技术栈,提供了Python API,我们对JVM了解不深入,就通过Databricks平台用了Python api和spark交互。这样可以满足大多数需求了。

但是如果你想写一个UDF处理复杂任务要怎么做?我们是可以用Python做编程、但如果用已有的go语言库达到了目的,那该多好!但是这样就要求我们把go代码封装到jar包里,用来加载文件到Spark任务的classpath。

本文记录了如何实现Golang写spark udf。注意以下操作也可以对UDAF使用。就是聚合的UDF。

为何做这个操作

你可能会质疑这个方案能否落地。结论是这个方案很好用也很有效。我们来看看我们的用例:在AWS S3存有大量以二进制格式存储的Protobuf格式数据,但不能原生地读取它们。更麻烦的是, 基于Protobuf格式,需要以最琐碎的方式保持代码仓库的最新状态是和Protobuf定义一致的,每次protobuf更新都要触发代码更新。

我们已经构建了一套经过生产验证的Go语言库用来处理这些数据。另外还维护了一些自动化工具,来让Protobuf和Go仓库保持同步。我们的spark任务当然也需要集成这些特点:

  • 在生产环境运行了个一段时间、经过验证和测试的GO语言仓库
  • 维护了一些自动化pipeline,让GO仓库和Schema保持同步

权衡下来,与其重新建立自动化pipeline和仓库、CI任务,我们选择把go代码打进jar包可以作为spark udf使用这个操作。

Spark UDF

spark很灵活,用户可以对任务做定制化达到任意处理数据的目的。有一种可重复利用的数据处理办法,就是用户定义函数(UDF)。它可以从Spark任务和Spark sql里直接被调用。我们是把定制化的二进制输入和Protobuf输入转换成Spark更容易处理的Json格式。

实现如下:

import pyspark.sql.functions as F
import pyspark.sql.types as T

spark.udf.registerJavaFunction("decodeContent""com.community.EventsUDF.Decoder")

# Define a schema for the JSON fields we want (you must customize)
event_schema = T.StructType([
  T.StructField("wrapper", T.StructType([
    T.StructField("id", T.StringType()),
    T.StructField("somefield", T.StringType()),
    T.StructField("timestamp", T.StringType()),
    T.StructField("anotherfield", T.StringType())
  ]))
])

events_df = (
  spark.read.format("binaryFile")
  .load(f"s3a://path_to_data/*")
  .withColumn("content", F.expr("decodeContent(content)").cast(T.StringType()))
)

parsed_json_df = (
  (events_df)
  .withColumn("json_data", F.from_json("event", event_schema))
  # Break out part of the JSON into a column
  .withColumn("type", F.col("json_data.yourjsonfield.type"))
)

# Continue working with the dataframe

数据转换成Json格式后,spark处理数据就变得容易许多,因为spark对protobuf的原生支持不太好。由于的库已经有了日期,就不需要在spark内部做数据对象的绑定了,只需要更新spark加载的UDF。

用Go构建Spark UDF

首先要写好go代码,让go封装起来可以被java调用。另外还要写一点java代码,封装Go程序供Spark调用。

go代码封装进java库

我们从Spark里取一个值然后返回一个值。这两个值的类型都是[]byte,这样对go和java都很容易处理。下面是我实现的spark_udf.go文件:


import (
"bytes"
"compress/gzip"
"io/ioutil"

"github.com/company/yourlib"
)

func Decode(data []byte) []byte {
zReader, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
// We return an error as the body!
return []byte(err.Error())
}

uncompressedData, err := ioutil.ReadAll(zReader)
if err != nil {
// We return an error as the body!
return []byte(err.Error())
}

output, _, _, err := yourlib.DoSomeWork(uncompressedData)

// Handle errors, etc

return bytes.Join(output, []byte("\n"))
}

写好go程序,下一步要把它打进jar包供其他java程序调用。gomobile这个项目为Go可以在Android中运行做了封装,其中有一些通用的工具可以拿来做Java和Go的交互的。gobind这个项目就可以用来生成Go module的java绑定,但是gobind用着不方便。

我们最后选了gojava这个工具把gobind做不到的事情处理好,虽然gojava这个开源工具开发贡献方面不活跃,它依然是一个非常好用的工具,命令如下:

JAVA_HOME=<your_java_home> \
        GO111MODULE=off \
        gojava -v -o `pwd`/eventdecoder.jar build github.com/<your_module_path>

上面的命令会生成eventdecoder.jar文件,包含着go代码的Java封装。在运行gojava前用go mod vendor命令检查依赖是否已经加好。

添加接口

到这里还没完,go代码在jar包里可没有提供给spark udf的接口。还要做一些事把go代码包一层。我们用Java实现,当然也可以用Scala实现。Spark SQL定义了 UDF1 到 UDF22 共22个类,UDF 最多支持22个输入参数。这里只用了一个输入,就用UDF1接口。java封装的实现如下:

package com.community.EventsUDF;
import org.apache.spark.sql.api.java.UDF1;
import go.eventdecoder.Eventdecoder;

public class Decoder implements UDF1<byte[], byte[]> {
        private static final long serialVersionUID = 1L;

        @Override
        public byte[] call(byte[] input) throws Exception {
    // Call our Go code
                return Eventdecoder.Decode(input);
        }
}

代码是按照Java项目布局写的路径,如果spark_udf.go在当前路径下,就要把上面的代码放到java/com/community/EventsUDF/Decoder.java这个相对路径下。注意在java源文件中的名称需要和这些代码和路径匹配得上。

组合

即将大功告成,但我们需要一个spark jar包用来做编译。我写了一个Makefile,它会下载spark jar包、存储到./spark_jars目录下。有了spark jar包后就可以编译Java代码了:

javac -cp \
 spark_jars/spark-core_$(VERSION).jar:eventdecoder.jar:spark_jars/spark-sql_$(VERSION).jar \
 java/com/community/EventsUDF/Decoder.java

然后就可以用打好的jar包了:

cd java && jar uf ../eventdecoder.jar com/community/EventsUDF/*.class

上面的命令会上传jar包文件、嵌入我们的类里。你也可以把它加载到spark中,使用方式如文章开头所示。

注意

注意,你的编码和配置需要和你的Spark任务运行环境匹配。Spark可能是通过Scala执行程序,但是我们这些实现都是打进Java的jar包的,里面的代码还可能依然是原生的二进制的。也许你能通过配置GOOSGOARCH实现,但对于我们来说只能用gojava。我们编译的UDF在Databricks平台的Ubuntu 18.04版本、Go 1.14上运行。

另一个坑是Go的package不能有下划线,否则Java的class loader就不能正常运行了。

用Makefile让打包方便

上面有很多重复的操作,可以把这些对UDF打包的步骤集成到Makefile。这样执行一次命令就可以完成所有操作了。下面展示了我的Makefile,其中把所有Spark jar包都存到了S3存储,这样从任何地方都可以拉包了,如果需要Java的编译工具可以自取。

我写的Makefile如下,可以适合你的场景下改改用。

# Makefile to build and test the eventdecoder.jar containing the
# Apache Spark UDF for decoding the events files in the Community
# event store.
SCALA_VERSION := 2.11
SPARK_VERSION := 2.4.5
VERSION := $(SCALA_VERSION)-$(SPARK_VERSION)
BUCKET_PATH ?= somewhere-in-s3/spark
JAVA_HOME ?= /usr/lib/jvm/java-8-openjdk-amd64
TEMPFILE := $(shell mktemp)

all: ../vendor udf

../vendor:
 go mod vendor

.PHONY: gojava
gojava:
 go get -u github.com/sridharv/gojava
 go install github.com/sridharv/gojava

eventdecoder.jar: gojava
 JAVA_HOME=$(JAVA_HOME) \
  GO111MODULE=off \
  gojava -v -o `pwd`/eventdecoder.jar build github.com/my-package/eventdecoder

spark_jars/spark-sql_$(VERSION).jar:
 mkdir -p spark_jars
 aws s3 cp s3://$(BUCKET_PATH)/spark-sql_$(VERSION).jar spark_jars

spark_jars/spark-core_$(VERSION).jar:
 mkdir -p spark_jars
 aws s3 cp s3://$(BUCKET_PATH)/spark-core_$(VERSION).jar spark_jars

spark-binaries: spark_jars/spark-core_$(VERSION).jar spark_jars/spark-sql_$(VERSION).jar

# Build the UDF code and insert into the jar
udf: spark-binaries eventdecoder.jar
 javac -cp spark_jars/spark-core_$(VERSION).jar:eventdecoder.jar:spark_jars/spark-sql_$(VERSION).jar java/com/community/EventsUDF/Decoder.java
 cd java && jar uf ../eventdecoder.jar com/community/EventsUDF/*.class

.PHONY: clean
clean:
 rm -f eventdecoder.jar

总结

虽然看起来这个需求很怪,但结果这个实现有很多人需要。这个go库能提升不少效率。希望对你有用。


 - EOF -

推荐阅读(点击标题可打开)

1、Etcd 源码分析:Raft 实现

2、Golang 新手可能会踩的 50 个坑

3、在生产环境用了一年 k8s 的经验教训


Go 开发大全

参与维护一个非常全面的Go开源技术资源库。日常分享 Go, 云原生、k8s、Docker和微服务方面的技术文章和行业动态。

关注后获取

回复 Go 获取6万star的Go资源库



分享、点赞和在看

支持我们分享更多好文章,谢谢!


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存