Mapreduce是一个分布式的运算编程框架,核心功能是将用户编写的核心逻辑代码分布式地运行在一个集群的很多服务器上;
一:为什么要用Mapreduce
(1)海量数据在单机上处理因为硬件资源限制,无法胜任,因为需要采用分布式集群的方式来处理。
(2)而一旦将单机版程序扩展到集群来分布式运行,将极大地增加程序的复杂度和开发难度。
(3)引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。
二:Mapreduce处理流程
三个阶段:map阶段、shuffle阶段和reduce阶段。
两个核心进程:map task进程和reduce task进程。
shuffle详解:
shuffle是map和reduce中间的数据调度过程,主要包含:缓存、分区、排序。
它的每一个处理步骤是分散在各个map task和reduce task节点上完成的,整体来看,分为3个操作:
1、分区partition
2、Sort根据key排序
3、Combiner进行局部value的合并
整个shuffle的大流程如下:
1、map task输出结果到一个内存缓存,并溢出为磁盘文件
2、分区/排序/合并
3、reduce task 拉取map输出文件中对应的分区数据
4、reduce端归并排序,产生聚合values迭代器来传递给reduce方法,并把这组聚合kv(聚合的依据是GroupingComparator)中排序最前的kv的key传给reduce方法的入参 key。
三:Mapreduce编程规范
(1)用户程序会分成三个部分:Mapper,Reducer,Driver
(2)Mapper的输入数据是KV对的形式,KV的类型可以设置
(3)Mapper的输出数据是KV对的形式,KV的类型可以设置
(4)Mapper中的业务逻辑写在map方法中
(5)map方法是每进来一个KV对调用一次
(6)Reducer的输入数据应该对应Mapper的输出数据,也是KV
(7)Reducer的业务逻辑写在reduce方法中
(8)reduce方法是对每一个<key,valueList>调用一次
(9)用户的Mapper和Reducer都要继承各自的父类
(10)整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象
四:Mapreduce运行模式
1、本地运行模式
(1)mapreduce程序是被提交给LocalJobRunner在本地运行
(2)而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上
(3)怎样实现本地运行?:写一个程序,不要带集群的配置文件(本质是你的mr程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname参数)
2、集群运行模式
(1)mapreduce程序会提交给yarn集群的resourcemanager,分发到很多的节点上并发执行
(2)处理的数据和输出结果应该位于hdfs文件系统
(3)怎样实现集群运行:三种方式
A、将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动
$ hadoop jar wordcount.jarcn.itcast.bigdata.mrsimple.WordCountDriver inputpath outputpath
B、直接在linux的eclipse中运行main方法
(项目中要带参数:mapreduce.framework.name=yarn以及yarn的两个基本配置)
C、如果要在windows的eclipse中提交job给集群,则要修改YarnRunner类
public class WordCountJobSubmitter { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); /* * 有这两行代码时为集群运行模式 * conf.set("mapreduce.framework.name", "yarn"); * conf.set("yarn.resourcemanager.hostname", "hdfs://hadoop-server-00:9000"); */ Job wordCountJob = Job.getInstance(conf); //重要:指定本job所在的jar包 wordCountJob.setJarByClass(WordCountJobSubmitter.class); //设置wordCountJob所用的mapper逻辑类为哪个类 wordCountJob.setMapperClass(WordCountMapper.class); //设置wordCountJob所用的reducer逻辑类为哪个类 wordCountJob.setReducerClass(WordCountReducer.class); //设置map阶段输出的kv数据类型 wordCountJob.setMapOutputKeyClass(Text.class); wordCountJob.setMapOutputValueClass(IntWritable.class); //设置最终输出的kv数据类型 wordCountJob.setOutputKeyClass(Text.class); wordCountJob.setOutputValueClass(IntWritable.class); //设置要处理的文本数据所存放的路径 FileInputFormat.setInputPaths(wordCountJob, "hdfs://192.168.77.70:9000/wordcount/srcdata/"); FileOutputFormat.setOutputPath(wordCountJob, new Path("hdfs://192.168.77.70:9000/wordcount/output/")); //提交job给hadoop集群 wordCountJob.waitForCompletion(true); } }
五:Mapreduce高级特性
1、Partitioner编程
Partition就是对map输出的key进行分组,不同的组可以指定不同的reduce task处理;
Partition功能由partitioner的实现子类来实现
示例:不同省份流量数据汇总到不同文件中,如: Mapreduce实例---分区流量汇总
2、Mapreduce的排序
Mapreduce中的常见排序机制:partial/total/secondary排序
Mapreduce中排序的基本要素:
a、排序是在map阶段输出之后,reduce处理之前
(通过无reduce的MR程序示例观察)
b、只针对key进行排序
c、 Key要实现WritableComparable接口
简单示例:对流量汇总数据进行倒序排序,如: Mapreduce实例---流量汇总并按流量大小倒序排序
3、Mapreduce程序运行并发度
(1)reduce task数量的决定机制
a、业务逻辑需要
b、数据量大小
设置方法:
job.setNumReduceTasks(5)
(2)map task数量的决定机制
由于map task之间没有协作关系,每一个map task都是各自为政,在map task的处理中没法做“全局”性的聚合操作,所以map task的数量完全取决于所处理的数据量的大小
决定机制:
a、对待处理数据一个文件一个文件的进行“切片”
b、每一个切片分配一个maptask来处理
Mapreduce框架中默认的切片机制:
TextInputFormat.getSplits()继承自FileInputFormat.getSplits()
1:定义一个切片大小:可以通过参数来调节,默认情况下等于“hdfs中设置的blocksize”,通常是128M
2:获取输入数据目录下所有待处理文件List
3:遍历文件List,逐个逐个文件进行切片
for(file:List)
对file从0偏移量开始切,每到128M就构成一个切片,比如a.txt(200M),就会被切成两个切片: a.txt: 0-128M, a.txt :128M-256M
再比如b.txt(80M),就会切成一个切片, b.txt :0-80M
注意:如果要处理的数据是大量的小文件,使用上述这种默认切片机制,就会导致大量的切片,从而maptask进程数特别多,但是每一个切片又非常小,每个maptask的处理数据量就很小,从而,整体的效率会很低。(因为启动maptask需要时间和资源)
通用解决方案:就是将多个小文件划分成一个切片;实现办法就是自定义一个Inputformat子类重写里面的getSplits方法;
Mapreduce框架中自带了一个用于此场景的Inputformat实现类:CombineFileInputformat
六:实例
Mapreduce实例---数单词个数(wordcount)
Mapreduce实例---流量汇总(flowcount)
Mapreduce实例---分区流量汇总
Mapreduce实例---流量汇总并按流量大小倒序排序
Mapreduce实例---共同好友
Mapreduce实例---倒排索引(含job串联)