Mapreduce实例---流量汇总(自定义类)

xiaoxiao2021-02-28  120

一:问题介绍

给一个数据文件,文件包含手机用户的各种上网信息,求每个手机用户的总上行流量,总下行流量和总流量。

数据流程:

二:需要的jar包

hadoop-2.4.1\share\hadoop\hdfs\hadoop-hdfs-2.4.1.jar hadoop-2.4.1\share\hadoop\hdfs\lib\所有jar包 hadoop-2.4.1\share\hadoop\common\hadoop-common-2.4.1.jar hadoop-2.4.1\share\hadoop\common\lib\所有jar包 hadoop-2.4.1\share\hadoop\mapreduce\除hadoop-mapreduce-examples-2.4.1.jar之外的jar包 hadoop-2.4.1\share\hadoop\mapreduce\lib\所有jar包

三:代码

自定义流量类:

/* * 自定义的数据类型要在hadoop集群中传递,需要实现hadoop的序列化框架,就是去实现一个接口 */ public class FlowBean implements Writable{ private long upFlow; private long downFlow; private long sumFlow; //因为反射机制的需要,必须定义一个无参构造函数 public FlowBean(){}; public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow+downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } /* * 反序列化方法:从数据字节流中逐个恢复出各个字段 */ @Override public void readFields(DataInput in) throws IOException { upFlow=in.readLong(); downFlow=in.readLong(); } /* * 序列化方法:将我们要传输的数据序列成字节流 */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); } @Override public String toString() { return upFlow+"\t"+downFlow+"\t"+sumFlow; } }

mapper类实现:

/* * KEYIN:日志文件的一行的起始偏移量,longwritable * VALUEIN:日志文件的一行的内容,Text * * KEYOUT:手机号Text * VALUEOUT:流量信息,FLowBean */ public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //拿到一行日志的内容转为string String line = value.toString(); //通过分隔符切分出各个字段 String[] flows=StringUtils.split(line, '\t'); //抽取需要的字段,数据不多,就不除去无效数据 String phone=flows[1]; long upFlow=Long.parseLong(flows[flows.length-3]); long downFlow=Long.parseLong(flows[flows.length-2]); //输出<phone,FlowBean> FlowBean flowBean = new FlowBean(upFlow,downFlow); context.write(new Text(phone), flowBean); } } reducer类实现:

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ /* * key是一个手机号 * values是这个手机号对应的所有的kv */ @Override protected void reduce(Text key, Iterable<FlowBean> values,Context context)throws IOException, InterruptedException { long upFlowSum=0; long downFlowSum=0; for(FlowBean value:values){ upFlowSum+=value.getUpFlow(); downFlowSum+=value.getDownFlow(); } FlowBean flowBean = new FlowBean(upFlowSum, downFlowSum); context.write(key, flowBean); } } job提交客户端实现:

public class FlowCountJobSubmitter { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job flowCountJob = Job.getInstance(conf); //重要:指定本job所在的jar包 flowCountJob.setJarByClass(FlowCountJobSubmitter.class); //设置flowCountJob所用的mapper逻辑类为哪个类 flowCountJob.setMapperClass(FlowCountMapper.class); //设置flowCountJob所用的reducer逻辑类为哪个类 flowCountJob.setReducerClass(FlowCountReducer.class); //设置map阶段输出的kv数据类型 flowCountJob.setMapOutputKeyClass(Text.class); flowCountJob.setMapOutputValueClass(FlowBean.class); //设置最终输出的kv数据类型 flowCountJob.setOutputKeyClass(Text.class); flowCountJob.setOutputValueClass(FlowBean.class); //设置要处理的文本数据所存放的路径 FileInputFormat.setInputPaths(flowCountJob, "hdfs://192.168.77.70:9000/flowcount/srcdata/"); FileOutputFormat.setOutputPath(flowCountJob, new Path("hdfs://192.168.77.70:9000/flowcount/output/")); //提交job给hadoop集群 flowCountJob.waitForCompletion(true); } }

四:操作流程

Mapreduce实例---wordcount类似

转载请注明原文地址: https://www.6miu.com/read-36803.html

最新回复(0)