查看原文
其他

扩展RDD API三部曲之第二部自定义操作算子

浪尖 浪尖聊大数据 2022-09-06

扩展RDD API三部曲,主要是帮助大家掌握如下三个内容:

1). 回顾一下RDD的基础

2). 扩展Action,也即是自定义RDD算子

3). 扩展 transform及自定义RDD

本文主要是将自定义Spark RDD算子中的Action 类型操作。

1. 准备阶段

讲到自定义RDD的action操作,大家首先应该想到的就是那些RDD到key-value算子的隐式转换,具体一点也就是PairRDDFunctions这个类里包含的算子,比如reducebykey等操作算子。

具体实现肯定是要比较了解scala的隐式转换操作,这个浪尖也发过文章了,可以点击下文阅读:

Scala语法基础之隐式转换

首先,我们要进行准备操作,首先定义一个case class

class SalesRecord(val transactionId: String,
                 val customerId: String,
                 val itemId: String,
                 val itemValue: Double) extends Comparable[SalesRecord]
 with Serializable {

 override def compareTo(o: SalesRecord): Int = {
   return this.transactionId.compareTo(o.transactionId)
 }

 override def toString: String = {
   transactionId+","+customerId+","+itemId+","+itemValue
 }
}

然后,定义我们的主要函数:

val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
     .set("yarn.resourcemanager.hostname", "mt-mdh.local")
     .set("spark.executor.instances","2")
     .set("spark.default.parallelism","4")
     .set("spark.sql.shuffle.partitions","4")
     .setJars(List("/opt/sparkjar/bigdata.jar"
       ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
       ,"/opt/jars/kafka-clients-0.10.2.2.jar"
       ,"/opt/jars/kafka_2.11-0.10.2.2.jar"))
   val sc = new SparkContext(sparkConf)
   val dataRDD = sc.textFile("file:///opt/bigdata/src/main/data/sales.csv")
   val salesRecordRDD = dataRDD.map(row => {
     val colValues = row.split(",")
     new SalesRecord(colValues(0),colValues(1),colValues(2),colValues(3).toDouble)
   })

这个时候加入我们需要对itemValue字段求和,常见的做法是

salesRecordRDD.map(_.itemValue).sum

其实,sum就是DoubleRDDFunctions内部的算子,也是通过隐式转换实现的。

2. 自定义算子实现

然后就是要定义RDD的操作算子本身,也即是一个工具类,我们叫他为CustomFunctions,内部包含求和函数如下:

import org.apache.spark.rdd.RDD

class CustomFunctions(rdd:RDD[SalesRecord]) {

 def totalSales = rdd.map(_.itemValue).sum

 def discount(discountPercentage:Double) = new DiscountRDD(rdd,discountPercentage)

}

这个仔细读一下上面已有的隐式转换算子,可以发现还不行,需要为自定义RDD的操作算子,自定义一个隐士转换的算子工具,内容如下:

object CustomFunctions {

 implicit def addCustomFunctions(rdd: RDD[SalesRecord]) = new CustomFunctions(rdd)
}

3. 使用算子

调用我们的转换方法:

println("Spark RDD API : "+salesRecordRDD.map(_.itemValue).sum)

import CustomFunctions._
println("Cunstom RDD API : "+salesRecordRDD.totalSales)

输出结果:

这就是自定义RDD的action操作。

下篇文章为自定义RDD和转换操作,这个就只会在星球里分享了欢迎加入浪尖的知识星球,与近420好友一起学习进步。

推荐阅读:

Spark源码系列之Standalone模式下Spark应用的整个启动过程

Spark源码系列之foreach和foreachPartition的区别

Hbase源码系列之BufferedMutator的Demo和源码解析

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存