大数据 (012)Hadoop-MR执行环境之---定义多个resuce task

xiaoxiao2021-02-28  92

引言

        之前我们说道,默认情况下reduce task只有一个,当我们需要对大量数据进行统计时,一个reduce task已经捉襟见肘,那么我们就有必要配置多哦reduce task进行并行任务执行。

实例

        例子:我们有一个每年每月温度的统计数据需要进行分析。如下图数据。我们需要统计每年中每个月的最高温度的前三名。

       

        自定义分区:我们需要 分配多个reduce task,根据年来进行reduce task的分区,每个reduce task统计每一年的数据。这就引入了自定义分区。

        自定义分组:因为我们要统计每年每月气温的前三名。所以我们应该按照月分组,然后找这个组气温前三名。这就引入了自定义分组。

       自定义 排序:统计温度的前三名,则需要对数据进行排序,取前三。这就引入了自定义排序。

        之前我们已经知道分区、分组、排序规则如下

                分区:hashcode模reduce数量

                分组:根据key比较进行分组      

                排序:根据key的hashcode

        所以我们要将需要自定义分组、排序、分区的数据封装进入key中,所以我们需要对key进行多态封装。

编码

一、总述

        这个测试模块的类全在包com.zjt.mapreducer.weather下。

        因为分区、分组、排序都是自定义,所以关于分区、分组和排序以及其依赖的键都需要我们手动编码完成。我们需要一共完成下方七个类的编码:

        1、MyKey.java

                因为分区、分组、排序都需要我们自己完成,而这三个都是依赖于键进行其工作的。所以我们要根据我们时间需求自定义键。

        2、WeatherMapper.java

                hdfs输入数据的mapper处理函数。其将接收到的数据封装为MyKey实体类,传递给下一步进行分区

        3、MyPartitioner.java

                我们自己实现的分区类,给WeatherMapper.java传来的数据 通过 取模 运算,返回其分区号

        4、MySort.java

                自定义排序类。通过比较MyKey.java中年、月、日来比较其实体类是否相等。

        5、MyGroup.java

                自定义分组。通过比较MyKey.java中年、月来比较其实体类是否相等。

        6、WeatherReducer.java

                接收到被排序、分区、分组后的数据,取其最大的三个,则为每年每月气温最高的前三个。

        7、RunJob.java

                运行类,其中对代码项进行配置,启动任务。

二、各个类源码     

 

        1、MyKey.java

                因为分区、分组、排序都需要我们自己完成,而这三个都是依赖于键进行其工作的。所以我们要根据我们时间需求自定义键。

package com.zjt.mapreducer.weather; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /**  *  * @author ZhangJintao  */ public class MyKey implements WritableComparable<MyKey>{     private int year;  //年     private int month;   //月     private double hot;   //温度     public int getYear() {         return year;     }     public void setYear(int year) {         this.year = year;     }     public int getMonth() {         return month;     }     public void setMonth(int month) {         this.month = month;     }     public double getHot() {         return hot;     }     public void setHot(double hot) {         this.hot = hot;     }     /**      * 反序列化【从对象流中读出】      */     public void readFields(DataInput arg0) throws IOException {         this.year = arg0.readInt();         this.month = arg0.readInt();         this.hot = arg0.readDouble();     }     /**      * 序列化      */     public void write(DataOutput arg0) throws IOException {         arg0.writeInt(year);         arg0.writeInt(month);         arg0.writeDouble(hot);     }     /**      * 比较两个对象是否相等方法,分组时用到      */     public int compareTo(MyKey o) {         int r1 = Integer.compare(this.year, o.getYear());         if (r1 == 0) {             int r2 = Integer.compare(this.month, o.getMonth());             if (r2 == 0) {                 return Double.compare(this.hot, o.getHot());             }else {                 return r2;             }         }else{             return r1 ;         }     }     @Override     public String toString() {         return "MyKey [year=" + year + ", month=" + month + ", hot=" + hot                 + "]";     } }

        2、WeatherMapper.java

                hdfs输入数据的mapper处理函数。其将接收到的数据封装为MyKey实体类,传递给下一步进行分区

 

package com.zjt.mapreducer.weather; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-HH-dd hh:mm:ss");          /**      * 根据键和值,将我们的数据拿出来封装成MyKey      *     key/value:每一行第一个隔开符 左边为key,右边为value      */ //    @Override     protected void map(Text key, Text value,             Mapper<Text, Text, MyKey, DoubleWritable>.Context context)             throws IOException, InterruptedException {         System.out.println("WeatherMapper.map()");         try {             Date date = sdf.parse(key.toString());             Calendar c = Calendar.getInstance();             c.setTime(date);             int year = c.get(Calendar.YEAR);             int month = c.get(Calendar.MONTH);             double hot = Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c")));             MyKey myKey = new MyKey();             myKey.setHot(hot);             myKey.setYear(year);             myKey.setMonth(month);             System.out.println("===WeatherMapper.map() 开始输出数据");             System.out.println("===   【"+myKey.toString() + " : " + hot +"】");             context.write(myKey, new DoubleWritable(hot));         } catch (Exception e) {             e.printStackTrace();         }     } }

 

        3、MyPartitioner.java

                我们自己实现的分区类,给WeatherMapper.java传来的数据 通过 取模 运算,返回其分区号

 

package com.zjt.mapreducer.weather; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{     /**      * 自定义分区      * MyTask每输出一个数据调用一次,所以这个方法越短越好      *     返回分区号      */ //    @Override     public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {         System.out.println("MyPartitioner.getPartition()");         System.out.println("===MyPartitioner.getPartition()  接收到到数据");         System.out.println("===   【" + key.toString() + "】");         System.out.println("===MyPartitioner.getPartition()  开始返回分区号");         System.out.println("===   【" + (key.getYear()-1949) % numReduceTasks + "】");         //当前年份减去1949年,对ReduceTasks数量取模         return (key.getYear()-1949) % numReduceTasks;     } }

 

        4、MySort.java

                自定义排序类。通过比较MyKey.java中年、月、日来比较其实体类是否相等。

 

package com.zjt.mapreducer.weather; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /**  * 自定义排序函数,继承默认的排序  * @author ZhangJintao  */ public class MySort extends WritableComparator{     public MySort() {         super(MyKey.class,true);     }      //    @Override     public int compare(WritableComparable a, WritableComparable b) {         System.out.println("MySort.compare()");         MyKey k1 = (MyKey)a;         MyKey k2 = (MyKey)b;         System.out.println("===MySort.compare()   开始比较数据");         System.out.println("===   k1=【"+k1.toString()+"】");         System.out.println("===   k2=【"+k2.toString()+"】");         int r1 = Integer.compare(k1.getYear(), k2.getYear());         if (r1 == 0) {             int r2 = Integer.compare(k1.getMonth(), k2.getMonth());             if (r2 == 0) {                 System.out.println("===MySort.compare()   比较结果");                 System.out.println("===   【"+-Double.compare(k1.getHot(), k2.getHot())+"】");                 return -Double.compare(k1.getHot(), k2.getHot());             }else {                 System.out.println("===MySort.compare()   比较结果");                 System.out.println("===   【"+r2+"】");                 return r2;             }         }else{             System.out.println("===MySort.compare()   比较结果");             System.out.println("===   【"+r1+"】");             return r1 ;         }     } }

 

        5、MyGroup.java

                自定义分组。通过比较MyKey.java中年、月来比较其实体类是否相等。

 

package com.zjt.mapreducer.weather; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /**  * 自定义分组函数  * @author ZhangJintao  */ public class MyGroup extends WritableComparator{     public MyGroup() {         super(MyKey.class,true);     }      //    @Override     public int compare(WritableComparable a, WritableComparable b) {         System.out.println("MyGroup.compare()");         System.out.println("===MyGroup.compare()   开始比较数据");         MyKey k1 = (MyKey)a;         MyKey k2 = (MyKey)b;         System.out.println("===   k1=【"+k1.toString()+"】");         System.out.println("===   k2=【"+k2.toString()+"】");                  int r1 = Integer.compare(k1.getYear(), k2.getYear());         if (r1 == 0) {             System.out.println("===MyGroup.compare()   比较结果");             System.out.println("===   【"+Integer.compare(k1.getMonth(), k2.getMonth())+"】");             return Integer.compare(k1.getMonth(), k2.getMonth());         }else{             System.out.println("===MyGroup.compare()   比较结果");             System.out.println("===   【"+r1+"】");             return r1 ;         }     } }

 

        6、WeatherReducer.java

                接收到被排序、分区、分组后的数据,取其最大的三个,则为每年每月气温最高的前三个。

 

package com.zjt.mapreducer.weather; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WeatherReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{ //    @Override     protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1,             Reducer<MyKey, DoubleWritable, Text, NullWritable>.Context arg2)             throws IOException, InterruptedException {         System.out.println("WeatherReducer.reduce()");         System.out.println("===WeatherReducer.reduce()   处理收到的数据");         System.out.println("===WeatherReducer.reduce()   【"+arg0.toString()+"】");         System.out.println("===WeatherReducer.reduce()   打印当前组的所有数据");         for (DoubleWritable v :arg1) {             String msg = arg0.getYear() + "\t" + arg0.getMonth() + "\t" + v.get();             System.out.println("===WeatherReducer.reduce()   【"+msg+"】");         }         int i = 0 ;         for (DoubleWritable v :arg1) {             i ++ ;             String msg = arg0.getYear() + "\t" + arg0.getMonth() + "\t" + v.get();             arg2.write(new Text(msg), NullWritable.get());             if (i == 3) {                 break;             }         }     } }

 

        7、RunJob.java

                运行类,其中对代码项进行配置,启动任务。 

 

package com.zjt.mapreducer.weather; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RunJob {     public static void main(String[] args) {         Configuration config =new Configuration();         config.set("fs.defaultFS", "hdfs://node1:8020");         config.set("yarn.resourcemanager.hostname", "node1"); //        config.set("mapred.jar", "C:\\Users\\ZhangJintao\\Desktop\\wc.jar");         try {             FileSystem fs =FileSystem.get(config);                          Job job =Job.getInstance(config);             job.setJarByClass(RunJob.class);                          job.setJobName("weather");             job.setMapperClass(WeatherMapper.class);             job.setReducerClass(WeatherReducer.class);             job.setMapOutputKeyClass(MyKey.class);             job.setOutputValueClass(DoubleWritable.class);                          job.setPartitionerClass(MyPartitioner.class);             job.setSortComparatorClass(MySort.class);             job.setGroupingComparatorClass(MyGroup.class);                          job.setNumReduceTasks(3);                          job.setInputFormatClass(KeyValueTextInputFormat.class);                          FileInputFormat.addInputPath(job, new Path("/usr/input/weather"));                          Path outpath =new Path("/usr/output/weather");                                       if(fs.exists(outpath)){                 fs.delete(outpath, true);             }             FileOutputFormat.setOutputPath(job, outpath);                          boolean f= job.waitForCompletion(true);             if(f){                 System.out.println("JOB 执行成功");             }         } catch (Exception e) {             e.printStackTrace();         }     } }

三、运行项目

 

        给hdfs上传/usr/input/weather,执行RunJob.java 的 main方法,执行成功后,刷新output得到如下结果

四、运行分析

        我们可以给每个类的执行方法内加入System.out.print("类名+方法名");从控制台得到的信息筛选出来,如下所示(即为代码的执行顺序)

 

WeatherMapper.map()        WeatherMapper.map() 开始输出数据           【MyKey [year=1949, month=0, hot=34.0] : 34.0】     MyPartitioner.getPartition()        MyPartitioner.getPartition()  接收到到数据           【MyKey [year=1949, month=0, hot=34.0]】        MyPartitioner.getPartition()  开始返回分区号           【0】     WeatherMapper.map()        WeatherMapper.map() 开始输出数据           【MyKey [year=1949, month=0, hot=36.0] : 36.0】     MyPartitioner.getPartition()        MyPartitioner.getPartition()  接收到到数据           【MyKey [year=1949, month=0, hot=36.0]】        MyPartitioner.getPartition()  开始返回分区号           【0】     WeatherMapper.map()        WeatherMapper.map() 开始输出数据           【MyKey [year=1950, month=0, hot=32.0] : 32.0】     MyPartitioner.getPartition()        MyPartitioner.getPartition()  接收到到数据           【MyKey [year=1950, month=0, hot=32.0]】        MyPartitioner.getPartition()  开始返回分区号           【1】     WeatherMapper.map()        WeatherMapper.map() 开始输出数据           【MyKey [year=1950, month=0, hot=37.0] : 37.0】     MyPartitioner.getPartition()        MyPartitioner.getPartition()  接收到到数据           【MyKey [year=1950, month=0, hot=37.0]】        MyPartitioner.getPartition()  开始返回分区号           【1】     WeatherMapper.map()        WeatherMapper.map() 开始输出数据           【MyKey [year=1951, month=0, hot=23.0] : 23.0】     MyPartitioner.getPartition()        MyPartitioner.getPartition()  接收到到数据           【MyKey [year=1951, month=0, hot=23.0]】        MyPartitioner.getPartition()  开始返回分区号           【2】     WeatherMapper.map()        WeatherMapper.map() 开始输出数据           【MyKey [year=1950, month=0, hot=41.0] : 41.0】     MyPartitioner.getPartition()        MyPartitioner.getPartition()  接收到到数据           【MyKey [year=1950, month=0, hot=41.0]】        MyPartitioner.getPartition()  开始返回分区号           【1】     WeatherMapper.map()        WeatherMapper.map() 开始输出数据           【MyKey [year=1950, month=0, hot=27.0] : 27.0】     MyPartitioner.getPartition()        MyPartitioner.getPartition()  接收到到数据           【MyKey [year=1950, month=0, hot=27.0]】        MyPartitioner.getPartition()  开始返回分区号           【1】     WeatherMapper.map()        WeatherMapper.map() 开始输出数据           【MyKey [year=1951, month=0, hot=45.0] : 45.0】     MyPartitioner.getPartition()        MyPartitioner.getPartition()  接收到到数据           【MyKey [year=1951, month=0, hot=45.0]】        MyPartitioner.getPartition()  开始返回分区号           【2】     WeatherMapper.map()        WeatherMapper.map() 开始输出数据           【MyKey [year=1951, month=0, hot=46.0] : 46.0】     MyPartitioner.getPartition()        MyPartitioner.getPartition()  接收到到数据           【MyKey [year=1951, month=0, hot=46.0]】        MyPartitioner.getPartition()  开始返回分区号           【2】     WeatherMapper.map()        WeatherMapper.map() 开始输出数据           【MyKey [year=1951, month=0, hot=47.0] : 47.0】     MyPartitioner.getPartition()        MyPartitioner.getPartition()  接收到到数据           【MyKey [year=1951, month=0, hot=47.0]】        MyPartitioner.getPartition()  开始返回分区号           【2】     MySort.compare()        MySort.compare()   开始比较数据           k1=【MyKey [year=1951, month=0, hot=47.0]】           k2=【MyKey [year=1951, month=0, hot=46.0]】        MySort.compare()   比较结果           【-1】     MySort.compare()        MySort.compare()   开始比较数据           k1=【MyKey [year=1951, month=0, hot=46.0]】           k2=【MyKey [year=1951, month=0, hot=45.0]】        MySort.compare()   比较结果           【-1】     MySort.compare()        MySort.compare()   开始比较数据           k1=【MyKey [year=1950, month=0, hot=27.0]】           k2=【MyKey [year=1950, month=0, hot=41.0]】        MySort.compare()   比较结果           【1】     MySort.compare()        MySort.compare()   开始比较数据           k1=【MyKey [year=1951, month=0, hot=45.0]】           k2=【MyKey [year=1951, month=0, hot=23.0]】        MySort.compare()   比较结果           【-1】     MySort.compare()        MySort.compare()   开始比较数据           k1=【MyKey [year=1950, month=0, hot=27.0]】           k2=【MyKey [year=1950, month=0, hot=37.0]】        MySort.compare()   比较结果           【1】     MySort.compare()        MySort.compare()   开始比较数据           k1=【MyKey [year=1950, month=0, hot=41.0]】           k2=【MyKey [year=1950, month=0, hot=37.0]】        MySort.compare()   比较结果           【-1】     MySort.compare()        MySort.compare()   开始比较数据           k1=【MyKey [year=1950, month=0, hot=27.0]】           k2=【MyKey [year=1950, month=0, hot=32.0]】        MySort.compare()   比较结果           【1】     MySort.compare()        MySort.compare()   开始比较数据           k1=【MyKey [year=1950, month=0, hot=37.0]】           k2=【MyKey [year=1950, month=0, hot=32.0]】        MySort.compare()   比较结果           【-1】     MySort.compare()        MySort.compare()   开始比较数据           k1=【MyKey [year=1949, month=0, hot=36.0]】           k2=【MyKey [year=1949, month=0, hot=34.0]】        MySort.compare()   比较结果           【-1】     MyGroup.compare()        MyGroup.compare()   开始比较数据           k1=【MyKey [year=1949, month=0, hot=36.0]】           k2=【MyKey [year=1949, month=0, hot=34.0]】        MyGroup.compare()   比较结果           【0】     WeatherReducer.reduce()        WeatherReducer.reduce()   处理收到的数据        WeatherReducer.reduce()   【MyKey [year=1949, month=0, hot=36.0]】        WeatherReducer.reduce()   打印当前组的所有数据        WeatherReducer.reduce()   【1949    0    36.0】        WeatherReducer.reduce()   【1949    0    34.0】     MyGroup.compare()        MyGroup.compare()   开始比较数据           k1=【MyKey [year=1950, month=0, hot=41.0]】           k2=【MyKey [year=1950, month=0, hot=37.0]】        MyGroup.compare()   比较结果           【0】     WeatherReducer.reduce()        WeatherReducer.reduce()   处理收到的数据        WeatherReducer.reduce()   【MyKey [year=1950, month=0, hot=41.0]】        WeatherReducer.reduce()   打印当前组的所有数据        WeatherReducer.reduce()   【1950    0    41.0】     MyGroup.compare()        MyGroup.compare()   开始比较数据           k1=【MyKey [year=1950, month=0, hot=37.0]】           k2=【MyKey [year=1950, month=0, hot=32.0]】        MyGroup.compare()   比较结果           【0】        WeatherReducer.reduce()   【1950    0    37.0】     MyGroup.compare()        MyGroup.compare()   开始比较数据           k1=【MyKey [year=1950, month=0, hot=32.0]】           k2=【MyKey [year=1950, month=0, hot=27.0]】        MyGroup.compare()   比较结果           【0】        WeatherReducer.reduce()   【1950    0    32.0】        WeatherReducer.reduce()   【1950    0    27.0】     MyGroup.compare()        MyGroup.compare()   开始比较数据           k1=【MyKey [year=1951, month=0, hot=47.0]】           k2=【MyKey [year=1951, month=0, hot=46.0]】        MyGroup.compare()   比较结果           【0】     WeatherReducer.reduce()        WeatherReducer.reduce()   处理收到的数据        WeatherReducer.reduce()   【MyKey [year=1951, month=0, hot=47.0]】        WeatherReducer.reduce()   打印当前组的所有数据        WeatherReducer.reduce()   【1951    0    47.0】     MyGroup.compare()        MyGroup.compare()   开始比较数据           k1=【MyKey [year=1951, month=0, hot=46.0]】           k2=【MyKey [year=1951, month=0, hot=45.0]】        MyGroup.compare()   比较结果           【0】        WeatherReducer.reduce()   【1951    0    46.0】     MyGroup.compare()        MyGroup.compare()   开始比较数据           k1=【MyKey [year=1951, month=0, hot=45.0]】           k2=【MyKey [year=1951, month=0, hot=23.0]】        MyGroup.compare()   比较结果           【0】        WeatherReducer.reduce()   【1951    0    45.0】        WeatherReducer.reduce()   【1951    0    23.0】

        从上方打印的控制台信息可以看出执行步骤如下

 

        1、WeatherMapper.map将hdfs传来的数据封装为MyKey实体,传递给MyPartitioner.getPartition进行分区

        2、MySort.compare进行排序

        3、MyGroup进行分组

        4、WeatherReducer.reduce接受到分组且拍好顺序的数据后,拿出前三位。

 

转载请注明原文地址: https://www.6miu.com/read-45190.html

最新回复(0)