依赖
<properties>
<maven.compiler.source>1.7
</maven.compiler.source>
<maven.compiler.target>1.7
</maven.compiler.target>
<encoding>UTF-8
</encoding>
<scala.version>2.10.6
</scala.version>
<scala.compat.version>2.10
</scala.compat.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang
</groupId>
<artifactId>scala-library
</artifactId>
<version>${scala.version}
</version>
</dependency>
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-core_2.10
</artifactId>
<version>1.5.2
</version>
</dependency>
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-streaming_2.10
</artifactId>
<version>1.5.2
</version>
</dependency>
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-sql_2.10
</artifactId>
<version>1.5.2
</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop
</groupId>
<artifactId>hadoop-client
</artifactId>
<version>2.6.2
</version>
</dependency>
<dependency>
<groupId>mysql
</groupId>
<artifactId>mysql-connector-java
</artifactId>
<version>5.1.38
</version>
</dependency>
</dependencies>
1 使用RDD和Case Class关联进行表操作(推断模式)
package cn.itcast.spark.sql
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
object InferringSchema {
def main(args: Array[String]) {
val conf =
new SparkConf().setAppName(
"SQL-1")
val sc =
new SparkContext(conf)
val sqlContext =
new SQLContext(sc)
val lineRDD = sc.textFile(args(
0)).map(_.split(
" "))
val personRDD = lineRDD.map(x => Person(x(
0).toInt, x(
1), x(
2).toInt))
import sqlContext.implicits._
val personDF = personRDD.toDF
personDF.registerTempTable(
"t_person")
val df = sqlContext.sql(
"select * from t_person order by age desc limit 2")
df.write.json(args(
1))
sc.stop()
}
}
case class Person(id: Int, name: String, age: Int)
2 RDD映射到rowRDD并指定schema(指定模式)
package cn.itcast.spark.sql
import org.apache.spark.sql.{SaveMode, Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}
object SpecifyingSchema {
def main(args: Array[String]) {
val conf =
new SparkConf().setAppName(
"SQL-2")
val sc =
new SparkContext(conf)
val sqlContext =
new SQLContext(sc)
val personRDD = sc.textFile(args(
0)).map(_.split(
" "))
val schema = StructType(
List(
StructField(
"id", IntegerType,
true),
StructField(
"name", StringType,
true),
StructField(
"age", IntegerType,
true)
)
)
val rowRDD = personRDD.map(p => Row(p(
0).toInt, p(
1).trim, p(
2).toInt))
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
personDataFrame.registerTempTable(
"t_person")
val df = sqlContext.sql(
"select * from t_person order by age desc limit 4")
df.write.json(args(
1))
sc.stop()
}
}
3 DataFrame的JDBC操作
package cn.itcast.spark.sql
import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
object JdbcRDD {
def main(args: Array[String]) {
val conf =
new SparkConf().setAppName(
"MySQL-Demo")
val sc =
new SparkContext(conf)
val sqlContext =
new SQLContext(sc)
val personRDD = sc.parallelize(Array(
"1 tom 5",
"2 jerry 3",
"3 kitty 6")).map(_.split(
" "))
val schema = StructType(
List(
StructField(
"id", IntegerType,
true),
StructField(
"name", StringType,
true),
StructField(
"age", IntegerType,
true)
)
)
val rowRDD = personRDD.map(p => Row(p(
0).toInt, p(
1).trim, p(
2).toInt))
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
val prop =
new Properties()
prop.put(
"user",
"root")
prop.put(
"password",
"123456")
personDataFrame.write.mode(
"append").jdbc(
"jdbc:mysql://localhost:3306/bigdata",
"bigdata.person", prop)
sc.stop()
}
}