为什么要用使用mapreduce
1、海量数据在单机上处理因为硬件资源限制,无法胜任 2、而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度 3、引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理 4、一些复杂的逻辑使用hive实现起来很麻烦,即使用hive实现了,之后需求变更,改起来也会很麻烦
mapreduce的原理
1、maptask收集我们的map()方法输出的kv对,放到内存缓冲区中 2、从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 3、多个溢出文件会被合并成大的溢出文件 4、在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序 5、reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据 6、reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序) 7、合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)
mapreduce的shuffle过程
• mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle; • shuffle: 洗牌、发牌——(核心机制:数据分区,排序,缓存); • 具体来说:就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序;
maptask的数据切片
mapreduce代码实例–wordcount
package
com.zjs.mapreduce
import org
.apache.commons.io.filefilter.TrueFileFilter
import org
.apache.hadoop.conf.Configuration
import org
.apache.hadoop.fs.FileSystem
import org
.apache.hadoop.fs.Path
import org
.apache.hadoop.io.IntWritable
import org
.apache.hadoop.io.LongWritable
import org
.apache.hadoop.io.Text
import org
.apache.hadoop.mapreduce.Job
import org
.apache.hadoop.mapreduce.Mapper
import org
.apache.hadoop.mapreduce.Reducer
import org
.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org
.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org
.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org
.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org
.apache.log4j
.BasicConfigurator
import java
.io.IOException
public class WordCount {
public static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
// @Override
// protected void setup(Context context) throws IOException, InterruptedException {
// super
.setup(context)
// }
Text k = new Text()
IntWritable v = new IntWritable()
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value
.toString()
String[] words = line
.split(
" ")
for (String word : words) {
k
.set(word)
v
.set(
1)
context
.write(k, v)
}
}
// @Override
// protected void cleanup(Context context) throws IOException, InterruptedException {
// super
.cleanup(context)
// }
}
public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
// @Override
// protected void setup(Context context) throws IOException, InterruptedException {
// super
.setup(context)
// }
IntWritable v = new IntWritable()
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count =
0
for (IntWritable v : values) {
count += v
.get()
}
v
.set(count)
context
.write(key, v)
}
// @Override
// protected Object clone() throws CloneNotSupportedException {
// return super
.clone()
// }
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration()
Job job = Job
.getInstance(conf)
BasicConfigurator
.configure()
job
.setJarByClass(WordCount
.class)
job
.setMapperClass(WordCountMap
.class)
job
.setReducerClass(WordCountReduce
.class)
job
.setMapOutputKeyClass(Text
.class)
job
.setMapOutputValueClass(IntWritable
.class)
job
.setOutputKeyClass(Text
.class)
job
.setOutputValueClass(IntWritable
.class)
job
.setInputFormatClass(TextInputFormat
.class)
job
.setOutputFormatClass(TextOutputFormat
.class)
FileInputFormat
.setInputPaths(job, new Path(
"/Users/zhangjishuai/zjs/wordcount/input"))
FileSystem fs = FileSystem
.get(conf)
Path path = new Path(
"/Users/zhangjishuai/zjs/wordcount/output")
if(fs
.exists(path)){
fs
.delete(path,true)
}
FileOutputFormat
.setOutputPath(job, path)
job
.waitForCompletion(true)
}
}