【Spark】Spark 并行查询 Greenplum

xiaoxiao2021-03-01  57

本文结构如下:

前言Spark SQL 几个属性介绍Spark 并行查询总结

一、前言

Spark 支持通过 JDBC 连接关系型数据库,连接方式如下:

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods // Loading data from a JDBC source val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() val connectionProperties = new Properties() connectionProperties.put("user", "username") connectionProperties.put("password", "password") val jdbcDF2 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Specifying the custom data types of the read schema connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING") val jdbcDF3 = spark.read .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) // Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

Spark 通过 JDBC 读取关系型数据库,默认查全表,只有一个 Task 去执行查询操作,大量数据情况下,效率是很慢的。

这时,可以通过构造多个 Task 并行连接 Greenplum 提升效率。

二、Spark SQL 几个属性介绍

如何构造多个 Task 来提升效率呢?首先想到的应该是 Spark SQL 本身的支持。

查看官网 Spark SQL 资料,定位到 JDBC To Other Databases:

在属性列表中找到了解决方法:

Property NameMeaningurlThe JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g., jdbc:postgresql://localhost/test?user=fred&password=secretdbtableThe JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parenthesesdriverThe class name of the JDBC driver to use to connect to this URLpartitionColumn, lowerBound, upperBoundThese options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to readingnumPartitionsThe maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing

更加详细参数参考:JDBC To Other Databases

主要解释三个参数:

dbtable:表名,可以是真实存在的关系表,也可以是通过查询语句 AS 出来的表。其实只要是在 SQL 语句里,FROM 后面能跟的语句用在 dbtable 属性都合法,其原理就是拼接 SQL 语句,dbtable 会填在 FROM 后面。numPartitions:读、写的最大分区数,也决定了开启数据库连接的数目。使用 numPartitions 有一点点限制, 如果指定了 numPartitions 大于1的值,但是没有指定分区规则,仍只有一个 task 去执行查询。partitionColumn, lowerBound, upperBound:指定读数据时的分区规则。要使用这三个参数,必须定义 numPartitions,而且这三个参数不能单独出现,要用就必须全部指定。而且 lowerBound, upperBound 不是过滤条件,只是用于决定分区跨度。在分区的时候,会根据 numPartitions 将 lowerBound 和 upperBound 拆分成,然后并行去执行查询。

三、Spark 并行查询

在知悉了上述信息之后,就可以通过增加 Task 数量来提升访问关系型数据的效率,大致有以下两种方法:

3.1、第一种:numPartitions,partitionColumn, lowerBound

val spark = SparkSession .builder() .config("spark.sql.warehouse.dir", warehouseLocation) .appName("load data from gp test") .getOrCreate() // 开始时间 val startTime = System.currentTimeMillis() val gpRDF = spark.read .format("jdbc") .option("driver", "com.pivotal.jdbc.GreenplumDriver") .option("url", "jdbc:pivotal:greenplum://192.168.11.72:5432;DatabaseName=testdb") .option("partitionColumn", "person_id") .option("lowerBound", lowerBound) .option("upperBound", upperBound) .option("numPartitions", numPartitions) .option("dbtable", "public.t_timing_face_person") .option("user", "gpadmin") .option("password", "gpadmin") .load()

3.2、第二种:dbtable

不用 numPartitions,partitionColumn, lowerBound, upperBound,可以通过 dbtable 构造子查询,并行执行多个查询得到多个结果 RDD,最后通过 reduce 合并成一个 RDD。

val stride = Math.ceil(dataNums / numPartitions).toInt val spark = SparkSession .builder() .config("spark.sql.warehouse.dir", warehouseLocation) .appName("load data from gp") .getOrCreate() // 创建 numPartitions 个 task val registerDF = Range(0, numPartitions) .map(index => { spark .read .format("jdbc") .option("driver", "com.pivotal.jdbc.GreenplumDriver") .option("url", "jdbc:pivotal:greenplum://192.168.11.72:5432;DatabaseName=testdb") .option("dbtable", s"(SELECT feature FROM public.t_timing_face_person WHERE person_id > ${stride * index} AND person_id <= ${stride * (index + 1)}) AS t_tmp_${index}") .option("user", "gpadmin") .option("password", "gpadmin") .load() }) .reduce((rdd1, rdd2) => rdd1.union(rdd2))

四、总结

对于上述的两种方式:

第一种有时会造成数据分布不均匀,有些 task 数据量很大,有些 task 数据量几乎为 0,这是因为 Spark 是根据指定的分区列 partitionColumn 来进行分区,如果指定的 partitionColumn 不是连续的数(分布不均匀),那么每个 task 中的数据量就会分配不均匀;

第二种自定义 sql,相对可控,当然自定义也就意味着代码要稍微复杂。

转载请注明原文地址: https://www.6miu.com/read-4149982.html

最新回复(0)