Mapreduce实例---分区流量汇总

xiaoxiao2021-02-28  79

一:问题介绍

给一个数据文件,文件包含手机用户的各种上网信息,求每个手机用户的总上行流量,总下行流量和总流量;并且按号码归属地分省份汇总。

数据流程:

二:需要的jar包

hadoop-2.4.1\share\hadoop\hdfs\hadoop-hdfs-2.4.1.jar hadoop-2.4.1\share\hadoop\hdfs\lib\所有jar包 hadoop-2.4.1\share\hadoop\common\hadoop-common-2.4.1.jar hadoop-2.4.1\share\hadoop\common\lib\所有jar包 hadoop-2.4.1\share\hadoop\mapreduce\除hadoop-mapreduce-examples-2.4.1.jar之外的jar包 hadoop-2.4.1\share\hadoop\mapreduce\lib\所有jar包

三:代码

自定义流量类:

/* * 自定义的数据类型要在hadoop集群中传递,需要实现hadoop的序列化框架,就是去实现一个接口 */ public class FlowBean implements Writable{ private long upFlow; private long downFlow; private long sumFlow; //因为反射机制的需要,必须定义一个无参构造函数 public FlowBean(){}; public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow+downFlow; } public void set(long upFlow, long downFlow){ this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow+downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } /* * 反序列化方法:从数据字节流中逐个恢复出各个字段 */ @Override public void readFields(DataInput in) throws IOException { upFlow=in.readLong(); downFlow=in.readLong(); sumFlow=in.readLong(); } /* * 序列化方法:将我们要传输的数据序列成字节流 */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public String toString() { return upFlow+"\t"+downFlow+"\t"+sumFlow; } } mapper类实现:

public class ProviceFlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ /* * 定义成成员变量,这样可以提高效率,减少垃圾回收。 */ private Text k=new Text(); private FlowBean bean=new FlowBean(); @Override protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, '\t'); String phone=fields[1]; long upFlow=Long.parseLong(fields[fields.length-3]); long downFlow=Long.parseLong(fields[fields.length-2]); k.set(phone); bean.set(upFlow, downFlow); context.write(k, bean); } } reducer类实现:

public class ProvinceFlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ private FlowBean bean=new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values,Context context)throws IOException, InterruptedException { long upFlowSum=0; long downFlowSum=0; for(FlowBean value : values){ upFlowSum+=value.getUpFlow(); downFlowSum+=value.getDownFlow(); } bean.set(upFlowSum, downFlowSum); context.write(key, bean); } } job提交客户端实现:

/* * 用于提交本job的一个客户端类 */ public class ProvinceFlowCountJobSubmitter { public static void main(String[] args) throws Exception { if(args.length<2){ System.err.println("参数不正确:输入数据路径 输出数据路径"); System.exit(2); } Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ProvinceFlowCountJobSubmitter.class); job.setMapperClass(ProviceFlowCountMapper.class); job.setReducerClass(ProvinceFlowCountReducer.class); //map输出的kv类型与reduce输出的kv类型一致时,这两行可以省略 // job.setMapOutputKeyClass(Text.class); // job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //通过显示指定partitioner类来让我们自定义的partitoner起作用,替换掉系统默认的hashpartitioner job.setPartitionerClass(ProvincePartitioner.class); /* * 设置本次job运行时的reduce task进程数,数量应该跟partitioner的分区数匹配 * 默认情况下,reduce task的数量为1 * 如果不匹配: * 当reduce task进程数大于partitioner的分区数,结果个数为reduce task进程数,但多余的为空。 * 当reduce task进程数小于partitioner的分区数 * 如果reduce task进程数为1,则所有结果在一个文件内,相当于未进行分区操作; * 否则,报错。 */ job.setNumReduceTasks(5); /* * 处理的数据文件地址 * 数据文件处理后结果存放地址 * 从终端获得参数 */ FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success = job.waitForCompletion(true); System.exit(success?0:1); } } 自定义partitoner类实现

/* * KEY为Mapper输出的key * VALUE为Mapper输出的value */ public class ProvincePartitioner extends Partitioner<Text,FlowBean>{ private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>(); //在partitioner初始化的时候就将外部字典数据一次性加载到本地内存中 static{ //加载外部的字典数据到本地内存 provinceMap.put("136", 0); provinceMap.put("137", 1); provinceMap.put("138", 2); provinceMap.put("139", 3); } //numReduceTasks为reduce task进程的数量 @Override public int getPartition(Text key, FlowBean value, int numReduceTasks) { //取手机号的前缀 String prefix =key.toString().substring(0, 3); //从字典数据中查询归属地的分区号 Integer provinceNum = provinceMap.get(prefix); if(provinceNum==null) provinceNum=4; return provinceNum; } }

四:操作流程

1、将项目打成jar包并上传到虚拟机上

2、把数据文件上传到hdfs上

3、运行jar文件

4、结果

转载请注明原文地址: https://www.6miu.com/read-53854.html

最新回复(0)