案例一:基于MapReduce求每年的最大天气
一、准备条件:
1、数据源:两个“.gz”文件,分别为1901.gz/192.gz 2、Hadoop集群环境二、数据源分析:
1、第16-20位为年份信息 2、第88位为+号时89-93为温度信息,为-号时88-93为温度信息 3、提取后数据举例为: (1901,0) (1901,22) (1901,-11) 。。。。。。三、概念
1、并行计算: Map<Key,value> entry:条目(key-value) Key:行号,0为基址 K-V-->map-->K-V 2、shuffle:洗牌四、运行机制
五、功能实现
前提准备:
引入类库将Hadoop包解压,并进入相对路径为hadoop-2.7.2\share\hadoop目录下,搜索*.jar,全选复制到新建的lib目录下,并在该目录下搜索source.jar(为源码包),删除;并在该目录下搜索test(测试包),删除,剩下的就是可用的包。新建项目命名为Hadoop-mr-01 新建lib文件夹,将第一步所找到的jar包复制到该文件夹下,并引入到项目中目录结构图:
第一步
新建MaxTemperatureMapper.java类继承于org.apache.hadoop.mapreduce.Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>并将四个参数修改为org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable>
说明:Mapper类下四个参数说明,输入的Key的值,输入的Value值,输出的Key的值,输出的Value值,也就是入的行号,输入的一行的文本,年份,温度代码如下:
package com.xt.hadoop; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { //缺失 private static final int MISSING=9999; //提取信息 @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //获取一行的数据 String line = value.toString(); //在一行数据中提取年份数据 也就是第16-20位 String year = line.substring(15,19); //气温变量 int airTemperature; if (line.charAt(87)=='+') { airTemperature=Integer.parseInt(line.substring(88,92)); } else { airTemperature=Integer.parseInt(line.substring(87,92)); } //质量 String quality=line.substring(92,93); //将有效数据写入到map的context中,注意类型务必要和泛型声明一致 if (airTemperature!=MISSING&&quality.matches("[01459]")) { context.write(new Text(year),new IntWritable(airTemperature)); } } }第二步
新建MaxTemperatureReducer.java类继承于org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>并将四个参数修改为org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable>
说明:mapper工作完成之后要交给reduce,因为mapper的输出为Text, IntWritable所以Reducer的输入要为mapper的输出,即Text, IntWritable,输出为Text, IntWritable代码如下:
package com.xt.hadoop; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text keyin, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //定义最大值变量 int maxValue=Integer.MIN_VALUE; for (IntWritable intWritable : values) { maxValue=Math.max(maxValue, intWritable.get()); } //将Reduce写入context context.write(keyin, new IntWritable(maxValue)); } }第三步
新建MaxTemperature.java类,实现main方法
具体代码如下:
package com.xt.hadoop; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MaxTemperature { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //作业 Job job = new Job(); job.setJarByClass(MaxTemperature.class); //设置作业名称 便于调试 job.setJobName("Max Temperature"); //添加输入路径,可以添加多个路径 //输入不进可以是具体文件,还可以是目录,不会递归 FileInputFormat.addInputPath(job, new Path(args[0])); //设置输出路径,只能添加一个,而且不能存在 FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置mapper job.setMapperClass(MaxTemperatureMapper.class); //设置reducer job.setReducerClass(MaxTemperatureReducer.class); //设置输出key类 job.setOutputKeyClass(Text.class); //设置输出Value类 job.setOutputValueClass(IntWritable.class); //等待作业完成 System.out.println(job.waitForCompletion(true)); } }六、项目部署
前提准备 1、将项目生成JAR文件,并将Jar文件放到master主机中 注:生成过程中JAR包全部不选择 在第三步需要设置主函数类 2、将天气数据上传到hdfs文件系统中第一步:通过WinSCP将生成的Jar文件拷贝到/usr/local目录下 第二步:通过WinSCP将天气数据文件靠背到/usr/local目录下
并执行命令: 1、在HDFS创建 `hadoop fs -mkdir -p /ncdc` 2、将天气数据文件上传到HDFS /ncdc目录下 hadoop fs -put 19*.gz /ncdc 3、执行Jar文件 hadoop jar hadoopmr01.jar /ncdc /out运行结果如下图所示
out目录如下图所示
part-r-00000为数据文件 查看数据文件内如如下hadoop fs -cat /out/part*