3.3 Hadoop-MapReduce,统计温度

xiaoxiao2021-02-28  17

统计一年最高温度的三个月 工程结构: WeatherKeypackage com.test.weather; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class WeatherKey implements WritableComparable{  private int year;//年  private int month;//月  private double hot;//温度  public int getYear() {   return year;  }  public void setYear(int year) {   this.year = year;  }  public int getMonth() {   return month;  }  public void setMonth(int month) {   this.month = month;  }  public double getHot() {   return hot;  }  public void setHot(double hot) {   this.hot = hot;  }    @Override  public void readFields(DataInput arg0) throws IOException {   this.year=arg0.readInt();   this.month=arg0.readInt();   this.hot=arg0.readDouble();  }  /**   * 转化为可序列化数字   */  @Override  public void write(DataOutput arg0) throws IOException {   arg0.writeInt(year);   arg0.writeInt(month);   arg0.writeDouble(hot);  }  @Override  public int compareTo(Object arg0) {   return this==arg0?0:-1;  }   } WeatherMapperpackage com.test.weather; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WeatherMapper extends Mapper<Text, Text, WeatherKey, Text>{  @Override  protected void map(Text key, Text value,Context context)    throws IOException, InterruptedException {   String datestring = key.toString();//得到时间字符串   try {    Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(datestring);//把字符串转化为时间格式    Calendar c =Calendar.getInstance();//定义日历    c.setTime(date);    int year =c.get(Calendar.YEAR);//获取年    int month =c.get(Calendar.MONTH);//获取月    double hot = Double.parseDouble(value.toString().substring(0, value.toString().length()-1));//截取value中的数字    WeatherKey outkey = new WeatherKey();    outkey.setYear(year);    outkey.setMonth(month);    outkey.setHot(hot);    context.write(outkey, new Text(key.toString()+"\t"+value.toString()));   } catch (Exception e) {    // TODO Auto-generated catch block    e.printStackTrace();   }  } } WeatherReduerpackage com.test.weather; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WeatherReduer extends Reducer<WeatherKey, Text, NullWritable, Text>{  @Override  protected void reduce(WeatherKey arg0, Iterable<Text> arg1,Context arg2)    throws IOException, InterruptedException {   int i=0;   for (Text text : arg1) {    if(i<3){//取前三个     arg2.write(NullWritable.get(), text);    }else{     break;    }    i++;   }  } } MyGrouppackage com.test.weather; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /**  * 分组  * @author root  *  */ public class MyGroup extends WritableComparator{  public MyGroup(){   super(WeatherKey.class,true);  }    public int compare(WritableComparable a, WritableComparable b) {   WeatherKey k1 =(WeatherKey) a;   WeatherKey k2=(WeatherKey) b;     int r1 =Integer.compare(k1.getYear(), k2.getYear());   if(r1==0){    int r2 =Integer.compare(k1.getMonth(), k2.getMonth());    return r2;   }   return r1;  } } MyPartionerpackage com.test.weather; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class MyPartioner extends HashPartitioner<WeatherKey,Text>{  public int getPartition(WeatherKey key, Text value, int numReduceTasks) {   return (key.getYear()-1949)%numReduceTasks;  } } MySortpackage com.test.weather; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /**  * 排序  * @author root  *  */ public class MySort extends WritableComparator{  public MySort(){   super(WeatherKey.class,true);  }    public int compare(WritableComparable a, WritableComparable b) {   WeatherKey k1 =(WeatherKey) a;   WeatherKey k2=(WeatherKey) b;     int r1 =Integer.compare(k1.getYear(), k2.getYear());   if(r1==0){    int r2 =Integer.compare(k1.getMonth(), k2.getMonth());    if(r2==0){     return -Double.compare(k1.getHot(), k2.getHot());    }    return r2;   }   return r1;  } } RunJobpackage com.test.weather; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /**  * 统计每年温度最高的三个月份  * @author root  *  */ public class RunJob {    public static void main(String[] args) {   Configuration conf = new Configuration();   conf.set("fs.defaultFS", "hdfs://node1:9000");   conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");//自定义key   try {    FileSystem fs = FileSystem.get(conf);    Job job = Job.getInstance(conf);    job.setJobName("Weather");    job.setJarByClass(RunJob.class);        job.setMapperClass(WeatherMapper.class);    job.setReducerClass(WeatherReduer.class);        job.setMapOutputKeyClass(WeatherKey.class);    job.setMapOutputValueClass(Text.class);        job.setPartitionerClass(MyPartioner.class);    job.setSortComparatorClass(MySort.class);    job.setGroupingComparatorClass(MyGroup.class);        job.setNumReduceTasks(3);//设置启动三个reduce,默认是一个    //设置一个map任务输入数据的格式    job.setInputFormatClass(KeyValueTextInputFormat.class);//KeyValueTextInputFormat 把第一个隔开符的左边为key,右边为value        //定义job任务输入数据目录和输出结果目录    //把wc.txt上传到hdfs目录中/usr/intput/weather    //输出结果数据放到/usr/output/weather        FileInputFormat.addInputPath(job, new Path("/usr/input/weather"));        //输出结果数据目录不能存在,job执行时自动创建的。如果在执行时目录已经存在,则job执行失败。    Path output =new Path("/usr/output/weather");    if(fs.exists(output)){     fs.delete(output, true);    }    FileOutputFormat.setOutputPath(job,output );        boolean f= job.waitForCompletion(true);    if(f){     System.out.println("job执行成功!");    }   } catch (Exception e) {    // TODO Auto-generated catch block    e.printStackTrace();   }  } }
转载请注明原文地址: https://www.6miu.com/read-2800222.html

最新回复(0)