Spark连接hbase的步骤:
1. 构建sparkconf配置信息,设置spark主机位置,设置程序名称,资源数等
2. 构建sparkcontext
3. 构建Sqlcontext
4. 通过sqlcontext操作构建RDD
5. 将RDD转换为dataframe
6. 用DataFrame注册表
7. 操作表进行处理
package cn.Dalong.test.DaLong_hbase import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.{Level, Logger} /** * Created by DreamBoy on 2017/5/5. */ object scala { def main(args: Array[String]): Unit = { //设置控制台不显示不必要的信息 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) //设置spark参数 val conf =newSparkConf().setMaster("local[2]").setAppName("HbaseTest") conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") val sc = newSparkContext(conf) val hbaseConf = HBaseConfiguration.create() val sqlContext = newSQLContext(sc) //配置HBase hbaseConf.set("hbase.rootdir","hdfs://http://192.168.10.228/hbase") hbaseConf.set("hbase.zookeeper.quorum","192.168.10.228,192.168.10.229,192.168.10.230,192.168.10.231,192.168.10.232") hbaseConf.set("hbase.zookeeper.property.clientPort","2181") hbaseConf.set("hbase.master","192.168.10.230") //定义表Hbase表的名字 val tableName = "deppon_test" val out_tbl="deppon_tt" //设置需要在hbase中查询的表名 hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName) //创建连接 val connection = ConnectionFactory.createConnection(hbaseConf) //scan操作 hao_zl构建hadoopRDD对象 // hao_zl 并获取访问hbase的返回集[(ImmutableBytesWritable,result)]元组 val hbaseRDD = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hbaseRDD.foreach(println(_)) println("==========================================================") /* hbaseRDD.foreach{ case (_,result)=> val key = Bytes.toString(result.getRow) val cookieid = Bytes.toString(result.getValue("basicmod".getBytes, "cookieid".getBytes)) val createtime = Bytes.toString(result.getValue("basicmod".getBytes, "createtime".getBytes)) val pv = Bytes.toString(result.getValue("basicmod".getBytes, "pv".getBytes)) //println("Row key:" + key + " cookieid:" + cookieid + " createtime:" + createtime+" pv:"+pv) (key,cookieid,createtime,pv) }*/ //获取对应到对应需要的列信息 函数=> val tblrdd = hbaseRDD.map {case(_, result) => val key = Bytes.toString(result.getRow) val cookieid = Bytes.toString(result.getValue("basicmod".getBytes,"cookieid".getBytes)) val createtime = Bytes.toString(result.getValue("basicmod".getBytes,"createtime".getBytes)) val pv = Bytes.toString(result.getValue("basicmod".getBytes,"pv".getBytes)) //println("Row key:" + key + " cookieid:" + cookieid + " createtime:" + createtime+" pv:"+pv) (key, cookieid, createtime, pv) } //hao_zl 隐式转换为dataframe import sqlContext.implicits._ //将返回的表中列信息封装成tbl_test类的元素值 val rowrdd = tblrdd.map(x=>tbl_test(x._1,x._2,x._3,x._4)).toDF() //注册表 rowrdd.registerTempTable("tbl") sqlContext.sql("select * from tbl").show()//.write.mode(SaveMode.Append).format("org.apache.phoenix.spark").insertInto("deppon_tt") //mode(SaveMode.Overwrite).options(Map("table" -> "USER_OVERVIEW", "zkUrl" -> conf.getString("Hbase.url"))).format("org.apache.phoenix.spark").save() sc.stop() } } case class tbl_test(id:String,cookieid:String,createtime:String,pv:String)
1. 本地运行需要将hive的hive-site.xml hdfs-site.xml文件配置到项目中的/main/resources文件夹下使本地程序找到集群就可以运行了
2. 服务器上运行的命令:
spark-submit --executor-memory 2g--driver-memory 200M --total-executor-cores 288 --executor-cores 2 --confspark.kryoserializer.buffer.max=256m --conf spark.kryoserializer.buffer=64m--master yarn --class cn.deppon.sparkhive.Dalong/home/appman/DL_test/original-MavenTest-1.0-SNAPSHOT.jar
object Dalong { def main(args: Array[String]): Unit = { //设置conf信息 val conf = new SparkConf().setAppName("Dalong").setMaster("local[2]") //创建spark上下文对象并加载配置信息初始化环境 val sparkContext =newSparkContext(conf) //使用spark上下文环境常见hivesql val sqlContext =newHiveContext(sparkContext) //可以直接运行需要执行的代码并插入到hive中 sqlContext.sql("create table dl_test.DL_in_test3 as select * from dl_test.test_e2 where country = 'china'").show() } }
3.spark连接hbase 读写操作
代码:
package cn.DaLong_hbase import com.deppon.spark.hbase.HbaseTool.hbaseConf import com.deppon.spark.hbase.{HbaseTool, SparkCont} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.{SparkConf, SparkContext} /** * Created by DreamBoy on 2017/5/8. */ object Hbase_read_write { def main(args: Array[String]): Unit = { //val sc = SparkCont.getSparkContext("Hbase_read_Write") //构建sc val conf = new SparkConf().setMaster("local[2]").setAppName("Hbase_read_Write") // .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(conf) //val hbase_read = HbaseTool.HbaseTool_Read("member",sc) //构建读取表信息的rdd hbaseConf.set(TableInputFormat.INPUT_TABLE,"member") val hbase_read = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result] ) //构建写表信息的rdd //val hbase_write = HbaseTool.HbaseTool_Write("member",sc) val hbase_write = new JobConf(hbaseConf) hbase_write.setOutputFormat(classOf[TableOutputFormat]) hbase_write.set(TableOutputFormat.OUTPUT_TABLE,"member") val h_Read_data = hbase_read.map{ case(_,result)=>{ val row = Bytes.toString(result.getRow) val name = Bytes.toString(result.getValue("info".getBytes(),"name".getBytes())) val age = Bytes.toString(result.getValue("info".getBytes(),"age".getBytes())) (row,name,age) } } // val result = h_Read_data.collect() //println(result.toBuffer) //由于数据存在部分为空的情况,所以控制台会报空指针的情况 // \x00\x00\x00\x01 column=info:age, timestamp=1494219674019, value=\x00\x00\x00\x0F // \x00\x00\x00\x01 column=info:name, timestamp=1494219674019, value=jack // 这两行数据中row的值传入的本身是Int类型的但是存储的时候转换成了16进制的形式,读取出来可能无法解析导致结果的是多个空格 //但是数据(包括上面这种情况)都做了对应的处理,并已经正确写入到hbase中 //将读出的数据加工之后写入到表中 val rdd = h_Read_data.map{ arr=>{ val put = new Put(Bytes.toBytes(arr._1+20+"<<")) put.addColumn("info".getBytes(),"name".getBytes(),Bytes.toBytes(arr._2)) put.addColumn("info".getBytes(),"age".getBytes(),Bytes.toBytes(arr._3+"==")) //构成(ImmutableBytesWritable,result)的元组的形式 (new ImmutableBytesWritable,put) } } rdd.saveAsHadoopDataset(hbase_write) sc.stop() } }