使用scala开发SparkSql程序

xiaoxiao2021-02-28  45

依赖

<properties> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.10.6</scala.version> <scala.compat.version>2.10</scala.compat.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.5.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> </dependencies>

1 使用RDD和Case Class关联进行表操作(推断模式)

package cn.itcast.spark.sql import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext object InferringSchema { def main(args: Array[String]) { //创建SparkConf()并设置App名称 val conf = new SparkConf().setAppName("SQL-1") //SQLContext要依赖SparkContext val sc = new SparkContext(conf) //创建SQLContext val sqlContext = new SQLContext(sc) //从指定的地址创建RDD val lineRDD = sc.textFile(args(0)).map(_.split(" ")) //创建case class //将RDD和case class关联 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //导入隐式转换,如果不到人无法将RDD转换成DataFrame //将RDD转换成DataFrame import sqlContext.implicits._ val personDF = personRDD.toDF //注册表 personDF.registerTempTable("t_person") //传入SQL val df = sqlContext.sql("select * from t_person order by age desc limit 2") //将结果以JSON的方式存储到指定位置 df.write.json(args(1)) //停止Spark Context sc.stop() } } //case class一定要放到外面 case class Person(id: Int, name: String, age: Int)

2 RDD映射到rowRDD并指定schema(指定模式)

package cn.itcast.spark.sql import org.apache.spark.sql.{SaveMode, Row, SQLContext} import org.apache.spark.sql.types._ import org.apache.spark.{SparkContext, SparkConf} object SpecifyingSchema { def main(args: Array[String]) { //创建SparkConf()并设置App名称 val conf = new SparkConf().setAppName("SQL-2") //SQLContext要依赖SparkContext val sc = new SparkContext(conf) //创建SQLContext val sqlContext = new SQLContext(sc) //从指定的地址创建RDD val personRDD = sc.textFile(args(0)).map(_.split(" ")) //通过StructType直接指定每个字段的schema val schema = StructType( List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) ) ) //将RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt)) //将schema信息应用到rowRDD上 val personDataFrame = sqlContext.createDataFrame(rowRDD, schema) //注册表 personDataFrame.registerTempTable("t_person") //执行SQL val df = sqlContext.sql("select * from t_person order by age desc limit 4") //将结果以JSON的方式存储到指定位置 df.write.json(args(1)) //停止Spark Context sc.stop() } }

3 DataFrame的JDBC操作

package cn.itcast.spark.sql import java.util.Properties import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext} object JdbcRDD { def main(args: Array[String]) { val conf = new SparkConf().setAppName("MySQL-Demo") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //通过并行化创建RDD val personRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3", "3 kitty 6")).map(_.split(" ")) //通过StructType直接指定每个字段的schema val schema = StructType( List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) ) ) //将RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt)) //将schema信息应用到rowRDD上 val personDataFrame = sqlContext.createDataFrame(rowRDD, schema) //创建Properties存储数据库相关属性 val prop = new Properties() prop.put("user", "root") prop.put("password", "123456") //将数据追加到数据库 personDataFrame.write.mode("append").jdbc("jdbc:mysql://localhost:3306/bigdata", "bigdata.person", prop) //停止SparkContext sc.stop() } }
转载请注明原文地址: https://www.6miu.com/read-2628573.html

最新回复(0)