hbase bulk load相关源码简析之HFileOutputFormat、LoadIncrementalHFiles

xiaoxiao2021-02-28  119

转载请注明出处: http://blog.csdn.net/lonelytrooper/article/details/17040895

PutSortReducer:

[java] view plain copy // 对map阶段传递过来的puts中的KVs做排序,并将有序的KVs写到输出流(最终写的类是HFileWriterV1或HFileWriterV2的append方法)...  public class PutSortReducer extends          Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {        @Override      protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<Put> puts,              Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context)              throws java.io.IOException, InterruptedException {          // although reduce() is called per-row, handle pathological case          // 设定一个RAM的阀值,用于应对非常规的情况.. 默认值2L * (1 << 30)为Integer.MAX_VALUE+1          long threshold = context.getConfiguration().getLong("putsortreducer.row.threshold",                  2L * (1 << 30));          Iterator<Put> iter = puts.iterator();          while (iter.hasNext()) {              TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR); // KVComparator              long curSize = 0;              // stop at the end or the RAM threshold              // 用curSize累计当前puts的size,但这个size不能超过threshold...              while (iter.hasNext() && curSize < threshold) {                  Put p = iter.next();                  for (List<KeyValue> kvs : p.getFamilyMap().values()) {                      for (KeyValue kv : kvs) {                          map.add(kv);                          curSize += kv.getLength();                      }                  }              }              // 记录已读取的map中的KV的个数,并将curSize转成易读的KB,MB,GB..              context.setStatus("Read " + map.size() + " entries of " + map.getClass() + "("                      + StringUtils.humanReadableInt(curSize) + ")");              int index = 0;              // 将当前有序的KV写到输出流..              for (KeyValue kv : map) {                  context.write(row, kv);                  if (index > 0 && index % 100 == 0// 记录进度,每100个记录一次..                      context.setStatus("Wrote " + index);              }                // if we have more entries to process              //如果居然还有put没处理完..我们会通过context.write(null, null)强刷.. 这会关闭当前的Writer(StoreFile.Writer),并形成了一个StoreFile。              //在外层的下次循环中,会继续处理余下的数据,并创建新的StoreFile的Writer。 换言之,这种情况下相同rowkey的数据会被写到不同的StoreFile中...              //细节部分可以看下HFileOutputFormat下RecordWriter类下的write方法..              if (iter.hasNext()) {                  // force flush because we cannot guarantee intra-row sorted                  // order                  context.write(nullnull);              }          }      }  }   KeyValueSortReducer: [java] view plain copy // 类比PutSortReducer,对map传递过来的KVs进行排序,并将有序的KVs写到输出流...  // 如果一行包含的列非常多的话,有oom的风险..  public class KeyValueSortReducer extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {    protected void reduce(ImmutableBytesWritable row, java.lang.Iterable<KeyValue> kvs,        org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)    throws java.io.IOException, InterruptedException {      TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);      for (KeyValue kv: kvs) {        map.add(kv.clone());      }      context.setStatus("Read " + map.getClass());      int index = 0;      for (KeyValue kv: map) {        context.write(row, kv);        if (index > 0 && index % 100 == 0) context.setStatus("Wrote " + index);      }    }  }   简单说下TotalOrderPartitioner和SimpleTotalOrderPartitioner:

TotalOrderPartitioner:

做全排序的东东,Hbase中的TOP其实就是Hadoop中TOP的直接拷贝,通过从外部文件中读取分区点来实现。 在bulk load中,这个外部文件即为从HTable中获取的region的startKeys处理之后得到的split points,这个split points文件被写到了路径Path partitionsPath = new Path(job.getWorkingDirectory(), "partitions_" + UUID.randomUUID())。

SimpleTotalOrderPartitioner:

简单的做全排序的东东,原则是根据输入的startkey和endkey进行均分,区间是左闭右开。
转载请注明原文地址: https://www.6miu.com/read-37902.html

最新回复(0)