MapReduce
资料
初识MapReduce
MapReduce思想
MapReduce的思想核心是“先分再合,分而治之”, 所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,把各部分的结果组成整个问题的结果。
- Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
- Reduce负责“合”,即对map阶段的结果进行全局汇总。
如何模拟实现分布式计算
分布式计算
分布式计算将该应用分解成许多小的部分,分配给多台计算机进行处理。这样可以节约整体计算时间,大大提高计算效率。
大数据场景下模拟实现
Hadoop MapReduce设计构思
- MapReduce是Hadoop的一个模块,是一个分布式运算程序的编程框架。
- 对许多开发者来说,自己完完全全实现一个并行计算程序难度太大,而MapReduce就是一种简化并行计算的编程模型,降低了开发并行应用的入门门槛。
如何对付大数据处理
- 对相互间不具有计算依赖关系的大数据计算任务,实现并行最自然的办法就是采取MapReduce分而治之的策略。
- 也就是Map阶段分的阶段,把大数据拆分成若干份小数据,多个程序同时并行计算产生中间结果;然后是Reduce聚合阶段,通过程序对并行的结果进行最终的汇总计算,得出最终的结果。
- 并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。不可分拆的计算任务或相互间有依赖关系的数据无法进行并行计算!
构建抽象模型
- MapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型。
- Map: 对一组数据元素进行某种重复式的处理;
image.png
- Reduce: 对Map的中间结果进行某种进一步的结果整理。
image.png
- MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:
- map: (k1; v1) → [(k2; v2)]
- reduce: (k2; [v2]) → [(k3; v3)]
- Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。通过以上两个编程接口,大家可以看出MapReduce处理的数据类型是**<key,value>键值对**。
统一架构、隐藏底层细节
如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员则需要考虑诸如数据存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。
- MapReduce最大的亮点在于通过抽象模型和计算框架把需要做什么(what need to do)与具体**怎么做(how to do)**分开了,为程序员提供一个抽象和高层的编程接口和框架。
- 程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:从分布代码的执行,到大到数千小到单个节点集群的自动调度使用。
Hadoop MapReduce简介
MapReduce介绍
- Hadoop MapReduce是一个软件框架,用于轻松编写应用程序,这些应用程序以可靠,容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多TB数据集)。
MapReduce特点
- 易于编程
- Mapreduce框架提供了用于二次开发得接口;简单地实现一些接口,就可以完成一个分布式程序。任务计算交给计算框架去处理,将分布式程序部署到hadoop集群上运行,集群节点可以扩展到成百上千个等。
- 良好的拓展性
- 当计算机资源不能得到满足的时候,可以通过增加机器来扩展它的计算能力。基于MapReduce的分布式计算得特点可以随节点数目增长保持近似于线性的增长,这个特点是MapReduce处理海量数据的关键,通过将计算节点增至几百或者几千可以很容易地处理数百TB甚至PB级别的离线数据。
- 高容错性
- Hadoop集群是分布式搭建和部署得,任何单一机器节点宕机了,它可以把上面的计算任务转移到另一个节点上运行,不影响整个作业任务得完成,过程完全是由Hadoop内部完成的。
- 适合海量数据的离线处理
MapReduce局限性
- 实时计算性能差
- MapReduce主要应用于离线作业,无法作到秒级或者是亚秒级得数据响应。
- 不能进行流式计算
- 流式计算特点是数据是源源不断得计算,并且数据是动态的;而MapReduce作为一个离线计算框架,主要是针对静态数据集得,数据是不能动态变化得。
Hadoop MapReduce编程
MapReduce架构体系
一个完整的mapreduce程序在分布式运行时有三类实例进程
- MRAppMaster:负责整个程序的过程调度及状态协调
- MapTask:负责map阶段的整个数据处理流程
- ReduceTask:负责reduce阶段的整个数据处理流程
MapReduce编程规范
MapReduce分布式的运算程序需要分成2个阶段,分别是Map阶段和Reduce阶段。Map阶段对应的是MapTask并发实例,完全并行运行。Reduce阶段对应的是ReduceTask并发实例,数据依赖于上一个阶段所有MapTask并发实例的数据输出结果。
- MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
- 用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端驱动)。
- 用户自定义的Mapper和Reducer都要继承各自的父类。Mapper中的业务逻辑写在map()方法中,Reducer的业务逻辑写在reduce()方法中。整个程序需要一个Driver来进行提交,提交的是一个描述了各种必要信息的job对象。
- 整个MapReduce程序中,数据都是以kv键值对的形式流转的。因此在实际编程解决各种业务问题中,需要考虑每个阶段的输入输出kv分别是什么。并且在MapReduce中数据会因为某些默认的机制进行排序进行分组。所以说kv的类型数据确定及其重要。
MapReduce工作执行流程
整个MapReduce工作流程可以分为3个阶段:map、shuffle、reduce。
- Map阶段
负责把从数据源读取来到数据进行处理,默认情况下读取数据返回的是kv键值对类型,经过自定义map方法处理之后,输出的也应该是kv键值对类型。
- Shuffle阶段
map输出的数据会经过分区、排序、分组等自带动作进行重组,相当于洗牌的逆过程。这是MapReduce的核心所在,也是难点所在。也是值得我们深入探究的所在。 默认分区规则:key相同的分在同一个分区,同一个分区被同一个reduce处理。 默认排序规则:根据key字典序排序 默认分组规则:key相同的分为一组,一组调用reduce处理一次。
- Reduce阶段
负责针对shuffle好的数据进行聚合处理。输出的结果也应该是kv键值对。
Hadoop序列化机制
什么是序列化
- 序列化 (Serialization)是将结构化对象转换成字节流以便于进行网络传输或写入持久存储的过程。
- 反序列化(Deserialization)是将字节流转换为一系列结构化对象的过程,重新创建该对象。
- 序列化的用途:
1、作为一种持久化格式。 2、作为一种通信的数据格式。 3、作为一种数据拷贝、克隆机制。
Java的序列化机制
Java中,一切都是对象,在分布式环境中经常需要将Object从这一端网络或设备传递到另一端。这就需要有一种可以在两端传输数据的协议。Java序列化机制就是为了解决这个问题而产生。
- Java对象序列化的机制,把对象表示成一个二进制的字节数组,里面包含了对象的数据,对象的类型信息,对象内部的数据的类型信息等等。通过保存或则转移这些二进制数组达到持久化、传递的目的。
- 要实现序列化,需要实现java.io.Serializable接口。反序列化是和序列化相反的过程,就是把二进制数组转化为对象的过程。
Hadoop的序列化机制
- Hadoop的序列化没有采用java的序列化机制,而是实现了自己的序列化机制。
- 原因在于java的序列化机制比较臃肿,重量级,是不断的创建对象的机制,并且会额外附带很多信息(校验、继承关系系统等)。但在Hadoop的序列化机制中,用户可以复用对象,这样就减少了java对象的分配和回收,提高了应用效率。
- Hadoop通过Writable接口实现的序列化机制,不过没有提供比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。(自定义比较)。
- Writable接口提供两个方法(write和readFields)。
- Hadoop序列化特点:高效、紧凑、扩展性强。
Hadoop中的数据类型
Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较。
Hadoop 数据类型 | Java数据类型 | 备注 |
---|---|---|
BooleanWritable | boolean | 标准布尔型数值 |
ByteWritable | byte | 单字节数值 |
IntWritable | int | 整型数 |
FloatWritable | float | 浮点数 |
LongWritable | long | 长整型数 |
DoubleWritable | double | 双字节数值 |
Text | String | 使用UTF8格式存储的文本 |
MapWritable | map | 映射 |
ArrayWritable | array | 数组 |
NullWritable | null | 当<key,value>中的key或value为空时使用 |
注意:如果需要将自定义的类放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。
MapReduce案例
MapReduce程序运行
所谓的运行模式讲的是:mr程序是单机运行还是分布式运行?mr程序需要的运算资源是yarn分配还是单机系统分配?
运行在何种模式 取决于下述这个参数: mapreduce.framework.name=yarn 集群模式 mapreduce.framework.name=local 本地模式 默认是local模式 在mapred-default.xml中有定义。如果代码中、运行的环境中有配置,会默认覆盖default配置。
本地模式运行
- mapreduce程序是被提交给LocalJobRunner在本地以单进程的形式运行。而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上。
- 本质是程序的conf中是否有mapreduce.framework.name=local
- 本地模式非常便于进行业务逻辑的debug。
- 右键直接运行main方法所在的主类即可。
集群模式运行
- 将mapreduce程序提交给yarn集群,分发到很多的节点上并发执行。处理的数据和输出结果应该位于hdfs文件系统
- 提交集群的实现步骤:
- 将程序打成jar包,然后在集群的任意一个节点上用命令启动
#指定主类 参数
hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver args
yarn jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver args
MapReduce输入输出梳理
MapReduce框架运转在<key,value>键值对上,也就是说,框架把作业的输入看成是一组<key,value>键值对,同样也产生一组<key,value>键值对作为作业的输出,这两组键值对可能是不同的。
输入特点
默认读取数据的组件叫做TextInputFormat。 关于输入路径:
- 如果指向的是一个文件 处理该文件
- 如果指向的是一个文件夹(目录) 就处理该目录所有的文件 当成整体来处理。
输出特点
默认输出数据的组件叫做TextOutputFormat。
- 输出路径不能提前存在否则执行报错对输出路径进行检测判断。
MapReduce流程简单梳理
Map阶段执行过程
- 第一阶段是把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。默认情况下,Split size = Block size。每一个切片由一个MapTask处理。(getSplits)
- 第二阶段是对切片中的数据按照一定的规则解析成<key,value>对。默认规则是把每一行文本内容解析成键值对。key是每一行的起始位置(单位是字节)——行偏移量,value是本行的文本内容。(TextInputFormat)
- 第三阶段是调用Mapper类中的map方法。上阶段中每解析出来的一个<k,v>,调用一次map方法。每次调用map方法会输出零个或多个键值对。
- 第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。默认是只有一个区。分区的数量就是Reducer任务运行的数量。默认只有一个Reducer任务。
- 第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到文件中。
- 第六阶段是对数据进行局部聚合处理,也就是combiner处理。键相等的键值对会调用一次reduce方法。经过这一阶段,数据量会减少。本阶段默认是没有的。
Reduce阶段执行过程
- 第一阶段是Reducer任务会主动从Mapper任务复制其输出的键值对。Mapper任务可能会有很多,因此Reducer会复制多个Mapper的输出。
- 第二阶段是把复制到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
- 第三阶段是对排序后的键值对调用reduce方法。键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS文件中。
在整个MapReduce程序的开发过程中,我们最大的工作量是覆盖map函数和覆盖reduce函数。
MapReduce编程模型
MapReduce编程指南
引入数据分区Partition
默认情况下MR输出文件个数
在默认情况下,不管map阶段有多少个并发执行task,到reduce阶段,所有的结果都将有一个reduce来处理,并且最终结果输出到一个文件中。 此时,MapReduce的执行流程如下所示:
修改ReduceTask个数
在MapReduce程序的驱动类中,通过job提供的方法,可以修改reducetask的个数。
- 默认情况下不设置,reducetask个数为1,结果输出到一个文件中。
- 使用api修改reducetask个数之后,输出结果文件的个数和reducetask个数对应。
数据分区概念
当MapReduce中有多个reducetask执行的时候,此时maptask的输出就会面临一个问题:究竟将自己的输出数据交给哪一个reducetask来处理,这就是所谓的**数据分区(partition)**问题。
默认分区规则
MapReduce默认分区规则是HashPartitioner。跟map输出的数据key有关。
当然用户也可以自己自定义分区规则。后面案例中说。
MapReduce的key的重要性
- 在MapReduce编程中,核心是牢牢把握住每个阶段的输入输出key是什么。
- 因为mr中很多默认行为都跟key相关。
- 排序:key的字典序a-z 正序
- 分区:key.hashcode % reducetask 个数
- 分组:key相同的分为一组
最重要的是,如果觉得默认的行为不满足业务需求,MapReduce还支持自定义排序、分区、分组的规则,这将使得编程更加灵活和方便。
案例:美国新冠疫情COVID-19统计
MapReduce自定义分区
需求
将美国每个州的疫情数据输出到各自不同的文件中,即一个州的数据在一个结果文件中。
分析
输出到不同文件中-->reducetask有多个(>2)-->默认只有1个,如何有多个?--->可以设置,job.setNumReduceTasks(N)--->当有多个reducetask 意味着数据分区---->默认分区规则是什么? hashPartitioner--->默认分区规则符合你的业务需求么?---->符合,直接使用--->不符合,自定义分区。
MapReduce自定义排序
需求
将美国2021-01-28,每个州state的确诊案例数进行倒序排序。
分析
如果你的需求中需要根据某个属性进行排序 ,不妨把这个属性作为key。因为MapReduce中key有默认排序行为的。但是需要进行如下考虑:
- 如果你的需求是正序,并且数据类型是Hadoop封装好的基本类型。这种情况下不需要任何修改,直接使用基本类型作为key即可。因为Hadoop封装好的类型已经实现了排序规则。
- 如果你的需求是倒序,或者数据类型是自定义对象。需要重写排序规则。需要对象实现Comparable接口,重写ComparTo方法。
- compareTo方法用于将当前对象与方法的参数进行比较。
- 如果指定的数与参数相等返回0。
- 如果指定的数小于参数返回 -1。
- 如果指定的数大于参数返回 1。
例如:o1.compareTo(o2); 返回正数的话,当前对象(调用compareTo方法的对象o1)要排在比较对象(compareTo传参对象o2)后面,返回负数的话,放在前面。
MapReduce Combiner
每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。
- combiner中文叫做数据规约。数据归约是指在尽可能保持数据原貌的前提下,最大限度地精简数据量。
- combiner是MR程序中Mapper和Reducer之外的一种组件,默认情况下不启用。
- combiner组件的父类就是Reducer,combiner和reducer的区别在于运行的位置:
- combiner是在每一个maptask所在的节点运行
- Reducer是接收全局所有Mapper的输出结果
- combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量
- 实现步骤:
- 自定义一个combiner继承Reducer,重写reduce方法
- 在job中设置: job.setCombinerClass(CustomCombiner.class)
- combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来。下述场景禁止使用,不仅优化了数据量,还改变了最终的结果
- 业务和数据个数相关的
- 业务和整体排序相关的
MapReduce自定义分组
分组概念和默认分组规则
- 分组在发生在reduce阶段,决定了同一个reduce中哪些数据将组成一组去调用reduce方法处理。默认分组规则是:key相同的就会分为一组(前后两个key直接比较是否相等)。
- 需要注意的是,在reduce阶段进行分组之前,因为进行数据排序行为,因此排序+分组将会使得key一样的数据一定被分到同一组,一组去调用reduce方法处理。
- 此外,用户还可以自定义分组规则:
- 写类继承WritableComparator,重写Compare方法。
- 只要Compare方法返回为0,MapReduce框架在分组的时候就会认为前后两个相等,分为一组。还需要在job对象中进行设置 才能让自己的重写分组类生效。
- job.setGroupingComparatorClass(xxxx.class);
需求
找出美国2021-01-28,每个州state的确诊案例数最多的县county是哪一个。该问题也是俗称的TopN问题。
分析
自定义对象,在map阶段将“州state和累计确诊病例数cases”作为key输出,重写对象的排序规则,首先根据州的正序排序,如果州相等,按照确诊病例数cases倒序排序,发送到reduce。
MapReduce并行度机制
MapTask并行度机制
概念
MapTask的并行度指的是map阶段有多少个并行的task共同处理任务。map阶段的任务处理并行度,势必影响到整个job的处理速度。那么,MapTask并行实例是否越多越好呢?其并行度又是如何决定呢?
原理机制
一个MapReducejob的map阶段并行度由客户端在提交job时决定,即客户端提交job之前会对待处理数据进行逻辑切片。切片完成会形成切片规划文件(job.split),每个逻辑切片最终对应启动一个maptask。 逻辑切片机制由FileInputFormat实现类的getSplits()方法完成。
- FileInputFormat中默认的切片机制:
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于block大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
比如待处理数据有两个文件: file1.txt 320M file2.txt 10M 经过FileInputFormat的切片机制运算后,形成的切片信息如下: file1.txt.split1—0M~128M file1.txt.split2—128M~256M file1.txt.split3—256M~320M file2.txt.split1—0M~10M
相关参数、优化
在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由这几个值来运算决定: minsize:默认值:1 配置参数: mapreduce.input.fileinputformat.split.minsize maxsize:默认值:Long.MAXValue 配置参数:mapreduce.input.fileinputformat.split.maxsize blocksize
- 因此,默认情况下,split size=block size,在hadoop 2.x中为128M。
- 但是,不论怎么调参数,都不能让多个小文件“划入”一个split。
- **当bytesRemaining/splitSize > 1.1不满足的话,那么最后所有剩余的会作为一个切片。**从而不会形成例如129M文件规划成两个切片的局面。
ReduceTask并行度机制
reducetask并行度同样影响整个job的执行并发度和执行效率,与maptask的并发数由切片数决定不同,
- Reducetask数量的决定是可以直接手动设置:job.setNumReduceTasks(4);
- 如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜。
- 注意: reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask。
MapReduce工作流程详解
MapTask工作机制详解
流程图
执行步骤
简单概述:input File通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(默认使用hash分区),然后写入buffer,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
详细步骤
- 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。
- 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行文本内容。
- 读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次。
- map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。
- MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
- 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
- 环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。
- 缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
- 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。
- 如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。
- 那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。
- 每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。
ReduceTask工作机制详解
执行步骤
Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。
详细步骤
- Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。
- Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
- 把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
- 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。
MapReduce Shuffle机制
Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。 而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端接收处理。
- 而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端接收处理。
- shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段。一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。
1).Partition阶段:将MapTask的结果输出到默认大小为100M的环形缓冲区,保存之前会对key进行分区的计算,默认Hash分区等。 2).Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序。 3).Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。 4).Copy阶段: ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。 5).Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。 6).Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。
课后作业
需求
源文件如下: 字段解释: 员工编号,姓名,岗位名称,员工上司编号,入职日期,工资,提成,部门编号
员工编号,姓名,岗位名称,员工上司编号,入职日期,工资,提成,部门编号
7369,SMITH,CLERK,7902,17-12月-80,800,0,20
7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30
7521,WARD,SALESMAN,7698,22-2月-81,1250,500,30
7566,JONES,MANAGER,7839,02-4月-81,2975,0,20
7654,MARTIN,SALESMAN,7698,28-9月-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5月-81,2850,0,30
null,JACK,SALESMAN,7698,23-2月-82,1400,0,10
7782,CLARK,MANAGER,7839,09-6月-81,2450,0,10
7839,KING,PRESIDENT,7839,17-11月-81,5000,0,10
7844,TURNER,SALESMAN,7698,08-9月-81,1500,0,30
,ALLEN,CLERK,7782,23-1月-82,1300,0,10
7900,JAMES,CLERK,7698,03-12月-81,950,0,30
7902,FORD,ANALYST,7566,03-12月-81,3000,0,20
7934,MILLER,CLERK,7782,23-1月-82,1300,0,10
需求1: 剔除源文件中员工编号为空的员工信息
- Mapper改造——判空校验
需求2:根据部分编号将源文件分类管理,同一个部门的员工信息放在一起
- 分区Partition,同一个部门放到同一个reduceTask、设置reduceTask个数
需求3: 将分类后的每个员工的薪资(基本工资 + 提成)进行降序排序
- 实体类实现hadoop序列化排序接口,重写compareTo方法,指定排序方法,在MapReduce框架中自动使用快排进行排序。
选做需求(可以不写): 需求4:将源数据中的入职日期:23-1月-82 转为 1982-01-23 注意年份范围限定在(1970-2020年)
请编写MapReduce代码,并附上集群运行成功的截图,实现以上三个需求!
Gitee链接
https://gitee.com/wx_be41dfed96/bigdata_ol_20220419/tree/maizhengyang