package com.uplooking.bigdata.mr.test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.util.Comparator; import java.util.TreeSet; /** 需求分析: orderid,userid,payment,productid seventeen_a.txt 1,9819,100,121 2,8918,2000,111 3,2813,1234,22 4,9100,10,1101 5,3210,490,111 6,1298,28,1211 7,1010,281,90 8,1818,9000,20 seventeen_b.txt 100,3333,10,100 101,9321,1000,293 102,3881,701,20 103,6791,910,30 104,8888,11,39 按照payment从大到小求出TopN,比如top10,结果如上图 当N为动态的话,如果来做,提示:参数控制 分析: 按照我们的分析,因为只需要求出top10,所以一种方式,我们只需要定义一个可排序的容器,同时控制这个容器的大小在10, 那么我们最后汇总得到的容器,其中的数据就是我们想要的top10 那么这个常见的可排序的容器有treeset|treemap,咱们这里就是用treeset就可以了 */ public class TopNApp { // static int topn; public static void main(String[] args) throws Exception { if(args == null || args.length < 3) { System.err.println("Parameter Error! Usage: <inputPath outputPath topn>"); System.exit(-1); } String inputPath = args[0]; Path outputPath = new Path(args[1]); String topn = args[2]; Configuration conf = new Configuration(); conf.set("TOP_N", topn); Job job = Job.getInstance(conf, TopNApp.class.getSimpleName()); job.setJarByClass(TopNApp.class); //设置输入 FileInputFormat.setInputPaths(job, inputPath); job.setInputFormatClass(TextInputFormat.class); //setmap job.setMapperClass(TopNMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(NullWritable.class); //设置输出 outputPath.getFileSystem(conf).delete(outputPath, true); FileOutputFormat.setOutputPath(job, outputPath); job.setOutputFormatClass(TextOutputFormat.class); //设置 reducer job.setReducerClass(TopNReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); job.setNumReduceTasks(1);//但凡是出现排序,topn等等这样的需求,一般的reduce的个数只能有一个 job.waitForCompletion(true); } static class TopNMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> { private TreeSet<Integer> ts; private int topn = 10; @Override protected void setup(Context context) throws IOException, InterruptedException { //按照刚才对mr中map方法特点的分析,我们应该确定,容器只应该被创建一次,所以我们需要将ts提到setUp中进行创建声明 ts = new TreeSet<Integer>(new Comparator<Integer>() { public int compare(Integer o1, Integer o2) { return o2 - o1;//和integer自身比较性相反即可 } }); //取出configuration中的参数 topn = Integer.valueOf(context.getConfiguration().get("TOP_N").trim()); } @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String line = v1.toString(); String[] splits = line.split(","); int payment = Integer.valueOf(splits[2].trim()); ts.add(payment); if(ts.size() > topn) { ts.pollLast();//这样每次经过排序之后,如果超过10个元素,删除最后一个元素,保证集合中只有十个元素 } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (int i : ts) {//在执行完一个inputSplits切片数据之后,将求出的top10写道shuffle context.write(new IntWritable(i), NullWritable.get()); } } } static class TopNReducer extends Reducer<IntWritable, NullWritable, IntWritable, IntWritable> { private TreeSet<Integer> ts; private int topn = 10; @Override protected void setup(Context context) throws IOException, InterruptedException { ts = new TreeSet<Integer>(new Comparator<Integer>() { public int compare(Integer o1, Integer o2) { return o2 - o1;//和integer自身比较性相反即可 } }); //reduce 取出configuration中的参数 topn = Integer.valueOf(context.getConfiguration().get("TOP_N").trim()); } @Override protected void reduce(IntWritable k2, Iterable<NullWritable> v2s, Context context) throws IOException, InterruptedException { ts.add(k2.get()); if(ts.size() > topn) { ts.pollLast(); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { int count = 1; for (int i : ts) { context.write(new IntWritable(count++), new IntWritable(i)); } } } }
转载请注明原文地址: https://www.6miu.com/read-71040.html