大家在接触mapreduce时,对于一个文件要分片,在分片末尾会不会有一条记录被拆开,从而导致map端在输入的时候,会不会有一行记录是不完整的疑惑,其实这肯定是不可能,如果这样的问题没有解决,那在生产生活中肯定是经常遇到,所以hadoop源码中肯定有这方面的处理,今天就带大家看看mapreduce中的源码,我们这次看的是hadoop2.8版本的,其他版本差距不大。
首先肯定想到的是读取文本文件的类是TextInputFormat,其中比较重要的方法是createRecordReader方法
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if(null != delimiter) { recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); } return new LineRecordReader(recordDelimiterBytes); }这里主要是从配置中读取了文本的记录之间的分隔符,如果不另外在设置别的分隔符,这里的recordDelimiterBytes就是空值,则默认CR和LF这些换行符为record的分隔符,将分隔符设成UTF-8的形式传入到LineRecordReader中去,意思是每次碰到一个分隔符,就为一条记录,然后我们再去LineRecordReader类中去看,我们去看nextKeyValue()方法,当其返回true时,就会调用getCurrentKey()和getCurrentValue()去获取key和value
public boolean nextKeyValue() throws IOException { if(this.key == null) { this.key = new LongWritable(); } this.key.set(this.pos); if(this.value == null) { this.value = new Text(); } int newSize = 0; //getFilePosition()当前文件的偏移量,end表示split结束的位置 while(this.getFilePosition() <= this.end || this.in.needAdditionalRecordAfterSplit()) { if(this.pos == 0L) { newSize = this.skipUtfByteOrderMark(); } else { newSize = this.in.readLine(this.value, this.maxLineLength, this.maxBytesToConsume(this.pos)); this.pos += (long)newSize; } if(newSize == 0 || newSize < this.maxLineLength) { break; } LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - (long)newSize)); } if(newSize == 0) { this.key = null; this.value = null; return false; } else { return true; } }这里的key值就this.pos也就是当前文本的偏移量,这里再判断当前位置是不是0,如果是的话执行skipUtfByteOrderMark()方法。这个方法是由于txt文件是UTF-8的时候文件前面会有三个字节的标识,这里是要跳过这个标识,如果不是的话,就读取下一行的newsize大小,当前位置pos加上这个newsize就是新的下一行的位置。我们再查看 newSize = this.in.readLine(this.value, this.maxLineLength, this.maxBytesToConsume(this.pos))这条语句,将当前value,最大读取行的长度,maxBytesToConsume()这个方法是判断当前文件是不是压缩的,不是就是默认值2147483647,紧接着我们去看readline()方法。这个方法在hadoop的util包里LineReader类中对应着这条语句
public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { return this.recordDelimiterBytes != null?this.readCustomLine(str, maxLineLength, maxBytesToConsume):this.readDefaultLine(str, maxLineLength, maxBytesToConsume); }这里我们的this.recordDelimiterBytes由于我们没有新制定别的分隔符,这里的就为null,则会跳到默认的readDefaultLine中,我们跳到readDefaultLine方法中去看,这里应该就有我们想要的答案
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { str.clear(); //先清空str int txtLength = 0; int newlineLength = 0; //行结束分隔符的长度 boolean prevCharCR = false; //CR换行符 long bytesConsumed = 0L; //返回的一行字节的长度 //这里循环主要是由于设置了maxLineLength,所以每次str.append最多只能加maxLineLength长度,如果此时在buffer中大于maxLineLength就需要多次append,直到buffer完全被append do { int startPosn = this.bufferPosn; //buffPosn表示上一次到达的位置 //如果之前读取过一个buffer,此时的bufferLength=bufferPosn,则重新设置bufferPosn if(this.bufferPosn >= this.bufferLength) { startPosn = this.bufferPosn = 0; if(prevCharCR) { //如果上一个buffer的最后一个字符为‘\r’, ++bytesConsumed; } this.bufferLength = this.fillBuffer(this.in, this.buffer, prevCharCR); //从缓冲区读取新的buffer的长度 if(this.bufferLength <= 0) { break; } } while(this.bufferPosn < this.bufferLength) { //从缓冲区读取的数据中寻找换行符 if(this.buffer[this.bufferPosn] == 10) { //当前位置有分隔符LF时, newlineLength = prevCharCR?2:1; //且前面的字符是CR,则行结束符长度为2 ++this.bufferPosn;//读取位置向前进一位 break;//找到一行的结束符,跳出 } if(prevCharCR) {//如果上一个位置为CR,但是当前位置不是LF newlineLength = 1; break; } prevCharCR = this.buffer[this.bufferPosn] == 13;//判断当前位置是否为CR ++this.bufferPosn; } int readLength = this.bufferPosn - startPosn;//此次读取的数据长度 if(prevCharCR && newlineLength == 0) {//正好读到缓冲区的末尾且正好是CR --readLength; } bytesConsumed += (long)readLength;//加上此次读取的长度 int appendLength = readLength - newlineLength;去掉行结束符 if(appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } if(appendLength > 0) { str.append(this.buffer, startPosn, appendLength); txtLength += appendLength; } } while(newlineLength == 0 && bytesConsumed < (long)maxBytesToConsume);//当没有遇到CR和LF等换行符时,且字节数没有超过上限 if(bytesConsumed > 2147483647L) { throw new IOException("Too many bytes before newline: " + bytesConsumed); } else { return (int)bytesConsumed; } }上面的for循环可以看出即使跨split.跨block也不能阻止它完整读取一行数据的决心,刚才那个 nextKeyValue()则是控制split读取数据的结束位置,while(this.getFilePosition() <= this.end )表示的是当前位置小于即使在等于split的末尾,还会去读取下一个分片第一行的数据,这个等号很重要,意思是即使我读到当前的位置末尾了,我也还要读取下一个分片的位置,当然下一个split在读取的时候是怎么读取的,在LineRecordReader的initialize()方法中会对每一个split进行初始化,start是这个split的开始位置,除了第一个split的start为0L外,其他分片均不为0。initialize()这段代码很重要
if(this.start != 0L) { this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start)); }这段代码中指出如果start!=0也就是不是第一个分片的时候,都要默认从第二行的开始位置读起,这里start更新为第二行的起始位置,为什么是第二行开始的位置,是由于后面会返回遇到第一个换行符的位置,start即可更新为下一行的开始位置,又由于maxLineLeng=0,计算出的appendLength=0
if(appendLength > 0) { str.append(this.buffer, startPosn, appendLength); txtLength += appendLength; }这个方法不会执行,str自然就不会append,则不会出现重复数据的情况,设计的很巧妙,熬夜写的,之前一直看了好久,希望大家能够多多关注,谢谢。
