需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数
1、定义一个mapper类
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { 拿到一行数据转换为string String line = value.toString(); //将这一行切分出各个单词 String[] words = line.split(" "); //遍历得到(word,1) for (String word : words) { context.write(new Text(word), new IntWritable(1)); } } }
(2)定义一个reducer类
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } }
(3)定义一个主类,用来描述job并提交job
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordcountDriver { public static void main(String[] args) throws Exception { if (args == null || args.length == 0) { args = new String[2]; args[0] = "hdfs://master:9000/wordcount/input/wordcount.txt"; args[1] = "hdfs://master:9000/wordcount/output/"; } Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 指定本程序的jar包所在的本地路径 job.setJarByClass(WordcountDriver.class); // z指定job需要使用的mapper/Ruducer业务类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); // 指出mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 指定最终数据的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定job原始文件所在的目录 FileInputFormat.setInputPaths(job, new Path(args[0])); // 指定job输出结果所在的目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); //FileInputFormat.addInputPath(job, new Path(path)); //FileOutputFormat.setOutputPath(job, new Path(path1)); // 将job中的配置的相参数、以及java类所在的jar包,提交给yarn去运行 boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }