package com.uplooking.bigdata.mr.format.out;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
 import java.io.IOException;
 
 /**
  * 将数据按照国名分目录进行存放
  *   3070818,1963,1096,,"US","IN",,1,,441,6,69,,4,,0.625,,,,,,,
      3070819,1963,1096,,"US","TN",,4,,12,6,63,,0,,,,,,,,,
      3070820,1963,1096,,"GB","",,2,,12,6,63,,0,,,,,,,,,
      3070821,1963,1096,,"US","IL",,2,,15,6,69,,1,,0,,,,,,,
      3070822,1963,1096,,"US","NY",,2,,401,1,12,,4,,0.375,,,,,,,
      3070823,1963,1096,,"US","MI",,1,,401,1,12,,8,,0.6563,,,,,,,
      3070824,1963,1096,,"US","IL",,1,,401,1,12,,5,,0.48,,,,,,,
      3070825,1963,1096,,"US","IL",,1,,401,1,12,,7,,0.6531,,,,,,,
      3070826,1963,1096,,"US","IA",,1,,401,1,12,,1,,0,,,,,,,
      3070827,1963,1096,,"US","CA",,4,,401,1,12,,2,,0.5,,,,,,,
      3070828,1963,1096,,"US","CT",,2,,16,5,59,,4,,0.625,,,,,,,
      3070829,1963,1096,,"FR","",,3,,16,5,59,,5,,0.48,,,,,,,
      3070830,1963,1096,,"US","NH",,2,,16,5,59,,0,,,,,,,,,
      3070831,1963,1096,,"US","CT",,2,,16,5,59,,0,,,,,,,,,
     国名在第五列
     这里使用MutilOutputs进行输出
  */
 public class MultiOutputsApp {
     public static void main(String[] args) throws Exception {
         if(args == null || args.length < 2) {
             System.err.println("Parameter Errors! Usage: <inputpath outputpath>");
             System.exit(-1);
         }
         String inputpath = args[0];
         Path outputpath = new Path(args[1]);
         Configuration conf = new Configuration();
         Job job = Job.getInstance(conf, MultiOutputsApp.class.getSimpleName());
         job.setJarByClass(MultiOutputsApp.class);
         FileInputFormat.setInputPaths(job, inputpath);
         job.setInputFormatClass(TextInputFormat.class);
 
         job.setMapperClass(MultiOutputsMapper.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(NullWritable.class);
 
         job.setOutputFormatClass(TextOutputFormat.class);
         outputpath.getFileSystem(conf).delete(outputpath, true);
         FileOutputFormat.setOutputPath(job, outputpath);
         job.setNumReduceTasks(0);
         /**
          * Adds a named output for the job.
          * <p/>
          *
          *  job               job to add the named output
          *  namedOutput       named output name, it has to be a word, letters
          *                          and numbers only, cannot be the word 'part' as
          *                          that is reserved for the default output.
          *  outputFormatClass OutputFormat class.
          *  keyClass          key class
          *  valueClass        value class
          *
          *  Named output 'PART' not defined 这样在驱动中预先声明namebase,在后面的mr中就可以直接进行访问
          */
         MultipleOutputs.addNamedOutput(job, "US", TextOutputFormat.class, Text.class, NullWritable.class);
         MultipleOutputs.addNamedOutput(job, "GB", TextOutputFormat.class, Text.class, NullWritable.class);
         MultipleOutputs.addNamedOutput(job, "FR", TextOutputFormat.class, Text.class, NullWritable.class);
         job.waitForCompletion(true);
     }
 
     static class MultiOutputsMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
         private MultipleOutputs<Text, NullWritable> mos;
         @Override
         protected void setup(Context context) throws IOException, InterruptedException {
             mos = new MultipleOutputs<Text, NullWritable>(context);
         }
 
         @Override
         protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
 
 
             //按照国名对得到的一行数据进行分目录存放
             String line = v1.toString().replace("\"", "");
             String[] splits = line.split(",");
             if(splits == null || splits.length < 4) {
                 return;
             }
             String countryName = splits[4];
 
 
 
             if(countryName.equalsIgnoreCase("US")) {
                 mos.write("US", v1, NullWritable.get());
                 //String namedOutput, K key, V value, String baseOutputPath
                 mos.write("US", v1, NullWritable.get(), "US");
             } else if(countryName.equalsIgnoreCase("GB")){
                 mos.write("GB", v1, NullWritable.get());
                 mos.write("GB", v1, NullWritable.get(), "GB");
             } else if (countryName.equalsIgnoreCase("FR")) {
                 mos.write("FR", v1, NullWritable.get());
                 mos.write("FR", v1, NullWritable.get(), "FR");
             } else {//Named output 'PART' not defined 在mos多目录输出的时候,需要对输出制定的目录进行预先声明
                 mos.write("PART", v1, NullWritable.get());
             }
 
         }
 
         @Override
         protected void cleanup(Context context) throws IOException, InterruptedException {
             mos.close();
         }
     }
 }