spark sql 读取指定json字段

xiaoxiao2021-02-27  330

spark sql读取json的问题

Spark sql虽然支持了json作为数据源,但由于json是松散的数据结构,而sql需要确定的数据结构,所以spark sql在读取json的时候会将整个json完整遍历得到一个最大的schema,这在数据量很小的时候貌似没啥问题,可一旦数据量过大,那么在选择一些limit的时候会失效,所以我们需要在用spark sql读取json的时候提供一份schema配置,如果有了这个,那么在读取json时候spark会按照你提供的scheam进行读取,如果json的path中没有那个属性,则会给空值,所以在读取大数据量的json时候这一点还是很有必要的

自定义配置dsl

( props.name as realName, event.info.eventId, props.age, event.own.telnum as tel ) 123456 123456

我们会选择将json中需要抽取的属性列举出来,然后解析这份配置得到一个StructType,然后把这作为json的schema传入进去

解析代码

package com.ximalaya.spark.sql.hfile.json import java.util.regex.Pattern import org.apache.spark.sql.types.{StringType, StructField, StructType} import scala.collection.mutable.ArrayBuffer /** * @author todd.chen at 7/28/16 8:14 AM. * email : todd.chen@ximalaya.com */ object JsonSchemaBuilder { final val columnSplitPattern = Pattern.compile("\\s*,\\s*") private final val fieldSplitPattern = Pattern.compile("\\.") private final val fieldPattern = Pattern.compile("([\\w\\.]+)(?:\\s+as\\s+\\w+)?") def getJsonSchema(schema: String): StructType = { getSchemaByFieldsList(columnSplitPattern.split(schema).map(getFieldList).toList) } private def getFieldList(singleField: String): List[String] = { val fieldMatch = fieldPattern.matcher(singleField) if (fieldMatch.matches()) { val fieldSource = fieldMatch.group(1) val fieldArray = fieldSplitPattern.split(fieldSource) fieldArray.toList } else { throw new IllegalArgumentException(s"field format error:$singleField ,we need parent.children(as aliasName)") } } private def getSchemaByFieldsList(fieldsList: List[List[String]]): StructType = { fieldsList.map(getStrcutType).reduce(mergeStructType) } private def getStrcutType(fields: List[String]): StructType = { fields match { case head :: Nil ⇒ StructType(StructField(head, StringType, DEFAULT_NULLABLE) :: Nil) case head :: tail ⇒ StructType(StructField(head, getStrcutType(tail), DEFAULT_NULLABLE) :: Nil) } } private def mergeStructType(left: StructType, right: StructType): StructType = { val newFields = ArrayBuffer.empty[StructField] val leftFields = left.fields val rightFields = right.fields val rightMapped = fieldsMap(rightFields) leftFields.foreach { case leftField@StructField(leftName, leftType, leftNullable, _) => rightMapped.get(leftName) .map { case rightField@StructField(_, rightType, rightNullable, _) => leftField.copy( dataType = mergeStructType(leftType.asInstanceOf[StructType], rightType.asInstanceOf[StructType]), nullable = leftNullable || rightNullable) } .orElse(Some(leftField)) .foreach(newFields += _) } val leftMapped = fieldsMap(leftFields) rightFields .filterNot(f => leftMapped.get(f.name).nonEmpty) .foreach(newFields += _) StructType(newFields) } private def fieldsMap(fields: Array[StructField]): Map[String, StructField] = { import scala.collection.breakOut fields.map(s ⇒ (s.name, s))(breakOut) } } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475

其中mergeStructType的代码使用的是StructType中被private的代码,而且去掉了不需要的分支

测试用例

val schema ="props.name as realName,\nevent.info.eventId,\nprops.age,\nevent.own.telnum as tel" JsonSchemaBuilder.getJsonSchema(schema).printTreeString() 输出: root |-- props: struct (nullable = true) | |-- name: string (nullable = true) | |-- age: string (nullable = true) |-- event: struct (nullable = true) | |-- info: struct (nullable = true) | | |-- eventId: string (nullable = true) | |-- own: struct (nullable = true) | | |-- telnum: string (nullable = true) 1234567891011121314 1234567891011121314

代码中的使用

sqlContext.read.schema(JsonSchemaBuilder.getJsonSchema(schemaString)).load(path).selectExpr(JsonSchemaBuilder.columnSplitPattern.split(schemaString):_*) 1 1

这样我们发现再也不会遍历整个json了

转载:http://blog.csdn.net/cjuexuan/article/details/52078223

转载请注明原文地址: https://www.6miu.com/read-6154.html

最新回复(0)