import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.slf4j.{Logger, LoggerFactory}
/**
* yaojingyi 2017-08-16
*
*/
object HFile_test {
val LOG: Logger = LoggerFactory.getLogger(
this.getClass)
def main(args: Array[String]): Unit = {
if (args.length !=
4){
System.err.print(
"Usage:BulkLoad <zookeeper> <tablename> <inputPath> <outPath> \n")
System.exit(
1)
}
val zk=args(
0)
val tablename = args(
1)
val inputPath=args(
2)
val outPath = args(
3)
val sparkCtx =
new SparkContext(
new SparkConf().setAppName(
"TEST_YJY_HFile_BuildHFile"))
val conf = HBaseConfiguration.create()
conf.set(
"hbase.zookeeper.quorum", zk)
conf.set(
"hbase.zookeeper.property.clientPort",
"2181")
val table =
new HTable(conf,tablename)
val startkey_size= table.getStartKeys().size
var startkeys=
new Array[String](startkey_size)
var i=
0
for(row <- table.getStartKeys()){
val startkey = Bytes.toStringBinary(row)
startkeys(i)=Bytes.toStringBinary(row)
i=i+
1
}
val allRdd = sparkCtx.textFile(inputPath)
.map(t=> {
val arr = t.split(
"@")
if(arr.length>=
4){
val rowKey = arr(
0)
val family = arr(
1)
val colum = arr(
2)
val value = arr(
3)
(
rowKey+
"_"+colum,
new ResultKV(
new ImmutableBytesWritable(rowKey.getBytes()),
new KeyValue(rowKey.getBytes(),family.getBytes(), colum.getBytes(), value.getBytes()))
)
}
})
.repartitionAndSortWithinPartitions(
new HFilePartitioner(startkeys))
.map(x => (x._2.k,x._2.v))
val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoad(job,table)
allRdd.saveAsNewAPIHadoopFile(outPath,classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat2],conf)
sparkCtx.stop()
}
case class ResultKV(k:ImmutableBytesWritable,v:KeyValue)
/**
*
* 自定义partition 根据 region的split的分布,将数据分到不同的partition中
*
* @param startkeyArr 表的每个region startKey
*/
class HFilePartitioner(startkeyArr: Array[String]) extends Partitioner {
override def numPartitions: Int = startkeyArr.length
override def getPartition(key: Any): Int = {
val domain = key.asInstanceOf[String]
for(i <-
0 until startkeyArr.length){
if(domain.toString().compare(startkeyArr(i))<
0){
return i-
1
}
}
return startkeyArr.length-
1
}
override def equals(other: Any): Boolean = other
match {
case h: HFilePartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
}
}