hadoop中map分片信息的源码分析

xiaoxiao2021-02-28  63

之前大家对于hadoop中map输入的分片不是很了解,直接认为map输入的分片数是由文件的个数和是否大于block_size来决定map的个数,这样是不准确的,的确在默认的情况下是这样的,但是实际优化过程中,并不是文件越多就起的map就越多,因为小文件太多的话,这样处理会影响系统效率,当然大文件的切分也不一定是按照block_size来切分,今天我们主要看看小文件合并的类CombineFileInputFormat中和切分大文件的FileInputFormat类

1.先看FileInputFormat类 其切分的方法如下:

public List<InputSplit> getSplits(JobContext job) throws IOException { StopWatch sw = (new StopWatch()).start();//这是监控任务用的 /*这两个值可以通过mapred.min.split和mapred.max.split.size来设置 其中minSize是从1和你设置的mapred.min.split中最大值*/ long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); List<InputSplit> splits = new ArrayList();//存储得到的分片结果 List<FileStatus> files = this.listStatus(job);//存储着输入文件列表信息 Iterator i$ = files.iterator(); //创建一个用来遍历文件的迭代器 while(true) { while(true) { while(i$.hasNext()) { FileStatus file = (FileStatus)i$.next(); Path path = file.getPath(); long length = file.getLen();//该文件长度 if(length != 0L) { BlockLocation[] blkLocations; //判断是否是本节点路径下的file if(file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus)file).getBlockLocations();//读取文件所属block的位置 } else { FileSystem fs = path.getFileSystem(job.getConfiguration());//从别的节点上去读取文件位置 blkLocations = fs.getFileBlockLocations(file, 0L, length); } //判断文件是否可分割,如果是压缩的,将不可分割 if(this.isSplitable(job, path)) { long blockSize = file.getBlockSize(); //获取当前block_size大小,默认128M /*这里是关键,splitSize是由blockSize,minSize,maxSize共同确定的,这里的computeSplitSize()方法其具体实现方式是return Math.max(minSize, Math.min(maxSize, blockSize));也就是maxSize和blockSize的最小值与minSize的最大值就是splitSize*/ long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining;//分好片的剩余字节数 int blkIndex; //当剩余数据分片大小与分片大小比值大于1.1时,继续分片,小于时停止分片 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } //如果余下的数据小于一个splitSize就独自是一个分片 if(bytesRemaining != 0L) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else {//不可压缩就返回整块数据 splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //对于文件长度为0时,返回一个空的split splits.add(this.makeSplit(path, 0L, length, new String[0])); } } job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size()); //设置输入文件数量 sw.stop(); if(LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits; } } }

这里可以看出大文件分片的原理,一般可以通过设置mapred.max.split.size的值来分片,一般的设置map的个数可能没有效果。

1.再看CombineInputFormat类

下面小文件合并,这里的合并是把小文件的放进一个inputsplit当中,这里合并的代码在CombineFileSplit中的initSplit方法中,这里获取文件的每个小文件的开始位置,每个小文件的长度,位置和总长度

private void initSplit(Path[] files, long[] start, long[] lengths, String[] locations) { this.startoffset = start; this.lengths = lengths; this.paths = files; this.totLength = 0L; this.locations = locations; long[] arr$ = lengths; int len$ = lengths.length; for(int i$ = 0; i$ < len$; ++i$) { long length = arr$[i$]; this.totLength += length;//获取inputsplit的总长度 } }

CombineInputFormat中的getSplits方法

public List<InputSplit> getSplits(JobContext job) throws IOException { long minSizeNode = 0L; long minSizeRack = 0L; long maxSize = 0L; Configuration conf = job.getConfiguration(); if(this.minSplitSizeNode != 0L) { minSizeNode = this.minSplitSizeNode; } else { minSizeNode = conf.getLong("mapreduce.input.fileinputformat.split.minsize.per.node", 0L); //获取split.minsize.per.node参数,同一节点的数据块形成切片时,切片大小的最小值 } if(this.minSplitSizeRack != 0L) { minSizeRack = this.minSplitSizeRack; } else { minSizeRack = conf.getLong("mapreduce.input.fileinputformat.split.minsize.per.rack", 0L);//这是指如果在一个机架中,同一机架的数据块形成切片时,切片大小的最小值 } if(this.maxSplitSize != 0L) { maxSize = this.maxSplitSize; } else { maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0L);//切片大小的最大值 } //这里设置的参数maxSize>=minSizeRack>=minSizeNode 才具有合理性 if(minSizeNode != 0L && maxSize != 0L && minSizeNode > maxSize) { throw new IOException("Minimum split size pernode " + minSizeNode + " cannot be larger than maximum split size " + maxSize); } else if(minSizeRack != 0L && maxSize != 0L && minSizeRack > maxSize) { throw new IOException("Minimum split size per rack " + minSizeRack + " cannot be larger than maximum split size " + maxSize); } else if(minSizeRack != 0L && minSizeNode > minSizeRack) { throw new IOException("Minimum split size per node " + minSizeNode + " cannot be larger than minimum split " + "size per rack " + minSizeRack); } else { List<FileStatus> stats = this.listStatus(job);//获取所有文件信息 List<InputSplit> splits = new ArrayList(); if(stats.size() == 0) { return splits; } else { Iterator i$ = this.pools.iterator(); //迭代获取过滤池实例的所有文件myPaths while(i$.hasNext()) { CombineFileInputFormat.MultiPathFilter onepool = (CombineFileInputFormat.MultiPathFilter)i$.next(); ArrayList<FileStatus> myPaths = new ArrayList(); Iterator iter = stats.iterator(); while(iter.hasNext()) { FileStatus p = (FileStatus)iter.next(); if(onepool.accept(p.getPath())) { myPaths.add(p);//加入到myPaths iter.remove(); } } //为myPaths中的文件生成切片 this.getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits); } //为不属于任何过滤池的文件生成切片 this.getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits); this.rackToNodes.clear(); return splits; } } } private void getMoreSplits(JobContext job, List<FileStatus> stats, long maxSize, long minSizeNode, long minSizeRack, List<InputSplit> splits) throws IOException { Configuration conf = job.getConfiguration(); //再将小文件打包的时候,需要考虑到数据本地性,定义三个对应关系 HashMap<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks = new HashMap();//机架与数据块的关系 HashMap<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes = new HashMap();//数据块与节点的关系 HashMap<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks = new HashMap();//节点与数据块的关系 CombineFileInputFormat.OneFileInfo[] files = new CombineFileInputFormat.OneFileInfo[stats.size()]; if(stats.size() != 0) { long totLength = 0L; int i = 0; for(Iterator i$ = stats.iterator(); i$.hasNext(); totLength += files[i].getLength()) { FileStatus stat = (FileStatus)i$.next(); files[i] = new CombineFileInputFormat.OneFileInfo(stat, conf, this.isSplitable(job, stat.getPath()), rackToBlocks, blockToNodes, nodeToBlocks, this.rackToNodes, maxSize);//为每个文件创建这些映射关系,每个文件一个onFileInfo,用于描写文件的这些映射关系 } //为这些文件建成一个切片,具体createSplits()方法如下: this.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, maxSize, minSizeNode, minSizeRack, splits); } }

createSplits()方法

void createSplits(Map<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks, Map<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes, Map<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks, long totLength, long maxSize, long minSizeNode, long minSizeRack, List<InputSplit> splits) { ArrayList<CombineFileInputFormat.OneBlockInfo> validBlocks = new ArrayList(); long curSplitSize = 0L; int totalNodes = nodeToBlocks.size(); long totalLength = totLength; Multiset<String> splitsPerNode = HashMultiset.create(); HashSet completedNodes = new HashSet(); /*优先将处于同一个节点文件合成一个切片,遍历并累加这个节点的数据块*/ label170: do { Iterator iter = nodeToBlocks.entrySet().iterator(); while(true) { while(true) { Entry one; String node; do { if(!iter.hasNext()) { continue label170; } one = (Entry)iter.next(); node = (String)one.getKey(); } while(completedNodes.contains(node)); Set<CombineFileInputFormat.OneBlockInfo> blocksInCurrentNode = (Set)one.getValue(); Iterator oneBlockIter = blocksInCurrentNode.iterator();//获取当前节点有哪些block /*循环这些block列表*/ while(oneBlockIter.hasNext()) { CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)oneBlockIter.next(); if(!blockToNodes.containsKey(oneblock)) { oneBlockIter.remove(); } else { validBlocks.add(oneblock); blockToNodes.remove(oneblock);//将block从blockNodes中移除 curSplitSize += oneblock.length; /* 如果累加的数据块大于maxSize,将这些数据块生成切片*/ if(maxSize != 0L && curSplitSize >= maxSize) {/*创建这些block合并后的split,并加入到splits列表中*/ this.addCreatedSplit(splits, Collections.singleton(node), validBlocks); totalLength -= curSplitSize; curSplitSize = 0L;//当前数据块清0 splitsPerNode.add(node);//将完成分片的节点名称加入到splitsPerNode的set集合中 /*重置*/ blocksInCurrentNode.removeAll(validBlocks); validBlocks.clear(); break; } } } /*如果还有没有被split的block*/ if(validBlocks.size() != 0) { /*如果这些block大小,并且这些block的大小大于一个节点minSizeNode,将这些block合并为一个split*/ if(minSizeNode != 0L && curSplitSize >= minSizeNode && splitsPerNode.count(node) == 0) { this.addCreatedSplit(splits, Collections.singleton(node), validBlocks); totalLength -= curSplitSize; splitsPerNode.add(node); blocksInCurrentNode.removeAll(validBlocks); } else { /*剩余的block的大小还是没有达到,将剩余的这些block归还给blockToNodes,等以后统一处理*/ Iterator i$ = validBlocks.iterator(); while(i$.hasNext()) { CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)i$.next(); blockToNodes.put(oneblock, oneblock.hosts); } } validBlocks.clear(); curSplitSize = 0L; completedNodes.add(node); } else if(blocksInCurrentNode.size() == 0) { completedNodes.add(node); } } } } while(completedNodes.size() != totalNodes && totalLength != 0L); /*对不在一个节点上但在同一个rack的block进行合并*/ LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: " + completedNodes.size() + ", size left: " + totalLength); ArrayList overflowBlocks = new ArrayList(); HashSet racks = new HashSet(); Iterator i$; label130: /*对在同一个节点上上但是未处理的block 创建同机架的split*/ while(blockToNodes.size() > 0) { i$ = rackToBlocks.entrySet().iterator(); while(true) { while(true) { if(!i$.hasNext()) { continue label130; } Entry<String, List<CombineFileInputFormat.OneBlockInfo>> one = (Entry)i$.next(); racks.add(one.getKey()); List<CombineFileInputFormat.OneBlockInfo> blocks = (List)one.getValue(); boolean createdSplit = false; Iterator i$ = blocks.iterator(); /*对每一个block进行遍历*/ while(i$.hasNext()) { CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)i$.next(); /*对那些在blockToNodes的block进行合并分片,这里blockToNodes是说明哪些block没有被split*/ if(blockToNodes.containsKey(oneblock)) { validBlocks.add(oneblock); blockToNodes.remove(oneblock); curSplitSize += oneblock.length; if(maxSize != 0L && curSplitSize >= maxSize) { this.addCreatedSplit(splits, this.getHosts(racks), validBlocks); createdSplit = true; break; } } } /*如果我们创建过一个分片,就去下一个rack*/ if(createdSplit) { curSplitSize = 0L; validBlocks.clear(); racks.clear(); } else { /*对于还没有处理完的split,如果这些大小大于同机架的minSizeRack,就创建一个分片,否则就留给后面处理*/ if(!validBlocks.isEmpty()) { if(minSizeRack != 0L && curSplitSize >= minSizeRack) { this.addCreatedSplit(splits, this.getHosts(racks), validBlocks); } else { overflowBlocks.addAll(validBlocks); } } curSplitSize = 0L; validBlocks.clear(); racks.clear(); } } } } assert blockToNodes.isEmpty(); assert curSplitSize == 0L; assert validBlocks.isEmpty(); assert racks.isEmpty(); /*对处于同一机架但是未处理完的block进行处理*/ i$ = overflowBlocks.iterator(); while(i$.hasNext()) { CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)i$.next(); validBlocks.add(oneblock); curSplitSize += oneblock.length; for(int i = 0; i < oneblock.racks.length; ++i) { racks.add(oneblock.racks[i]); } /*如果这些block的大小大于最大分片,则创建split*/ if(maxSize != 0L && curSplitSize >= maxSize) { this.addCreatedSplit(splits, this.getHosts(racks), validBlocks); curSplitSize = 0L; validBlocks.clear(); racks.clear(); } } /*否则无论剩下多少block,都合并为一个分片,这里的block的既不在同一节点,也不在同一rack*/ if(!validBlocks.isEmpty()) { this.addCreatedSplit(splits, this.getHosts(racks), validBlocks); } }

总结来看,对于小文件的合并,主要是这么分的,先设置maxSize,也就是最大分片的大小,然后在设置split.minsize.per.node的大小,也就是同一节点最小分片的大小,最后是设置split.maxsize.per.rack的大小,也就是同一机架上的最小分片大小, (1)分片先是按照同一节点来分,当同一节点下的所有小文件占据的数据块大于maxSize就合并为一个分片,剩余的大于minsize.per.node就单独为一个分片,再剩下的就先留着。 (2)再计算同一机架上的所有节点下未分片的block数据块,当数据块大小大于maxSize就形成一个分片,剩下的如果大于minsize.per.rack就单独为一个分片,最后剩下的留着 (3)对于不同机架上的不同节点中剩余的block,如果大小大于maxSize就合并为一个split,如果小于maxSize,所有直接合并为一个分片 (4)对于这三个参数都没有设定的话,就会把所有的文件合成一个分片来输入 这里分析的是hadoop2.8版本的源码,其他版本差距不大 分析源码太累,希望能给大家有所帮助

转载请注明原文地址: https://www.6miu.com/read-59273.html

最新回复(0)