引言
之前我们说道,默认情况下reduce task只有一个,当我们需要对大量数据进行统计时,一个reduce task已经捉襟见肘,那么我们就有必要配置多哦reduce task进行并行任务执行。
实例
例子:我们有一个每年每月温度的统计数据需要进行分析。如下图数据。我们需要统计每年中每个月的最高温度的前三名。
自定义分区:我们需要 分配多个reduce task,根据年来进行reduce task的分区,每个reduce task统计每一年的数据。这就引入了自定义分区。
自定义分组:因为我们要统计每年每月气温的前三名。所以我们应该按照月分组,然后找这个组气温前三名。这就引入了自定义分组。
自定义 排序:统计温度的前三名,则需要对数据进行排序,取前三。这就引入了自定义排序。
之前我们已经知道分区、分组、排序规则如下
分区:hashcode模reduce数量
分组:根据key比较进行分组
排序:根据key的hashcode
所以我们要将需要自定义分组、排序、分区的数据封装进入key中,所以我们需要对key进行多态封装。
编码
一、总述
这个测试模块的类全在包com.zjt.mapreducer.weather下。
因为分区、分组、排序都是自定义,所以关于分区、分组和排序以及其依赖的键都需要我们手动编码完成。我们需要一共完成下方七个类的编码:
1、MyKey.java
因为分区、分组、排序都需要我们自己完成,而这三个都是依赖于键进行其工作的。所以我们要根据我们时间需求自定义键。
2、WeatherMapper.java
hdfs输入数据的mapper处理函数。其将接收到的数据封装为MyKey实体类,传递给下一步进行分区
3、MyPartitioner.java
我们自己实现的分区类,给WeatherMapper.java传来的数据 通过 取模 运算,返回其分区号
4、MySort.java
自定义排序类。通过比较MyKey.java中年、月、日来比较其实体类是否相等。
5、MyGroup.java
自定义分组。通过比较MyKey.java中年、月来比较其实体类是否相等。
6、WeatherReducer.java
接收到被排序、分区、分组后的数据,取其最大的三个,则为每年每月气温最高的前三个。
7、RunJob.java
运行类,其中对代码项进行配置,启动任务。
二、各个类源码
1、MyKey.java
因为分区、分组、排序都需要我们自己完成,而这三个都是依赖于键进行其工作的。所以我们要根据我们时间需求自定义键。
package com.zjt.mapreducer.weather; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * * @author ZhangJintao */ public class MyKey implements WritableComparable<MyKey>{ private int year; //年 private int month; //月 private double hot; //温度 public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public double getHot() { return hot; } public void setHot(double hot) { this.hot = hot; } /** * 反序列化【从对象流中读出】 */ public void readFields(DataInput arg0) throws IOException { this.year = arg0.readInt(); this.month = arg0.readInt(); this.hot = arg0.readDouble(); } /** * 序列化 */ public void write(DataOutput arg0) throws IOException { arg0.writeInt(year); arg0.writeInt(month); arg0.writeDouble(hot); } /** * 比较两个对象是否相等方法,分组时用到 */ public int compareTo(MyKey o) { int r1 = Integer.compare(this.year, o.getYear()); if (r1 == 0) { int r2 = Integer.compare(this.month, o.getMonth()); if (r2 == 0) { return Double.compare(this.hot, o.getHot()); }else { return r2; } }else{ return r1 ; } } @Override public String toString() { return "MyKey [year=" + year + ", month=" + month + ", hot=" + hot + "]"; } }2、WeatherMapper.java
hdfs输入数据的mapper处理函数。其将接收到的数据封装为MyKey实体类,传递给下一步进行分区
package com.zjt.mapreducer.weather; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-HH-dd hh:mm:ss"); /** * 根据键和值,将我们的数据拿出来封装成MyKey * key/value:每一行第一个隔开符 左边为key,右边为value */ // @Override protected void map(Text key, Text value, Mapper<Text, Text, MyKey, DoubleWritable>.Context context) throws IOException, InterruptedException { System.out.println("WeatherMapper.map()"); try { Date date = sdf.parse(key.toString()); Calendar c = Calendar.getInstance(); c.setTime(date); int year = c.get(Calendar.YEAR); int month = c.get(Calendar.MONTH); double hot = Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c"))); MyKey myKey = new MyKey(); myKey.setHot(hot); myKey.setYear(year); myKey.setMonth(month); System.out.println("===WeatherMapper.map() 开始输出数据"); System.out.println("=== 【"+myKey.toString() + " : " + hot +"】"); context.write(myKey, new DoubleWritable(hot)); } catch (Exception e) { e.printStackTrace(); } } }
3、MyPartitioner.java
我们自己实现的分区类,给WeatherMapper.java传来的数据 通过 取模 运算,返回其分区号
package com.zjt.mapreducer.weather; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{ /** * 自定义分区 * MyTask每输出一个数据调用一次,所以这个方法越短越好 * 返回分区号 */ // @Override public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) { System.out.println("MyPartitioner.getPartition()"); System.out.println("===MyPartitioner.getPartition() 接收到到数据"); System.out.println("=== 【" + key.toString() + "】"); System.out.println("===MyPartitioner.getPartition() 开始返回分区号"); System.out.println("=== 【" + (key.getYear()-1949) % numReduceTasks + "】"); //当前年份减去1949年,对ReduceTasks数量取模 return (key.getYear()-1949) % numReduceTasks; } }
4、MySort.java
自定义排序类。通过比较MyKey.java中年、月、日来比较其实体类是否相等。
package com.zjt.mapreducer.weather; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 自定义排序函数,继承默认的排序 * @author ZhangJintao */ public class MySort extends WritableComparator{ public MySort() { super(MyKey.class,true); } // @Override public int compare(WritableComparable a, WritableComparable b) { System.out.println("MySort.compare()"); MyKey k1 = (MyKey)a; MyKey k2 = (MyKey)b; System.out.println("===MySort.compare() 开始比较数据"); System.out.println("=== k1=【"+k1.toString()+"】"); System.out.println("=== k2=【"+k2.toString()+"】"); int r1 = Integer.compare(k1.getYear(), k2.getYear()); if (r1 == 0) { int r2 = Integer.compare(k1.getMonth(), k2.getMonth()); if (r2 == 0) { System.out.println("===MySort.compare() 比较结果"); System.out.println("=== 【"+-Double.compare(k1.getHot(), k2.getHot())+"】"); return -Double.compare(k1.getHot(), k2.getHot()); }else { System.out.println("===MySort.compare() 比较结果"); System.out.println("=== 【"+r2+"】"); return r2; } }else{ System.out.println("===MySort.compare() 比较结果"); System.out.println("=== 【"+r1+"】"); return r1 ; } } }
5、MyGroup.java
自定义分组。通过比较MyKey.java中年、月来比较其实体类是否相等。
package com.zjt.mapreducer.weather; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 自定义分组函数 * @author ZhangJintao */ public class MyGroup extends WritableComparator{ public MyGroup() { super(MyKey.class,true); } // @Override public int compare(WritableComparable a, WritableComparable b) { System.out.println("MyGroup.compare()"); System.out.println("===MyGroup.compare() 开始比较数据"); MyKey k1 = (MyKey)a; MyKey k2 = (MyKey)b; System.out.println("=== k1=【"+k1.toString()+"】"); System.out.println("=== k2=【"+k2.toString()+"】"); int r1 = Integer.compare(k1.getYear(), k2.getYear()); if (r1 == 0) { System.out.println("===MyGroup.compare() 比较结果"); System.out.println("=== 【"+Integer.compare(k1.getMonth(), k2.getMonth())+"】"); return Integer.compare(k1.getMonth(), k2.getMonth()); }else{ System.out.println("===MyGroup.compare() 比较结果"); System.out.println("=== 【"+r1+"】"); return r1 ; } } }
6、WeatherReducer.java
接收到被排序、分区、分组后的数据,取其最大的三个,则为每年每月气温最高的前三个。
package com.zjt.mapreducer.weather; import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WeatherReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{ // @Override protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1, Reducer<MyKey, DoubleWritable, Text, NullWritable>.Context arg2) throws IOException, InterruptedException { System.out.println("WeatherReducer.reduce()"); System.out.println("===WeatherReducer.reduce() 处理收到的数据"); System.out.println("===WeatherReducer.reduce() 【"+arg0.toString()+"】"); System.out.println("===WeatherReducer.reduce() 打印当前组的所有数据"); for (DoubleWritable v :arg1) { String msg = arg0.getYear() + "\t" + arg0.getMonth() + "\t" + v.get(); System.out.println("===WeatherReducer.reduce() 【"+msg+"】"); } int i = 0 ; for (DoubleWritable v :arg1) { i ++ ; String msg = arg0.getYear() + "\t" + arg0.getMonth() + "\t" + v.get(); arg2.write(new Text(msg), NullWritable.get()); if (i == 3) { break; } } } }
7、RunJob.java
运行类,其中对代码项进行配置,启动任务。
package com.zjt.mapreducer.weather; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RunJob { public static void main(String[] args) { Configuration config =new Configuration(); config.set("fs.defaultFS", "hdfs://node1:8020"); config.set("yarn.resourcemanager.hostname", "node1"); // config.set("mapred.jar", "C:\\Users\\ZhangJintao\\Desktop\\wc.jar"); try { FileSystem fs =FileSystem.get(config); Job job =Job.getInstance(config); job.setJarByClass(RunJob.class); job.setJobName("weather"); job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReducer.class); job.setMapOutputKeyClass(MyKey.class); job.setOutputValueClass(DoubleWritable.class); job.setPartitionerClass(MyPartitioner.class); job.setSortComparatorClass(MySort.class); job.setGroupingComparatorClass(MyGroup.class); job.setNumReduceTasks(3); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path("/usr/input/weather")); Path outpath =new Path("/usr/output/weather"); if(fs.exists(outpath)){ fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath); boolean f= job.waitForCompletion(true); if(f){ System.out.println("JOB 执行成功"); } } catch (Exception e) { e.printStackTrace(); } } }
三、运行项目
给hdfs上传/usr/input/weather,执行RunJob.java 的 main方法,执行成功后,刷新output得到如下结果
四、运行分析
我们可以给每个类的执行方法内加入System.out.print("类名+方法名");从控制台得到的信息筛选出来,如下所示(即为代码的执行顺序)
WeatherMapper.map() WeatherMapper.map() 开始输出数据 【MyKey [year=1949, month=0, hot=34.0] : 34.0】 MyPartitioner.getPartition() MyPartitioner.getPartition() 接收到到数据 【MyKey [year=1949, month=0, hot=34.0]】 MyPartitioner.getPartition() 开始返回分区号 【0】 WeatherMapper.map() WeatherMapper.map() 开始输出数据 【MyKey [year=1949, month=0, hot=36.0] : 36.0】 MyPartitioner.getPartition() MyPartitioner.getPartition() 接收到到数据 【MyKey [year=1949, month=0, hot=36.0]】 MyPartitioner.getPartition() 开始返回分区号 【0】 WeatherMapper.map() WeatherMapper.map() 开始输出数据 【MyKey [year=1950, month=0, hot=32.0] : 32.0】 MyPartitioner.getPartition() MyPartitioner.getPartition() 接收到到数据 【MyKey [year=1950, month=0, hot=32.0]】 MyPartitioner.getPartition() 开始返回分区号 【1】 WeatherMapper.map() WeatherMapper.map() 开始输出数据 【MyKey [year=1950, month=0, hot=37.0] : 37.0】 MyPartitioner.getPartition() MyPartitioner.getPartition() 接收到到数据 【MyKey [year=1950, month=0, hot=37.0]】 MyPartitioner.getPartition() 开始返回分区号 【1】 WeatherMapper.map() WeatherMapper.map() 开始输出数据 【MyKey [year=1951, month=0, hot=23.0] : 23.0】 MyPartitioner.getPartition() MyPartitioner.getPartition() 接收到到数据 【MyKey [year=1951, month=0, hot=23.0]】 MyPartitioner.getPartition() 开始返回分区号 【2】 WeatherMapper.map() WeatherMapper.map() 开始输出数据 【MyKey [year=1950, month=0, hot=41.0] : 41.0】 MyPartitioner.getPartition() MyPartitioner.getPartition() 接收到到数据 【MyKey [year=1950, month=0, hot=41.0]】 MyPartitioner.getPartition() 开始返回分区号 【1】 WeatherMapper.map() WeatherMapper.map() 开始输出数据 【MyKey [year=1950, month=0, hot=27.0] : 27.0】 MyPartitioner.getPartition() MyPartitioner.getPartition() 接收到到数据 【MyKey [year=1950, month=0, hot=27.0]】 MyPartitioner.getPartition() 开始返回分区号 【1】 WeatherMapper.map() WeatherMapper.map() 开始输出数据 【MyKey [year=1951, month=0, hot=45.0] : 45.0】 MyPartitioner.getPartition() MyPartitioner.getPartition() 接收到到数据 【MyKey [year=1951, month=0, hot=45.0]】 MyPartitioner.getPartition() 开始返回分区号 【2】 WeatherMapper.map() WeatherMapper.map() 开始输出数据 【MyKey [year=1951, month=0, hot=46.0] : 46.0】 MyPartitioner.getPartition() MyPartitioner.getPartition() 接收到到数据 【MyKey [year=1951, month=0, hot=46.0]】 MyPartitioner.getPartition() 开始返回分区号 【2】 WeatherMapper.map() WeatherMapper.map() 开始输出数据 【MyKey [year=1951, month=0, hot=47.0] : 47.0】 MyPartitioner.getPartition() MyPartitioner.getPartition() 接收到到数据 【MyKey [year=1951, month=0, hot=47.0]】 MyPartitioner.getPartition() 开始返回分区号 【2】 MySort.compare() MySort.compare() 开始比较数据 k1=【MyKey [year=1951, month=0, hot=47.0]】 k2=【MyKey [year=1951, month=0, hot=46.0]】 MySort.compare() 比较结果 【-1】 MySort.compare() MySort.compare() 开始比较数据 k1=【MyKey [year=1951, month=0, hot=46.0]】 k2=【MyKey [year=1951, month=0, hot=45.0]】 MySort.compare() 比较结果 【-1】 MySort.compare() MySort.compare() 开始比较数据 k1=【MyKey [year=1950, month=0, hot=27.0]】 k2=【MyKey [year=1950, month=0, hot=41.0]】 MySort.compare() 比较结果 【1】 MySort.compare() MySort.compare() 开始比较数据 k1=【MyKey [year=1951, month=0, hot=45.0]】 k2=【MyKey [year=1951, month=0, hot=23.0]】 MySort.compare() 比较结果 【-1】 MySort.compare() MySort.compare() 开始比较数据 k1=【MyKey [year=1950, month=0, hot=27.0]】 k2=【MyKey [year=1950, month=0, hot=37.0]】 MySort.compare() 比较结果 【1】 MySort.compare() MySort.compare() 开始比较数据 k1=【MyKey [year=1950, month=0, hot=41.0]】 k2=【MyKey [year=1950, month=0, hot=37.0]】 MySort.compare() 比较结果 【-1】 MySort.compare() MySort.compare() 开始比较数据 k1=【MyKey [year=1950, month=0, hot=27.0]】 k2=【MyKey [year=1950, month=0, hot=32.0]】 MySort.compare() 比较结果 【1】 MySort.compare() MySort.compare() 开始比较数据 k1=【MyKey [year=1950, month=0, hot=37.0]】 k2=【MyKey [year=1950, month=0, hot=32.0]】 MySort.compare() 比较结果 【-1】 MySort.compare() MySort.compare() 开始比较数据 k1=【MyKey [year=1949, month=0, hot=36.0]】 k2=【MyKey [year=1949, month=0, hot=34.0]】 MySort.compare() 比较结果 【-1】 MyGroup.compare() MyGroup.compare() 开始比较数据 k1=【MyKey [year=1949, month=0, hot=36.0]】 k2=【MyKey [year=1949, month=0, hot=34.0]】 MyGroup.compare() 比较结果 【0】 WeatherReducer.reduce() WeatherReducer.reduce() 处理收到的数据 WeatherReducer.reduce() 【MyKey [year=1949, month=0, hot=36.0]】 WeatherReducer.reduce() 打印当前组的所有数据 WeatherReducer.reduce() 【1949 0 36.0】 WeatherReducer.reduce() 【1949 0 34.0】 MyGroup.compare() MyGroup.compare() 开始比较数据 k1=【MyKey [year=1950, month=0, hot=41.0]】 k2=【MyKey [year=1950, month=0, hot=37.0]】 MyGroup.compare() 比较结果 【0】 WeatherReducer.reduce() WeatherReducer.reduce() 处理收到的数据 WeatherReducer.reduce() 【MyKey [year=1950, month=0, hot=41.0]】 WeatherReducer.reduce() 打印当前组的所有数据 WeatherReducer.reduce() 【1950 0 41.0】 MyGroup.compare() MyGroup.compare() 开始比较数据 k1=【MyKey [year=1950, month=0, hot=37.0]】 k2=【MyKey [year=1950, month=0, hot=32.0]】 MyGroup.compare() 比较结果 【0】 WeatherReducer.reduce() 【1950 0 37.0】 MyGroup.compare() MyGroup.compare() 开始比较数据 k1=【MyKey [year=1950, month=0, hot=32.0]】 k2=【MyKey [year=1950, month=0, hot=27.0]】 MyGroup.compare() 比较结果 【0】 WeatherReducer.reduce() 【1950 0 32.0】 WeatherReducer.reduce() 【1950 0 27.0】 MyGroup.compare() MyGroup.compare() 开始比较数据 k1=【MyKey [year=1951, month=0, hot=47.0]】 k2=【MyKey [year=1951, month=0, hot=46.0]】 MyGroup.compare() 比较结果 【0】 WeatherReducer.reduce() WeatherReducer.reduce() 处理收到的数据 WeatherReducer.reduce() 【MyKey [year=1951, month=0, hot=47.0]】 WeatherReducer.reduce() 打印当前组的所有数据 WeatherReducer.reduce() 【1951 0 47.0】 MyGroup.compare() MyGroup.compare() 开始比较数据 k1=【MyKey [year=1951, month=0, hot=46.0]】 k2=【MyKey [year=1951, month=0, hot=45.0]】 MyGroup.compare() 比较结果 【0】 WeatherReducer.reduce() 【1951 0 46.0】 MyGroup.compare() MyGroup.compare() 开始比较数据 k1=【MyKey [year=1951, month=0, hot=45.0]】 k2=【MyKey [year=1951, month=0, hot=23.0]】 MyGroup.compare() 比较结果 【0】 WeatherReducer.reduce() 【1951 0 45.0】 WeatherReducer.reduce() 【1951 0 23.0】
从上方打印的控制台信息可以看出执行步骤如下
1、WeatherMapper.map将hdfs传来的数据封装为MyKey实体,传递给MyPartitioner.getPartition进行分区
2、MySort.compare进行排序
3、MyGroup进行分组
4、WeatherReducer.reduce接受到分组且拍好顺序的数据后,拿出前三位。