查看原文
其他

无缝对接Spark与R:Sparklyr系列—探讨属于数据科学家的Spark

黄天元 R语言中文社区 2019-04-22

作者黄天元,复旦大学博士在读,目前研究涉及文本挖掘、社交网络分析和机器学习等。希望与大家分享学习经验,推广并加深R语言在业界的应用。

邮箱:huang.tian-yuan@qq.com


为什么Spark需要与R对接?

解决这个问题,也许应该先问:为什么要学习Spark?对于数据科学家而言,Spark只是一个工具,我们大可绕过Hadoop生态系统和RDD之类的基本概念识,直截了当地回答:成本低、速度快。也就是企业里面的数据很多是利用分布式架构存储的,可以连接多个低性能的设备来快速存储、调度和计算数据,而能够实现这些功能的软件框架,就是Spark。
然后再问一个问题:为什么要学习R?R语言是一门完全面向数据科学家的语言,它的设计之初就是面向统计与图形表达。也正因如此,以计算机起家的做机器学习的数据工程师可能会偏向Python,这样能够更加方便地设计一些应用程序。但是对于数据本身而言,特别是创新型探索性数据分析方面,R有得天独厚的优势。应该说,Python是属于工程师的,而R属于科学家。
那么如果要在大规模数据集中进行探索性数据分析,怎么办?目前很多企业招聘的要求都是大家会Spark,但是会Spark是个什么概念呢?是要会Scala?要能够开发Spark程序?对于数据科学家而言,显然不是。我们要专注的就是,公司目前存在什么问题?如何量化这些问题?如何进行预测?结果如何解释?如果推广模型?至于底层的问题,不用特别深究。如果能够让R与Spark对接,可以大大提高数据科学家在解决大数据问题的工作效率,让数据科学家专注业务和数据支撑的实务,更好地解决如何为企业提供价值的根本问题。

为什么学习Sparklyr?

R × Spark = Sparklyr.目前的框架中,Rstudio的Sparklyr是最优秀的,能够熟练利用dplyr进行ETL的数据科学家,甚至可以摈弃传统的SQL,走向可重复性、可追溯性极强的ETL程序编写。目前版本已经推出到0.9.2,还在持续更新中,大家可以参考官网https://spark.rstudio.com.
有了这个利器,今后的数据科学家可以利用高效的R语言工具在Spark框架下轻松调度数据,并在Spark中进行复杂的机器学习和图运算。而且有了Extention之后,R用户能够对Spark更加细化的功能进行实现。


配置

首先这里是针对个人初学者的配置,也就是自己的学习。如果有企业愿意提供云计算的环境,请直接联系我。目前支持这个框架的云我用过的是Databricks,不过因为社区版非常不稳定,装包也慢,不是特别推荐(不过实在本地条件不佳,也可以用一下,官网有成套的介绍)。 要在R中使用Sparklyr,首先计算机中应该有Spark。如果没有,也没关系,Sparklyr中可以全自动安装,不过这样的话计算机应该安装有相应版本的Java。有的同学已经有了Java,整个过程就无比顺畅;有的同学什么也没有,可能要多花一点时间。但是只要你足够想要学习,相信完全不是问题。

#install.packages("pacman")  #包管理的模块 library(pacman) p_load(sparklyr,dplyr) #spark_install(version="2.1.0")       #第一次安装而没有Spark的情况需要运行,可以通过参数version选择版本. #因为我电脑版本的Java没有更新,所以采用2.1.0的Spark.默认安装最新版本.


数据导入

首先需要连接Spark.

#连接Spark spark_connect(master = "local",              version = "2.1.0") -> sc

然后,让我们放一点数据到Spark中。sparklyr和dplyr都有可以把R环境中数据框变量放进Spark的方法,我们都展示了一下。这里用base里面自带的两个数据集:cars和iris.

sc %>% sdf_copy_to(cars) -> sc_cars #用了sparklyr的方法 sc %>% copy_to(iris) -> sc_iris #用了dplyr的方法

注意我们这里其实完成了两个步骤,一是把数据集放进了Spark的集群中,二是把这个数据集给了sc_cars和sc_iris两个变量,这样一来我们可以通过访问sc_*变量来直接对Spark里面的变量进行操作。 如果没有在命名的时候直接赋值,也没有关系,sparklyr中有函数能够直接取出已经放进Spark中的数据,如下所示:

tbl(sc,"cars") -> cars_tbl

这里的cars_tbl等价于之前的sc_cars.

下面,让我们查看一下在Spark中的数据集:

src_tbls(sc) ## [1] "cars" "iris"

可以看到我们现在有两个数据集在里面,分别命名为“cars”和“iris”.如果想要使用外部数据,可以采用spark_read_*系列函数,能够在Spark中导入各类格式的数据。 最后,需要注意的是,不能多次把同名变量放进Spark,否则会报错。如果想要覆盖之前的数据,请在copy_to/sdf_copy_to函数的参数中补充overwrite=T.


ETL

Extract,transfer,load,让我们在sparklyr中进行数据作业的调度。 说到ETL,大家可能首先想到的就是SQL。sparklyr是支持SQL语言来调数据的,例子如下:

p_load(DBI) iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10") iris_preview ##    Sepal_Length Sepal_Width Petal_Length Petal_Width Species ## 1           5.1         3.5          1.4         0.2  setosa ## 2           4.9         3.0          1.4         0.2  setosa ## 3           4.7         3.2          1.3         0.2  setosa ## 4           4.6         3.1          1.5         0.2  setosa ## 5           5.0         3.6          1.4         0.2  setosa ## 6           5.4         3.9          1.7         0.4  setosa ## 7           4.6         3.4          1.4         0.3  setosa ## 8           5.0         3.4          1.5         0.2  setosa ## 9           4.4         2.9          1.4         0.2  setosa ## 10          4.9         3.1          1.5         0.1  setosa

这样我们就提取了iris前十行的数据。不过,实话说,我学dplyr在SQL之前,会了dplyr之后真的觉得SQL可读性不是特别强,不过还是先用base把上面的再实现一次,因为需要秀一波如何把R的语句反转为SQL语句:

p_load(dbplyr) sc_iris %>%  head(10) -> get_first_10 get_first_10 ## # Source: spark<?> [?? x 5] ##    Sepal_Length Sepal_Width Petal_Length Petal_Width Species ##  *        <dbl>       <dbl>        <dbl>       <dbl> <chr>   ##  1          5.1         3.5          1.4         0.2 setosa ##  2          4.9         3            1.4         0.2 setosa ##  3          4.7         3.2          1.3         0.2 setosa ##  4          4.6         3.1          1.5         0.2 setosa ##  5          5           3.6          1.4         0.2 setosa ##  6          5.4         3.9          1.7         0.4 setosa ##  7          4.6         3.4          1.4         0.3 setosa ##  8          5           3.4          1.5         0.2 setosa ##  9          4.4         2.9          1.4         0.2 setosa ## 10          4.9         3.1          1.5         0.1 setosa ## # ... with more rows sql_render(get_first_10) ## <SQL> SELECT * ## FROM `iris` ## LIMIT 10

也就是说,我们在R里面实现的数据操纵,能够变成SQL语言。如果你不幸只会dplyr甚至是base-r,只要有了这个函数,就能够把自己的语言“翻译”成SQL。这样一来你也可以用SQL来找数据,甚至可以教只会SQL的ETL工程师怎么来写他们的SQL.是不是有点太神了?
只是取一个前10列也许不足以说服你,那么我们来一个复杂的例子,分组求Sepal.Length的最大值、平均值、最小值:

sc_iris %>%  group_by(Species) %>%  summarise(max_sl=max(Sepal_Length,na.rm = T),            mean_sl=mean(Sepal_Length,na.rm = T),            min_sl=min(Sepal_Length,na.rm = T)) -> group_max_min_avg group_max_min_avg ## # Source: spark<?> [?? x 4] ##   Species    max_sl mean_sl min_sl ## * <chr>       <dbl>   <dbl>  <dbl> ## 1 versicolor    7      5.94    4.9 ## 2 virginica     7.9    6.59    4.9 ## 3 setosa        5.8    5.01    4.3 sql_render(group_max_min_avg) ## <SQL> SELECT `Species`, MAX(`Sepal_Length`) AS `max_sl`, AVG(`Sepal_Length`) AS `mean_sl`, MIN(`Sepal_Length`) AS `min_sl` ## FROM `iris` ## GROUP BY `Species`

注意数据集中有缺失值,所以使用了na.rm=T来忽略它。dplyr能够完成非常复杂的数据调度任务,而它的语法结构简洁而明晰,方便交流,如今还能够与SQL语句对接,前途不可限量!在Spark中能够使用dplyr乃是Spark用户的一大福音。


线性回归

这一节不是为了介绍机器学习,而是想给大家讲解R与Spark的关系,所以直接引用了官网上面的一个例子。不过需要注意的是,直接用R的lm肯定是无法正常进行回归的,需要用到sparklyr专门的函数:ml_linear_regression

lm_model <- sc_iris %>%  ml_linear_regression(Petal_Length ~ Petal_Width) summary(lm_model) ## Deviance Residuals: ##      Min       1Q   Median       3Q      Max ## -1.33542 -0.30347 -0.02955  0.25776  1.39453 ## ## Coefficients: ## (Intercept) Petal_Width ##    1.083558    2.229940 ## ## R-Squared: 0.9271 ## Root Mean Squared Error: 0.475


下面要解释的是画图:

p_load(ggplot2) sc_iris %>%  select(Petal_Width, Petal_Length) %>%  collect %>%  ggplot(aes(Petal_Length, Petal_Width)) +    geom_point(aes(Petal_Width, Petal_Length), size = 2, alpha = 0.5) +    geom_abline(aes(slope = coef(lm_model)[["Petal_Width"]],                    intercept = coef(lm_model)[["(Intercept)"]]),                color = "red") +  labs(    x = "Petal Width",    y = "Petal Length",    title = "Linear Regression: Petal Length ~ Petal Width",    subtitle = "Use Spark.ML linear regression to predict petal length as a function of petal width."  )


Spark本身是不具备强大的图形表达功能的,因此这个东西肯定要在R里面做。把Spark的计算结果放到R里面,有一个重要的通道,没错,就是collect.只要collect之后,spark对象就正式转化为R中的对象。我们所有操作里面,sc/sc_cars/sc_iris都是spark的对象,我们仅仅是可以使用sparklyr的函数直接进行操作而已,在深层次其实语言会先变化为Spark能够“听得懂”的语言,然后进行操作。但是Spark在存储、运算的优势是针对大数据而言的,我们把大数据高度概括之后的图形表示,用ggplot2是再好不过的了。


大家都在看

R语言二分类问题案例分析:以泰坦尼克号沉船为例

公众号后台回复关键字即可学习

回复 爬虫             爬虫三大案例实战  
回复 
Python        1小时破冰入门

回复 数据挖掘      R语言入门及数据挖掘
回复 
人工智能      三个月入门人工智能
回复 数据分析师   数据分析师成长之路 
回复 机器学习      机器学习的商业应用
回复 数据科学      数据科学实战
回复 常用算法      常用数据挖掘算法

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存