Hadoop 自定义序列化编程

xiaoxiao2021-02-28  8

一 自定义序列化需求 二 MapReduce代码编写 1 自定义序列化类 package com.cakin.hadoop.mr; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class UserWritable implements WritableComparable<UserWritable> { private Integer id; private Integer income; private Integer expenses; private Integer sum; public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeInt(id); out.writeInt(income); out.writeInt(expenses); out.writeInt(sum); } public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.id=in.readInt(); this.income=in.readInt(); this.expenses=in.readInt(); this.sum=in.readInt(); } public Integer getId() { return id; } public UserWritable setId(Integer id) { this.id = id; return this; } public Integer getIncome() { return income; } public UserWritable setIncome(Integer income) { this.income = income; return this; } public Integer getExpenses() { return expenses; } public UserWritable setExpenses(Integer expenses) { this.expenses = expenses; return this; } public Integer getSum() { return sum; } public UserWritable setSum(Integer sum) { this.sum = sum; return this; } public int compareTo(UserWritable o) { // TODO Auto-generated method stub return this.id>o.getId()?1:-1; } @Override public String toString() { return id + "\t"+income+"\t"+expenses+"\t"+sum; } } 2 编写MapReduce package com.cakin.hadoop.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Reducer; /* * 测试数据 * 用户id 收入 支出 * 1 1000 0 * 2 500 300 * 1 2000 1000 * 2 500 200 * * 需求: * 用户id 总收入 总支出 总的余额 * 1 3000 1000 2000 * 2 1000 500 500 * */ public class CountMapReduce { public static class CountMapper extends Mapper<LongWritable,Text,IntWritable,UserWritable> { private UserWritable userWritable =new UserWritable(); private IntWritable id =new IntWritable(); @Override protected void map(LongWritable key,Text value, Mapper<LongWritable,Text,IntWritable,UserWritable>.Context context) throws IOException, InterruptedException{ String line = value.toString(); String[] words = line.split("\t"); if(words.length ==3) { userWritable.setId(Integer.parseInt(words[0])) .setIncome(Integer.parseInt(words[1])) .setExpenses(Integer.parseInt(words[2])) .setSum(Integer.parseInt(words[1])-Integer.parseInt(words[2])); id.set(Integer.parseInt(words[0])); } context.write(id, userWritable); } } public static class CountReducer extends Reducer<IntWritable,UserWritable,UserWritable,NullWritable> { /* * 输入数据 * <1,{[1,1000,0,1000],[1,2000,1000,1000]}> * <2,[2,500,300,200],[2,500,200,300]> * * */ private UserWritable userWritable = new UserWritable(); private NullWritable n = NullWritable.get(); protected void reduce(IntWritable key,Iterable<UserWritable> values, Reducer<IntWritable,UserWritable,UserWritable,NullWritable>.Context context) throws IOException, InterruptedException{ Integer income=0; Integer expenses = 0; Integer sum =0; for(UserWritable u:values) { income += u.getIncome(); expenses+=u.getExpenses(); } sum = income - expenses; userWritable.setId(key.get()) .setIncome(income) .setExpenses(expenses) .setSum(sum); context.write(userWritable, n); } } public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); /* * 集群中节点都有配置文件 conf.set("mapreduce.framework.name.", "yarn"); conf.set("yarn.resourcemanager.hostname", "mini1"); */ Job job=Job.getInstance(conf,"countMR"); //jar包在哪里,现在在客户端,传递参数 //任意运行,类加载器知道这个类的路径,就可以知道jar包所在的本地路径 job.setJarByClass(CountMapReduce.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(CountMapper.class); job.setReducerClass(CountReducer.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(UserWritable.class); //指定最终输出的数据kv类型 job.setOutputKeyClass(UserWritable.class); job.setOutputKeyClass(NullWritable.class); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job中配置的相关参数及job所用的java类在的jar包,提交给yarn去运行 //提交之后,此时客户端代码就执行完毕,退出 //job.submit(); //等集群返回结果在退出 boolean res=job.waitForCompletion(true); System.exit(res?0:1); //类似于shell中的$? } } 三 通过eclipse将程序打包为mapreduce.jar 四 MapReduce的自定义序列化测试 1 准备数据 [root@centos hadoop-2.7.4]# bin/hdfs dfs -cat /input/data 1 1000 0 2 500 300 1 2000 1000 2 500 200 2 运行MapReduce [root@centos hadoop-2.7.4]# bin/yarn jar /root/jar/mapreduce.jar /input/data /output3 17/12/20 21:24:45 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 17/12/20 21:24:46 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 17/12/20 21:24:47 INFO input.FileInputFormat: Total input paths to process : 1 17/12/20 21:24:47 INFO mapreduce.JobSubmitter: number of splits:1 17/12/20 21:24:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1513775596077_0001 17/12/20 21:24:49 INFO impl.YarnClientImpl: Submitted application application_1513775596077_0001 17/12/20 21:24:49 INFO mapreduce.Job: The url to track the job: http://centos:8088/proxy/application_1513775596077_0001/ 17/12/20 21:24:49 INFO mapreduce.Job: Running job: job_1513775596077_0001 17/12/20 21:25:13 INFO mapreduce.Job: Job job_1513775596077_0001 running in uber mode : false 17/12/20 21:25:13 INFO mapreduce.Job: map 0% reduce 0% 17/12/20 21:25:38 INFO mapreduce.Job: map 100% reduce 0% 17/12/20 21:25:54 INFO mapreduce.Job: map 100% reduce 100% 17/12/20 21:25:56 INFO mapreduce.Job: Job job_1513775596077_0001 completed successfully 17/12/20 21:25:57 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=94 FILE: Number of bytes written=241391 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=135 HDFS: Number of bytes written=32 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=23672 Total time spent by all reduces in occupied slots (ms)=11815 Total time spent by all map tasks (ms)=23672 Total time spent by all reduce tasks (ms)=11815 Total vcore-milliseconds taken by all map tasks=23672 Total vcore-milliseconds taken by all reduce tasks=11815 Total megabyte-milliseconds taken by all map tasks=24240128 Total megabyte-milliseconds taken by all reduce tasks=12098560 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=80 Map output materialized bytes=94 Input split bytes=94 Combine input records=0 Combine output records=0 Reduce input groups=2 Reduce shuffle bytes=94 Reduce input records=4 Reduce output records=2 Spilled Records=8 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=157 CPU time spent (ms)=1090 Physical memory (bytes) snapshot=275660800 Virtual memory (bytes) snapshot=4160692224 Total committed heap usage (bytes)=139264000 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=41 File Output Format Counters Bytes Written=32 3 测试结果 [root@centos hadoop-2.7.4]# bin/hdfs dfs -cat /output3/part-r-00000 1 3000 1000 2000 2 1000 500 500 五 参考 http://www.jikexueyuan.com/course/2710.html
转载请注明原文地址: https://www.6miu.com/read-2150097.html

最新回复(0)