MapReduce之TopN

xiaoxiao2021-02-28  101

package com.uplooking.bigdata.mr.test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.util.Comparator; import java.util.TreeSet; /**  需求分析:  orderid,userid,payment,productid   seventeen_a.txt      1,9819,100,121      2,8918,2000,111      3,2813,1234,22      4,9100,10,1101      5,3210,490,111      6,1298,28,1211      7,1010,281,90      8,1818,9000,20  seventeen_b.txt      100,3333,10,100      101,9321,1000,293      102,3881,701,20      103,6791,910,30      104,8888,11,39  按照payment从大到小求出TopN,比如top10,结果如上图  当N为动态的话,如果来做,提示:参数控制  分析:     按照我们的分析,因为只需要求出top10,所以一种方式,我们只需要定义一个可排序的容器,同时控制这个容器的大小在10,  那么我们最后汇总得到的容器,其中的数据就是我们想要的top10     那么这个常见的可排序的容器有treeset|treemap,咱们这里就是用treeset就可以了  */ public class TopNApp { //    static int topn;     public static void main(String[] args) throws Exception {         if(args == null || args.length < 3) {             System.err.println("Parameter Error! Usage: <inputPath outputPath topn>");             System.exit(-1);         }         String inputPath = args[0];         Path outputPath = new Path(args[1]);         String topn = args[2];         Configuration conf = new Configuration();         conf.set("TOP_N", topn);         Job job = Job.getInstance(conf, TopNApp.class.getSimpleName());         job.setJarByClass(TopNApp.class);         //设置输入         FileInputFormat.setInputPaths(job, inputPath);         job.setInputFormatClass(TextInputFormat.class);         //setmap         job.setMapperClass(TopNMapper.class);         job.setMapOutputKeyClass(IntWritable.class);         job.setMapOutputValueClass(NullWritable.class);         //设置输出         outputPath.getFileSystem(conf).delete(outputPath, true);         FileOutputFormat.setOutputPath(job, outputPath);         job.setOutputFormatClass(TextOutputFormat.class);         //设置 reducer         job.setReducerClass(TopNReducer.class);         job.setOutputKeyClass(IntWritable.class);         job.setOutputValueClass(IntWritable.class);         job.setNumReduceTasks(1);//但凡是出现排序,topn等等这样的需求,一般的reduce的个数只能有一个         job.waitForCompletion(true);     }     static class TopNMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {         private TreeSet<Integer> ts;         private int topn = 10;         @Override         protected void setup(Context context) throws IOException, InterruptedException {             //按照刚才对mr中map方法特点的分析,我们应该确定,容器只应该被创建一次,所以我们需要将ts提到setUp中进行创建声明              ts = new TreeSet<Integer>(new Comparator<Integer>() {                 public int compare(Integer o1, Integer o2) {                     return o2 - o1;//和integer自身比较性相反即可                 }             });             //取出configuration中的参数             topn = Integer.valueOf(context.getConfiguration().get("TOP_N").trim());         }         @Override         protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {             String line = v1.toString();             String[] splits = line.split(",");             int payment = Integer.valueOf(splits[2].trim());             ts.add(payment);             if(ts.size() > topn) {                 ts.pollLast();//这样每次经过排序之后,如果超过10个元素,删除最后一个元素,保证集合中只有十个元素             }         }         @Override         protected void cleanup(Context context) throws IOException, InterruptedException {             for (int i : ts) {//在执行完一个inputSplits切片数据之后,将求出的top10写道shuffle                 context.write(new IntWritable(i), NullWritable.get());             }         }     }     static class TopNReducer extends Reducer<IntWritable, NullWritable, IntWritable, IntWritable> {         private TreeSet<Integer> ts;         private int topn = 10;         @Override         protected void setup(Context context) throws IOException, InterruptedException {             ts = new TreeSet<Integer>(new Comparator<Integer>() {                 public int compare(Integer o1, Integer o2) {                     return o2 - o1;//和integer自身比较性相反即可                 }             });             //reduce 取出configuration中的参数             topn = Integer.valueOf(context.getConfiguration().get("TOP_N").trim());         }         @Override         protected void reduce(IntWritable k2, Iterable<NullWritable> v2s, Context context) throws IOException, InterruptedException {             ts.add(k2.get());             if(ts.size() > topn) {                 ts.pollLast();             }         }         @Override         protected void cleanup(Context context) throws IOException, InterruptedException {             int count = 1;             for (int i : ts) {                 context.write(new IntWritable(count++), new IntWritable(i));             }         }     } }
转载请注明原文地址: https://www.6miu.com/read-71040.html

最新回复(0)