大数据 | Spark机器学习工作流开发指南
点击蓝字关注我
Spark.ml是在Spark 1.2开始引入的一个包,它旨在提供一套统一的高级API,帮助用户创建和优化实用的机器学习工作流,它在原来的MLlib的基础上进行了大量的改进和优化,让Spark生态更见坚不可摧,本文就来详细介绍一下Spark机器学习工作流的基本概念和用法。
—▼—
QQ群:164660119;QQ号:498073774;
前言
之前我曾写过一篇介绍Spark集群搭建和基本使用的文章,在文中详细的介绍了Spark的来历、优势及搭建过程,Spark以其低时延、速度快、通用性强等优势在大数据处理领域备受欢迎,但是它的强大之处绝不仅仅是因为强大的分布式计算能力。如果你仅仅想借助Spark的并行计算能力,那样我觉得大可不必费尽周折去搭建Spark和HDFS,可以直接借助多线程实现数据的并行处理和读取。
Spark之所以如此强大还得益于它完善的生态系统,它在仓储工具、机器学习等领域也非常完备,这让它能够进一步在大数据上进行一些查找、训练等工作,而不仅仅是作为一个数据读取工具使用。
在Spark 1.2之前它携带的机器学习库是MLlib,它包含了聚类、逻辑回归、SVM等常用的机器学习算法,因此,它也得到了广泛的应用。但是,在机器学习工作流、数据格式、性能方面有所限制,因此,在Spark 1.2之后加入了Spark.ml机器学习库,这是在MLlib之上更高阶的API,它不仅面向DataFrame,在RDD基础上进一步封装,提供更强大更方便的API。另外,比较重要的一点是,它引入了工作流(Pipeline)的概念,将多个单独的算法组合成一个统一的流水线形式,便于实现复杂的机器学习模型,本文就来详细介绍一下Spark.ml机器学习工作流的一些专业术语和基本使用方法。
Spark.ml简介
Spark.ml是在Spark 1.2开始引入的一个包,它旨在提供一套统一的高级API,帮助用户创建和优化实用的机器学习工作流(machine learning pipelines)。
一个完整的机器学习工作流包括多个阶段,
数据预处理
特征抽取
模型训练
交叉验证
......
spark.ml对机器学习算法的API进行了标准化,使得更加容易的将多个算法组合成单个工作流。
在spark.ml中引入了一些关键的专业术语,它们分别是,
ML Dataset:spark.ml不像MLlib那样使用RDD作为数据输入,而是使用更加高阶的DataFrame,它相对于RDD包含了schema信息,它可以支持多种不同的数据类型,例如,文本、特征向量、真实标签、预测标签。
Transformer:它可以被翻译为转换器,是一个工作流阶段(PipelineStage),它的作用是将一个DataFrame转换成另一个DataFrame,从技术上讲Transformer实现了transform()方法。转换器是一种抽象,它包括特征转换器和机器学习模型。
Estimator:它可以被译为评估器,作用是将输入的DataFrame转化成Transformer,在技术上它实现了fit()方法。例如,一个机器学习算法就是Estimator,它的输入为数据集,输出是机器学习模型,这个机器学习模型就是一个Transformer,它能够将一个数据转换成另一个数据。
Pipeline:可以翻译为工作流或者管道,个人认为反义词工作流更加贴切、容易理解。它把多个工作流阶段连接起来,组成一个完整的工作流。例如,简单的文本文档处理工作流可能包括如下几个阶段,
将每个文档的文本拆分为单词。
将每个文档的单词转换成数字特征向量。
使用特征向量和标签学习预测模型。
spark.ml将这样的工作流表示为Pipeline,它按特定的顺序运行一系列Pipeline(Transformer和Estimator)组成。
Param:Param被用来设置 Transformer 或者 Estimator 的参数。
如何运行?
工作流(Pipeline)是只由多个工作阶段组成的序列,每个阶段可以是一个Transformer或者Estimator,这些阶段按照顺序运行,并且输入的数据集在每个阶段时都会被修改。
在Transformer阶段,在数据集上调用transform()方法,在Estimator阶段,调用fit()方法以生成一个Transformer(它将成为PipelineModel或者拟合Pipeline的一部分), 并且在数据集上调用该Transformer的transform()方法。
下图是工作流训练时的使用情况,用来说明简单的文本文档处理工作流程,
在上图中,第一行代表了三个阶段的工作流,前两个(Tokenizer和HashingTF)是转换器(蓝色框),第三个阶段(Logistic Regression)是评估器(红色框)。
第二行代表了流经工作流的数据情况,其中圆柱体代表DataFrame。Tokenizer.transform()方法将原始文本文档(Raw text)拆分为单词,在数据集中添加带有单词的新列。HashingTF.transform()方法将words列转换为特征向量,并将带有这些向量的新列添加到数据集。然后,由于LogisticRegression是Estimator,因此管道首先调用LogisticRegression.fit()来生成LogisticRegressionModel。如果管道有更多阶段,则在将数据集传递到下一阶段之前,将在数据集中调用LogisticRegressionModel的transform()方法。
经过上述工作流,将产生一个PipelineModel,它是一个Transformer,这个模型在测试数据上使用的工作流如下,
在上图中,PipelineModel和原始Pipeline阶段数相同,但是原始Pipeline中的所有Estimator都已变为Transformer。在测试数据集上调用PipelineModel的transform()方法时,数据将按顺序传递给工作流。每个阶段的transform()方法都会更新数据集,并将其传递到下一个阶段。
Spark.ML实战
下面就以一个简单的实例来构造一个逻辑回归(LogisticRegression)工作流,来加深一下对上述流程的理解和印象。
在spark.ml实战中假设这样一个场景,一个句子(sentence)中,如果包含单词"flower",那么它的标签为1,否则为0。
导入模块
从前面讲解中我们可以清楚的知道,在构造这个工作流中,我们会用到Tokenizer、HashingTF、DataFrame、LogisticRegression、Pipeline,我们在程序开头先导入这些模块,
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import HashingTF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
构造数据集
了解了示例背景,下面我们分别来构造训练集DataFrame和测试集DataFrame,
# 训练集
TRAINING = spark.createDataFrame([
(0, "one two three", 0.0),
(1, "flower Kafka Hadoop Flink", 1.0),
(2, "a b c d e", 0.0),
(3, "hello flower", 1.0),
(4, "this is flower", 1.0)
], ["id", "sentence", "label"])
# 测试集
TEST = spark.createDataFrame([
(5, "flower hadoop"),
(6, "a b m n k"),
(7, "one two"),
(8, "flower is good"),
(9, "1 2 3 flower")
], ["id", "sentence"])
构建Pipeline
我们首先要来构造上面第一个图中的Pipeline(Estimator),在构建这个工作流时我们需要用到Tokenizer、HashingTF进行分词和抽取特征,然后调用逻辑回归算法,生成PipelineModel,
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=20, regParam=0.001)
然后通过Pipeline把上述三个阶段连接起来,构成工作流,
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(TRAINING)
print(model)
# 输出
PipelineModel_kl091lz1a
可以看出,上面输出一个PipelineModel。
构建PipelineModel
下面要完成的就是上面第二幅图中的工作流,在测试集上面首先调用PipelineModel,然后通过多个阶段的数据传递,最终输出预测结果,
prediction = model.transform(TEST)
selected = prediction.select("id", "sentence", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print("({}, {}) --> prob={}, prediction={}".format(rid, text, str(prob), prediction))
# 输出
(5, flower hadoop) --> prob=[0.00764555307059,0.992354446929], prediction=1.0
(6, a b m n k) --> prob=[0.899006169016,0.100993830984], prediction=0.0
(7, one two) --> prob=[0.978454225878,0.021545774122], prediction=0.0
(8, flower is good) --> prob=[0.00534341313327,0.994656586867], prediction=1.0
(9, 1 2 3 flower) --> prob=[0.0210414429823,0.978958557018], prediction=1.0
上述就是基本的Spark.ml应用实例。
▲
END
长按扫码可关注