无缝对接Spark与R:Sparklyr系列—探讨属于数据科学家的Spark
作者:黄天元,复旦大学博士在读,目前研究涉及文本挖掘、社交网络分析和机器学习等。希望与大家分享学习经验,推广并加深R语言在业界的应用。
邮箱:huang.tian-yuan@qq.com
为什么Spark需要与R对接?
解决这个问题,也许应该先问:为什么要学习Spark?对于数据科学家而言,Spark只是一个工具,我们大可绕过Hadoop生态系统和RDD之类的基本概念识,直截了当地回答:成本低、速度快。也就是企业里面的数据很多是利用分布式架构存储的,可以连接多个低性能的设备来快速存储、调度和计算数据,而能够实现这些功能的软件框架,就是Spark。
然后再问一个问题:为什么要学习R?R语言是一门完全面向数据科学家的语言,它的设计之初就是面向统计与图形表达。也正因如此,以计算机起家的做机器学习的数据工程师可能会偏向Python,这样能够更加方便地设计一些应用程序。但是对于数据本身而言,特别是创新型探索性数据分析方面,R有得天独厚的优势。应该说,Python是属于工程师的,而R属于科学家。
那么如果要在大规模数据集中进行探索性数据分析,怎么办?目前很多企业招聘的要求都是大家会Spark,但是会Spark是个什么概念呢?是要会Scala?要能够开发Spark程序?对于数据科学家而言,显然不是。我们要专注的就是,公司目前存在什么问题?如何量化这些问题?如何进行预测?结果如何解释?如果推广模型?至于底层的问题,不用特别深究。如果能够让R与Spark对接,可以大大提高数据科学家在解决大数据问题的工作效率,让数据科学家专注业务和数据支撑的实务,更好地解决如何为企业提供价值的根本问题。
为什么学习Sparklyr?
R × Spark = Sparklyr.目前的框架中,Rstudio的Sparklyr是最优秀的,能够熟练利用dplyr进行ETL的数据科学家,甚至可以摈弃传统的SQL,走向可重复性、可追溯性极强的ETL程序编写。目前版本已经推出到0.9.2,还在持续更新中,大家可以参考官网https://spark.rstudio.com.
有了这个利器,今后的数据科学家可以利用高效的R语言工具在Spark框架下轻松调度数据,并在Spark中进行复杂的机器学习和图运算。而且有了Extention之后,R用户能够对Spark更加细化的功能进行实现。
配置
首先这里是针对个人初学者的配置,也就是自己的学习。如果有企业愿意提供云计算的环境,请直接联系我。目前支持这个框架的云我用过的是Databricks,不过因为社区版非常不稳定,装包也慢,不是特别推荐(不过实在本地条件不佳,也可以用一下,官网有成套的介绍)。 要在R中使用Sparklyr,首先计算机中应该有Spark。如果没有,也没关系,Sparklyr中可以全自动安装,不过这样的话计算机应该安装有相应版本的Java。有的同学已经有了Java,整个过程就无比顺畅;有的同学什么也没有,可能要多花一点时间。但是只要你足够想要学习,相信完全不是问题。
#install.packages("pacman") #包管理的模块
library(pacman)
p_load(sparklyr,dplyr)
#spark_install(version="2.1.0")
#第一次安装而没有Spark的情况需要运行,可以通过参数version选择版本.
#因为我电脑版本的Java没有更新,所以采用2.1.0的Spark.默认安装最新版本.
数据导入
首先需要连接Spark.
#连接Spark
spark_connect(master = "local",
version = "2.1.0") -> sc
然后,让我们放一点数据到Spark中。sparklyr和dplyr都有可以把R环境中数据框变量放进Spark的方法,我们都展示了一下。这里用base里面自带的两个数据集:cars和iris.
sc %>% sdf_copy_to(cars) -> sc_cars #用了sparklyr的方法
sc %>% copy_to(iris) -> sc_iris #用了dplyr的方法
注意我们这里其实完成了两个步骤,一是把数据集放进了Spark的集群中,二是把这个数据集给了sc_cars和sc_iris两个变量,这样一来我们可以通过访问sc_*变量来直接对Spark里面的变量进行操作。 如果没有在命名的时候直接赋值,也没有关系,sparklyr中有函数能够直接取出已经放进Spark中的数据,如下所示:
tbl(sc,"cars") -> cars_tbl
这里的cars_tbl等价于之前的sc_cars.
下面,让我们查看一下在Spark中的数据集:
src_tbls(sc)
## [1] "cars" "iris"
可以看到我们现在有两个数据集在里面,分别命名为“cars”和“iris”.如果想要使用外部数据,可以采用spark_read_*系列函数,能够在Spark中导入各类格式的数据。 最后,需要注意的是,不能多次把同名变量放进Spark,否则会报错。如果想要覆盖之前的数据,请在copy_to/sdf_copy_to函数的参数中补充overwrite=T.
ETL
Extract,transfer,load,让我们在sparklyr中进行数据作业的调度。 说到ETL,大家可能首先想到的就是SQL。sparklyr是支持SQL语言来调数据的,例子如下:
p_load(DBI)
iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
iris_preview
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1 5.1 3.5 1.4 0.2 setosa
## 2 4.9 3.0 1.4 0.2 setosa
## 3 4.7 3.2 1.3 0.2 setosa
## 4 4.6 3.1 1.5 0.2 setosa
## 5 5.0 3.6 1.4 0.2 setosa
## 6 5.4 3.9 1.7 0.4 setosa
## 7 4.6 3.4 1.4 0.3 setosa
## 8 5.0 3.4 1.5 0.2 setosa
## 9 4.4 2.9 1.4 0.2 setosa
## 10 4.9 3.1 1.5 0.1 setosa
这样我们就提取了iris前十行的数据。不过,实话说,我学dplyr在SQL之前,会了dplyr之后真的觉得SQL可读性不是特别强,不过还是先用base把上面的再实现一次,因为需要秀一波如何把R的语句反转为SQL语句:
p_load(dbplyr)
sc_iris %>%
head(10) -> get_first_10
get_first_10
## # Source: spark<?> [?? x 5]
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## * <dbl> <dbl> <dbl> <dbl> <chr>
## 1 5.1 3.5 1.4 0.2 setosa
## 2 4.9 3 1.4 0.2 setosa
## 3 4.7 3.2 1.3 0.2 setosa
## 4 4.6 3.1 1.5 0.2 setosa
## 5 5 3.6 1.4 0.2 setosa
## 6 5.4 3.9 1.7 0.4 setosa
## 7 4.6 3.4 1.4 0.3 setosa
## 8 5 3.4 1.5 0.2 setosa
## 9 4.4 2.9 1.4 0.2 setosa
## 10 4.9 3.1 1.5 0.1 setosa
## # ... with more rows
sql_render(get_first_10)
## <SQL> SELECT *
## FROM `iris`
## LIMIT 10
也就是说,我们在R里面实现的数据操纵,能够变成SQL语言。如果你不幸只会dplyr甚至是base-r,只要有了这个函数,就能够把自己的语言“翻译”成SQL。这样一来你也可以用SQL来找数据,甚至可以教只会SQL的ETL工程师怎么来写他们的SQL.是不是有点太神了?
只是取一个前10列也许不足以说服你,那么我们来一个复杂的例子,分组求Sepal.Length的最大值、平均值、最小值:
sc_iris %>%
group_by(Species) %>%
summarise(max_sl=max(Sepal_Length,na.rm = T),
mean_sl=mean(Sepal_Length,na.rm = T),
min_sl=min(Sepal_Length,na.rm = T)) -> group_max_min_avg
group_max_min_avg
## # Source: spark<?> [?? x 4]
## Species max_sl mean_sl min_sl
## * <chr> <dbl> <dbl> <dbl>
## 1 versicolor 7 5.94 4.9
## 2 virginica 7.9 6.59 4.9
## 3 setosa 5.8 5.01 4.3
sql_render(group_max_min_avg)
## <SQL> SELECT `Species`, MAX(`Sepal_Length`) AS `max_sl`, AVG(`Sepal_Length`) AS `mean_sl`, MIN(`Sepal_Length`) AS `min_sl`
## FROM `iris`
## GROUP BY `Species`
注意数据集中有缺失值,所以使用了na.rm=T来忽略它。dplyr能够完成非常复杂的数据调度任务,而它的语法结构简洁而明晰,方便交流,如今还能够与SQL语句对接,前途不可限量!在Spark中能够使用dplyr乃是Spark用户的一大福音。
线性回归
这一节不是为了介绍机器学习,而是想给大家讲解R与Spark的关系,所以直接引用了官网上面的一个例子。不过需要注意的是,直接用R的lm肯定是无法正常进行回归的,需要用到sparklyr专门的函数:ml_linear_regression
lm_model <- sc_iris %>%
ml_linear_regression(Petal_Length ~ Petal_Width)
summary(lm_model)
## Deviance Residuals:
## Min 1Q Median 3Q Max
## -1.33542 -0.30347 -0.02955 0.25776 1.39453
##
## Coefficients:
## (Intercept) Petal_Width
## 1.083558 2.229940
##
## R-Squared: 0.9271
## Root Mean Squared Error: 0.475
下面要解释的是画图:
p_load(ggplot2)
sc_iris %>%
select(Petal_Width, Petal_Length) %>%
collect %>%
ggplot(aes(Petal_Length, Petal_Width)) +
geom_point(aes(Petal_Width, Petal_Length), size = 2, alpha = 0.5) +
geom_abline(aes(slope = coef(lm_model)[["Petal_Width"]],
intercept = coef(lm_model)[["(Intercept)"]]),
color = "red") +
labs(
x = "Petal Width",
y = "Petal Length",
title = "Linear Regression: Petal Length ~ Petal Width",
subtitle = "Use Spark.ML linear regression to predict petal length as a function of petal width."
)
Spark本身是不具备强大的图形表达功能的,因此这个东西肯定要在R里面做。把Spark的计算结果放到R里面,有一个重要的通道,没错,就是collect.只要collect之后,spark对象就正式转化为R中的对象。我们所有操作里面,sc/sc_cars/sc_iris都是spark的对象,我们仅仅是可以使用sparklyr的函数直接进行操作而已,在深层次其实语言会先变化为Spark能够“听得懂”的语言,然后进行操作。但是Spark在存储、运算的优势是针对大数据而言的,我们把大数据高度概括之后的图形表示,用ggplot2是再好不过的了。
大家都在看
公众号后台回复关键字即可学习
回复 爬虫 爬虫三大案例实战
回复 Python 1小时破冰入门回复 数据挖掘 R语言入门及数据挖掘
回复 人工智能 三个月入门人工智能
回复 数据分析师 数据分析师成长之路
回复 机器学习 机器学习的商业应用
回复 数据科学 数据科学实战
回复 常用算法 常用数据挖掘算法