package com.uplooking.bigdata.mr.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /** * 统计hdfs上面的一个文件hello.txt中每一个单词出现的次数 * hello.txt内容: hello wuhan hello xiao mei hello xiao hong map<LongWritable, Text, Text, IntWritable> reduce<Text, IntWritable, Text, IntWritable> */ public class WordCountApp { /* * 关于map和reduce的管理调度 */ public static void main(String[] args) throws Exception {//驱动代码 用于管理map和reduce //1、声明创建一个执行map和reduce的对象,这个对象我们一般称之为一个Job Configuration conf = new Configuration(); String jobName = WordCountApp.class.getSimpleName(); Job job = Job.getInstance(conf, jobName); //接下来的动作,将写好的代码打成一个jar //需要设置包含驱动程序的类的字节码 job.setJarByClass(WordCountApp.class); //2°、设置map,使用FileInputFormat设置map的输入,需要知道hdfs文件的路径,同时需要告诉从属于哪一个job //2.1 FileInputFormat.setInputPaths(job, "/hello.txt"); //2.2 job.setInputFormatClass(TextInputFormat.class);//使用相应的文件格式化方式进行解析 //2.3设置用哪一个map类执行相应操作 job.setMapperClass(WordCountMapper.class); //2.4设置map输出的key和value的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //3°、设置reduce //3.1 设置reduce从属于哪一个类 job.setReducerClass(WordCountReducer.class); //3.2设置reduce的输出key和value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //3.3 设置将reduce的结果输出到hdfs上的路径,以及采用何种格式化方式输出到hdfs job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path("/out"));//如果输出路径已经存在,则会抛出目录存在异常 //3.4设置有几个reduce来完成汇总 job.setNumReduceTasks(1); //4°、提交程序 job.waitForCompletion(true); } /** * map()相关 * Mapper类的四个泛型参数 * KEYIN---->k1--->当前行在文本中的偏移量,LongWritable * VALUEIN----->v1--->当前行的内容,Text * KEYOUT------>k2 Text * VALUEOUT------>v2 IntWritable */ static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ /* * 编写map阶段的业务逻辑 * map()函数的前两个参数就是K1,v1 * * */ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String line = v1.toString();//得到每一行的内容 String[] splits = line.split(" ");//将字符串按照分隔符进行切分,得到字符串数组 for(String word : splits) {//使用map阶段的上下文对象将这些数据写出到shuffle中 Text k2 = new Text(word); IntWritable v2 = new IntWritable(1); context.write(k2, v2);//组装好数据写出 } } } /** * reduce相关 * <KEYIN---->k2 类型Text * VALUEIN ---->v2s 类型IntWritable * KEYOUT ---->k3 类型Text * VALUEOUT ---->v3 类型IntWritable * */ static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text k2, Iterable<IntWritable> v2s, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable v2 : v2s) {//循环遍历v2s中的每一个数据 sum += v2.get(); } context.write(k2, new IntWritable(sum));//将汇总结果输出到hdfs文件中 } }
}
将上述使用idea打包生成mr-wc.jar,上传到hadoop服务器/opt/jars/hadoop
使用参数化的命令 yarn jar /opt/jars/hadoop/mr-wc.jar com.uplooking.bigdata.mr.WordCountApp2 /hello /out