HBase-Spark生成hfile

xiaoxiao2021-02-28  6

import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2 import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue} import org.apache.hadoop.mapreduce.Job import org.apache.spark.{Partitioner, SparkConf, SparkContext} import org.slf4j.{Logger, LoggerFactory} /** * yaojingyi 2017-08-16 * */ object HFile_test { val LOG: Logger = LoggerFactory.getLogger(this.getClass) def main(args: Array[String]): Unit = { if (args.length != 4){ System.err.print("Usage:BulkLoad <zookeeper> <tablename> <inputPath> <outPath> \n") System.exit(1) } val zk=args(0) val tablename = args(1) val inputPath=args(2) val outPath = args(3) val sparkCtx = new SparkContext(new SparkConf().setAppName("TEST_YJY_HFile_BuildHFile")) val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", zk) conf.set("hbase.zookeeper.property.clientPort", "2181") val table = new HTable(conf,tablename) val startkey_size= table.getStartKeys().size var startkeys=new Array[String](startkey_size) var i=0 for(row <- table.getStartKeys()){ val startkey = Bytes.toStringBinary(row) startkeys(i)=Bytes.toStringBinary(row) i=i+1 } val allRdd = sparkCtx.textFile(inputPath) .map(t=> { val arr = t.split("@") if(arr.length>=4){ val rowKey = arr(0) val family = arr(1) val colum = arr(2) val value = arr(3) ( rowKey+"_"+colum, new ResultKV( new ImmutableBytesWritable(rowKey.getBytes()), new KeyValue(rowKey.getBytes(),family.getBytes(), colum.getBytes(), value.getBytes())) ) } }) //.sortByKey()//rowkey和column需要按字典排序 table.getStartKeys .repartitionAndSortWithinPartitions(new HFilePartitioner(startkeys)) .map(x => (x._2.k,x._2.v))//(ImmutableBytesWritable ,KeyValue) val job = Job.getInstance(conf) job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputValueClass(classOf[KeyValue]) HFileOutputFormat2.configureIncrementalLoad(job,table) allRdd.saveAsNewAPIHadoopFile(outPath,classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat2],conf) sparkCtx.stop() } case class ResultKV(k:ImmutableBytesWritable,v:KeyValue) /** * * 自定义partition 根据 region的split的分布,将数据分到不同的partition中 * * @param startkeyArr 表的每个region startKey */ class HFilePartitioner(startkeyArr: Array[String]) extends Partitioner { override def numPartitions: Int = startkeyArr.length override def getPartition(key: Any): Int = { val domain = key.asInstanceOf[String] for(i <- 0 until startkeyArr.length){ if(domain.toString().compare(startkeyArr(i))<0){ return i-1 } } //default return 1 return startkeyArr.length-1 } override def equals(other: Any): Boolean = other match { case h: HFilePartitioner => h.numPartitions == numPartitions case _ => false } } }
转载请注明原文地址: https://www.6miu.com/read-1150253.html

最新回复(0)