package com.uplooking.bigdata.mr.test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 对数据进行二次排序: * 假如我们现在的需求是先按 cookieId 排序,然后按 time 排序,以便按 session 切分日志 cookieId time url 2 12:12:34 2_hao123 3 09:10:34 3_baidu 1 15:02:41 1_google 3 22:11:34 3_sougou 1 19:10:34 1_baidu 2 15:02:41 2_google 1 12:12:34 1_hao123 3 23:10:34 3_soso 2 05:02:41 2_google 结果: --------------------------------- 1 12:12:34 1_hao123 1 15:02:41 1_google 1 19:10:34 1_baidu --------------------------------- 2 05:02:41 2_google 2 12:12:34 2_hao123 2 15:02:41 2_google --------------------------------- 3 09:10:34 3_baidu 3 22:11:34 3_sougou 3 23:10:34 3_soso 分析: 这里显然要比上一个案例复杂,上一个案例我们只对一列之进行了排序,该案例我们需要对两列值进行排序,把这种排序称之为二次排序。 按照我们之前在学习集合的时候的思路(在treeset或者treemap的时候对person先按照年龄自然顺序排序,然后在按照姓名自然顺序,为二次排序), 类比到我们这道题,因为在mr中要进行排序只对key进行排序的原则,同时我们知道key作为mr的泛型参数,需要具备以下原则: 1、key必须要实现序列化(implements Writable) 2、同时因为要经过shuffle,shuffle中有排序,所以key还有可比较(implements Comparable) 这二者综合就需要让key实现WritableComparable接口 所以要完成二次排序,需要自定义key类型,key中包含了我们要进行排序的内容, */ public class DataSecondSortApp { public static void main(String[] args) throws Exception { if(args == null || args.length < 2) { System.err.println("Parameter Error! Usage: <inputPath outputPath>"); System.exit(-1); } String inputPath = args[0]; Path outputPath = new Path(args[1]); Configuration conf = new Configuration(); Job job = Job.getInstance(conf, DataSecondSortApp.class.getSimpleName()); job.setJarByClass(DataSecondSortApp.class); //设置输入 FileInputFormat.setInputPaths(job, inputPath); job.setInputFormatClass(TextInputFormat.class); //setmap job.setMapperClass(DataSecondSortMapper.class); job.setMapOutputKeyClass(SecondSortWritable.class); job.setMapOutputValueClass(NullWritable.class); //设置输出 outputPath.getFileSystem(conf).delete(outputPath, true); FileOutputFormat.setOutputPath(job, outputPath); job.setOutputFormatClass(TextOutputFormat.class); //设置 reducer job.setReducerClass(DataSecondSortReducer.class); job.setOutputKeyClass(SecondSortWritable.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(1); job.waitForCompletion(true); } static class DataSecondSortMapper extends Mapper<LongWritable, Text, SecondSortWritable, NullWritable> { @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String line = v1.toString(); String[] splits = line.split("\t"); //2 12:12:34 2_hao123 Integer cookieid = Integer.valueOf(splits[0].trim()); String time = splits[1].trim(); String url = splits[2].trim(); SecondSortWritable ssw = new SecondSortWritable(cookieid, time, url); context.write(ssw, NullWritable.get()); } } static class DataSecondSortReducer extends Reducer<SecondSortWritable, NullWritable, SecondSortWritable, NullWritable> { @Override protected void reduce(SecondSortWritable k2, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(k2, NullWritable.get()); } } } /** * 自定义key的类型 */ class SecondSortWritable implements WritableComparable<SecondSortWritable> { private int cookieid; private String time; private String url; public SecondSortWritable(int cookieid, String time, String url) { this.cookieid = cookieid; this.time = time; this.url = url; } public SecondSortWritable() { } public int getCookieid() { return cookieid; } public void setCookieid(int cookieid) { this.cookieid = cookieid; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public void write(DataOutput out) throws IOException { out.writeInt(this.cookieid); out.writeUTF(this.time); out.writeUTF(this.url); } public void readFields(DataInput in) throws IOException { this.cookieid = in.readInt(); this.time = in.readUTF(); this.url = in.readUTF(); } /* 核心的比较 先按照cookieid进行升序比较 在按照time进行比较 */ public int compareTo(SecondSortWritable sw) { int ret = this.cookieid - sw.cookieid; if(ret == 0) { ret = this.time.compareTo(sw.time); } return ret; } @Override public String toString() { return this.cookieid + "\t" + this.time + "\t" + this.url; } }
转载请注明原文地址: https://www.6miu.com/read-73009.html