7个实例全面掌握Hadoop MapReduce
作者介绍
杜亦舒,创业中,技术合伙人,喜欢研究分享技术。个人订阅号:性能与架构。
本文旨在帮您快速了解 MapReduce 的工作机制和开发方法,解决以下几个问题:
MapReduce 基本原理是什么?
MapReduce 的执行过程是怎么样的?
MapReduce 的核心流程细节
如何进行 MapReduce 程序开发?(通过7个实例逐渐掌握)
文章中提供了程序实例中涉及到的测试数据文件,可以直接下载使用。
关于实践环境,如果您不喜欢自己搭建Hadoop环境,可以下载使用本教程提供的环境,实践部分内容中会介绍具体使用方法。
通过学习并实践完成后,可以对 MapReduce 工作原理有比较清晰的认识,并掌握 MapReduce 的编程思路。
大纲:
一、MapReduce 基本原理
二、MapReduce 入门示例 - WordCount 单词统计
三、MapReduce 执行过程分析
实例1 - 自定义对象序列化
实例2 - 自定义分区
实例3 - 计算出每组订单中金额最大的记录
实例4 - 合并多个小文件
实例5 - 分组输出到多个文件
四、MapReduce 核心流程梳理
实例6 - join 操作
实例7 - 计算出用户间的共同好友
五、下载方式
一、MapReduce基本原理
MapReduce是一种编程模型,用于大规模数据集的分布式运算。
1、MapReduce通俗解释
图书馆要清点图书数量,有10个书架,管理员为了加快统计速度,找来了10个同学,每个同学负责统计一个书架的图书数量。
张同学统计 书架1
王同学统计 书架2
刘同学统计 书架3
……
过了一会儿,10个同学陆续到管理员这汇报自己的统计数字,管理员把各个数字加起来,就得到了图书总数。
这个过程就可以理解为MapReduce的工作过程。
2、MapReduce中有两个核心操作
(1)map
管理员分配哪个同学统计哪个书架,每个同学都进行相同的“统计”操作,这个过程就是map。
(2)reduce
每个同学的结果进行汇总,这个过程是reduce。
3、MapReduce工作过程拆解
下面通过一个景点案例(单词统计)看MapReduce是如何工作的。
有一个文本文件,被分成了4份,分别放到了4台服务器中存储
Text1:the weather is good
Text2:today is good
Text3:good weather is good
Text4:today has good weather
现在要统计出每个单词的出现次数。
(1)拆分单词
map节点1
输入:“the weather is good”
输出:(the,1),(weather,1),(is,1),(good,1)
map节点2
输入:“today is good”
输出:(today,1),(is,1),(good,1)
map节点3
输入:“good weather is good”
输出:(good,1),(weather,1),(is,1),(good,1)
map节点4
输入:“today has good weather”
输出:(today,1),(has,1),(good,1),(weather,1)
(2)排序
map节点1
map节点2
map节点3
map节点4
(3)合并
map节点1
map节点2
map节点3
map节点4
(4)汇总统计
每个map节点都完成以后,就要进入reduce阶段了。
例如使用了3个reduce节点,需要对上面4个map节点的结果进行重新组合,比如按照26个字母分成3段,分配给3个reduce节点。
Reduce节点进行统计,计算出最终结果。
这就是最基本的MapReduce处理流程。
4、MapReduce编程思路
了解了MapReduce的工作过程,我们思考一下用代码实现时需要做哪些工作?
在4个服务器中启动4个map任务
每个map任务读取目标文件,每读一行就拆分一下单词,并记下来次单词出现了一次
目标文件的每一行都处理完成后,需要把单词进行排序
在3个服务器上启动reduce任务
每个reduce获取一部分map的处理结果
reduce任务进行汇总统计,输出最终的结果数据
但不用担心,MapReduce是一个非常优秀的编程模型,已经把绝大多数的工作做完了,我们只需要关心2个部分:
map处理逻辑——对传进来的一行数据如何处理?输出什么信息?
reduce处理逻辑——对传进来的map处理结果如何处理?输出什么信息?
编写好这两个核心业务逻辑之后,只需要几行简单的代码把map和reduce装配成一个job,然后提交给Hadoop集群就可以了。
至于其它的复杂细节,例如如何启动map任务和reduce任务、如何读取文件、如对map结果排序、如何把map结果数据分配给reduce、reduce如何把最终结果保存到文件等等,MapReduce框架都帮我们做好了,而且还支持很多自定义扩展配置,例如如何读文件、如何组织map或者reduce的输出结果等等,后面的示例中会有介绍。
二、MapReduce入门示例:WordCount单词统计
WordCount是非常好的入门示例,相当于helloword,下面就开发一个WordCount的MapReduce程序,体验实际开发方式。
1、安装Hadoop实践环境
您可以选择自己搭建环境,也可以使用打包好的Hadoop环境(版本2.7.3)。
这个Hadoop环境实际上是一个虚机镜像,所以需要安装virtualbox虚拟机、vagrant镜像管理工具,和我的Hadoop镜像,然后用这个镜像启动虚机就可以了,下面是具体操作步骤:
(1)安装virtualbox
下载地址:https://www.virtualbox.org/wiki/Downloads
(2)安装vagrant
因为官网下载较慢,我上传到了云盘
Windows版
链接: https://pan.baidu.com/s/1pKKQGHl
密码: eykr
Mac版
链接: https://pan.baidu.com/s/1slts9yt
密码: aig4
安装完成后,在命令行终端下就可以使用vagrant命令。
(3)下载Hadoop镜像
链接: https://pan.baidu.com/s/1bpaisnd
密码: pn6c
(4)启动
加载Hadoop镜像
vagrant box add {自定义镜像名称} {镜像所在路径}
例如您想命名为Hadoop,镜像下载后的路径为d:\hadoop.box,加载命令就是这样:
vagrant box add hadoop d:\hadoop.box
创建工作目录,例如d:\hdfstest。
进入此目录,初始化
cd d:\hdfstest
vagrant init hadoop
启动虚机
启动完成后,就可以使用SSH客户端登录虚机了
IP 127.0.0.1
端口 2222
用户名 root
密码 vagrant
在Hadoop服务器中启动HDFS和Yarn,之后就可以运行MapReduce程序了
start-dfs.sh
start-yarn.sh
2、创建项目
注:流程是在本机开发,然后打包,上传到Hadoop服务器上运行。
新建项目目录wordcount,其中新建文件pom.xml,内容:
然后创建源码目录src/main/java
现在的目录结构
3、代码
mapper程序:src/main/java/WordcountMapper.java
内容:
这里定义了一个mapper类,其中有一个map方法。MapReduce框架每读到一行数据,就会调用一次这个map方法。
map的处理流程就是接收一个key value对儿,然后进行业务逻辑处理,最后输出一个key value对儿。
Mapper<LongWritable, Text, Text, IntWritable>
其中的4个类型分别是:输入key类型、输入value类型、输出key类型、输出value类型。
MapReduce框架读到一行数据侯以key value形式传进来,key默认情况下是mr矿机所读到一行文本的起始偏移量(Long类型),value默认情况下是mr框架所读到的一行的数据内容(String类型)。
输出也是key value形式的,是用户自定义逻辑处理完成后定义的key,用户自己决定用什么作为key,value是用户自定义逻辑处理完成后的value,内容和类型也是用户自己决定。
此例中,输出key就是word(字符串类型),输出value就是单词数量(整型)。
这里的数据类型和我们常用的不一样,因为MapReduce程序的输出数据需要在不同机器间传输,所以必须是可序列化的,例如Long类型,Hadoop中定义了自己的可序列化类型LongWritable,String对应的是Text,int对应的是IntWritable。
reduce程序:src/main/java/WordCountReducer.java
这里定义了一个Reducer类和一个reduce方法。
当传给reduce方法时,就变为:
4个类型分别指:输入key的类型、输入value的类型、输出key的类型、输出value的类型。
需要注意,reduce方法接收的是:一个字符串类型的key、一个可迭代的数据集。因为reduce任务读取到map任务处理结果是这样的:
当传给reduce方法时,就变为:
key:good
value:(1,1,1,1)
所以,reduce方法接收到的是同一个key的一组value。
主程序:src/main/java/WordCountMapReduce.java
这个main方法就是用来组装一个job并提交执行
4、编译打包
在pom.xml所在目录下执行打包命令:
执行完成后,会自动生成target目录,其中有打包好的jar文件。
现在项目文件结构:
5、运行
先把target中的jar上传到Hadoop服务器,然后在Hadoop服务器的HDFS中准备测试文件(把Hadoop所在目录下的txt文件都上传到HDFS)
cd $HADOOP_HOME
hdfs dfs -mkdir -p /wordcount/input
hdfs dfs -put *.txt /wordcount/input
执行wordcount jar
hadoop jar mapreduce-wordcount-0.0.1-SNAPSHOT.jar WordCountMapR
educe /wordcount/input /wordcount/output
执行完成后验证
可以看到单词数量统计结果。
三、MapReduce执行过程分析
下面看一下从job提交到执行完成这个过程是怎样。
(1)客户端提交任务
Client提交任务时会先到HDFS中查看目标文件的大小,了解要获取的数据的规模,然后形成任务分配的规划,例如:
a.txt 0-128M交给一个task,128-256M 交给一个task,b.txt 0-128M交给一个task,128-256M交给一个task ...,形成规划文件job.split。
然后把规划文件job.split、jar、配置文件xml提交给yarn(Hadoop集群资源管理器,负责为任务分配合适的服务器资源)
(2)启动appmaster
注:appmaster是本次job的主管,负责maptask和reducetask的启动、监控、协调管理工作。
yarn找一个合适的服务器来启动appmaster,并把job.split、jar、xml交给它。
(3)启动maptask
Appmaster启动后,根据固化文件job.split中的分片信息启动maptask,一个分片对应一个maptask。
分配maptask时,会尽量让maptask在目标数据所在的datanode上执行。
(4)执行maptask
Maptask会一行行地读目标文件,交给我们写的map程序,读一行就调一次map方法,map调用context.write把处理结果写出去,保存到本机的一个结果文件,这个文件中的内容是分区且有序的。
分区的作用就是定义哪些key在一组,一个分区对应一个reducer。
(5)启动reducetask
Maptask都运行完成后,appmaster再启动reducetask,maptask的结果中有几个分区就启动几个reducetask。
(6)执行reducetask
reducetask去读取maptask的结果文件中自己对应的那个分区数据,例如reducetask_01去读第一个分区中的数据。
reducetask把读到的数据按key组织好,传给reduce方法进行处理,处理结果写到指定的输出路径。
四、实例1:自定义对象序列化
1、需求与实现思路
(1)需求
需要统计手机用户流量日志,日志内容实例:
要把同一个用户的上行流量、下行流量进行累加,并计算出综合。
例如上面的13897230503有两条记录,就要对这两条记录进行累加,计算总和,得到:
13897230503,500,1600,2100
(2)实现思路
map
接收日志的一行数据,key为行的偏移量,value为此行数据。
输出时,应以手机号为key,value应为一个整体,包括:上行流量、下行流量、总流量。
手机号是字符串类型Text,而这个整体不能用基本数据类型表示,需要我们自定义一个bean对象,并且要实现可序列化。
key: 13897230503
value: < upFlow:100, dFlow:300, sumFlow:400 >
reduce
接收一个手机号标识的key,及这个手机号对应的bean对象集合。
例如:
key:
13897230503
value:
< upFlow:400, dFlow:1300, sumFlow:1700 >,
< upFlow:100, dFlow:300, sumFlow:400 >
迭代bean对象集合,累加各项,形成一个新的bean对象,例如:
< upFlow:400+100, dFlow:1300+300, sumFlow:1700+400 >
最后输出:
key: 13897230503
value: < upFlow:500, dFlow:1600, sumFlow:2100 >
2、代码实践
(1)创建项目
新建项目目录serializebean,其中新建文件pom.xml,内容:
然后创建源码目录src/main/java
现在项目目录的文件结构
(2)代码
自定义bean:src/main/java/FlowBean
MapReduce程序:src/main/java/FlowCount
(3)编译打包
在pom.xml所在目录下执行打包命令:
mvn package
执行完成后,会自动生成target目录,其中有打包好的jar文件。
现在项目文件结构:
(4)运行
先把target中的jar上传到Hadoop服务器,然后下载测试数据文件:
链接: https://pan.baidu.com/s/1skTABlr
密码:tjwy
上传到HDFS
hdfs dfs -mkdir -p /flowcount/input
hdfs dfs -put flowdata.log /flowcount/input
运行
hadoop jar mapreduce-serializebean-0.0.1-SNAPSHOT.jar FlowCount
/flowcount/input /flowcount/output2
检查
五、实例2:自定义分区
1、需求与实现思路
(1)需求
还是以上个例子的手机用户流量日志为例:
在上个例子的统计需要基础上添加一个新需求:按省份统计,不同省份的手机号放到不同的文件里。
例如137表示属于河北,138属于河南,那么在结果输出时,他们分别在不同的文件中。
(2)实现思路
map和reduce的处理思路与上例相同,这里需要多做2步:
自定义一个分区器Partitioner
根据手机号判断属于哪个分区。有几个分区就有几个reducetask,每个reducetask输出一个文件,那么,不同分区中的数据就写入了不同的结果文件中。
在main程序中指定使用我们自定义的Partitioner即可
2、代码实践
(1)创建项目
新建项目目录custom_partion,其中新建文件pom.xml,内容:
然后创建源码目录src/main/java
现在项目目录的文件结构
(2)代码
自定义bean:src/main/java/FlowBean.java
自定义分区器:src/main/java/ProvincePartitioner.java
这段代码是本示例的重点,其中定义了一个hashmap,假设其是一个数据库,定义了手机号和分区的关系。
getPartition取得手机号的前缀,到数据库中获取区号,如果没在数据库中,就指定其为“其它分区”(用4代表)
MapReduce程序:src/main/java/FlowCount.java
main程序中指定了使用自定义的分区器
job.setPartitionerClass(ProvincePartitioner.class);
(3)编译打包
在pom.xml所在目录下执行打包命令:
执行完成后,会自动生成target目录,其中有打包好的jar文件
现在项目文件结构
(4)运行
先把target中的jar上传到Hadoop服务器
运行
hadoop jar mapreduce-custompartion-0.0.1-SNAPSHOT.jar FlowCount
/flowcount/input /flowcount/output-part
检查
六、实例3:计算出每组订单中金额最大的记录
1、需求与实现思路
(1)需求
有如下订单数据:
需要求出每一个订单中成交金额最大的一笔交易。
(2)实现思路
先介绍一个概念GroupingComparator组比较器,通过WordCount来理解它的作用。
WordCount中map处理完成后的结果数据是这样的:
<good,1>
<good,1>
<good,1>
<is,1>
<is,1>
Reducer会把这些数据都读进来,然后进行分组,把key相同的放在一组,形成这样的形式:
<good, [1,1,1]>
<is, [1,1]>
然后对每一组数据调用一次reduce( key, Iterable, ...)方法。
其中分组的操作就需要用到GroupingComparator,对key进行比较,相同的放在一组。
注:上例中的Partitioner是属于mapDuang的,GroupingComparator是属于reduce端的。
下面看整体实现思路。
1)定义一个订单bean
属性包括:订单号、金额
要实现可序列化,与比较方法compareTo,比较规则:订单号不同的,按照订单好比较,相同的,按照金额比较。
2)定义一个Partitioner
根据订单号的hashcode分区,可以保证订单号相同的在同一个分区,以便reduce中接收到同一个订单的全部记录。
同分区的数据是序的,这就用到了bean中的比较方法,可以让订单号相同的记录按照金额从大到小排序。
在map方法中输出数据时,key就是bean,value为null。
map的结果数据形式例如:
3)定义一个GroupingComparator
因为map的结果数据中key是bean,不是普通数据类型,所以需要使用自定义的比较器来分组,就使用bean中的订单号来比较。
例如读取到分区1的数据:
<{ Order_0000001 222.8 }, null>,
<{ Order_0000001 25.8 }, null>,
<{ Order_0000003 222.8 }, null>
进行比较,前两条数据的订单号相同,放入一组,默认是以第一条记录的key作为这组记录的key。
分组后的形式如下:
<{ Order_0000001 222.8 }, [null, null]>,
<{ Order_0000003 222.8 }, [null]>
在reduce方法中收到的每组记录的key就是我们最终想要的结果,所以直接输出到文件就可以了。
2、代码实践
(1)创建项目
新建项目目录groupcomparator,其中新建文件pom.xml,内容:
然后创建源码目录src/main/java
现在项目目录的文件结构
(2)代码
**自定义bean:** src/main/java/OrderBean.java
自定义分区器:src/main/java/ItemIdPartitioner.java
自定义比较器:src/main/java/MyGroupingComparator.java
MapReduce程序:src/main/java/GroupSort.java
(3)编译打包
在pom.xml所在目录下执行打包命令:
执行完成后,会自动生成target目录,其中有打包好的jar文件
现在项目文件结构
(4)运行
先把target中的jar上传到Hadoop服务器
下载测试数据文件
链接:https://pan.baidu.com/s/1pKKlvh5
密码: 43xa
上传到HDFS
运行
hadoop jar mapreduce-groupcomparator-0.0.1-SNAPSHOT.jar GroupSo
rt /orders.txt /outputOrders
检查
hdfs dfs -ls /outputOrders
hdfs dfs -cat /outputOrders/*
七、实例4:合并多个小文件
1、需求与实现思路
(1)需求
要计算的目标文件中有大量的小文件,会造成分配任务和资源的开销比实际的计算开销还打,这就产生了效率损耗。
需要先把一些小文件合并成一个大文件。
(2)实现思路
文件的读取由map负责,在前面的示意图中可以看到一个inputformat用来读取文件,然后以key value形式传递给map方法。
我们要自定义文件的读取过程,就需要了解其细节流程:
所以我们需要自定义一个inputformat和RecordReader。
Inputformat使用我们自己的RecordReader,RecordReader负责实现一次读取一个完整文件封装为key value。
map接收到文件内容,然后以文件名为key,以文件内容为value,向外输出的格式要注意,要使用SequenceFileOutPutFormat(用来输出对象)。
因为reduce收到的key value都是对象,不是普通的文本,reduce默认的输出格式是TextOutputFormat,使用它的话,最终输出的内容就是对象ID,所以要使用SequenceFileOutPutFormat进行输出。
2、代码实践
(1)创建项目inputformat,其中新建文件pom.xml,内容:
然后创建源码目录src/main/java
现在项目目录文件结构
(2)代码
自定义inputform:src/main/java/MyInputFormat.java
createRecordReader方法中创建一个自定义的reader
自定义reader:src/main/java/MyRecordReader.java
其中有3个核心方法:nextKeyValue、getCurrentKey、getCurrentValue。
nextKeyValue负责生成要传递给map方法的key和value。getCurrentKey、getCurrentValue是实际获取key和value的。所以RecordReader的核心机制就是:通过nextKeyValue生成key value,然后通过getCurrentKey和getCurrentValue来返回上面构造好的key value。这里的nextKeyValue负责把整个文件内容作为value。
MapReduce程序:src/main/java/ManyToOne.java
main程序中指定使用我们自定义的MyInputFormat,输出使用SequenceFileOutputFormat。
(3)编译打包
在pom.xml所在目录下执行打包命令:
执行完成后,会自动生成target目录,其中有打包好的jar文件。
现在项目文件结构
(4)运行
先把target中的jar上传到Hadoop服务器。
准备测试文件,把Hadoop目录中的配置文件上传到HDFS
hdfs dfs -mkdir /files
hdfs dfs -put $HADOOP_HOME/etc/hadoop/*.xml /files
运行
hadoop jar mapreduce-inputformat-0.0.1-SNAPSHOT.jar ManyToOne /
files /onefile
检查
八、实例5:分组输出到多个文件
1、需求与实现思路
(1)需求
需要把相同订单id的记录放在一个文件中,并以订单id命名。
(2)实现思路
这个需求可以直接使用MultipleOutputs这个类来实现。
默认情况下,每个reducer写入一个文件,文件名由分区号命名,例如'part-r-00000',而 MultipleOutputs可以用key作为文件名,例如‘Order_0000001-r-00000’。
所以,思路就是map中处理每条记录,以‘订单id’为key,reduce中使用MultipleOutputs进行输出,会自动以key为文件名,文件内容就是相同key的所有记录。
例如‘Order_0000001-r-00000’的内容就是:
Order_0000001,Pdt_05,25.8
Order_0000001,Pdt_01,222.8
2、代码实践
(1)创建项目
新建项目目录multioutput,其中新建文件pom.xml,内容:
然后创建源码目录src/main/java
现在项目目录的文件结构
(2)代码
MapReduce程序:src/main/java/MultipleOutputTest.java
(3)编译打包
在pom.xml所在目录下执行打包命令:
执行完成后,会自动生成target目录,其中有打包好的jar文件。
现在项目文件结构
(4)运行
先把target中的jar上传到Hadoop服务器
然后运行
hadoop jar mapreduce-multipleOutput-0.0.1-SNAPSHOT.jar Multiple
OutputTest /orders.txt /output-multi
检查
九、MapReduce核心流程梳理
我们已经了解了MapReduce的大概流程:
(1)maptask从目标文件中读取数据
(2)mapper的map方法处理每一条数据,输出到文件中
(3)reducer读取map的结果文件,进行分组,把每一组交给reduce方法进行处理,最后输出到指定路径。
这是最基本的流程,有助于快速理解MapReduce的工作方式。
通过上面的几个示例,我们要经接触了一些更深入的细节,例如mapper的inputform中还有RecordReader、reducer中还有GroupingComparator。
下面就看一下更加深入的处理流程。
1、Maptask中的处理流程
(1)读文件流程
目标文件会被按照规划文件进行切分,inputformat调用RecordReader读取文件切片,RecordReader会生成key value对儿,传递给Mapper的mao方法。
(2)写入结果文件的流程
从Mapper的map方法调用context.write之后,到形成结果数据文件这个过程是比较复杂的。
context.write不是直接写入文件,而是把数据交给OutputCollector,OutputCollector把数据写入‘环形缓冲区’。‘环形缓冲区’中的数据会进行排序。
因为缓冲区的大小是有限制的,所以每当快满时(达到80%)就要把其中的数据写出去,这个过程叫做数据溢出。
溢出到一个文件中,溢出过程会对这批数据进行分组、比较操作,然后吸入文件,所以溢出文件中的数据是分好区的,并且是有序的。每次溢出都会产生一个溢出数据文件,所以会有多个。
当map处理完全数据后,就会对各个溢出数据文件进行合并,每个文件中相同区的数据放在一起,并再次排序,最后得到一个整体的结果文件,其中是分区且有序的。
这样就完成了map过程,读数据过程和写结果文件的过程联合起来如下图:
2、Reducetask的处理流程
reducetask去读每个maptask产生的结果文件中自己所负责的分区数据,读到自己本地。对多个数据文件进行合并排序,然后通过GroupingComparator进行分组,把相同key的数据放到一组。对每组数据调一次reduce方法,处理完成后写入目标路径文件。
3、整体流程
把map和reduce的过程联合起来:
十、实例6:join操作
1、需求与实现思路
(1)需求
有2个数据文件:订单数据、商品信息。
订单数据表order
商品信息表product
需要用MapReduce程序来实现下面这个SQL查询运算:
select o.id order_id, o.date, o.amount, p.id p_id, p.pname, p.c
ategory_id, p.price
from t_order o join t_product p on o.pid = p.id
(2)实现思路
SQL的执行结果是这样的:
实际上就是给每条订单记录补充上商品表中的信息。
实现思路:
1)定义bean
把SQL执行结果中的各列封装成一个bean对象,实现序列化。
bean中还要有一个另外的属性flag,用来标识此对象的数据是订单还是商品。
2)map处理
map会处理两个文件中的数据,根据文件名可以知道当前这条数据是订单还是商品。
对每条数据创建一个bean对象,设置对应的属性,并标识flag(0代表order,1代表product)
以join的关联项“productid”为key,bean为value进行输出。
3)reduce处理
reduce方法接收到pid相同的一组bean对象。
遍历bean对象集合,如果bean是订单数据,就放入一个新的订单集合中,如果是商品数据,就保存到一个商品bean中。然后遍历那个新的订单集合,使用商品bean的数据对每个订单bean进行信息补全。
这样就得到了完整的订单及其商品信息。
2、代码实践
(1)创建项目
新建项目目录jointest,其中新建文件pom.xml,内容:
然后创建源码目录src/main/java
现在项目目录的文件结构
(2)代码
**封装bean:** src/main/java/InfoBean.java
MapReduce程序:src/main/java/JoinMR.java
(3)编译打包
在pom.xml所在目录下执行打包命令:
执行完成后,会自动生成target目录,其中有打包好的jar文件。
现在项目文件结构
(4)运行
先把target中的jar上传到Hadoop服务器
下载产品和订单的测试数据文件
链接: https://pan.baidu.com/s/1pLRnm47
密码: cg7x
链接: https://pan.baidu.com/s/1pLrvsfT
密码: j2zb
上传到HDFS
hdfs dfs -mkdir -p /jointest/input
hdfs dfs -put order.txt /jointest/input
hdfs dfs -put product.txt /jointest/input
运行
hadoop jar joinmr.jar com.dys.mapreducetest.join.JoinMR /jointe
st/input /jointest/output
检查
十一、实例7:计算出用户间的共同好友
1、需求与实现思路
(1)需求
下面是用户的好友关系列表,每一行代表一个用户和他的好友列表。
需要求出哪些人两两之间有共同好友,及他俩的共同好友都有谁。
例如从前2天记录中可以看出,C、E是A、B的共同好友,最终的形式如下:
(2)实现思路
之前的示例中都是一个MapReduce计算出来的,这里我们使用2个MapReduce来实现。
1)第1个MapReduce
map
找出每个用户都是谁的好友,例如:
读一行A:B,C,D,F,E,O(A的好友有这些,反过来拆开,这些人中的每一个都是A的好友)
输出<B,A> <C,A> <D,A> <F,A> <E,A> <O,A>
再读一行B:A,C,E,K
输出<A,B> <C,B> <E,B> <K,B>
……
reduce
key相同的会分到一组,例如:
<C,A><C,B><C,E><C,F><C,G>......
Key:C
value: [ A, B, E, F, G ]
意义是:C是这些用户的好友。
遍历value就可以得到:
A B 有共同好友C
A E 有共同好友C
...
B E有共同好友 C
B F有共同好友 C
输出:
<A-B,C>
<A-E,C>
<A-F,C>
<A-G,C>
<B-E,C>
<B-F,C>
.....
2)第2个MapReduce
对上一步的输出结果进行计算。
map
读出上一步的结果数据,组织成key value直接输出
例如:
读入一行<A-B,C>
直接输出<A-B,C>
reduce
读入数据,key相同的在一组
输出:
这样就得出了两个用户间的共同好友列表
2、代码实践
(1)创建项目
新建项目目录jointest,其中新建文件pom.xml,内容:
然后创建源码目录src/main/java
现在项目目录的文件结构
(2)代码
第一步的MapReduce程序:src/main/java/StepFirst.java
第二步的MapReduce程序:src/main/java/StepSecond.java
(3)编译打包
在pom.xml所在目录下执行打包命令:
执行完成后,会自动生成target目录,其中有打包好的jar文件。
现在项目文件结构
(4)运行
先把target中的jar上传到Hadoop服务器
下载测试数据文件
链接: https://pan.baidu.com/s/1o8fmfbG
密码: kbut
上传到HDFS
hdfs dfs -mkdir -p /friends/input
hdfs dfs -put friendsdata.txt /friends/input
运行第一步
hadoop jar mapreduce-friends-0.0.1-SNAPSHOT.jar StepFirst /frie
nds/input/friendsdata.txt /friends/output01
运行第二步
hadoop jar mapreduce-friends-0.0.1-SNAPSHOT.jar StepSecond /fri
ends/output01/part-r-00000 /friends/output02
查看结果
十二、小结
MapReduce的基础内容介绍完了,希望可以帮助您快速熟悉MapReduce的工作原理和开发方法。如有批评与建议(例如内容有误、不足的地方、改进建议等),欢迎留言讨论。
提示:如需下载本文,点击文末【阅读原文】或登录云盘 http://pan.baidu.com/s/1bpxSCZt进行下载。
相关专题:
◆ 近期热文 ◆
干货!谈自动化运维平台的地基如何打牢
前聚美优品运维负责人:CMDB的那些事儿
解锁MySQL备份恢复的4种正确姿势
DBA要失业了?看ML如何自动优化数据库
从HPE净亏损超6亿$看企业家精神
◆ MVP专栏 ◆
◆ 近期活动 ◆
DAMS中国数据资产管理峰会上海站
峰会官网:www.dams.org.cn