###sparksql 1. sql:访问方式:python;command-line;JDBC/ODBC; 2. Datasets and DataFrames:datasets只支持java,scala(2.2.0);Dataframe:structured data files, tables in Hive, external databases, or existing RDDs. 3. SparkSession:可以轻松的执行hiveSQL,hiveUDFs,不需要启动hive;在spark2.0出现; ``` from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() ``` 4.createDataFrame:create DataFrames from an existing RDD, from a Hive table, or from Spark data sources. # spark is an existing SparkSession df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() 5.Untyped Dataset Operations (aka DataFrame Operations) df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ # Select everybody, but increment the age by 1 df.select(df['name'], df['age'] + 1).show() # +-------+---------+ # | name|(age + 1)| # +-------+---------+ # |Michael| null| # | Andy| 31| # | Justin| 20| # +-------+---------+ # Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+ # Count people by age df.groupBy("age").count().show() 更多操作: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame 6. Running SQL Queries Programmatically df.createOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() #Global Temporary View # Register the DataFrame as a global temporary view df.createGlobalTempView("people") # Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ # Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() 7. Spark SQL supports two different methods for converting existing RDDs into Datasets. #first method:Inferring the Schema Using Reflection from pyspark.sql import Row sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are Dataframe objects. # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`. teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames: print(name) # Name: Justin #second method:Programmatically Specifying the Schema # Import data types from pyspark.sql.types import * sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) # Each line is converted to a tuple. people = parts.map(lambda p: (p[0], p[1].strip())) # The schema is encoded in a string. schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaPeople = spark.createDataFrame(people, schema) # Creates a temporary view using the DataFrame schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. results = spark.sql("SELECT name FROM people") results.show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ 8. Data Sources #Generic Load/Save Functions df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") #Manually Specifying Options(json, parquet, jdbc, orc, libsvm, csv, tex) df = spark.read.load("examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") #Run SQL on files directly df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") 9.Save Modes // 创建DataFrame 读取json SQLContext sqlContext = new SQLContext(sc); DataFrame dataFrame = sqlContext.read().format("json").load("resources/people.json"); dataFrame.write().mode(SaveMode.ErrorIfExists).save("people2.json"); // 报错退出 dataFrame.write().mode(SaveMode.Append).save("people2.json"); // 追加 dataFrame.write().mode(SaveMode.Ignore).save("people2.json"); // 忽略错误 dataFrame.write().mode(SaveMode.Overwrite).save("people2.json");// 覆盖 10.Saving to Persistent Tables df.write.option("path", "/some/path").saveAsTable("t") #如果人工指定path那么table drop path不会drop,default path才会drop; 从2.1开始sparksession提供了分区命令,ALTER TABLE PARTITION ... SET LOCATION 11. Bucketing, Sorting and Partitioning #分桶 df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") #分区 df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") #即分区又分桶 df = spark.read.parquet("examples/src/main/resources/users.parquet") (df .write .partitionBy("favorite_color") .bucketBy(42, "name") .saveAsTable("people_partitioned_bucketed")) 12. Parquet Files(列式存储,许多系统支持) #Loading Data Programmatically peopleDF = spark.read.json("examples/src/main/resources/people.json") # DataFrames can be saved as Parquet files, maintaining the schema information. peopleDF.write.parquet("people.parquet") # Read in the Parquet file created above. # Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. parquetFile = spark.read.parquet("people.parquet") # Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile") teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.show() 13. Partition Discovery(数据类型和分区是自动推断的) 1. spark.sql.sources.partitionColumnTypeInference.enabled=false停止自动推断 2. 从1.6版本后可以通过指定basepath发现给定路径下的分区,然后加载SparkSession.read.parquet or SparkSession.read.load 14. Schema Merging(不建议打开代价太大,默认关闭,) #打开方式 1. setting data source option mergeSchema to true when reading Parquet files (as shown in the examples below), or 2. setting the global SQL option spark.sql.parquet.mergeSchema to true. 15. Hive metastore Parquet table conversion 16. JSON Datasets(json file的每一行必须是一个json对象) # spark is from the previous example. sc = spark.sparkContext # A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files path = "examples/src/main/resources/people.json" peopleDF = spark.read.json(path) # The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") # SQL statements can be run by using the sql methods provided by spark teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() # +------+ # | name| # +------+ # |Justin| # +------+ # Alternatively, a DataFrame can be created for a JSON dataset represented by # an RDD[String] storing one JSON object per string jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'] otherPeopleRDD = sc.parallelize(jsonStrings) otherPeople = spark.read.json(otherPeopleRDD) otherPeople.show() 17. Hive Tables #在hive-site.xml中配置spark支持 from os.path import expanduser, join, abspath from pyspark.sql import SparkSession from pyspark.sql import Row # warehouse_location points to the default location for managed databases and tables warehouse_location = abspath('spark-warehouse') spark = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .config("spark.sql.warehouse.dir", warehouse_location) \ .enableHiveSupport() \ .getOrCreate() # spark is an existing SparkSession spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show() # +---+-------+ # |key| value| # +---+-------+ # |238|val_238| # | 86| val_86| # |311|val_311| # ... # Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show() # +--------+ # |count(1)| # +--------+ # | 500 | # +--------+ # The results of SQL queries are themselves DataFrames and support all normal functions. sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") # The items in DataFrames are of type Row, which allows you to access each column by ordinal. stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value)) for record in stringsDS.collect(): print(record) # Key: 0, Value: val_0 # Key: 0, Value: val_0 # Key: 0, Value: val_0 # ... # You can also use DataFrames to create temporary views within a SparkSession. Record = Row("key", "value") recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)]) recordsDF.createOrReplaceTempView("records") # Queries can then join DataFrame data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() 18. Specifying storage format for Hive tables 19. JDBC To Other Databases # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods # Loading data from a JDBC source jdbcDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .load() jdbcDF2 = spark.read \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Saving data to a JDBC source jdbcDF.write \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .save() jdbcDF2.write \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying create table column data types on write jdbcDF.write \ .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) 20. Compatibility with Apache Hive(对hive哪些操作支持,哪些不支持)
转载请注明原文地址: https://www.6miu.com/read-25044.html