MapReduce 详解

xiaoxiao2023-03-27  43

声明: https://blog.csdn.net/shujuelin/article/details/79119214 上面这篇博客真的写得很好,可以点击进去看

MapReduce是一个分布式运算程序的编程框架,是用户开发"基于hadoop的数据分析应用" MapReduce 核心功能是将用户编写的业务逻辑和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上

为什么要MAPREDUCE (1)海量数据在单机上处理因为硬件资源限制,无法胜任 (2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度 (3)引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理

设想一个海量数据场景下的wordcount需求:

单机版:内存受限,磁盘受限,运算能力受限 分布式: 1、文件分布式存储(HDFS) 2、运算逻辑需要至少分成2个阶段(一个阶段独立并发,一个阶段汇聚) 3、运算程序如何分发 4、程序如何分配运算任务(切片) 5、两阶段的程序如何启动?如何协调? 6、整个程序运行过程中的监控?容错?重试?

可见在程序由单机版扩成分布式时,会引入大量的复杂工作。为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。

而mapreduce就是这样一个分布式程序的通用框架,其应对以上问题的整体结构如下:

1、MRAppMaster(mapreduce application master) 2、MapTask 3、ReduceTask
分布式运算程序

1.分布式运算程序往往至少需要2个阶段,完全并行 2.第一个阶段的task并发实例各司其职互不相干,完全并行 3.第二个阶段的task并发实例互不相干,但是他们的数据依赖上一个阶段的task并发实例的输出,这个依赖是全局的依赖,每一个都依赖上一个阶段的所有的并发实例的输出 4.MapReduce是只有两个阶段:Map阶段和Reduce阶段 如果你的程序很复杂,两个阶段搞不定,那么就需要多个MapReduce程序,串行运行

第一个阶段就是Map阶段,第一个阶段的运行实例就是 Map task 第二个阶段就是Reduce阶段,第二个阶段的运行实例就是 Reduce task

如果需要统计文件中每一个单词出现的次数:

Map里面的逻辑: 1.读取数据 2.按行处理 3.按空格切分行内单词 4.计数 HashMap(单词, value+1)

等分给自己的数据片,全部读完之后

5.把HashMap 输出给下一个阶段 – 将HashMap按照首字母范围分成3个hashMap 6.将3个hashMap分别传递给3个 reduce task

若干复杂的细节问题: 1.你的Map task 如何进行任务分配 2.Reduce task 如何分配要处理的任务 3.Map task 和 Reduce task 之间如何衔接 4.如果某些Map task 运行失败,如何处理 5.Map task 如果都要自己负责输出数据的分区,那么会很麻烦

如果要解决上面的问题(两个阶段的实例难以协调) 我们就需要一个主管 : mr application master


MapReduce编程实例 wordcount 由于使用MapReduce,所以之前的hdfsjar是不能满足的,需要新的jar包. mapReduceJars 这些jar包在hadoop的源码里面的share文件里面都有. 需要的jar包: hadoop-common-2.6.4.jar 然后还需要common里面lib下的所有jar hadoop-hdfs-2.6.4.jar 然后还需要hdfs里面的lib下的所有的jar 还有mapReduce下面的jar包,但是不能包括 hadoop-mapreduce-examples-2.6.4.jar – 这个包里面有已经写好了的mapreduce例子 然后再家mapreduce的lib下的所有的jar 还需要yarn下的jar,不需要server

yarn下的lib所有的jar也需要

package com.thp.bigdata.wcdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * KEYIN : 默认情况下是mr框架所读到的一行文本的起始偏移量 (默认是一行一行读取数据,读到一行就把起始偏移量传递到KEYIN里面) -- Long * VALUEIN : 默认情况下是mr框架所读到一行文本的内容 -- String * KEYOUT : 是用户自定义逻辑处理完成之后输出数据中的key, 是我们自己来决定的 在此处是单词 -- String * VALUEOUT : 是用户自定义逻辑处理完成之后输出数据的value 在此处是单词次数 -- Integer * @author 汤小萌 * */ /** * 我们要输出的数据,需要经过网络传输,既然要经过网络传输,那么这写数据就需要经过序列化 * 使用Serializable 进行序列化,比较冗余,会将类的继承信息全部序列化 * 但是在这里我们只需要将对象里面的数据进行序列化就可以 * hadoop里面有一个自己的序列化框架 -- 更加精简 Long - LongWritable String - Text Integer - IntWritable * @author 汤小萌 * */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ /** * map阶段的业务逻辑就写在自定义的map()方法中 * maptask会对每一行输入数据调用一次我们自定义的方法 * key 是偏移量,对我进行业务逻辑的处理是没有用的 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将maptask传递给我们的文本内容转换成String String line = value.toString(); // 根据空格将这一行切分单词 String[] words = line.split(" "); // 输出的是 <单词, 1> for(String word : words) { // 将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同的单词会到想的reduce task context.write(new Text(word), new IntWritable(1)); } } } package com.thp.bigdata.wcdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 输入的类型要跟Map阶段的输出的类型对应 * * KEYOUT, VALUEOUT 自定义Reduce逻辑处理结果的输出数据类型 * KEYOUT 是单词 * VALUEOUT 是总次数 * @author 汤小萌 * */ public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ /** * 入参 key : 是一组相同单词kv对的key * 同一组 <hello,1> <hello,1> <hello,1> <hello,1> <hello,1> * key 指的是 hello , 只需要一个 因为一组的单词是一样的 , * values 就是迭代器了,可以迭代里面的每一个 1 进行计数 */ // 每一次调用reduce是完成了一组单词的统计,下一次调用就是统计另一组的单词的统计 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable value : values) { count += value.get(); // 这个count 最后就是某一个单词的汇总的值 } context.write(key, new IntWritable(count)); } } package com.thp.bigdata.wcdemo; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 驱动类 * 相当于一个yarn集群的客户端 * 需要在此封装mapreduce程序的相关运行参数,指定jar包 * 最后提交给yarn * @author 汤小萌 * */ public class WordcountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 指定本程序的jar包所在的本地路径 job.setJarByClass(WordcountDriver.class); // 指定本业务job要使用的mapper/Reduce业务类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReduce.class); // 指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 指定最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定job的输入原始文件所在的目录 // 待处理文件可以在多个目录里面 FileInputFormat.setInputPaths(job, new Path(args[0])); // 指定job的输出结果 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 将job中配置的相关参数,以及job所用的的java类所在的jar包,提交给yarn去运行 /*job.submit();*/ boolean res = job.waitForCompletion(true); // 会等待程序处理完成之后,程序才退出 System.exit(res ? 0 : 1); } }

先启动hdfs

start-dfs.sh

还必须启动 yarn , 需要运行在yarn 上

start-yarn.sh

启动完之后需要检查一下是否启动成功


一个完整的mapreduce程序在分布式运行时有三类实例进程:

1、MRAppMaster:负责整个程序的过程调度及状态协调 2、mapTask:负责map阶段的整个数据处理流程 2、mapTask:负责map阶段的整个数据处理流程


MapReduce 的编程规范: (1)用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端) (2)Mapper的输入数据是KV对的形式(KV的类型可自定义) (3)Mapper的输出数据是KV对的形式(KV的类型可自定义) (4)Mapper中的业务逻辑写在map()方法中 (5)map()方法(maptask进程)对每一个<K,V>调用一次 (6)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV (7)Reducer的业务逻辑写在reduce()方法中 (8)Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法 (9)用户自定义的Mapper和Reducer都要继承各自的父类 (10)整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象


MAPREDUCE框架结构及核心运行机制

结构 一个完整的mapreduce程序在分布式运行时有三类实例进程: 1、MRAppMaster:负责整个程序的过程调度及状态协调 2、mapTask:负责map阶段的整个数据处理流程 3、ReduceTask:负责reduce阶段的整个数据处理流程

MR程序运行流程 1.流程示意图:

2.流程解析 a. 一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程 b. maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:

利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件

c. MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区) d. Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储

转载请注明原文地址: https://www.6miu.com/read-4988254.html

最新回复(0)