spark(8)

xiaoxiao2021-02-28  4

 

1. 多表关联join

 

 

DSL语法风格中的N多种join方式:

 

    val spark: SparkSession = SparkSession.builder()      .master("local[*]")      .appName(this.getClass.getSimpleName)      .getOrCreate()    import spark.implicits._    // userInfo name age sex pcode    val user: Dataset[String] = spark.createDataset(List("dingding 18 F henan", "dangdang 16 F henan", "dongdong 28 M shandong",      "laozhao 30 M heilongjiang", "cangls 36 F jp","meixi 30 M agt"))    // province pcode pname    val province: Dataset[String] = spark.createDataset(List("henan,河南省", "shandong,山东省", "heilongjiang,黑龙江", "jp,日本省", "tw,台湾省"))    // 对数据进行切分    val userSplit: Dataset[Row] = user.map(str => {      val split = str.split(" ")      (split(0), split(1).toInt, split(2), split(3))    }).toDF("name", "age", "gender", "pcode")    val proSplit: Dataset[Row] = province.map(str => {      val split = str.split(",")      (split(0), split(1))    }).toDF("pcode", "province")    // SQL    // 注册2个临时视图  关联查询    userSplit.createTempView("v_user")    proSplit.createTempView("v_pro")//    val result1: DataFrame = spark.sql("select * from v_user t1 join v_pro t2 on t1.pcode = t2.pcode ")//    result1.show()    //DSL join  添加条件    判断的标准是 ===    userSplit.join(proSplit).where(userSplit("pcode") === proSplit("pcode")) // .show()    userSplit.join(proSplit,userSplit("pcode") === proSplit("pcode"),"right").show()    // 如果参与join的字段名称一致 可以直接使用一个    userSplit.join(proSplit,"pcode")// .show()    // 默认使用的是inner join    spark.stop()

 

2. 根据IP地址求归属地,写mysql

val spark: SparkSession = SparkSession.builder()  .master("local[*]")  .appName(this.getClass.getSimpleName)  .getOrCreate()import spark.implicits._// 读取数据val ipruleDs: Dataset[String] = spark.read.textFile("f:/mrdata/ipdata/ip.txt")val logDs: Dataset[String] = spark.read.textFile("f:/mrdata/ipdata/ipaccess.log")// 数据切分val ipRules: Dataset[Row] = ipruleDs.map(str => {  val split = str.split("\\|")  (split(2).toLong, split(3).toLong, split(6))}).toDF("start", "end", "province")val longIps = logDs.map(str => {  val split = str.split("\\|")  val ip = split(1)  IpUtils.ip2Long(ip)}).toDF("longIp")// 数据匹配聚合  关联查询ipRules.createTempView("v_ipRules")longIps.createTempView("v_longIp")val result: DataFrame = spark.sql("select province,count(*) as cnts from v_ipRules t1 join " +  "v_longIp t2 on t2.longIp between t1.start and t1.end " +  "group by province " +  " order by cnts desc ")result.show()

 

 

 

 

 

 

 

添加mysql的驱动jar包:

<!--mysql的驱动jar--><dependency>    <groupId>mysql</groupId>    <artifactId>mysql-connector-java</artifactId>    <version>5.1.38</version></dependency>

 

// 写入mysqlval url = "jdbc:mysql://localhost:3306/scott?characterEncoding=utf-8"val table = "access_log10"val conn = new Properties()conn.setProperty("user", "root")conn.setProperty("password", "123")conn.setProperty("driver", "com.mysql.jdbc.Driver")result.write.jdbc(url, table, conn)

 

 

 

 

2.1. 读取配置文件

2.1.1. 利用类加载器获取配置文件

    // 传统的通过类加载器获取某一个配置文件//    val asStream = IpLocal.getClass.getClassLoader.getResourceAsStream("application.conf")//    val prop = new Properties()//    prop.load(asStream)//    prop.getProperty("")

 

 

2.1.2. 利用ConfigFactory来加载配置文件

 

必须导入依赖jar

<!-- <dependency>     <groupId>com.typesafe.akka</groupId>     <artifactId>akka-remote_2.11</artifactId>     <version>2.4.17</version> </dependency>--> <!--ConfigFactory所在的jar--> <dependency>     <groupId>com.typesafe</groupId>     <artifactId>config</artifactId>     <version>1.3.0</version> </dependency>

 

 

 

// 默认的加载顺序 application.conf  -->  application.json --->  application.propertiesval load = ConfigFactory.load()val conf1 = load.getString("db.url")println(conf1)

 

mysql的配置写入到配置文件中:

 

 

在代码中通过配置来获取参数:

// 写入mysqlval url = config.getString("db.url")val table = config.getString("db.table")val conn = new Properties()// user是关键字 ,不能用于keyconn.setProperty("user", config.getString("db.user"))conn.setProperty("password", config.getString("db.passwd"))conn.setProperty("driver", config.getString("db.driver"))result.write./*mode(SaveMode.Append).*/jdbc(url, table, conn)

 

 

 

不同的配置文件,有不同的写法:

 

详情可参考: https://blog.csdn.net/qq_21439395/article/details/80678524 

 

 

3. 保存模式

3.1. 简介

当第2次写入数据到mysql的时候,报错了:

 

 

原因: write 写的API,默认使用的模式是:SaveMode.ErrorIfExists 如果写入的路径存在,就会报错。

 

 

 

3.2. 4种模式的说明:

ErrorIfExists 默认模式,  如果存在,即报错。

Append : 追加模式, 如果存在,就追加

Ignore 忽略模式, 如果存在,不写了。

Overwrite: 覆盖模式,先删除,再创建

 

2种写法: SaveMode.Append “append”

 

 

 

推荐使用SaveMode.Append 写法。不容易写错。

 

只要使用writeAPI 进行数据存储,都可以使用mode方法。

4. 自定义函数

UDFUser  defined  Functions

UDAF  User  defined  aggregate  Function

UDTF

 

 

 

 

 

 

 

 

 

val config = ConfigFactory.load()val spark: SparkSession = SparkSession.builder()  .master("local[*]")  .appName(this.getClass.getSimpleName)  .getOrCreate()import spark.implicits._// 读取数据val ipruleDs: Dataset[String] = spark.read.textFile("f:/mrdata/ipdata/ip.txt")val logDs: Dataset[String] = spark.read.textFile("f:/mrdata/ipdata/ipaccess.log")// 数据切分val ipRules: Dataset[(Long, Long, String)] = ipruleDs.map(str => {  val split = str.split("\\|")  (split(2).toLong, split(3).toLong, split(6))})// 可以 先把收集收集起来  ds --> Array//    ipRules.rdd.collect()  DataFrame .collect  --->  Array[Row]val ipRulesArr: Array[(Long, Long, String)] = ipRules.collect()// ip规则库数据进行广播val bc: Broadcast[Array[(Long, Long, String)]] = spark.sparkContext.broadcast(ipRulesArr)val longIps = logDs.map(str => {  val split = str.split("\\|")  val ip = split(1)  IpUtils.ip2Long(ip)}).toDF("longIp")longIps.createTempView("v_longIp")// 自定义函数的名称spark.udf.register("ip2Province", (ip: Long) => {  val newIpRules: Array[(Long, Long, String)] = bc.value  // 根据10 进制的ip地址,得到省份  IpUtils.binarySearchFun(newIpRules, ip)})// 自定义函数val result = spark.sql("select ip2Province(longIp) province,count(*) cnts from v_longIp where ip2Province(longIp) != 'unknown' " +  "group by province" +  " order by cnts desc")result.printSchema()// 写入mysqlval url = config.getString("db.url")val table = config.getString("db.table")val conn = new Properties()// user是关键字 ,不能用于keyconn.setProperty("user", config.getString("db.user"))conn.setProperty("password", config.getString("db.passwd"))conn.setProperty("driver", config.getString("db.driver"))result.write.mode(SaveMode.Overwrite).jdbc(url, table, conn)

 

 

5. sparksql的任务提交到集群运行

 

通过—jars 导入jar包,或者把jar包放到sparkjars目录下。

spark-submit --master spark://hdp-01:7077 --class day08.IpLocalStandalone --jars mysql-connector-java-5.1.38.jar,config-1.3.0.jar /root/spark-sql33-1.0-SNAPSHOT.jar hdfs://hdp-01:9000/ip.txt hdfs://hdp-01:9000/ipaccess.log

 

 

 

6. UDAF-几何平均数

几何平均数:

 

掌握实现的思路即可。

 

 

7. SparkSql中的数据源

spark sql 读取 写入 数据的格式

file,jdbc,json,csv,parquet

 

 

7.1. file

 

val spark: SparkSession = SparkSession.builder()  .master("local[*]")  .appName(this.getClass.getSimpleName)  .getOrCreate()import spark.implicits._// 读取文件val file: Dataset[String] = spark.read.textFile("person.txt")//    val text: DataFrame = spark.read.text("person.txt")val df: DataFrame = file.map(t => {  val split = t.split(" ")  (split(0), split(1).toInt, split(2))}).toDF("name", "age", "fv")// 写文件  text 写的api 只支持一列df.select("name").write.text("output1")// saveApi 写的文件格式是Parquet文件 独有的读写APIdf.write.save("output2")spark.close()

 

 

7.2. JDBC数据源

JDBC去连接任意的数据库。

val spark: SparkSession = SparkSession.builder()  .master("local[*]")  .appName(this.getClass.getSimpleName)  .getOrCreate()import spark.implicits._val config = ConfigFactory.load()// 读取mysql数据库  ---》  操作 之后 ---》 写到mysqlval url = config.getString("db.url")val conn = new Properties()conn.setProperty("user", config.getString("db.user"))conn.setProperty("password", config.getString("db.passwd"))// 连接mysql数据库 设置参数url driver dbtable user passwordval empData: DataFrame = spark.read.format("jdbc").options(  Map(    "url" -> "jdbc:mysql://localhost:3306/scott?characterEncoding=utf-8",    "driver" -> "com.mysql.jdbc.Driver",    "dbtable" -> "emp",    "user" -> "root",    "password" -> "123"  )).load()spark.read.format("jdbc").jdbc(url,"emp",conn)// 读取数据val jdbc: DataFrame = spark.read.jdbc(url, "emp", conn)jdbc.printSchema()val result1: Dataset[Row] = jdbc.where("sal > 2500").select("empno")// 写数据result1.write.mode(SaveMode.Append).jdbc(url, "emp10", conn)spark.close()

 

注意: 读取mysql的时候,必须导入myql的驱动jar包。

 

7.3. JSON数据源

val spark: SparkSession = SparkSession.builder()      .master("local[*]")      .appName(this.getClass.getSimpleName)      .getOrCreate()    // dataset[Row]    val json: DataFrame = spark.read.json("product.json")    json.printSchema()    // json 格式的数据,生成的schema信息不是按照原有的数据顺序,是按照字典顺序得到的。//    json.toDF("pname1","price1","amount1").show()    val result: Dataset[Row] = json.filter("price > 1000")    // 读寄送 写 json    result.write.mode(SaveMode.Append).json("jsonout1")    spark.close()

 

 

 

{"name":"zs","address":{"city":"bj","street":"cp"}}

// dataset[Row]val json: DataFrame = spark.read.json("address.json")json.printSchema()json.createTempView("v_json")val result1 = spark.sql("select address.city,address.street from v_json")

 

 

{"name":"zs","address":[{"city":"bj"},{"city":"cp2"}]}

 

// dataset[Row]val json: DataFrame = spark.read.json("address2.json")json.printSchema()import org.apache.spark.sql.functions._json.select($"name", explode($"address")).toDF("name", "city").show()

 

7.4. csv

python

默认的分隔符是逗号, 可以直接使用excel 打开。

本身没有schema信息,读取和存入的数据都不带有schema

默认生成的shema信息是 _c0 _c1 …. 默认的类型都是String

val spark: SparkSession = SparkSession.builder()  .master("local[*]")  .appName(this.getClass.getSimpleName)  .getOrCreate()import spark.implicits._// 读取文件val file: Dataset[Row] = spark.read.csv("product.csv")file.printSchema()/**  * root  * |-- _c0: string (nullable = true)  * |-- _c1: string (nullable = true)  * |-- _c2: string (nullable = true)  */val config = ConfigFactory.load()// 读取mysql数据库  ---》  操作 之后 ---》 写到mysqlval url = config.getString("db.url")val conn = new Properties()conn.setProperty("user", config.getString("db.user"))conn.setProperty("password", config.getString("db.passwd"))// 读取数据val jdbc: DataFrame = spark.read.jdbc(url, "emp", conn)jdbc.filter("sal >2600").write.csv("csvpath1")spark.close()

 

 

7.5. parquet数据

实用

列式存储,

parquet数据格式的优势??

列式存储,使用时只需要读取需要的列,支持向量运算,能获得更好的扫描行。 压缩编码可以降低磁盘存储空间,由于同一列的数据类型是一样的,可以使用不同的压缩编码。 可以跳过不符合条件的数据,只读取需要的数据,降低IO的数据量 通用的,适配多种计算框架,查询引擎(hive,impala,pig等),计算框架(mapreduce,spark等),数据模型(avro,thrift,json等)

 

优点:

列式存,读取操作方便。

支持压缩编码,有效减少磁盘占用空间。压缩比高:

通用性。

 

缺点:

不支持update操作。生成之后,不能修改。

 

parquet有自己独特的API

 

 

 

val spark: SparkSession = SparkSession.builder()  .master("local[*]")  .appName(this.getClass.getSimpleName)  .getOrCreate()import spark.implicits._// 读取 parquet文件的数据val file: Dataset[Row] = spark.read.parquet("output2")file.printSchema()file.show()val result: Dataset[Row] = file.filter("age > 30")// 写到parquetresult.write.parquet("parquetout1")spark.close()

 

 

 

7.6. 日志数据量补充:

处理的数据量  1000gb   

1, 看每一条日志数据的大小, 有多少字段,看存储什么数据

0.5kb   80个字段

35M  ---  7万条数据

 

天的数据量 =  有多少用户  *  每一个用户 每天大概产生几条日志。

 

100000000 * 10 * 0.5KB  

条数 / 数据量的大小

 

1TB     1个亿

 

8. hiveOnSpark

 

 

利用Hive的规范,利用hive的元数据库,把数据存储在hdfs中。

HQL 编程,程序运行在 spark的集群中。

 

 

8.1. 有了hive

如果已经存在了hive。直接把hive的配置,hive-site.xml,放到spark的安装conf目录下即可。

 

8.2. 如果没有hive

mysql的元数据库   --- hdfs之间的映射关系。

 

 

 

8.3. spark-sql

相当于spark core 模式下的 spark-shell。

提供了一个编写sql语句的交互式命令行。

 

 

执行spark-sql,创建的数据库,的元数据? 数据? 都存放在哪里?

 

 

默认情况下,执行spark-sql,使用的元数据库是derby,元数据库的存储位置是metastore_db,数据存储位置是spark-warehouse。

在哪里执行spark-sql命令,就在哪个目录下生成这几个目录。

 

derby --à  mysql

spark-warehouse -à  hdfs的目录。

 

8.4. 创建一个mysql数据库:

只需要安装一台即可。

这里用的mysql版本比较低。   安装完成之后,没有默认的密码。

# 安装mysql

yum -y install mysql mysql-server

# 启动mysql

service mysqld start

#安全配置

/usr/bin/mysql_secure_installation

 

 

root用户进行授权:

# mysql -uroot -p111111

mysql> GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '111111' WITH GRANT OPTION;

Query OK, 0 rows affected (0.00 sec)

 

mysql> FLUSH PRIVILEGES;

Query OK, 0 rows affected (0.00 sec)

 

 

远程连接,确保防火墙是关闭的。

 

 

 

8.5. 配置hive-site.xml文件

如何让创建的数据库,使用自己的mysql呢?

 

启动spark-sql的地方,称之为客户端。 hive-site.xml 只需要在客户端配置。

假定客户端使用hdp-02机器。

 

 

在sparkconf目录下,创建hive-site.xml文件:

<configuration>    <property>        <name>javax.jdo.option.ConnectionURL</name>        <value>jdbc:mysql://hdp-04:3306/hivedb?createDatabaseIfNotExist=true</value>        <description>JDBC connect string for a JDBC metastore</description>    </property>    <property>        <name>javax.jdo.option.ConnectionDriverName</name>        <value>com.mysql.jdbc.Driver</value>        <description>Driver class name for a JDBC metastore</description>    </property>    <property>        <name>javax.jdo.option.ConnectionUserName</name>        <value>root</value>        <description>username to use against metastore database</description>    </property>    <property>        <name>javax.jdo.option.ConnectionPassword</name>        <value>111111</value>        <description>password to use against metastore database</description>    </property></configuration>

 

 

直接重新启动sprak-sql 即可为我们创建好数据库。

需要导入msyql的驱动jar包。

 

 

 

spark-sql –jars mysql..jar

 

 

spark-sql启动成功之后,可以发现,在hdp-04的数据库中,创建好了29张表。

mysql元数据库创建好了。

 

 

但是,目前再在spark-sql 中创建数据库表,依然不能使用hdfs的存储路径。(也就是数据库表的数据依然存在本地磁盘中)

 

修改表的路径:

 

新的hdfs的路径: hdfs://hdp-01:9000/user/root/spark-warehouse

 

 

执行测试:

Spark-sql > create table test2(name varchar(20));

Spark-sql > insert into test2 values("xxxooo");

hdfs的指定目录下,有文件产生。

 

现在,就把hive -on -spark的配置完全搞定。

 

最终的一点,就是hive-site.xml, 放到spark conf 目录下。

 

 

1.1. IDEA编程

环境准备:

 

导入jar包:

<!--必须导入sparkhive的支持jar--><dependency>    <groupId>org.apache.spark</groupId>    <artifactId>spark-hive_2.11</artifactId>    <version>${spark.version}</version></dependency>

 

开启sparkhive的支持:

val spark: SparkSession = SparkSession.builder()  .master("local[*]")  .appName(this.getClass.getSimpleName)  // 必须开始sparkhive的支持  .enableHiveSupport()  .getOrCreate()

 

 

本地测试:

需要把hive-site.xml文件,添加到本地资源目录下。

 

基本的HQL 操作:

    // 伪装用户身份    System.setProperty("HADOOP_USER_NAME", "root")    val spark: SparkSession = SparkSession.builder()      .master("local[*]")      .appName(this.getClass.getSimpleName)      // 必须开始sparkhive的支持      .enableHiveSupport()      .getOrCreate()    import spark.implicits._    // 查询    spark.sql("select * from t_access_times")    //    .show()    // 创建表    // 求每个用户的每月总金额    //    spark.sql("select username,month,sum(salary) as salary from t_access_times group by username,month")    // 创建表    //    spark.sql("create table t_access1(username string,month string,salary int) row format delimited fields terminated by ','")    // 删除表    //    spark.sql("drop table t_access1")    // 插入数据    //        spark.sql("insert into  t_access1  select * from t_access_times")    //    .show()    // 覆盖写数据    //    spark.sql("insert overwrite table  t_access1  select * from t_access_times where username='A'")    // 覆盖load新数据    //    C,2015-01,10    //    C,2015-01,20    //    spark.sql("load data local inpath 't_access_time_log' overwrite into table t_access1")    // 清空数据//        spark.sql("truncate table t_access1")    //      .show()    // 写入自定义数据    val access: Dataset[String] = spark.createDataset(List("b,2015-01,10", "c,2015-02,20"))    val accessdf = access.map({      t =>        val lines = t.split(",")        (lines(0), lines(1), lines(2).toInt)    }).toDF("username", "month", "salary")    //    .show()    accessdf.createTempView("t_ac")//    spark.sql("insert into t_access1 select * from t_ac")    // insertIntoapi  入库     // database.table    accessdf.write.mode(SaveMode.Append).insertInto("t_access1")    //    new HiveContext(new SparkContext())    spark.stop()

 

 

 

8.6. TOpK

row_number over( partition by xxx )

rank()  全局的排序 支持并列排序

dense_rank()  选中的进行全局排序。

 

object TopK {  val topK = 2  def main(args: Array[String]): Unit = {    val session = SparkSession.builder()      .master("local[*]")      .appName(this.getClass.getSimpleName)      .getOrCreate()    import session.implicits._    val file = session.read.textFile("f:/mrdata/teacher.log")    val st: DataFrame = file.map({      t =>        val index = t.lastIndexOf("/")        // 截取字符串        val tName = t.substring(index + 1)        // 封装数据为URL,然后获取Host内容        val uri = new URL(t.substring(0, index))        val host = uri.getHost        val hostArray = host.split("[.]") // 要对特殊的字符进行转义      // 获取学科名称      val sub = hostArray(0)        // 返回元组        (sub, tName)    }).toDF("subject", "teacher")    st.createTempView("t_sub_teacher")    // 该学科下的老师的访问次数    val sql = session.sql("select subject,teacher,count(*) cnts from t_sub_teacher group By subject,teacher ") //order by cnts desc    //    sql    //      sql.show()    // 全局topK    //    sql.limit(3).show()    sql.createTempView("v_tmp")    // 求分组topK  分学科的老师 排序    //    val groupedTop = session.sql("select subject,teacher,cnts,row_number() over(partition by subject order by cnts desc) sub_order from v_tmp  order by cnts desc")    //    groupedTop.show()    // 分学科的排序 取topK    //    val groupedTop = session.sql(s"select * from (select subject,teacher,cnts,    // row_number() over(partition by subject order by cnts desc) sub_order    // from v_tmp order by cnts desc ) where sub_order <= $topK")    //  分学科的排序  + 全局排序    //    val groupedTop = session.sql("select subject,teacher,cnts,    // row_number() over(partition by subject order by cnts desc) sub_order,    // row_number() over(order by cnts desc) g_order    // from v_tmp  order by cnts desc")    // 分学科的TopK 排序 + 全局排序  rank 支持并列排序//    val groupedTop = session.sql("select * from (select subject,teacher,cnts," +//      "row_number() over(partition by subject order by cnts desc) sub_order," +//      "rank() over(order by cnts desc) g_order " +//      s"from v_tmp   ) where sub_order <=$topK")//    groupedTop.show()    // 全局排序,分学科topk排序,选中的全局排序(可使用row_number over()  rank() over ()  dense_rank()       val groupedTop = session.sql("select * ,dense_rank() over(order by cnts desc) choose_num from (select subject,teacher,cnts," +         "row_number() over(partition by subject order by cnts desc) sub_order," +         "rank() over(order by cnts desc) g_order " +         s"from v_tmp) where sub_order <=$topK")        groupedTop.show()    // 释放资源    session.close()  }}

 

 

转载请注明原文地址: https://www.6miu.com/read-2050038.html

最新回复(0)