Spark2.x 快速入门教程 5

xiaoxiao2021-02-28  95

Spark 处理多种数据源

一、实验介绍

1.1 实验内容

Spark SQL 通过 DataFrame 接口可以支持 Parquet、JSON、Hive 等数据源,将 DataFrame 注册为临时视图,可以允许你在数据上运行 SQL 查询语句,并且可以通过 JDBC 连接外部数据源。前面的介绍中,我们已经涉及到了Hive,这里不再赘述。本节讲解Parquet,JSON,及 JDBC 数据库连接等。

1.2 实验知识点

Parquet 数据源Json 数据源JDBC 数据源

1.4 实验环境

spark-2.1.0-bin-hadoop2.6

Xfce 终端

1.5 适合人群

本课程属于初级难度级别,适合具有 spark 基础的用户,如果对 Spark SQL 了解能够更好的上手本课程。

二、实验步骤

2.1 准备工作

因为用的本地的环境,可以不用启动任何进程也可以完成本节实验。

su hadoop #密码 hadoop cd /opt/spark-2.1.0-bin-hadoop2.6

2.2 Parquet

Parquet 是语言无关的,面向分析型业务的列式存储格式,高效地存储具有嵌套字段的记录,Spark SQL 提供了可以自动保存原始数据模式的对 Parquet 文件读取和写入的操作,由 Twitter 和 Cloudera 合作开发。

我们可以在 Spark 的安装包里找到样例数据 users.parquet,无需自己创建:

ls examples/src/main/resources/users.parquet

您会发现这个文件比较特殊,无论你用 cat 命令,还是用 vi 编辑器查看内容都是一堆乱码,只有加载到程序,Spark 才会解析。

在Spark 安装包目录下输入 bin/spark-shell 打开 scala 会话。

bin/spark-shell

import spark.implicits._ #导入隐式转换 val parquetDF = spark.read.parquet("/opt/spark-2.1.0-bin-hadoop2.6/examples/src/main/resources/users.parquet") #加载数据 parquetDF.createOrReplaceTempView("pFile") #注册视图 val total = spark.sql("SELECT * FROM pFile")# sql扫描表 total.foreach(l =>println("Name: " + l(0)+" color: "+l(1)))#输出

不用关闭 spark-shell 终端,下节继续使用,若想关闭,请用 :quit命令退出即可。

2.3 JSON

Spark SQL可以自动推断 JSON 文件的元数据,并且加载其数据,创建一个 DataFrame。但是要注意的是,这里使用的 JSON 文件与传统意义上的 JSON 文件是不一样的。每行都必须也只能包含一个单独的有效的 JSON 对象,不能让一个 JSON 对象分散在多行,否则会报错。

我们可以在 Spark 的安装包里找到样例数据 people.json,无需自己创建:

more examples/src/main/resources/people.json

#导入隐式转换 scala> import spark.implicits._ #加载数据 scala> val peopleDF = spark.read.json("/opt/spark-2.1.0-bin-hadoop2.6/examples/src/main/resources/people.json") #保存为Parquet格式 scala> peopleDF.write.parquet("people.parquet") #加载 scala> val parquetDF = spark.read.parquet("people.parquet") #注册视图 scala> parquetDF.createOrReplaceTempView("parquetfile") scala> val nameandage = spark.sql("select * from parquetfile where age > 19") scala> nameandage.map(l=>("age:"+l(0)+" name:"+l(1))).show()

用 :quit 退出 spark-shell 终端。

2.4 JDBC连接数据库

1). 启动并配置 MySQL

实验楼环境默认已经安装 mysql ,启动即可,在 Hive on Spark 这节已经启动并设置密码,若已经完成请跳过此小节。

sudo service mysql start mysqladmin -uroot password "123456" mysql -uroot -p123456 exit

2). 创建数据库和表,插入数据

mysql -uroot -p123456 #创建 jdbc_spark 数据库 mysql> create database jdbc_spark; mysql> use jdbc_spark; #创建 depart 表 mysql> create table depart (id INT(4), name VARCHAR(20), gender VARCHAR(4)); mysql> insert into depart values(1,'Kitty','F'); mysql> insert into depart values(2,'John','M'; mysql> insert into depart values(3,'marry','F'); mysql> insert into depart values(4,'Jim','M');

查看数据是否存在:

show databases; use jdbc_spark; select * from depart;

不用关闭 MySQL 终端,稍后使用。

3). MySQL驱动

另外开启Xfce 终端

在 /opt 目录下下载 mysql-connector 并解压。

sudo wget http://labfile.oss.com/courses/809/mysql-connector-java-5.0.8.tar.gz sudo tar -zxvf mysql-connector-java-5.0.8.tar.gz sudo cp /opt/mysql-connector-java-5.0.8/mysql-connector-java-5.0.8-bin.jar .

4). 通过 JDBC 连接数据库

我们要启动一个 spark-shell,要附加一些参数,必须指定 mysql 连接驱动 jar 包:

#在一行的末尾加入斜杠\,是为了告诉spark-shell,命令还没有结束,全部复制即可 bin/spark-shell \ --driver-class-path /opt/mysql-connector-java-5.0.8-bin.jar \ --jars /opt/mysql-connector-java-5.0.8-bin.jar

启动进入spark-shell以后,可以执行以下命令连接数据库,读取数据,并显示:

scala> import org.apache.spark.sql.SQLContext scala> val sqlContext = new SQLContext(sc) scala> val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://localhost:3306/jdbc_spark", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "depart", "user" -> "root", "password" -> "123456")).load() scala> jdbcDF.show()

接下来编写程序向 MySQL 中写入数据:

scala> import java.util.Properties scala> import org.apache.spark.sql.{SQLContext, Row} scala> import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType} scala> val sqlContext = new SQLContext(sc) #设置两条数据表示两个部门信息 scala> val departRDD = sc.parallelize(Array("5 Hot M ","6 Smath M ")).map(_.split(" ")) #设置模式信息 scala> val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true))) #创建Row对象,每个Row对象都是rowRDD中的一行 scala> val rowRDD = departRDD.map(l => Row(l(0).toInt, l(1).trim, l(2).trim)) #建立起Row对象和模式之间的对应关系,即把数据和模式对应起来 scala> val departDataFrame = sqlContext.createDataFrame(rowRDD, schema)

#创建一个 properties 变量用来保存 JDBC 连接参数 scala> val properties = new Properties() #表示用户名是root scala> properties.put("user", "root") #表示密码是123456 scala> properties.put("password", "123456") #表示驱动程序是com.mysql.jdbc.Driver scala> properties.put("driver","com.mysql.jdbc.Driver") #采用append模式连接数据库,追加记录到数据库jdbc_spark的department表中 scala> departDataFrame.write.mode("append").jdbc("jdbc:mysql://localhost:3306/jdbc_spark", "jdbc_spark.depart", properties)

在 spark-shell 中执行完上述程序后,在刚才的 MySQL 终端命令提示符下面继续输入下面命令,查看 MySQL 数据库中的 jdbc_spark 表成功的插入了数据。

至此,本节实验完毕。

三、实验总结

Spark SQL可以支持 Parquet、JSON、Hive 等数据源,并且可以通过 JDBC 连接外部数据源。学完本节课,希望能帮助你学会如何处理 Spark 的多种数据源,很快入门。

四、参考阅读

http://spark.apache.org/docs/latest/sql-programming-guide.htmlhttp://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package
转载请注明原文地址: https://www.6miu.com/read-64348.html

最新回复(0)