详解MapReduce(Spark和MapReduce对比铺垫篇)
本来笔者是不打算写MapReduce的,但是考虑到目前很多公司还都在用这个计算引擎,以及后续要讲的Hive原生支持的计算引擎也是MapReduce,并且为Spark和MapReduce的对比做铺垫,笔者今天详细阐述一下MapReduce。鉴于Hadoop1.X已过时,Hadoop3.X目前用的还不多,企业中目前大量运用的还是Hadoop2.X,所以以下都是基于Hadoop2.X版本的MapReduce(后续要讲的HDFS和Yarn也是)。
1. MRAppMaster:负责整个程序过程调度及状态协调
2. MapTask:负责map阶段整个数据处理流程
3. ReduceTask:负责reduce阶段整个数据处理流程
这里笔者还是要强调一点:MapTask和ReduceTask是进程级别,这一点很重要!
切片也就是把文件切成一个个block块,但是此处的切片是逻辑切片而非物理切片。切片的逻辑可以查看接口InputFormat<K, V>的getSplits方法,通过它的一个实现类FileInputFormat看看切片的默认实现机制,直接看源码:
这里咱们主要关注文件可切分的部分,通过分析源码,在FileInputFormat中,默认切片机制如下:
1. 简单的按照文件的内容长度进行切片
2. 切片大小,默认等于block大小
3. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
因此默认情况下,切片大小等于blocksize。但是,不论怎么调参数,都不能让多个小文件“划入”一个split, 会影响性能, 后续讲HDFS时会说明一下小文件的问题
了解完切片机制之后,初学者容易陷入一个误区,就是比如我配置blocksize为128M,那么我一个文件就会按照128M等比例切分,切到最后不足128M部分单独作为一个切片,但笔者强调这是要分情况的。其实细心的小伙伴会看到我源码截图中的注释部分,关键的参数SPLIT_SLOP为1.1,同样以blocksize为128M为例,假如对于一个130M的可切分文件会产生几个block块呢?很显然130 < 128*1.1,就产生一个切片为130M的block,所以多看源码很重要。
1)MapTask并行度决定机制
2)ReduceTask并行度决定机制
ReduceTask设置方式就很简单了,可以直接手动设置:job.setNumReduceTasks(4);,默认值是1,手动设置为4。ReduceTask的并行度同样影响整个任务的执行效率,如果数据分布不均匀,就有可能产生数据倾斜。
注意:ReduceTask设置方式就很简单了,可以直接手动设置:数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask设置方式就很简单了,可以直接手动设置:。尽量不要运行太多的ReduceTask。对大多数任务来说,最好reduce的个数最多和集群中的reduce持平,或者比集群的reduce slots小。这个对于小集群而言,尤其重要。
并发数的选择受多方面因素影响,比如运算节点的硬件配置、运算任务的类型:CPU密集型还是IO密集型、运算任务的数据量,这个还是要根据实际情况而定。
3)map|reduce端核心组件
在map输出数据溢出到磁盘之前调用。默认根据key.hashcode%reduce数量(HashPartition),可以自定义分区组件
2 5 6:加入combiner,2+5+6/3=13/3
4 3 :加入combiner,4+3/2=7/2,最终(13/3+7/2)/2 = 47/12
不加combiner组件:2+5+6+4+3/5 = 4
public class ItemidGroupingComparator extends WritableComparator {
//传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
protected ItemidGroupingComparator() {
super(Order.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
//强转
Order aBean = (Order) a;
Order bBean = (Order) b;
//比较两个bean时,指定只比较bean中itemId
return aBean.getItemId().compareTo(bBean.getItemId());
}
}
对于MapReduce分布式缓存,很类似于Spark中的广播变量,后续讲到Spark广播变量和累计变量时再细说。
近期文章:
通过spark.default.parallelism谈Spark并行度