查看原文
其他

Spark实现WordCount的几种方式总结(源码)


关注我们, 一起成长哦




 map+reduceByKey -


package com.cw.bigdata.spark.wordcount
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}
object WordCount1 { def main(args: Array[String]): Unit = {val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount1")
val sc: SparkContext = new SparkContext(config)
val lines: RDD[String] = sc.textFile("in")
lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println) }}




使用countByValue代替map + reduceByKey -


package com.cw.bigdata.spark.wordcount
import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}
object WordCount2 { def main(args: Array[String]): Unit = {val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount2")
val sc: SparkContext = new SparkContext(config)
val lines: RDD[String] = sc.textFile("in")
lines.flatMap(_.split(" ")).countByValue().foreach(println)
}}




aggregateByKey或者foldByKey -


package com.cw.bigdata.spark.wordcount
import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDD
/** * WordCount实现第三种方式:aggregateByKey或者foldByKey * * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] * 1.zeroValue:给每一个分区中的每一个key一个初始值; * 2.seqOp:函数用于在每一个分区中用初始值逐步迭代value;(分区内聚合函数) * 3.combOp:函数用于合并每个分区中的结果。(分区间聚合函数) *  *  foldByKey相当于aggregateByKey的简化操作,seqop和combop相同 */object WordCount3 { def main(args: Array[String]): Unit = { val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount3")
val sc: SparkContext = new SparkContext(config)
val lines: RDD[String] = sc.textFile("in")
lines.flatMap(_.split(" ")).map((_, 1)).aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)
lines.flatMap(_.split(" ")).map((_, 1)).foldByKey(0)(_ + _).collect().foreach(println)
}}




groupByKey+map -



package com.cw.bigdata.spark.wordcount
import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDD
/**  * WordCount实现的第四种方式:groupByKey+map */object WordCount4 { def main(args: Array[String]): Unit = {val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount4")
val sc: SparkContext = new SparkContext(config)
val lines: RDD[String] = sc.textFile("in")
val groupByKeyRDD: RDD[(String, Iterable[Int])] = lines.flatMap(_.split(" ")).map((_, 1)).groupByKey()
groupByKeyRDD.map(tuple => { (tuple._1, tuple._2.sum) }).collect().foreach(println)
}}




Scala原生实现wordcount -


package com.cw.bigdata.spark.wordcount

/**  * Scala原生实现wordcount */object WordCount5 { def main(args: Array[String]): Unit = {
val list = List("cw is cool", "wc is beautiful", "andy is beautiful", "mike is cool")/** * 第一步,将list中的元素按照分隔符这里是空格拆分,然后展开 * 先map(_.split(" "))将每一个元素按照空格拆分 * 然后flatten展开 * flatmap即为上面两个步骤的整合      */ val res0 = list.map(_.split(" ")).flatten val res1 = list.flatMap(_.split(" "))
println("第一步结果")println(res0)println(res1)
/** * 第二步是将拆分后得到的每个单词生成一个元组 * k是单词名称,v任意字符即可这里是1 */ val res3 = res1.map((_, 1))println("第二步结果")println(res3)/** * 第三步是根据相同的key合并 */ val res4 = res3.groupBy(_._1)println("第三步结果")println(res4)/** * 最后一步是求出groupBy后的每个key对应的value的size大小,即单词出现的个数 */ val res5 = res4.mapValues(_.size)println("最后一步结果")println(res5.toBuffer) }}




combineByKey-


package com.cw.bigdata.spark.wordcount
import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDD
/**  * WordCount实现的第六种方式:combineByKey */object WordCount6 { def main(args: Array[String]): Unit = {val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("combineByKey")
val sc: SparkContext = new SparkContext(config)
val lines: RDD[String] = sc.textFile("in")
val mapRDD: RDD[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))
// combineByKey实现wordcount mapRDD.combineByKey( x => x, (x: Int, y: Int) => x + y, (x: Int, y: Int) => x + y ).collect().foreach(println)
}}

- END -猜你喜欢

浅谈数据质量(DQ)


数仓任务开发规范流程


SQL 语法速成手册,yyds!


期待与大佬们技术交流、思想碰撞!点击关注,交个朋友↓

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存