2017年8月5日22:17:06
mapreduce解决数据倾斜的思路
我们知道在我们设定job.setNumReduceTasks(5); 表示有5个reducetask
我们不去设定的时候就默认为1,
当有多个reducetask 时候,我们如果不自定义 XXX extends Partitioner 组件时候,默认按照 key 的 hashcode%numreducetask去分配
我们自定义ProvincePartitioner extends Partitioner 可以指定分组
相同的key对应的values(多对去了相同的reducetask里, 看上去没什么问题.. ........ 但是不同的key对应的数量不一样啊 导致有的redeucetask任务量大,有的任务量的小,这样导致cpu没有合理利用,这样效率也比较低, 比如 <hello ,n> n从 1取到100000000000000000000000000000000000000,
<world,1> 就一个值
显然hello和world在不同的reducetask里边 任务量,运行时间,等等各种问题。。。
2.1
(1)map端的join逻辑不需要阶段,在默认情况下reducetask为1 ,所有要手动改为0 ,
(2)并且指定需要缓存一个文件到所有的maptask运行的节点目录
(3)将表文件缓存到task运行的节点目录中去
(4)重写setup setup作用
通过阅读父类的Mapper的源码,发现 setup() while(还有没读的行){map();} clearup(); setup调用一次 ,map调用多次,clearup调用一次 也就是maptask在处理数据之前就会调用一次setup setup 可以用来做一些初始化工作
(5)局部代码
//map端的join逻辑不需要阶段,在默认情况下reducetask为1 ,所有要手动改为0 job.setNumReduceTasks(0); //指定需要缓存一个文件到所有的maptask运行的节点目录 /*job.addArchiveToClassPath(archive);*/ //缓存jar包到task运行节点的classpath中 /*job.addFileToClassPath(file); */ //缓存普通文件到task运行的classpath中 /*job.addCacheArchive(uri);*/ //缓存压缩包文件到task运行的节点目录 /*job.addCacheFile(uri);*/ //缓存普通文件到task运行的节点目录 //将产品表文件缓存到task运行的节点目录中去 job.addCacheFile(new URI("file:///C:/mapreduce/file/pd.txt")); setup代码 @Override protected void setup(Context context)throws IOException, InterruptedException { BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"))); /*org.apache.commons.lang.StringUtils; * isNotBlank判断是不是空串 * isNOtempty判断是不是空串和null */ String line; while(StringUtils.isNotEmpty(line=br.readLine())) { String[] fields = line.split(","); pdInfoMap.put(fields[0], fields[1]); } br.close(); }3.案例
package cn.yzx.bigdata.mr.map_side_join; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MapSideJoin { static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ //用hashmap加载产品信息表 Map<String,String> pdInfoMap=new HashMap<String,String>(); Text k=new Text(); /* 通过阅读父类的Mapper的源码,发现 setup() while(还有没读的行){map();} clearup(); setup调用一次 ,map调用多次,clearup调用一次 也就是maptask在处理数据之前就会调用一次setup setup 可以用来做一些初始化工作 */ @Override protected void setup(Context context)throws IOException, InterruptedException { BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"))); /*org.apache.commons.lang.StringUtils; * isNotBlank判断是不是空串 * isNOtempty判断是不是空串和null */ String line; while(StringUtils.isNotEmpty(line=br.readLine())) { String[] fields = line.split(","); pdInfoMap.put(fields[0], fields[1]); } br.close(); } //由于已经持有完整的产品信息表,所以在map方法中就能实现join逻辑了 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { String orderLine = value.toString(); String[] fields = orderLine.split(","); String pdName=pdInfoMap.get(fields[1]); k.set(orderLine+"\t"+pdName); context.write(k, NullWritable.get() ); } } public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("mapreduce.framework.name","local"); conf.set("fs.faultFS", "file:///"); Job job = Job.getInstance(conf); job.setJarByClass(MapSideJoin.class); job.setMapperClass(MapSideJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("C:/mapreduce/mapjoininput")); FileOutputFormat.setOutputPath(job, new Path("C:/mapreduce/mapjoinoutput")); //map端的join逻辑不需要阶段,在默认情况下reducetask为1 ,所有要手动改为0 job.setNumReduceTasks(0); //指定需要缓存一个文件到所有的maptask运行的节点目录 /*job.addArchiveToClassPath(archive);*/ //缓存jar包到task运行节点的classpath中 /*job.addFileToClassPath(file); */ //缓存普通文件到task运行的classpath中 /*job.addCacheArchive(uri);*/ //缓存压缩包文件到task运行的节点目录 /*job.addCacheFile(uri);*/ //缓存普通文件到task运行的节点目录 //将产品表文件缓存到task运行的节点目录中去 job.addCacheFile(new URI("file:///C:/mapreduce/file/pd.txt")); boolean res = job.waitForCompletion(true); System.out.println(res ? 0 : 1); } }