使用Golang实现Spark UDF
(给Go开发大全
加星标)
【导读】如何实现go语言写spark udf提交到spark任务运行?本文做了详细介绍
Spark非常适合处理大量数据。但是对于我们Go语言开发者来说并不友好,我们公司的人对Golang技术栈更熟悉。Spark的原生Scala技术栈,提供了Python API,我们对JVM了解不深入,就通过Databricks平台用了Python api和spark交互。这样可以满足大多数需求了。
但是如果你想写一个UDF处理复杂任务要怎么做?我们是可以用Python做编程、但如果用已有的go语言库达到了目的,那该多好!但是这样就要求我们把go代码封装到jar包里,用来加载文件到Spark任务的classpath。
本文记录了如何实现Golang写spark udf。注意以下操作也可以对UDAF使用。就是聚合的UDF。
为何做这个操作
你可能会质疑这个方案能否落地。结论是这个方案很好用也很有效。我们来看看我们的用例:在AWS S3存有大量以二进制格式存储的Protobuf格式数据,但不能原生地读取它们。更麻烦的是, 基于Protobuf格式,需要以最琐碎的方式保持代码仓库的最新状态是和Protobuf定义一致的,每次protobuf更新都要触发代码更新。
我们已经构建了一套经过生产验证的Go语言库用来处理这些数据。另外还维护了一些自动化工具,来让Protobuf和Go仓库保持同步。我们的spark任务当然也需要集成这些特点:
在生产环境运行了个一段时间、经过验证和测试的GO语言仓库 维护了一些自动化pipeline,让GO仓库和Schema保持同步
权衡下来,与其重新建立自动化pipeline和仓库、CI任务,我们选择把go代码打进jar包可以作为spark udf使用这个操作。
Spark UDF
spark很灵活,用户可以对任务做定制化达到任意处理数据的目的。有一种可重复利用的数据处理办法,就是用户定义函数(UDF)。它可以从Spark任务和Spark sql里直接被调用。我们是把定制化的二进制输入和Protobuf输入转换成Spark更容易处理的Json格式。
实现如下:
import pyspark.sql.functions as F
import pyspark.sql.types as T
spark.udf.registerJavaFunction("decodeContent", "com.community.EventsUDF.Decoder")
# Define a schema for the JSON fields we want (you must customize)
event_schema = T.StructType([
T.StructField("wrapper", T.StructType([
T.StructField("id", T.StringType()),
T.StructField("somefield", T.StringType()),
T.StructField("timestamp", T.StringType()),
T.StructField("anotherfield", T.StringType())
]))
])
events_df = (
spark.read.format("binaryFile")
.load(f"s3a://path_to_data/*")
.withColumn("content", F.expr("decodeContent(content)").cast(T.StringType()))
)
parsed_json_df = (
(events_df)
.withColumn("json_data", F.from_json("event", event_schema))
# Break out part of the JSON into a column
.withColumn("type", F.col("json_data.yourjsonfield.type"))
)
# Continue working with the dataframe
数据转换成Json格式后,spark处理数据就变得容易许多,因为spark对protobuf的原生支持不太好。由于的库已经有了日期,就不需要在spark内部做数据对象的绑定了,只需要更新spark加载的UDF。
用Go构建Spark UDF
首先要写好go代码,让go封装起来可以被java调用。另外还要写一点java代码,封装Go程序供Spark调用。
go代码封装进java库
我们从Spark里取一个值然后返回一个值。这两个值的类型都是[]byte,这样对go和java都很容易处理。下面是我实现的spark_udf.go
文件:
import (
"bytes"
"compress/gzip"
"io/ioutil"
"github.com/company/yourlib"
)
func Decode(data []byte) []byte {
zReader, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
// We return an error as the body!
return []byte(err.Error())
}
uncompressedData, err := ioutil.ReadAll(zReader)
if err != nil {
// We return an error as the body!
return []byte(err.Error())
}
output, _, _, err := yourlib.DoSomeWork(uncompressedData)
// Handle errors, etc
return bytes.Join(output, []byte("\n"))
}
写好go程序,下一步要把它打进jar包供其他java程序调用。gomobile
这个项目为Go可以在Android中运行做了封装,其中有一些通用的工具可以拿来做Java和Go的交互的。gobind
这个项目就可以用来生成Go module的java绑定,但是gobind用着不方便。
我们最后选了gojava
这个工具把gobind做不到的事情处理好,虽然gojava这个开源工具开发贡献方面不活跃,它依然是一个非常好用的工具,命令如下:
JAVA_HOME=<your_java_home> \
GO111MODULE=off \
gojava -v -o `pwd`/eventdecoder.jar build github.com/<your_module_path>
上面的命令会生成eventdecoder.jar
文件,包含着go代码的Java封装。在运行gojava
前用go mod vendor
命令检查依赖是否已经加好。
添加接口
到这里还没完,go代码在jar包里可没有提供给spark udf的接口。还要做一些事把go代码包一层。我们用Java实现,当然也可以用Scala实现。Spark SQL定义了 UDF1 到 UDF22 共22个类,UDF 最多支持22个输入参数。这里只用了一个输入,就用UDF1
接口。java封装的实现如下:
package com.community.EventsUDF;
import org.apache.spark.sql.api.java.UDF1;
import go.eventdecoder.Eventdecoder;
public class Decoder implements UDF1<byte[], byte[]> {
private static final long serialVersionUID = 1L;
@Override
public byte[] call(byte[] input) throws Exception {
// Call our Go code
return Eventdecoder.Decode(input);
}
}
代码是按照Java项目布局写的路径,如果spark_udf.go
在当前路径下,就要把上面的代码放到java/com/community/EventsUDF/Decoder.java
这个相对路径下。注意在java源文件中的名称需要和这些代码和路径匹配得上。
组合
即将大功告成,但我们需要一个spark jar包用来做编译。我写了一个Makefile
,它会下载spark jar包、存储到./spark_jars
目录下。有了spark jar包后就可以编译Java代码了:
javac -cp \
spark_jars/spark-core_$(VERSION).jar:eventdecoder.jar:spark_jars/spark-sql_$(VERSION).jar \
java/com/community/EventsUDF/Decoder.java
然后就可以用打好的jar包了:
cd java && jar uf ../eventdecoder.jar com/community/EventsUDF/*.class
上面的命令会上传jar包文件、嵌入我们的类里。你也可以把它加载到spark中,使用方式如文章开头所示。
注意
注意,你的编码和配置需要和你的Spark任务运行环境匹配。Spark可能是通过Scala执行程序,但是我们这些实现都是打进Java的jar包的,里面的代码还可能依然是原生的二进制的。也许你能通过配置GOOS
和GOARCH
实现,但对于我们来说只能用gojava。我们编译的UDF在Databricks平台的Ubuntu 18.04版本、Go 1.14上运行。
另一个坑是Go的package不能有下划线,否则Java的class loader就不能正常运行了。
用Makefile让打包方便
上面有很多重复的操作,可以把这些对UDF打包的步骤集成到Makefile
。这样执行一次命令就可以完成所有操作了。下面展示了我的Makefile,其中把所有Spark jar包都存到了S3存储,这样从任何地方都可以拉包了,如果需要Java的编译工具可以自取。
我写的Makefile
如下,可以在适合你的场景下改改用。
# Makefile to build and test the eventdecoder.jar containing the
# Apache Spark UDF for decoding the events files in the Community
# event store.
SCALA_VERSION := 2.11
SPARK_VERSION := 2.4.5
VERSION := $(SCALA_VERSION)-$(SPARK_VERSION)
BUCKET_PATH ?= somewhere-in-s3/spark
JAVA_HOME ?= /usr/lib/jvm/java-8-openjdk-amd64
TEMPFILE := $(shell mktemp)
all: ../vendor udf
../vendor:
go mod vendor
.PHONY: gojava
gojava:
go get -u github.com/sridharv/gojava
go install github.com/sridharv/gojava
eventdecoder.jar: gojava
JAVA_HOME=$(JAVA_HOME) \
GO111MODULE=off \
gojava -v -o `pwd`/eventdecoder.jar build github.com/my-package/eventdecoder
spark_jars/spark-sql_$(VERSION).jar:
mkdir -p spark_jars
aws s3 cp s3://$(BUCKET_PATH)/spark-sql_$(VERSION).jar spark_jars
spark_jars/spark-core_$(VERSION).jar:
mkdir -p spark_jars
aws s3 cp s3://$(BUCKET_PATH)/spark-core_$(VERSION).jar spark_jars
spark-binaries: spark_jars/spark-core_$(VERSION).jar spark_jars/spark-sql_$(VERSION).jar
# Build the UDF code and insert into the jar
udf: spark-binaries eventdecoder.jar
javac -cp spark_jars/spark-core_$(VERSION).jar:eventdecoder.jar:spark_jars/spark-sql_$(VERSION).jar java/com/community/EventsUDF/Decoder.java
cd java && jar uf ../eventdecoder.jar com/community/EventsUDF/*.class
.PHONY: clean
clean:
rm -f eventdecoder.jar
总结
虽然看起来这个需求很怪,但结果这个实现有很多人需要。这个go库能提升不少效率。希望对你有用。
- EOF -
Go 开发大全
参与维护一个非常全面的Go开源技术资源库。日常分享 Go, 云原生、k8s、Docker和微服务方面的技术文章和行业动态。
关注后获取
回复 Go 获取6万star的Go资源库
分享、点赞和在看
支持我们分享更多好文章,谢谢!