package com.uplooking.bigdata.mr.format.in; import com.uplooking.bigdata.mr.wc.WordCountApp; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /** * 如果我们读取的数据不只是一个文件,或者说数据源是多个目录下面的文件 * 还是来统计单词的出现的次数 */ public class MultiInFormatApp { public static void main(String[] args) throws Exception { if(args == null || args.length < 1) { System.err.println("Parameter Error!Usage:<output_path>"); System.exit(-1); } Path outputpath = new Path(args[0]); Configuration conf = new Configuration(); String jobName = MultiInFormatApp.class.getSimpleName(); Job job = Job.getInstance(conf, jobName); job.setJarByClass(MultiInFormatApp.class); /** * job The Job * path to be added to the list of inputs for the job * inputFormatClass class to use for this path * mapperClass class to use for this path */ MultipleInputs.addInputPath(job, new Path("/hello"), TextInputFormat.class, MultiInFormatMapper.class); MultipleInputs.addInputPath(job, new Path("/hello.txt"), TextInputFormat.class, MultiInFormatMapper.class); // FileInputFormat.setInputPaths(job, inputpath); // job.setInputFormatClass(TextInputFormat.class); // job.setMapperClass(MultiInFormatMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //reducer job.setReducerClass(MultiInFormatReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, outputpath); job.setNumReduceTasks(1); job.waitForCompletion(true); } static class MultiInFormatMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String line = v1.toString(); String[] splits = line.split(" "); for(String word : splits) { context.write(new Text(word), new LongWritable(1)); } } } static class MultiInFormatReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { long count = 0; for(LongWritable lw : v2s) { count += lw.get(); } context.write(k2, new LongWritable(count)); } } }
转载请注明原文地址: https://www.6miu.com/read-70272.html