parquet 是面向分析型业务的列示存储格式. 列式存储比行式存储有哪些优势呢 1.可以跳过不符合条件的数据,只读取需要的数据,降低IO的数量。 2.压缩编码格式可以降低磁盘空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码进一步节省孔家 3.只要读取需要的列,支持向量运算能够获取更好的扫描性能。
查询用户数据中的用户姓名 下面给出java 示例
public class ParquetLoadData { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("ParquetLoadData"); JavaSparkContext sc=new JavaSparkContext(conf); SQLContext sqlContext=new SQLContext(sc); DataFrameReader reader=sqlContext.read(); Dataset ds= reader.json("hdfs://hadoop:8020/data/users.parquet"); ds.show(); ds.registerTempTable("users"); Dataset userName=sqlContext.sql("select name from users"); //对查询出来的dataSet 进行操作,处理打印 List<String> userNameRDD=userName.javaRDD().map(new Function<Row,String>() { public String call(Row o) throws Exception { return "name:"+o.getString(0); } }).collect(); for(String usernmae:userNameRDD){ System.out.println(usernmae); } } }下面给出scala 示例
object ParquetLoadData { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ParquetLoadData") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val reader = sqlContext.read val ds = reader.json("hdfs://hadoop:8020/data/users.parquet") ds.show() ds.registerTempTable("users") val userName = sqlContext.sql("select name from users") //对查询出来的dataSet 进行操作,处理打印 val userNameRDD = userName.javaRDD.map(line=>"name:"+line.getString(0)).collect import scala.collection.JavaConversions._ for (usernmae <- userNameRDD) { println(usernmae) } } }表分区是一种非常常见的优化方式,比如hive中就提供了表分区的特性,在一个分区表中,不同分区的数据通常存在不同的目录,分区列的值通常包含在分区目录名中。sparkSql中的parquet 支持自动更具目录名推断出分区信息。 如果将tableName 传入到SqlContext.read.parquet 或者SqlContext.read.load() 那么spark 会根据目录结构自动推断出分区信息。 此外 分区的数据类型也是自动被推断出来的。目前Spark Sql 仅支持自动推断出数字类型和字符类型,有时用户也许不希望SparkSql 自动判断分区列的数据类型,此时只需要设置一个配置即可,spark.sql.sources.partitionColumnTypeInference.enabled.默认为true,即自动推断分区类型,设置为false,不会推断。
下面给出java 示例:
public class ParquetDiscovery { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("ParquetDiscovery"); JavaSparkContext sc=new JavaSparkContext(conf); SQLContext sqlContext=new SQLContext(sc); DataFrameReader reader=sqlContext.read(); Dataset ds= reader.json("hdfs://hadoop:8020/data/gender=male/country=us/users.parquet"); ds.printSchema(); ds.show(); sc.close(); } }下面给出scala 示例:
object ParquetDiscovery { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ParquetDiscovery") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val reader = sqlContext.read val ds = reader.json("hdfs://hadoop:8020/data/gender=male/country=us/users.parquet") ds.printSchema() ds.show() } }parquet 支持合并元数据,用户可以在一开始的时候就定于一个简单的元数据,然后随着业务的发展逐渐的往元数据中添加更多的列,这种情况下用户,可能会创建多个partition 文件,有着不同但却互相兼容的元数据.parquet自动判断这种情况,并且进行多个文件的元数据合并。 因为合并元数据是一个比较耗时的操作大多情况下不太需要,所以默认是关闭的 可以通过如下的方式打开。 1.读取Parquet 将数据源选项,mergeSchema 设置为true. 2.使用SQlContext.setconf()将spark.sql.parquet.mergeSchema参数设置为true 下面给出scala 示例:
object ParquetMergeScaema { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ParquetMergeScaema") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val reader = sqlContext.read //创建一个dataSet 作为学生信息 import sqlContext.implicits._ val studentWithNameAndAge=Array(("leo",20),("jack","25")).toSeq val studentWithNameAndAgeDF=sc.parallelize(studentWithNameAndAge,2).toDF("name","age"); studentWithNameAndAgeDF.write.save("hdfs://hadoop:8020/data/ParquetMergeScaemaTest1"); //再创建一个 val studentWithNameAndId=Array(("leo",20),("jack","25")).toSeq val studentWithNameAndIdDF=sc.parallelize(studentWithNameAndId,2).toDF("name","Id"); studentWithNameAndIdDF.write.save("hdfs://hadoop:8020/data/ParquetMergeScaemaTest2.json"); //合并两个dataset val student=sqlContext.read.option("mergeSchema","true").parquet("hdfs://hadoop:8020/data/ParquetMergeScaemaResult") student.printSchema() student.show() } }