MapReduce之多数据源输入

xiaoxiao2021-02-28  102

package com.uplooking.bigdata.mr.format.in; import com.uplooking.bigdata.mr.wc.WordCountApp; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /**  * 如果我们读取的数据不只是一个文件,或者说数据源是多个目录下面的文件  * 还是来统计单词的出现的次数  */ public class MultiInFormatApp {     public static void main(String[] args) throws Exception {         if(args == null || args.length < 1) {             System.err.println("Parameter Error!Usage:<output_path>");             System.exit(-1);         }         Path outputpath = new Path(args[0]);         Configuration conf = new Configuration();         String jobName = MultiInFormatApp.class.getSimpleName();         Job job = Job.getInstance(conf, jobName);         job.setJarByClass(MultiInFormatApp.class);         /**          *  job                 The  Job          *  path                to be added to the list of inputs for the job          *  inputFormatClass    class to use for this path          *  mapperClass         class to use for this path          */         MultipleInputs.addInputPath(job, new Path("/hello"), TextInputFormat.class, MultiInFormatMapper.class);         MultipleInputs.addInputPath(job, new Path("/hello.txt"), TextInputFormat.class, MultiInFormatMapper.class); //        FileInputFormat.setInputPaths(job, inputpath); //        job.setInputFormatClass(TextInputFormat.class); //        job.setMapperClass(MultiInFormatMapper.class);         job.setMapOutputKeyClass(Text.class);         job.setMapOutputValueClass(LongWritable.class);         //reducer         job.setReducerClass(MultiInFormatReducer.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(LongWritable.class);         job.setOutputFormatClass(TextOutputFormat.class);         FileOutputFormat.setOutputPath(job, outputpath);         job.setNumReduceTasks(1);         job.waitForCompletion(true);     }     static class MultiInFormatMapper extends Mapper<LongWritable, Text, Text, LongWritable> {         @Override         protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {             String line = v1.toString();             String[] splits = line.split(" ");             for(String word : splits) {                 context.write(new Text(word), new LongWritable(1));             }         }     }     static class MultiInFormatReducer extends Reducer<Text, LongWritable, Text, LongWritable> {         @Override         protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {             long count = 0;             for(LongWritable lw : v2s) {                 count += lw.get();             }             context.write(k2, new LongWritable(count));         }     } }
转载请注明原文地址: https://www.6miu.com/read-70272.html

最新回复(0)