MapReduce之Join操作

xiaoxiao2021-02-28  87

package com.uplooking.bigdata.mr.test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URL; import java.util.HashMap; import java.util.Map; /**  student  sid    sname    sclass scid  score  scid scname    score    sid  class  clz_id    clz_name  join  求出学生的完整的信息  sid sname clz_name scname score  select  sid, sname, clz_name, scname, score  from student s left join score sc on s.scid = sc.scid  left join class c s.sclass = c.clz_id;  我们不用sql了,mr来计算了  两种方式      在map端完成join操作          如果其中的几张表非常小,比如说功能表,就可以将这些小表直接加载到内存里面,          在map端就可以实现和大表的join操作      在reduce完成join操作          reduce不好的一个地方就是大量数据要经过shuffle,          然后经过网络传输到各个reduce节点,这非常消耗性能 案例:  登录表login.txt  userId sexId login_date  1    0    20121213  2    0    20121213  3    1    20121213  4    1    20121213  1    0    20121114  2    0    20121114  3    1    20121114  4    1    20121114  1    0    20121213  1    0    20121114  9    0    20121114  性别表sex.txt  sexId sexName  0    女  1    男  用户表user.txt  userID userName    user_province  1    张三    河北  2    李四    北京  3    王五    天津  4    赵六    广东  结果:     用户登录的次数     userName    sexName login_count     张三      男       4     赵六      女       2     李四      男       2     王五      女       2  分析:     通过上述的分析,我们发现其中有两张表的内容相对较少,sex.txt和user.txt内容相对较少,     因为需要进行多表关联才能求出最后想要的结果,那么我们可以采取在map端进行关联操作,     将这两张小表加到缓存中,可以在每一个mapper task中有一份备份,在每个mapper task中取出小表,放置到hashmap     然后和大表进行关联,其实质就是让二者的关联字段到hashmap中get(key)判断value是否存在,如果存在,关联成功     如果vlaue为null,没有相关的值  yarn jar mr-mapjoin.jar com.uplooking.bigdata.mr.test.MapSideJoinApp /input/mr/join/login.txt /output/mr/join/ hdfs://ns1/input/mr/join/sex.txt hdfs://ns1/input/mr/join/user.txt  */ public class MapSideJoinApp {     public static void main(String[] args) throws Exception {         if(args == null || args.length < 4) {             System.err.println("parameter Errors ! Usage: <inputPath outputPath sexURL userURL>");             System.exit(-1);         }         String inputPath = args[0];         Path outputPath = new Path(args[1]);         String sexURI = args[2];         String userURI = args[3];         Configuration conf = new Configuration();         Job job = Job.getInstance(conf, MapSideJoinApp.class.getSimpleName());         job.setJarByClass(MapSideJoinApp.class);         //将两个小文件加入到每个mappertask所在节点的缓存里面         job.addCacheFile(new URI(sexURI));         job.addCacheFile(new URI(userURI));         //设置输入         FileInputFormat.setInputPaths(job, inputPath);         job.setInputFormatClass(TextInputFormat.class);         //设置map         job.setMapperClass(MapSideJoinMapper.class);         job.setMapOutputKeyClass(Text.class);         job.setMapOutputValueClass(NullWritable.class);         //设置输出         outputPath.getFileSystem(conf).delete(outputPath, true);         FileOutputFormat.setOutputPath(job, outputPath);         job.setOutputFormatClass(TextOutputFormat.class);         //设置reducer         job.setReducerClass(MapSideJoinReducer.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(IntWritable.class);         //设置reducer task数量         job.setNumReduceTasks(1);         //启动job         job.waitForCompletion(true);     }     static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {         //声明两张hash表         private Map<String, String> sexMap = new HashMap<String, String>();         private Map<String, String> userMap = new HashMap<String, String>();         @Override         protected void setup(Context context) throws IOException, InterruptedException {             JobContext jobContext = context;             Path[] paths = jobContext.getLocalCacheFiles();             BufferedReader br = null;             for(Path path : paths) {                 String filepath = path.toString();                 br = new BufferedReader(new FileReader(filepath));                 String line = null;                 while((line = br.readLine()) != null) {                     String[] splits = line.split("\t");                     if (filepath.contains("sex.txt")) {                         sexMap.put(splits[0].trim(), splits[1].trim());                     } else if (filepath.contains("user.txt")) {                         userMap.put(splits[0].trim(), splits[1].trim());                     }                 }             }             if(br != null) {                 br.close();             }         }         @Override         protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {             String line = v1.toString();             String[] splits = line.split("\t");             String name = userMap.get(splits[0].trim());             String sexName = sexMap.get(splits[1].trim());             context.write(new Text(name + "\t" + sexName), NullWritable.get());         }     }     static class MapSideJoinReducer extends Reducer<Text, NullWritable, Text, IntWritable> {         @Override         protected void reduce(Text k2, Iterable<NullWritable> v2s, Context context) throws IOException, InterruptedException {             int sum = 0;             for(NullWritable nw : v2s) {                 sum++;             }             context.write(k2, new IntWritable(sum));         }     } }
转载请注明原文地址: https://www.6miu.com/read-71293.html

最新回复(0)