如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为 MapReduce框中的 shuffle 过程一定会对 key 进行排序,此时,自定义的 bean 实现的接口应该是:public class FlowBean implements WritableComparable<FlowBean>:
例:
进行了序列化的 Flow 类:
package flow.pojo; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 用户自定义的POJO类如果充当key的话,那么必须要进行序列化操作和执行排序规则 * * 让 Flow实现Writable接口,就是让该类具有序列化和反序列化的能力 * * 真正的操作: 其实就是把当前的某个对象,进行序列化,就是把属性值通过流进行传输到其他的存储介质或者流 * * 实现序列化操作 * 实现反序列化操作 * 指定排序规则 * */ public class Flow implements WritableComparable<Flow>{ private String phone; private long upFlow; private long downFlow; private long sumFlow; public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public Flow() { super(); } public Flow(String phone, long upFlow, long downFlow, long sumFlow) { super(); this.phone = phone; this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } @Override public String toString() { return "Flow [phone=" + phone + ", upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow + "]"; } /** * 序列化方法 */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(phone); out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化操作 */ @Override public void readFields(DataInput in) throws IOException { this.phone = in.readUTF(); this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } /** * 排序规则 */ @Override public int compareTo(Flow o) { /** * 按照总流量 从大到小 */ long diff = o.getSumFlow() - this.getSumFlow(); if(diff == 0){ return 0; }else{ return diff > 0 ? 1 : -1; } } }统计上行流量和下行流量之和并且按照流量大小倒序排序的 MR 程序Flow2MR :
package flow.pojo; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 实现流量汇总并且按照流量大小倒序排序 前提:处理的数据是已经汇总过的结果文件 */ public class Flow2MR { // 在 kv 中传输我们自定义的对象是可以的,但是必须实现 hadoop 的序列化机制 implements Writable, 如果要排序, // 还要实现 Comparable 接口, hadoop 为 我 们 提 供 了 一 个 方 便 的 类 , 叫 做 WritableComparable,直接实现就好 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //集群 conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); System.setProperty("HADOOP_USER_NAME", "hadoop"); Job job = Job.getInstance(conf); // 告诉框架,我们的程序所在 jar 包的路径 job.setJarByClass(Flow2MR.class); // 告诉框架,我们的程序所用的 mapper 类和 reducer 类 job.setMapperClass(Flow2MRMapper.class); // job.setReducerClass(Flow2MRReducer.class); // 告诉框架,我们的 mapperreducer 输出的数据类型 job.setMapOutputKeyClass(Flow.class); job.setMapOutputValueClass(NullWritable.class); // // 如果reducer阶段的输出的key-value的类型和mapper阶段的一致,那么可以省略前面的setMapOutClass() // job.setOutputKeyClass(Text.class); // job.setOutputValueClass(Text.class); // 框架中默认的输入输出组件就是这俩货,所以可以省略这两行代码 /* * job.setInputFormatClass(TextInputFormat.class); * job.setOutputFormatClass(TextOutputFormat.class); */ // 告诉框架,我们要处理的文件在哪个路径下 Path inputPath = new Path(args[0]); // 告诉框架,我们的处理结果要输出到哪里去 Path outputPath = new Path(args[1]); FileInputFormat.setInputPaths(job, inputPath); FileSystem fs = FileSystem.get(conf); if(fs.exists(outputPath)){ fs.delete(outputPath,true); } FileOutputFormat.setOutputPath(job, outputPath); boolean isDone = job.waitForCompletion(true); System.exit(isDone ? 0 : 1); } /** * Mapper阶段的业务逻辑 * * null也有对于的参与序列化的指定类型: NullWritable */ private static class Flow2MRMapper extends Mapper<LongWritable, Text, Flow, NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将读到的一行数据进行字段切分 String[] split = value.toString().split("\t"); // 抽取业务所需要的各字段 String phone = split[0]; long upFlow = Long.parseLong(split[1]); long downFlow = Long.parseLong(split[2]); long sumFlow = Long.parseLong(split[3]); Flow flow = new Flow(phone, upFlow, downFlow, sumFlow); context.write(flow, NullWritable.get()); } } /** * Reducer阶段的业务逻辑 */ private static class Flow2MRReducer extends Reducer<Text, Text, Text, Text>{ // reduce 方法接收到的 key 是某一组<a 手机号,bean><a 手机号,bean><a 手机号,bean>中的第一个手机号 // reduce 方法接收到的 vlaues 是这一组 kv 中的所有 bean 的一个迭代器 @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { /** * 在当前排序操作中,根本不需要 reducer阶段去指定 一些逻辑 * * 但是需要Reducer阶段: 因为只有有reducer阶段,最终的结果集才会按照key进行排序 */ } } }