hadoop之MapReduce基本原理及入门WordCount小例子(三)

xiaoxiao2021-02-28  24

一.MapReduce原理

分而治之,一个大任务拆成多个子任务就叫map,并行执行后合并结果(reduce) Job&Task 一个作业,比如说从100G的日志访问里找出访问次数最多的IP;一个JobTracker可能被拆分成多个task,task又分为MapTaskTracker和ReduceTaskTracker taskTracker常常和DataNode同一个节点,能保证计算跟着数据走,使开销最小化

JobTracker的角色: 1.作业调度 2.分配任务、监控任务执行进度 3.监控TaskTracker的状态

TaskTracker的角色: 1.执行任务 2.汇报任务状态

MapReduce运行流程: 输入数据分片,将分片的数据按照一定规则放到TaskTracker分配Map任务,然后产生一些中间结果,都是key-value对,然后再根据一些映射规则进行交换到Reduce区,运算完后数据写回到hdfs中。 MapReduce的容错机制: 1.重复执行出错的计算(重复执行4次后放弃) 2.推测执行:如果有一个TaskTracker计算的很慢,保持这个TaskTracker继续计算,然后MapReduce会重新启动一个TaskTracker来进行计算,两个TaskTracker哪个先算完就用哪一个的结果

二.计算单词出现次数实例

wordCount单词计数:计算文件中出现每个单词的次数,输入结果按照字母顺序进行排序,如下

WordConut代码

import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCount { public static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //key一般指的是偏移量,value是一行的数据,从下面的while循环看得出来就是一次分组过程 String line = value.toString(); StringTokenizer token = new StringTokenizer(line); while (token.hasMoreTokens()) { word.set(token.nextToken()); context.write(word, one);//统计某单词出现过一次,以键值对形式存放 } } } public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //这里代码就容易读懂了,key是某个单词,values是map之后的的数组,context相当于一个舞台环境 int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(WordCount.class); job.setJobName("wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }

Map方法主要是Mapper类中主要承担处理作业的方法。 其中key是传入map的键值,value是对应键值的value值,context是环境对象参数,供程序访问Hadoop的环境对象,map()方法对输入的键值对进行处理,产生一系列的中间键值对,转换后的中间键值对可以有新的键值类型。输入的键值对可根据实际应用设定,例如文档数据记录可将文本文件中的行或数据表格中的行。

将WordCount代码用vim编辑器复制随便放入一个文件夹下,然后编译它:

javac -classpath /opt/hadoop-1.2.1/hadoop-core-1.2.1.jar:/opt/hadoop-1.2.1/lib/commons-cli-1.2.jar -d 你要存放的class文件夹/ WordCount.java 然后打包:jar -cvf wordcount.jar *.class 再新建个input文件夹,将要计算的原文件放进去: vim file1 hello world hello hadoop hadoop file system hadoop java api hello java vim file2 new file hdoop file hadoop new world hadoop free home hadoop free school 然后将file1,file2上传到hdfs,先在hdfs上创建一个文件夹用来接收file1,file2 hadoop fs -mkdir input_wordcount 然后上传file1到上面: hadoop fs -put input/* input_wordcount/

然后执行作业:

hadoop jar wordcount.jar WordCount input_wordcount output_wordcount

可以明显的看出是map执行完后,reduce才开始执行

查看output文件夹的结果:

hadoop fs -ls output_wordcount

上图箭头指向的文件夹是运行结果,我们查看下计算结果:

hadoop fs -cat output_wordcount/part-r-00000

转载请注明原文地址: https://www.6miu.com/read-2350221.html

最新回复(0)