spark 连接hbase hive

xiaoxiao2021-02-28  93

1.  Spark连接hbase

//后续更多细节补充,现在还不太懂。      如有大神看到请不吝赐教

Spark连接hbase的步骤:

1.      构建sparkconf配置信息,设置spark主机位置,设置程序名称,资源数等

2.      构建sparkcontext

3.      构建Sqlcontext

4.      通过sqlcontext操作构建RDD

5.      将RDD转换为dataframe

6.      用DataFrame注册表

7.      操作表进行处理

package cn.Dalong.test.DaLong_hbase import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.{Level, Logger} /**   * Created by DreamBoy on 2017/5/5.   */ object scala {   def main(args: Array[String]): Unit = {     //设置控制台不显示不必要的信息    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)     //设置spark参数     val conf =newSparkConf().setMaster("local[2]").setAppName("HbaseTest")     conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")     val sc = newSparkContext(conf)     val hbaseConf = HBaseConfiguration.create()     val sqlContext = newSQLContext(sc)     //配置HBase     hbaseConf.set("hbase.rootdir","hdfs://http://192.168.10.228/hbase")     hbaseConf.set("hbase.zookeeper.quorum","192.168.10.228,192.168.10.229,192.168.10.230,192.168.10.231,192.168.10.232")     hbaseConf.set("hbase.zookeeper.property.clientPort","2181")     hbaseConf.set("hbase.master","192.168.10.230")     //定义表Hbase表的名字     val tableName = "deppon_test"     val out_tbl="deppon_tt"     //设置需要在hbase中查询的表名     hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)     //创建连接     val connection = ConnectionFactory.createConnection(hbaseConf)     //scan操作   hao_zl构建hadoopRDD对象     // hao_zl 并获取访问hbase的返回集[(ImmutableBytesWritable,result)]元组     val hbaseRDD = sc.newAPIHadoopRDD(       hbaseConf,       classOf[TableInputFormat],       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],       classOf[org.apache.hadoop.hbase.client.Result])     hbaseRDD.foreach(println(_))     println("==========================================================")     /*     hbaseRDD.foreach{ case (_,result)=>       val key = Bytes.toString(result.getRow)       val cookieid = Bytes.toString(result.getValue("basicmod".getBytes, "cookieid".getBytes))       val createtime = Bytes.toString(result.getValue("basicmod".getBytes, "createtime".getBytes))       val pv = Bytes.toString(result.getValue("basicmod".getBytes, "pv".getBytes))       //println("Row key:" + key + " cookieid:" + cookieid + " createtime:" + createtime+" pv:"+pv)       (key,cookieid,createtime,pv)     }*/     //获取对应到对应需要的列信息   函数=>     val tblrdd = hbaseRDD.map {case(_, result) =>       val key = Bytes.toString(result.getRow)       val cookieid = Bytes.toString(result.getValue("basicmod".getBytes,"cookieid".getBytes))       val createtime = Bytes.toString(result.getValue("basicmod".getBytes,"createtime".getBytes))       val pv = Bytes.toString(result.getValue("basicmod".getBytes,"pv".getBytes))       //println("Row key:" + key + " cookieid:" + cookieid + " createtime:" + createtime+" pv:"+pv)       (key, cookieid, createtime, pv)     }     //hao_zl 隐式转换为dataframe     import sqlContext.implicits._     //将返回的表中列信息封装成tbl_test类的元素值     val rowrdd = tblrdd.map(x=>tbl_test(x._1,x._2,x._3,x._4)).toDF()   //注册表     rowrdd.registerTempTable("tbl")     sqlContext.sql("select * from tbl").show()//.write.mode(SaveMode.Append).format("org.apache.phoenix.spark").insertInto("deppon_tt")     //mode(SaveMode.Overwrite).options(Map("table" -> "USER_OVERVIEW", "zkUrl" -> conf.getString("Hbase.url"))).format("org.apache.phoenix.spark").save()     sc.stop()   } } case class tbl_test(id:String,cookieid:String,createtime:String,pv:String)

 

 

2.  Spark连接hive数据

1.      本地运行需要将hive的hive-site.xml hdfs-site.xml文件配置到项目中的/main/resources文件夹下使本地程序找到集群就可以运行了

2.      服务器上运行的命令:

spark-submit --executor-memory 2g--driver-memory 200M --total-executor-cores 288 --executor-cores 2 --confspark.kryoserializer.buffer.max=256m --conf spark.kryoserializer.buffer=64m--master yarn --class cn.deppon.sparkhive.Dalong/home/appman/DL_test/original-MavenTest-1.0-SNAPSHOT.jar

object Dalong {     def main(args: Array[String]): Unit = {       //设置conf信息       val conf = new SparkConf().setAppName("Dalong").setMaster("local[2]")       //创建spark上下文对象并加载配置信息初始化环境       val sparkContext =newSparkContext(conf)       //使用spark上下文环境常见hivesql       val sqlContext =newHiveContext(sparkContext)       //可以直接运行需要执行的代码并插入到hive       sqlContext.sql("create table dl_test.DL_in_test3 as select * from dl_test.test_e2 where country = 'china'").show()     } }

 

3.spark连接hbase 读写操作

代码:

package cn.DaLong_hbase import com.deppon.spark.hbase.HbaseTool.hbaseConf import com.deppon.spark.hbase.{HbaseTool, SparkCont} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.{SparkConf, SparkContext} /**   * Created by DreamBoy on 2017/5/8.   */ object Hbase_read_write {   def main(args: Array[String]): Unit = {     //val sc = SparkCont.getSparkContext("Hbase_read_Write") //构建sc     val conf = new SparkConf().setMaster("local[2]").setAppName("Hbase_read_Write")     // .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")     val sc = new SparkContext(conf)     //val hbase_read = HbaseTool.HbaseTool_Read("member",sc) //构建读取表信息的rdd     hbaseConf.set(TableInputFormat.INPUT_TABLE,"member")     val hbase_read = sc.newAPIHadoopRDD(       hbaseConf,       classOf[TableInputFormat],       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],       classOf[org.apache.hadoop.hbase.client.Result]     )     //构建写表信息的rdd     //val hbase_write = HbaseTool.HbaseTool_Write("member",sc)     val hbase_write = new JobConf(hbaseConf)     hbase_write.setOutputFormat(classOf[TableOutputFormat])     hbase_write.set(TableOutputFormat.OUTPUT_TABLE,"member")     val h_Read_data = hbase_read.map{       case(_,result)=>{       val row = Bytes.toString(result.getRow)       val name = Bytes.toString(result.getValue("info".getBytes(),"name".getBytes()))       val age = Bytes.toString(result.getValue("info".getBytes(),"age".getBytes()))         (row,name,age)       }     }   //  val result = h_Read_data.collect()   //println(result.toBuffer) //由于数据存在部分为空的情况,所以控制台会报空指针的情况 //  \x00\x00\x00\x01                                  column=info:age, timestamp=1494219674019, value=\x00\x00\x00\x0F                                                                                  //  \x00\x00\x00\x01                                  column=info:name, timestamp=1494219674019, value=jack   //  这两行数据中row的值传入的本身是Int类型的但是存储的时候转换成了16进制的形式,读取出来可能无法解析导致结果的是多个空格 //但是数据(包括上面这种情况)都做了对应的处理,并已经正确写入到hbase中     //将读出的数据加工之后写入到表中    val rdd = h_Read_data.map{       arr=>{         val put = new Put(Bytes.toBytes(arr._1+20+"<<"))         put.addColumn("info".getBytes(),"name".getBytes(),Bytes.toBytes(arr._2))         put.addColumn("info".getBytes(),"age".getBytes(),Bytes.toBytes(arr._3+"=="))         //构成(ImmutableBytesWritable,result)的元组的形式         (new ImmutableBytesWritable,put)       }     }     rdd.saveAsHadoopDataset(hbase_write)     sc.stop()   } }

 

转载请注明原文地址: https://www.6miu.com/read-53304.html

最新回复(0)