package com.uplooking.bigdata.mr.writablez; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 关于自定义类型作为MR的key和value的练习,以统计电信数据为案例: * 需要统计每一个手机号码的upPackNum、downPackNum、upPayLoad、dwonPayLoad的总和。 据此来衡量每一个手机号码对流量的使用情况,依次为了用户推荐更加合理的套餐,方便你我他。 这个思路就是简单的数据分析/数据挖掘的一种体现 * 其中upPackNum对应的序号为6, * downPackNum对应的序号为7 * upPayLoad对应的需要为8 * downPayLoad对应的序号为9 * 同时因为统计的是每个手机号对应上述四个值,所以我们在提取数据核心 * 字段的时候,只保留需要的,其余过滤, * 在此只需要手机号(序号为1),上述四个值 * */ public class HttpDataWritablezApp { public static void main(String[] args) throws Exception { if(args == null || args.length < 2) { System.err.println("Parameter Errors ! Usage: <inputpath outputpath>"); System.exit(-1); } String inputpath = args[0]; Path outputpath = new Path(args[1]); Configuration conf = new Configuration(); String jobName = HttpDataWritablezApp.class.getSimpleName(); //创建作业 每一个mapreduce最后会封装成为一个job作业在集群中运行 Job job = Job.getInstance(conf, jobName); //job在集群中运行的方式是以jar的放来运行的 job.setJarByClass(HttpDataWritablezApp.class); //set mapper FileInputFormat.setInputPaths(job, inputpath); job.setInputFormatClass(TextInputFormat.class); //如果reducer输出类型和mapper的输出类型一致,可以省略mapper的输出, //啥意思?就是说下面两句话可以被省略 job.setMapperClass(HttpMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(UserTraffic.class); //set reducer outputpath.getFileSystem(conf).delete(outputpath, true);//如果输出路径已经存在,删除之 FileOutputFormat.setOutputPath(job, outputpath); job.setOutputFormatClass(TextOutputFormat.class); job.setReducerClass(HttpReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(UserTraffic.class); //设置有几个reducer来进行汇总操作 job.setNumReduceTasks(1); //提交job给集群,等结束返回结果 job.waitForCompletion(true); } static class HttpMapper extends Mapper<LongWritable, Text, Text, UserTraffic> { @Override protected void setup(Context context) throws IOException, InterruptedException { //这是mapper中的声明周期方法,每一个mapper task调用一次 //一般都做一些初始化的工作 } //1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String line = v1.toString(); String[] splits = line.split("\t");//对行内容进行拆分 //获取核心计算数据 String phone = splits[1]; long upPackNum = Long.valueOf(splits[6].trim()); long downPackNum = Long.valueOf(splits[7].trim()); long upPayLoad = Long.valueOf(splits[8].trim()); long downPayNum = Long.valueOf(splits[9].trim()); //将结果从map写出到shuffle中,shuffle中会根据key进行分区分组排序规约 context.write(new Text(phone), new UserTraffic(upPackNum, downPackNum, upPayLoad, downPayNum)); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { //这也是一个声明周期方法,每一个mapper task调用一次 //一般都做一些关闭动作或者全局结果的输出 } } static class HttpReducer extends Reducer<Text, UserTraffic, Text, UserTraffic> { @Override protected void reduce(Text k2, Iterable<UserTraffic> v2s, Context context) throws IOException, InterruptedException { long upPackNum = 0; long downPackNum = 0; long upPayLoad = 0; long downPayNum = 0; for(UserTraffic ut : v2s) {//对相应的值进行累加 upPackNum += ut.upPackNum; downPackNum += ut.downPackNum; upPayLoad += ut.upPayLoad; downPayNum += ut.downPayNum; } context.write(k2, new UserTraffic(upPackNum, downPackNum, upPayLoad, downPayNum)); } } } /** * 自定义的类型,用来描述手机号对应的四个流量特征 */ class UserTraffic implements Writable { long upPackNum; long downPackNum; long upPayLoad; long downPayNum; public UserTraffic(long upPackNum, long downPackNum, long upPayLoad, long downPayNum) { this.upPackNum = upPackNum; this.downPackNum = downPackNum; this.upPayLoad = upPayLoad; this.downPayNum = downPayNum; } public UserTraffic() { } //序列化的方法如何来写? public void write(DataOutput out) throws IOException { out.writeLong(this.upPackNum); out.writeLong(this.downPackNum); out.writeLong(this.upPayLoad); out.writeLong(this.downPayNum); } //反序列化的方法如何来写? public void readFields(DataInput in) throws IOException { this.upPackNum = in.readLong(); this.downPackNum = in.readLong(); this.upPayLoad = in.readLong(); this.downPayNum = in.readLong(); } @Override public String toString() { return this.upPackNum + "\t" + this.downPackNum + "\t" + this.upPayLoad + "\t" + this.downPayNum; } }
转载请注明原文地址: https://www.6miu.com/read-72450.html