转载请注明出处: http://blog.csdn.net/lonelytrooper/article/details/17040895
PutSortReducer:
[java] view plain copy // 对map阶段传递过来的puts中的KVs做排序,并将有序的KVs写到输出流(最终写的类是HFileWriterV1或HFileWriterV2的append方法)... public class PutSortReducer extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> { @Override protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<Put> puts, Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context) throws java.io.IOException, InterruptedException { // although reduce() is called per-row, handle pathological case // 设定一个RAM的阀值,用于应对非常规的情况.. 默认值2L * (1 << 30)为Integer.MAX_VALUE+1 long threshold = context.getConfiguration().getLong("putsortreducer.row.threshold", 2L * (1 << 30)); Iterator<Put> iter = puts.iterator(); while (iter.hasNext()) { TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR); // KVComparator long curSize = 0; // stop at the end or the RAM threshold // 用curSize累计当前puts的size,但这个size不能超过threshold... while (iter.hasNext() && curSize < threshold) { Put p = iter.next(); for (List<KeyValue> kvs : p.getFamilyMap().values()) { for (KeyValue kv : kvs) { map.add(kv); curSize += kv.getLength(); } } } // 记录已读取的map中的KV的个数,并将curSize转成易读的KB,MB,GB.. context.setStatus("Read " + map.size() + " entries of " + map.getClass() + "(" + StringUtils.humanReadableInt(curSize) + ")"); int index = 0; // 将当前有序的KV写到输出流.. for (KeyValue kv : map) { context.write(row, kv); if (index > 0 && index % 100 == 0) // 记录进度,每100个记录一次.. context.setStatus("Wrote " + index); } // if we have more entries to process //如果居然还有put没处理完..我们会通过context.write(null, null)强刷.. 这会关闭当前的Writer(StoreFile.Writer),并形成了一个StoreFile。 //在外层的下次循环中,会继续处理余下的数据,并创建新的StoreFile的Writer。 换言之,这种情况下相同rowkey的数据会被写到不同的StoreFile中... //细节部分可以看下HFileOutputFormat下RecordWriter类下的write方法.. if (iter.hasNext()) { // force flush because we cannot guarantee intra-row sorted // order context.write(null, null); } } } } KeyValueSortReducer: [java] view plain copy // 类比PutSortReducer,对map传递过来的KVs进行排序,并将有序的KVs写到输出流... // 如果一行包含的列非常多的话,有oom的风险.. public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> { protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs, org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context) throws java.io.IOException, InterruptedException { TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR); for (KeyValue kv: kvs) { map.add(kv.clone()); } context.setStatus("Read " + map.getClass()); int index = 0; for (KeyValue kv: map) { context.write(row, kv); if (index > 0 && index % 100 == 0) context.setStatus("Wrote " + index); } } } 简单说下TotalOrderPartitioner和SimpleTotalOrderPartitioner:TotalOrderPartitioner:
做全排序的东东,Hbase中的TOP其实就是Hadoop中TOP的直接拷贝,通过从外部文件中读取分区点来实现。 在bulk load中,这个外部文件即为从HTable中获取的region的startKeys处理之后得到的split points,这个split points文件被写到了路径Path partitionsPath = new Path(job.getWorkingDirectory(), "partitions_" + UUID.randomUUID())。
SimpleTotalOrderPartitioner:
简单的做全排序的东东,原则是根据输入的startkey和endkey进行均分,区间是左闭右开。