MapReduce之多文件输出

xiaoxiao2021-02-28  115

package com.uplooking.bigdata.mr.format.out; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /**  * 将数据按照国名分目录进行存放  *   3070818,1963,1096,,"US","IN",,1,,441,6,69,,4,,0.625,,,,,,,      3070819,1963,1096,,"US","TN",,4,,12,6,63,,0,,,,,,,,,      3070820,1963,1096,,"GB","",,2,,12,6,63,,0,,,,,,,,,      3070821,1963,1096,,"US","IL",,2,,15,6,69,,1,,0,,,,,,,      3070822,1963,1096,,"US","NY",,2,,401,1,12,,4,,0.375,,,,,,,      3070823,1963,1096,,"US","MI",,1,,401,1,12,,8,,0.6563,,,,,,,      3070824,1963,1096,,"US","IL",,1,,401,1,12,,5,,0.48,,,,,,,      3070825,1963,1096,,"US","IL",,1,,401,1,12,,7,,0.6531,,,,,,,      3070826,1963,1096,,"US","IA",,1,,401,1,12,,1,,0,,,,,,,      3070827,1963,1096,,"US","CA",,4,,401,1,12,,2,,0.5,,,,,,,      3070828,1963,1096,,"US","CT",,2,,16,5,59,,4,,0.625,,,,,,,      3070829,1963,1096,,"FR","",,3,,16,5,59,,5,,0.48,,,,,,,      3070830,1963,1096,,"US","NH",,2,,16,5,59,,0,,,,,,,,,      3070831,1963,1096,,"US","CT",,2,,16,5,59,,0,,,,,,,,,     国名在第五列     这里使用MutilOutputs进行输出  */ public class MultiOutputsApp {     public static void main(String[] args) throws Exception {         if(args == null || args.length < 2) {             System.err.println("Parameter Errors! Usage: <inputpath outputpath>");             System.exit(-1);         }         String inputpath = args[0];         Path outputpath = new Path(args[1]);         Configuration conf = new Configuration();         Job job = Job.getInstance(conf, MultiOutputsApp.class.getSimpleName());         job.setJarByClass(MultiOutputsApp.class);         FileInputFormat.setInputPaths(job, inputpath);         job.setInputFormatClass(TextInputFormat.class);         job.setMapperClass(MultiOutputsMapper.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(NullWritable.class);         job.setOutputFormatClass(TextOutputFormat.class);         outputpath.getFileSystem(conf).delete(outputpath, true);         FileOutputFormat.setOutputPath(job, outputpath);         job.setNumReduceTasks(0);         /**          * Adds a named output for the job.          * <p/>          *          *  job               job to add the named output          *  namedOutput       named output name, it has to be a word, letters          *                          and numbers only, cannot be the word 'part' as          *                          that is reserved for the default output.          *  outputFormatClass OutputFormat class.          *  keyClass          key class          *  valueClass        value class          *          *  Named output 'PART' not defined 这样在驱动中预先声明namebase,在后面的mr中就可以直接进行访问          */         MultipleOutputs.addNamedOutput(job, "US", TextOutputFormat.class, Text.class, NullWritable.class);         MultipleOutputs.addNamedOutput(job, "GB", TextOutputFormat.class, Text.class, NullWritable.class);         MultipleOutputs.addNamedOutput(job, "FR", TextOutputFormat.class, Text.class, NullWritable.class);         job.waitForCompletion(true);     }     static class MultiOutputsMapper extends Mapper<LongWritable, Text, Text, NullWritable> {         private MultipleOutputs<Text, NullWritable> mos;         @Override         protected void setup(Context context) throws IOException, InterruptedException {             mos = new MultipleOutputs<Text, NullWritable>(context);         }         @Override         protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {             //按照国名对得到的一行数据进行分目录存放             String line = v1.toString().replace("\"", "");             String[] splits = line.split(",");             if(splits == null || splits.length < 4) {                 return;             }             String countryName = splits[4];             if(countryName.equalsIgnoreCase("US")) {                 mos.write("US", v1, NullWritable.get());                 //String namedOutput, K key, V value, String baseOutputPath                 mos.write("US", v1, NullWritable.get(), "US");             } else if(countryName.equalsIgnoreCase("GB")){                 mos.write("GB", v1, NullWritable.get());                 mos.write("GB", v1, NullWritable.get(), "GB");             } else if (countryName.equalsIgnoreCase("FR")) {                 mos.write("FR", v1, NullWritable.get());                 mos.write("FR", v1, NullWritable.get(), "FR");             } else {//Named output 'PART' not defined 在mos多目录输出的时候,需要对输出制定的目录进行预先声明                 mos.write("PART", v1, NullWritable.get());             }         }         @Override         protected void cleanup(Context context) throws IOException, InterruptedException {             mos.close();         }     } }
转载请注明原文地址: https://www.6miu.com/read-70477.html

最新回复(0)