package com.uplooking.bigdata.mr.test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URL; import java.util.HashMap; import java.util.Map; /** student sid sname sclass scid score scid scname score sid class clz_id clz_name join 求出学生的完整的信息 sid sname clz_name scname score select sid, sname, clz_name, scname, score from student s left join score sc on s.scid = sc.scid left join class c s.sclass = c.clz_id; 我们不用sql了,mr来计算了 两种方式 在map端完成join操作 如果其中的几张表非常小,比如说功能表,就可以将这些小表直接加载到内存里面, 在map端就可以实现和大表的join操作 在reduce完成join操作 reduce不好的一个地方就是大量数据要经过shuffle, 然后经过网络传输到各个reduce节点,这非常消耗性能 案例: 登录表login.txt userId sexId login_date 1 0 20121213 2 0 20121213 3 1 20121213 4 1 20121213 1 0 20121114 2 0 20121114 3 1 20121114 4 1 20121114 1 0 20121213 1 0 20121114 9 0 20121114 性别表sex.txt sexId sexName 0 女 1 男 用户表user.txt userID userName user_province 1 张三 河北 2 李四 北京 3 王五 天津 4 赵六 广东 结果: 用户登录的次数 userName sexName login_count 张三 男 4 赵六 女 2 李四 男 2 王五 女 2 分析: 通过上述的分析,我们发现其中有两张表的内容相对较少,sex.txt和user.txt内容相对较少, 因为需要进行多表关联才能求出最后想要的结果,那么我们可以采取在map端进行关联操作, 将这两张小表加到缓存中,可以在每一个mapper task中有一份备份,在每个mapper task中取出小表,放置到hashmap 然后和大表进行关联,其实质就是让二者的关联字段到hashmap中get(key)判断value是否存在,如果存在,关联成功 如果vlaue为null,没有相关的值 yarn jar mr-mapjoin.jar com.uplooking.bigdata.mr.test.MapSideJoinApp /input/mr/join/login.txt /output/mr/join/ hdfs://ns1/input/mr/join/sex.txt hdfs://ns1/input/mr/join/user.txt */ public class MapSideJoinApp { public static void main(String[] args) throws Exception { if(args == null || args.length < 4) { System.err.println("parameter Errors ! Usage: <inputPath outputPath sexURL userURL>"); System.exit(-1); } String inputPath = args[0]; Path outputPath = new Path(args[1]); String sexURI = args[2]; String userURI = args[3]; Configuration conf = new Configuration(); Job job = Job.getInstance(conf, MapSideJoinApp.class.getSimpleName()); job.setJarByClass(MapSideJoinApp.class); //将两个小文件加入到每个mappertask所在节点的缓存里面 job.addCacheFile(new URI(sexURI)); job.addCacheFile(new URI(userURI)); //设置输入 FileInputFormat.setInputPaths(job, inputPath); job.setInputFormatClass(TextInputFormat.class); //设置map job.setMapperClass(MapSideJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //设置输出 outputPath.getFileSystem(conf).delete(outputPath, true); FileOutputFormat.setOutputPath(job, outputPath); job.setOutputFormatClass(TextOutputFormat.class); //设置reducer job.setReducerClass(MapSideJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置reducer task数量 job.setNumReduceTasks(1); //启动job job.waitForCompletion(true); } static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> { //声明两张hash表 private Map<String, String> sexMap = new HashMap<String, String>(); private Map<String, String> userMap = new HashMap<String, String>(); @Override protected void setup(Context context) throws IOException, InterruptedException { JobContext jobContext = context; Path[] paths = jobContext.getLocalCacheFiles(); BufferedReader br = null; for(Path path : paths) { String filepath = path.toString(); br = new BufferedReader(new FileReader(filepath)); String line = null; while((line = br.readLine()) != null) { String[] splits = line.split("\t"); if (filepath.contains("sex.txt")) { sexMap.put(splits[0].trim(), splits[1].trim()); } else if (filepath.contains("user.txt")) { userMap.put(splits[0].trim(), splits[1].trim()); } } } if(br != null) { br.close(); } } @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String line = v1.toString(); String[] splits = line.split("\t"); String name = userMap.get(splits[0].trim()); String sexName = sexMap.get(splits[1].trim()); context.write(new Text(name + "\t" + sexName), NullWritable.get()); } } static class MapSideJoinReducer extends Reducer<Text, NullWritable, Text, IntWritable> { @Override protected void reduce(Text k2, Iterable<NullWritable> v2s, Context context) throws IOException, InterruptedException { int sum = 0; for(NullWritable nw : v2s) { sum++; } context.write(k2, new IntWritable(sum)); } } }
转载请注明原文地址: https://www.6miu.com/read-71293.html