MapReduce二次排序

xiaoxiao2021-02-28  69

package com.uplooking.bigdata.mr.test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /**  * 对数据进行二次排序:  *  假如我们现在的需求是先按 cookieId 排序,然后按 time 排序,以便按 session 切分日志      cookieId    time    url      2    12:12:34    2_hao123      3    09:10:34    3_baidu      1    15:02:41    1_google      3    22:11:34    3_sougou      1    19:10:34    1_baidu      2    15:02:41    2_google      1    12:12:34    1_hao123      3    23:10:34    3_soso      2    05:02:41    2_google  结果:      ---------------------------------      1      12:12:34        1_hao123      1      15:02:41        1_google      1      19:10:34        1_baidu      ---------------------------------      2      05:02:41        2_google      2      12:12:34        2_hao123      2      15:02:41        2_google      ---------------------------------      3      09:10:34        3_baidu      3      22:11:34        3_sougou      3      23:10:34        3_soso     分析:         这里显然要比上一个案例复杂,上一个案例我们只对一列之进行了排序,该案例我们需要对两列值进行排序,把这种排序称之为二次排序。         按照我们之前在学习集合的时候的思路(在treeset或者treemap的时候对person先按照年龄自然顺序排序,然后在按照姓名自然顺序,为二次排序),         类比到我们这道题,因为在mr中要进行排序只对key进行排序的原则,同时我们知道key作为mr的泛型参数,需要具备以下原则:         1、key必须要实现序列化(implements Writable)         2、同时因为要经过shuffle,shuffle中有排序,所以key还有可比较(implements Comparable)             这二者综合就需要让key实现WritableComparable接口         所以要完成二次排序,需要自定义key类型,key中包含了我们要进行排序的内容,  */ public class DataSecondSortApp {     public static void main(String[] args) throws Exception {         if(args == null || args.length < 2) {             System.err.println("Parameter Error! Usage: <inputPath outputPath>");             System.exit(-1);         }         String inputPath = args[0];         Path outputPath = new Path(args[1]);         Configuration conf = new Configuration();         Job job = Job.getInstance(conf, DataSecondSortApp.class.getSimpleName());         job.setJarByClass(DataSecondSortApp.class);         //设置输入         FileInputFormat.setInputPaths(job, inputPath);         job.setInputFormatClass(TextInputFormat.class);         //setmap         job.setMapperClass(DataSecondSortMapper.class);         job.setMapOutputKeyClass(SecondSortWritable.class);         job.setMapOutputValueClass(NullWritable.class);         //设置输出         outputPath.getFileSystem(conf).delete(outputPath, true);         FileOutputFormat.setOutputPath(job, outputPath);         job.setOutputFormatClass(TextOutputFormat.class);         //设置 reducer         job.setReducerClass(DataSecondSortReducer.class);         job.setOutputKeyClass(SecondSortWritable.class);         job.setOutputValueClass(NullWritable.class);         job.setNumReduceTasks(1);         job.waitForCompletion(true);     }     static class DataSecondSortMapper extends Mapper<LongWritable, Text, SecondSortWritable, NullWritable> {         @Override         protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {             String line = v1.toString();             String[] splits = line.split("\t");             //2    12:12:34    2_hao123             Integer cookieid = Integer.valueOf(splits[0].trim());             String time = splits[1].trim();             String url = splits[2].trim();             SecondSortWritable ssw = new SecondSortWritable(cookieid, time, url);             context.write(ssw, NullWritable.get());         }     }     static class DataSecondSortReducer extends Reducer<SecondSortWritable, NullWritable, SecondSortWritable, NullWritable> {         @Override         protected void reduce(SecondSortWritable k2, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {             context.write(k2, NullWritable.get());         }     } } /**  * 自定义key的类型  */ class SecondSortWritable implements WritableComparable<SecondSortWritable> {     private int cookieid;     private String time;     private String url;     public SecondSortWritable(int cookieid, String time, String url) {         this.cookieid = cookieid;         this.time = time;         this.url = url;     }     public SecondSortWritable() {     }     public int getCookieid() {         return cookieid;     }     public void setCookieid(int cookieid) {         this.cookieid = cookieid;     }     public String getTime() {         return time;     }     public void setTime(String time) {         this.time = time;     }     public String getUrl() {         return url;     }     public void setUrl(String url) {         this.url = url;     }     public void write(DataOutput out) throws IOException {         out.writeInt(this.cookieid);         out.writeUTF(this.time);         out.writeUTF(this.url);     }     public void readFields(DataInput in) throws IOException {         this.cookieid = in.readInt();         this.time = in.readUTF();         this.url = in.readUTF();     }     /*         核心的比较             先按照cookieid进行升序比较             在按照time进行比较     */     public int compareTo(SecondSortWritable sw) {         int ret = this.cookieid - sw.cookieid;         if(ret == 0) {             ret = this.time.compareTo(sw.time);         }         return ret;     }     @Override     public String toString() {         return this.cookieid + "\t" + this.time + "\t" + this.url;     } }
转载请注明原文地址: https://www.6miu.com/read-73009.html

最新回复(0)