MapReduce之自定义Key和Value

xiaoxiao2021-02-28  58

package com.uplooking.bigdata.mr.writablez; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /**  * 关于自定义类型作为MR的key和value的练习,以统计电信数据为案例:  * 需要统计每一个手机号码的upPackNum、downPackNum、upPayLoad、dwonPayLoad的总和。     据此来衡量每一个手机号码对流量的使用情况,依次为了用户推荐更加合理的套餐,方便你我他。  这个思路就是简单的数据分析/数据挖掘的一种体现  * 其中upPackNum对应的序号为6,  *  downPackNum对应的序号为7  *  upPayLoad对应的需要为8  *  downPayLoad对应的序号为9  *  同时因为统计的是每个手机号对应上述四个值,所以我们在提取数据核心  *  字段的时候,只保留需要的,其余过滤,  *  在此只需要手机号(序号为1),上述四个值  *  */ public class HttpDataWritablezApp {     public static void main(String[] args) throws Exception {         if(args == null || args.length < 2) {             System.err.println("Parameter Errors ! Usage: <inputpath outputpath>");             System.exit(-1);         }         String inputpath = args[0];         Path outputpath = new Path(args[1]);         Configuration conf = new Configuration();         String jobName = HttpDataWritablezApp.class.getSimpleName();         //创建作业 每一个mapreduce最后会封装成为一个job作业在集群中运行         Job job = Job.getInstance(conf, jobName);         //job在集群中运行的方式是以jar的放来运行的         job.setJarByClass(HttpDataWritablezApp.class);         //set mapper         FileInputFormat.setInputPaths(job, inputpath);         job.setInputFormatClass(TextInputFormat.class);         //如果reducer输出类型和mapper的输出类型一致,可以省略mapper的输出,         //啥意思?就是说下面两句话可以被省略         job.setMapperClass(HttpMapper.class);         job.setMapOutputKeyClass(Text.class);         job.setMapOutputValueClass(UserTraffic.class);         //set reducer         outputpath.getFileSystem(conf).delete(outputpath, true);//如果输出路径已经存在,删除之         FileOutputFormat.setOutputPath(job, outputpath);         job.setOutputFormatClass(TextOutputFormat.class);         job.setReducerClass(HttpReducer.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(UserTraffic.class);         //设置有几个reducer来进行汇总操作         job.setNumReduceTasks(1);         //提交job给集群,等结束返回结果         job.waitForCompletion(true);     }     static class HttpMapper extends Mapper<LongWritable, Text, Text, UserTraffic> {         @Override         protected void setup(Context context) throws IOException, InterruptedException {             //这是mapper中的声明周期方法,每一个mapper task调用一次             //一般都做一些初始化的工作         }         //1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200         @Override         protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {             String line = v1.toString();             String[] splits = line.split("\t");//对行内容进行拆分             //获取核心计算数据             String phone = splits[1];             long upPackNum = Long.valueOf(splits[6].trim());             long downPackNum = Long.valueOf(splits[7].trim());             long upPayLoad = Long.valueOf(splits[8].trim());             long downPayNum = Long.valueOf(splits[9].trim());             //将结果从map写出到shuffle中,shuffle中会根据key进行分区分组排序规约             context.write(new Text(phone), new UserTraffic(upPackNum, downPackNum, upPayLoad, downPayNum));         }         @Override         protected void cleanup(Context context) throws IOException, InterruptedException {             //这也是一个声明周期方法,每一个mapper task调用一次             //一般都做一些关闭动作或者全局结果的输出         }     }     static class HttpReducer extends Reducer<Text, UserTraffic, Text, UserTraffic> {         @Override         protected void reduce(Text k2, Iterable<UserTraffic> v2s, Context context) throws IOException, InterruptedException {             long upPackNum = 0;             long downPackNum = 0;             long upPayLoad = 0;             long downPayNum = 0;             for(UserTraffic ut : v2s) {//对相应的值进行累加                 upPackNum += ut.upPackNum;                 downPackNum += ut.downPackNum;                 upPayLoad += ut.upPayLoad;                 downPayNum += ut.downPayNum;             }             context.write(k2, new UserTraffic(upPackNum, downPackNum, upPayLoad, downPayNum));         }     } } /**  * 自定义的类型,用来描述手机号对应的四个流量特征  */ class UserTraffic implements Writable {     long upPackNum;     long downPackNum;     long upPayLoad;     long downPayNum;     public UserTraffic(long upPackNum, long downPackNum, long upPayLoad, long downPayNum) {         this.upPackNum = upPackNum;         this.downPackNum = downPackNum;         this.upPayLoad = upPayLoad;         this.downPayNum = downPayNum;     }     public UserTraffic() {     }     //序列化的方法如何来写?     public void write(DataOutput out) throws IOException {         out.writeLong(this.upPackNum);         out.writeLong(this.downPackNum);         out.writeLong(this.upPayLoad);         out.writeLong(this.downPayNum);     }     //反序列化的方法如何来写?     public void readFields(DataInput in) throws IOException {         this.upPackNum = in.readLong();         this.downPackNum = in.readLong();         this.upPayLoad = in.readLong();         this.downPayNum = in.readLong();     }     @Override     public String toString() {         return this.upPackNum + "\t" + this.downPackNum + "\t" + this.upPayLoad + "\t" + this.downPayNum;     } }
转载请注明原文地址: https://www.6miu.com/read-72450.html

最新回复(0)