hadoop-MapReduce--二次排序

xiaoxiao2021-03-01  17

Hadoop 实例11 二次排序讲解

在 hadoop2.X 以后使用是

job.setPartitionerClass(Partitioner p); // map阶段对输入的数据进行分区操作,每个分区映射到一个reducer。 job.setSortComparatorClass(RawComparator c); //如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。 job.setGroupingComparatorClass(RawComparator c); //使用job.setGroupingComparatorClass设置的分组函数类。

hadoop的接口 #1.Writable===>自定义Value 实现**Writable[HDFS序列化接口]接口的类,可以作为MapReduce的Values进行数据 #2.WritableComparable===>自定义key===>MapReduce的key要出重–排序 如果自定义的类要作为MapReduce的key,必须实现WritableComparable**

2.1 通过equals方法判断key是否相同 2.2 hashCode 使用默认分区HashPartitioner时,会调用key的hashCode方法 2.3 compareTo 默认排序【排序】时,会调用key的compareTo方法 //判断对象是否是同一个对象,当该对象作为MapReduce输出的key进行比较


数据

1949-10-01 14:21:02 34c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1951-12-01 12:21:02 23c 1950-10-02 12:21:02 41c 1950-10-03 12:21:02 27c 1951-07-01 12:21:02 45c 1951-07-02 12:21:02 46c 1951-07-03 12:21:03 47c


1.1自定义key

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class MyKey implements WritableComparable<MyKey>{ private int year; private int month; private double hot; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public double getHot() { return hot; } public void setHot(double hot) { this.hot = hot; } public void readFields(DataInput arg0) throws IOException { this.year=arg0.readInt(); this.month=arg0.readInt(); this.hot=arg0.readDouble(); } public void write(DataOutput arg0) throws IOException { arg0.writeInt(year); arg0.writeInt(month); arg0.writeDouble(hot); } //判断对象是否是同一个对象,当该对象作为输出的key public int compareTo(MyKey o) { int r1 =Integer.compare(this.year, o.getYear()); if(r1==0){ int r2 =Integer.compare(this.month, o.getMonth()); if(r2==0){ return Double.compare(this.hot, o.getHot()); }else{ return r2; } }else{ return r1; } } /* (non-Javadoc) * @see java.lang.Object#hashCode() */ @Override public int hashCode() { final int prime = 31; int result = 1; long temp; temp = Double.doubleToLongBits(hot); result = prime * result + (int) (temp ^ (temp >>> 32)); result = prime * result + month; result = prime * result + year; return result; } /* (non-Javadoc) * @see java.lang.Object#equals(java.lang.Object) */ @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; MyKey other = (MyKey) obj; if (Double.doubleToLongBits(hot) != Double.doubleToLongBits(other.hot)) return false; if (month != other.month) return false; if (year != other.year) return false; return true; } }

2自定义Partition

import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; /** * Mapper每输出一个数据执行一次partition * @author hadoop * */ public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{ //执行时间越短越好 public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) { return (key.getYear()-1949)%numReduceTasks; } }

3自定义分组

import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 确定一个reduce要处理的数据 * 分组实质上也是比较 * @author hadoop * */ public class MyGroup extends WritableComparator{ public MyGroup(){ super(MyKey.class,true); } public int compare(WritableComparable a, WritableComparable b) { MyKey k1 =(MyKey) a; MyKey k2 =(MyKey) b; int r1 =Integer.compare(k1.getYear(), k2.getYear()); if(r1==0){ return Integer.compare(k1.getMonth(), k2.getMonth()); }else{ return r1; } } }

4自定义排序

/** * shuffer 过程排序使用 * @author hadoop * */ public class MySort extends WritableComparator{ public MySort(){ super(MyKey.class,true); } public int compare(WritableComparable a, WritableComparable b) { MyKey k1 =(MyKey) a; MyKey k2 =(MyKey) b; int r1 =Integer.compare(k1.getYear(), k2.getYear()); if(r1==0){ int r2 =Integer.compare(k1.getMonth(), k2.getMonth()); if(r2==0){ return -Double.compare(k1.getHot(), k2.getHot()); }else{ return r2; } }else{ return r1; } } }

5 MapReduce主体代码

import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RunJob { public static void main(String[] args) { Configuration config =new Configuration(); config.set("fs.defaultFS", "hdfs://node1:8020"); config.set("yarn.resourcemanager.hostname", "node1"); // config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar"); // config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ","); try { FileSystem fs =FileSystem.get(config); Job job =Job.getInstance(config); job.setJarByClass(RunJob.class); job.setJobName("weather"); job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReducer.class); job.setMapOutputKeyClass(MyKey.class); job.setMapOutputValueClass(DoubleWritable.class); job.setPartitionerClass(MyPartitioner.class);// job.setSortComparatorClass(MySort.class);// job.setGroupingComparatorClass(MyGroup.class);// job.setNumReduceTasks(3); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path("/usr/input/weather")); Path outpath =new Path("/usr/output/weather"); if(fs.exists(outpath)){ fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath); boolean f= job.waitForCompletion(true); if(f){ } } catch (Exception e) { e.printStackTrace(); } } //key:每行第一个隔开符左边为key,右边为value static class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{ SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); NullWritable v =NullWritable.get(); protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { try { Date date =sdf.parse(key.toString()); Calendar c =Calendar.getInstance(); c.setTime(date); int year =c.get(Calendar.YEAR); int month =c.get(Calendar.MONTH); double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c"))); MyKey k =new MyKey(); k.setYear(year); k.setMonth(month); k.setHot(hot); context.write(k, new DoubleWritable(hot)); } catch (Exception e) { e.printStackTrace(); } } } static class WeatherReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{ protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1, Context arg2) throws IOException, InterruptedException { int i=0; for(DoubleWritable v :arg1){ i++; String msg =arg0.getYear()+"\t"+arg0.getMonth()+"\t"+v.get(); arg2.write(new Text(msg), NullWritable.get()); if(i==3){ break; } } } } }
转载请注明原文地址: https://www.6miu.com/read-4550176.html

最新回复(0)