Spark 之 Spark SQL源码函数解读及UDFUDAF例子 spark研习第六集

xiaoxiao2021-02-28  70

转自:http://lib.csdn.net/article/spark/58325

1. Spark SQL内置函数解密与实战

SparkSQL的DataFrame引入了大量的内置函数,这些内置函数一般都有CG(CodeGeneration)功能,这样的函数在编译和执行时都会经过高度优化。

问题:SparkSQL操作Hive和Hive on Spark一样吗?

=> 不一样。SparkSQL操作Hive只是把Hive当作数据仓库的来源,而计算引擎就是SparkSQL本身。Hive on spark是Hive的子项目,Hive on Spark的核心是把Hive的执行引擎换成Spark。众所周知,目前Hive的计算引擎是Mapreduce,因为性能低下等问题,所以Hive的官方就想替换这个引擎。

SparkSQL操作Hive上的数据叫Spark on Hive,而Hive on Spark依旧是以Hive为核心,只是把计算引擎由MapReduce替换为Spark。

Spark官网上DataFrame 的API Docs:  http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package

Experimental  A distributed collection of data organized into named columns.  A DataFrame is equivalent to a relational table in Spark SQL. The following example creates a DataFrame by pointing Spark SQL to a Parquet data set.

val people = sqlContext.read.parquet("...") // in Scala 1DataFrame people = sqlContext.read().parquet("...") // in Java 1 1Once created, it can be manipulated using the various domain-specific-language (DSL) functions defined in: DataFrame (this class), Column, and functions. 1To select a column from the data frame, use apply method in Scala and col in Java. 1 1val ageCol = people("age") // in Scala 1Column ageCol = people.col("age") // in Java 1 1Note that the Column type can also be manipulated through its various functions. 1 1// The following creates a new column that increases everybody's age by 10. 1people("age") + 10 // in Scala 1people.col("age").plus(10); // in Java 1 1A more concrete example in Scala: 1 1// To create DataFrame using SQLContextval people = sqlContext.read.parquet("...")val department = sqlContext.read.parquet("...") 1 1people.filter("age > 30") 1 .join(department, people("deptId") === department("id")) 1 .groupBy(department("name"), "gender") 1 .agg(avg(people("salary")), max(people("age"))) 1and in Java: 1// To create DataFrame using SQLContext 1DataFrame people = sqlContext.read().parquet("..."); 1DataFrame department = sqlContext.read().parquet("..."); 1people.filter("age".gt(30)) 1 .join(department, people.col("deptId").equalTo(department("id"))) 1 .groupBy(department.col("name"), "gender") 1 .agg(avg(people.col("salary")), max(people.col("age"))); 1

以上内容中的join,groupBy,agg都是SparkSQL的内置函数。  SParkl1.5.x以后推出了很多内置函数,据不完全统计,有一百多个内置函数。  下面实战开发一个聚合操作的例子:

package com.dt.spark 1 1import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} 1import org.apache.spark.{SparkConf, SparkContext} 1import org.apache.spark.sql.{Row, SQLContext} 1import org.apache.spark.sql.functions._ 1/** 1 * 使用Spark SQL中的内置函数对数据进行分析,Spark SQL API不同的是,DataFrame中的内置函数操作的结果是返回一个Column对象,而 1 * DataFrame天生就是"A distributed collection of data organized into named columns.",这就为数据的复杂分析建立了坚实的基础 1 * 并提供了极大的方便性,例如说,我们在操作DataFrame的方法中可以随时调用内置函数进行业务需要的处理,这之于我们构建附件的业务逻辑而言是可以 1 * 极大的减少不必须的时间消耗(基于上就是实际模型的映射),让我们聚焦在数据分析上,这对于提高工程师的生产力而言是非常有价值的 1 * Spark 1.5.x开始提供了大量的内置函数,例如agg: 1 * def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = { 1 * groupBy().agg(aggExpr, aggExprs : _*) 1 *} 1 * 还有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acros、asin、atan 1 * 总体上而言内置函数包含了五大基本类型: 1 * 1,聚合函数,例如countDistinct、sumDistinct等; 1 * 2,集合函数,例如sort_array、explode等 1 * 3,日期、时间函数,例如hour、quarter、next_day 1 * 4, 数学函数,例如asin、atan、sqrt、tan、round等; 1 * 5,开窗函数,例如rowNumber等 1 * 6,字符串函数,concat、format_number、rexexp_extract 1 * 7, 其它函数,isNaN、sha、randn、callUDF 1 */ 1object SparkSQLAgg { 1 def main(args: Array[String]) { 1 System.setProperty("hadoop.home.dir", "G:/datarguru spark/tool/hadoop-2.6.0") 1 val conf = new SparkConf() 1 conf.setAppName("SparkSQLlinnerFunctions") 1 //conf.setMaster("spark://master:7077") 1 conf.setMaster("local") 1 val sc = new SparkContext(conf) 1 val sqlContext = new SQLContext(sc) //构建SQL上下文 1 1 //要使用Spark SQL的内置函数,就一定要导入SQLContext下的隐式转换 1 import sqlContext.implicits._ 1 1 //模拟电商访问的数据,实际情况会比模拟数据复杂很多,最后生成RDD 1 val userData = Array( 1 "2016-3-27,001,http://spark.apache.org/,1000", 1 "2016-3-27,001,http://Hadoop.apache.org/,1001", 1 "2016-3-27,002,http://fink.apache.org/,1002", 1 "2016-3-28,003,http://kafka.apache.org/,1020", 1 "2016-3-28,004,http://spark.apache.org/,1010", 1 "2016-3-28,002,http://hive.apache.org/,1200", 1 "2016-3-28,001,http://parquet.apache.org/,1500", 1 "2016-3-28,001,http://spark.apache.org/,1800" 1 ) 1 1 val userDataRDD = sc.parallelize(userData)//生成分布式集群对象 1 1 //根据业务需要对数据进行预处理生成DataFrame,要想把RDD转换成DataFrame,需要先把RDD中的元素类型变成Row类型 1 //于此同时要提供DataFrame中的Columns的元数据信息描述 1 val userDataRDDRow = userDataRDD.map(row => {val splited = row.split(","); Row(splited(0),splited(1).toInt,splited(2), splited(3).toInt)}) 1 val structType = StructType(Array( 1 StructField("time", StringType, true), 1 StructField("id", IntegerType, true), 1 StructField("url", StringType, true), 1 StructField("amount", IntegerType, true) 1 )) 1 val userDataDF = sqlContext.createDataFrame(userDataRDDRow, structType) 1 1 //第五步:使用Spark SQL提供的内置函数对DataFrame进行操作,特别注意:内置函数生成的Column对象且自定进行CG; 1 userDataDF.groupBy("time").agg('time, countDistinct('id)) 1 .map(row => Row(row(1),row(2))).collect().foreach(println) 1 userDataDF.groupBy("time").agg('time, sum('amount)) 1 .map(row => Row(row(1),row(2))).collect().foreach(println) 1 } 1} 1

2. Spark SQL窗口函数解密与实战

窗口函数包括:  分级函数、分析函数、聚合函数  较全的窗口函数介绍参考:  https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-windows.html

窗口函数中最重要的是row_number。row_bumber是对分组进行排序,所谓分组排序就是说在分组的基础上再进行排序。  下面使用SparkSQL的方式重新编写TopNGroup.scala程序并执行:

package com.dt.spark 1 1import org.apache.spark.sql.hive.HiveContext 1import org.apache.spark.{SparkConf, SparkContext} 1 1object SparkSQLWindowFunctionOps { 1 def main(args: Array[String]) { 1 val conf = new SparkConf() 1 conf.setMaster("spark://master:7077") 1 conf.setAppName("SparkSQLWindowFunctionOps") 1 val sc = new SparkContext(conf) 1 val hiveContext = new HiveContext(sc) 1 hiveContext.sql("DROP TABLE IF EXISTS scores") 1 hiveContext.sql("CREATE TABLE IF NOT EXISTS scores(name STRING,score INT)" 1 +"ROW FORMAT DELIMITED FIELDS TERMINATED ' ' LINES TERMINATED BY '\\n'") 1 1 //将要处理的数据导入到Hive表中 1 hiveContext.sql("LOAD DATA LOCAL INPATH 'G://datarguru spark/tool/topNGroup.txt' INTO TABLE SCORES") 1 //hiveContext.sql("LOAD DATA LOCAL INPATH '/opt/spark-1.4.0-bin-hadoop2.6/dataSource' INTO TABLE SCORES") 1 1 /** 1 * 使用子查询的方式完成目标数据的提取,在目标数据内幕使用窗口函数row_number来进行分组排序: 1 * PARTITION BY :指定窗口函数分组的Key; 1 * ORDER BY:分组后进行排序; 1 */ 1 val result = hiveContext.sql("SELECT name,score FROM (" 1 + "SELECT name,score,row_number() OVER (PARTITION BY name ORDER BY score DESC) rank FROM scores) sub_scores" 1 + "WHERE rank <= 4") 1 1 result.show() //在Driver的控制台上打印出结果内容 1 1 //把数据保存在Hive数据仓库中 1 hiveContext.sql("DROP TABLE IF EXISTS sortedResultScores") 1 result.saveAsTable("sortedResultScores") 1 } 1} 1

报错:

ERROR metadata.Hive: NoSuchObjectException(message:default.scores table not found) 1Exception in thread "main" org.apache.spark.sql.AnalysisException: missing BY at '' '' near '<EOF>'; line 1 pos 96 1

参考:  http://blog.csdn.net/slq1023/article/details/51138709

3. Spark SQL UDF和UDAF解密与实战

UDAF=USER DEFINE AGGREGATE FUNCTION  通过案例实战Spark SQL下的UDF和UDAF的具体使用:  * UDF: User Defined Function,用户自定义的函数,函数的输入是一条具体的数据记录,实现上讲就是普通的Scala函数;  * UDAF:User Defined Aggregation Function,用户自定义的聚合函数,函数本身作用于数据集合,能够在聚合操作的基础上进行自定义操作;  * 实质上讲,例如说UDF会被Spark SQL中的Catalyst封装成为Expression,最终会通过eval方法来计算输入的数据Row(此处的Row和DataFrame中的Row没有任何关系)

1)实战编写UDF和UDAF:

package com.dt.spark 1 1import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} 1import org.apache.spark.sql.types._ 1import org.apache.spark.sql.{Row, SQLContext} 1import org.apache.spark.{SparkConf, SparkContext} 1 1object SparkSQLUDFUDAF { 1 def main(args: Array[String]) { 1 System.setProperty("hadoop.home.dir", "G:/datarguru spark/tool/hadoop-2.6.0"); 1 val conf = new SparkConf() 1 conf.setAppName("SparkSQLUDFUDAF") 1 conf.setMaster("local") 1 val sc = new SparkContext(conf) 1 val sqlContext = new SQLContext(sc) 1 1 //模拟实际使用数据 1 val bigData = Array("Spark", "Spark", "Hadoop", "Spark", "Hadoop", "Spark", "Spark", "Hadoop", "Spark", "Hadoop") 1 1 //基于提供的数据创建DataFrame 1 val bigDataRDD = sc.parallelize(bigData) 1 val bigDataRow = bigDataRDD.map(item => Row(item)) 1 val structType = StructType(Array(StructField("word", StringType, true))) 1 val bigDataDF = sqlContext.createDataFrame(bigDataRow, structType) 1 bigDataDF.registerTempTable("bigDataTable") //注册成为临时表 1 1 //通过SQLContext注册UDF,在Scala 2.10.x版本UDF函数最多可以接受22个输入参数 1 sqlContext.udf.register("computeLength", (input: String) => input.length) 1 1 //直接在SQL语句中使用UDF,就像使用SQL自动的内部函数一样 1 sqlContext.sql("select word, computeLength(word) as length from bigDataTable").show() 1 1 sqlContext.udf.register("wordCount", new MyUDAF) 1 sqlContext.sql("select word,wordCount(word) as count,computeLength(word) " + 1 "as length from bigDataTable group by word").show() 1 while(true){} 1 1 } 1 1} 1 1class MyUDAF extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法 1 /** 1 * 该方法指定具体输入数据的类型 1 * @return 1 */ 1 override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true))) 1 1 /** 1 * 在进行聚合操作的时候要处理的数据的结果的类型 1 * @return 1 */ 1 override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true))) 1 1 /** 1 * 指定UDAF函数计算后返回的结果类型 1 * @return 1 */ 1 override def dataType: DataType = IntegerType 1 1 override def deterministic: Boolean = true 1 1 /** 1 * 在Aggregate之前每组数据的初始化结果 1 * @param buffer 1 * @param input 1 */ 1 override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)=0} 1 1 /** 1 * 在进行聚合的时候有新的值进来,对分组后的聚合如何进行计算 1 * 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner(这里的Row跟DataFrame的Row无关) 1 * @param buffer 1 * @param input 1 */ 1 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { 1 buffer(0) = buffer.getAs[Int](0) + 1 1 } 1 1 /** 1 * 最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作 1 * @param buffer1 1 * @param buffer2 1 */ 1 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 1 buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0) 1 } 1 1 /** 1 * 返回UDAF最后的计算结果 1 * @param buffer 1 * @return 1 */ 1 override def evaluate(buffer: Row): Any = buffer.getAs[Int](0) 1} 1

2) UDFRegistration的源码:

/** 1 * Functions for registering user-defined functions. Use [[SQLContext.udf]] to access this. 1 * 1 * @since 1.3.0 1 */ 1class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { 1 1 private val functionRegistry = sqlContext.functionRegistry 1 1 protected[sql] def registerPython(name: String, udf: UserDefinedPythonFunction): Unit = { 1 log.debug( 1 s""" 1 | Registering new PythonUDF: 1 | name: $name 1 | command: ${udf.command.toSeq} 1 | envVars: ${udf.envVars} 1 | pythonIncludes: ${udf.pythonIncludes} 1 | pythonExec: ${udf.pythonExec} 1 | dataType: ${udf.dataType} 1 """.stripMargin) 1 1 functionRegistry.registerFunction(name, udf.builder) 1 } 1 1 /** 1 * Register a user-defined aggregate function (UDAF). 1 * 1 * @param name the name of the UDAF. 1 * @param udaf the UDAF needs to be registered. 1 * @return the registered UDAF. 1 */ 1 def register( 1 name: String, 1 udaf: UserDefinedAggregateFunction): UserDefinedAggregateFunction = { 1 def builder(children: Seq[Expression]) = ScalaUDAF(children, udaf) 1 functionRegistry.registerFunction(name, builder) 1 udaf 1 } 1 1// scalastyle:off 1 1/* register 0-22 were generated by this script 1 1 (0 to 22).map { x => 1 val types = (1 to x).foldRight("RT")((i, s) => {s"A$i, $s"}) 1 val typeTags = (1 to x).map(i => s"A${i}: TypeTag").foldLeft("RT: TypeTag")(_ + ", " + _) 1 val inputTypes = (1 to x).foldRight("Nil")((i, s) => {s"ScalaReflection.schemaFor[A$i].dataType :: $s"}) 1 println(s""" 1 /** 1 * Register a Scala closure of ${x} arguments as user-defined function (UDF). 1 * @tparam RT return type of UDF. 1 * @since 1.3.0 1 */ 1 def register[$typeTags](name: String, func: Function$x[$types]): UserDefinedFunction = { 1 val dataType = ScalaReflection.schemaFor[RT].dataType 1 val inputTypes = Try($inputTypes).getOrElse(Nil) 1 def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) 1 functionRegistry.registerFunction(name, builder) 1 UserDefinedFunction(func, dataType, inputTypes) 1 }""") 1 } 1 1 (1 to 22).foreach { i => 1 val extTypeArgs = (1 to i).map(_ => "_").mkString(", ") 1 val anyTypeArgs = (1 to i).map(_ => "Any").mkString(", ") 1 val anyCast = s".asInstanceOf[UDF$i[$anyTypeArgs, Any]]" 1 val anyParams = (1 to i).map(_ => "_: Any").mkString(", ") 1 println(s""" 1 |/** 1 | * Register a user-defined function with ${i} arguments. 1 | * @since 1.3.0 1 | */ 1 |def register(name: String, f: UDF$i[$extTypeArgs, _], returnType: DataType) = { 1 | functionRegistry.registerFunction( 1 | name, 1 | (e: Seq[Expression]) => ScalaUDF(f$anyCast.call($anyParams), returnType, e)) 1 |}""".stripMargin) 1 } 1 */ 1 1/** 1 * Register a Scala closure of 0 arguments as user-defined function (UDF). 1 * @tparam RT return type of UDF. 1 * @since 1.3.0 1 */ 1def register[RT: TypeTag](name: String, func: Function0[RT]): UserDefinedFunction = { 1 val dataType = ScalaReflection.schemaFor[RT].dataType 1 val inputTypes = Try(Nil).getOrElse(Nil) 1 def builder(e: Seq[Expression]) = ScalaUDF(func, dataType, e, inputTypes) 1 functionRegistry.registerFunction(name, builder) 1 UserDefinedFunction(func, dataType, inputTypes) 1} 1

FunctionRegistry的源码如下:

object FunctionRegistry { 1 1 type FunctionBuilder = Seq[Expression] => Expression 1 1 val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map( 1 // misc non-aggregate functions 1 expression[Abs]("abs"), 1 expression[CreateArray]("array"), 1 expression[Coalesce]("coalesce"), 1 expression[Explode]("explode"), 1 expression[Greatest]("greatest"), 1 expression[If]("if"), 1 expression[IsNaN]("isnan"), 1 expression[IsNull]("isnull"), 1 expression[IsNotNull]("isnotnull"), 1 expression[Least]("least"), 1 expression[Coalesce]("nvl"), 1 expression[Rand]("rand"), 1 expression[Randn]("randn"), 1 expression[CreateStruct]("struct"), 1 expression[CreateNamedStruct]("named_struct"), 1 expression[Sqrt]("sqrt"), 1 expression[NaNvl]("nanvl"), 1 1 // math functions 1 expression[Acos]("acos"), 1 expression[Asin]("asin"), 1 expression[Atan]("atan"), 1 expression[Atan2]("atan2"), 1 expression[Bin]("bin"), 1 expression[Cbrt]("cbrt"), 1 expression[Ceil]("ceil"), 1 expression[Ceil]("ceiling"), 1 expression[Cos]("cos"), 1 expression[Cosh]("cosh"), 1 expression[Conv]("conv"), 1 expression[EulerNumber]("e"), 1 expression[Exp]("exp"), 1 expression[Expm1]("expm1"), 1 expression[Floor]("floor"), 1 expression[Factorial]("factorial"), 1 expression[Hypot]("hypot"), 1 expression[Hex]("hex"), 1 expression[Logarithm]("log"), 1 expression[Log]("ln"), 1 expression[Log10]("log10"), 1 expression[Log1p]("log1p"), 1 expression[Log2]("log2"), 1 expression[UnaryMinus]("negative"), 1 expression[Pi]("pi"), 1 expression[Pow]("pow"), 1 expression[Pow]("power"), 1 expression[Pmod]("pmod"), 1 expression[UnaryPositive]("positive"), 1 expression[Rint]("rint"), 1 expression[Round]("round"), 1 expression[ShiftLeft]("shiftleft"), 1 expression[ShiftRight]("shiftright"), 1 expression[ShiftRightUnsigned]("shiftrightunsigned"), 1 expression[Signum]("sign"), 1 expression[Signum]("signum"), 1 expression[Sin]("sin"), 1 expression[Sinh]("sinh"), 1 expression[Tan]("tan"), 1 expression[Tanh]("tanh"), 1 expression[ToDegrees]("degrees"), 1 expression[ToRadians]("radians"), 1 1 // aggregate functions 1 expression[HyperLogLogPlusPlus]("approx_count_distinct"), 1 expression[Average]("avg"), 1 expression[Corr]("corr"), 1 expression[Count]("count"), 1 expression[First]("first"), 1 expression[First]("first_value"), 1 expression[Last]("last"), 1 expression[Last]("last_value"), 1 expression[Max]("max"), 1 expression[Average]("mean"), 1 expression[Min]("min"), 1 expression[StddevSamp]("stddev"), 1 expression[StddevPop]("stddev_pop"), 1 expression[StddevSamp]("stddev_samp"), 1 expression[Sum]("sum"), 1 expression[VarianceSamp]("variance"), 1 expression[VariancePop]("var_pop"), 1 expression[VarianceSamp]("var_samp"), 1 expression[Skewness]("skewness"), 1 expression[Kurtosis]("kurtosis"), 1 1 // string functions 1 expression[Ascii]("ascii"), 1 expression[Base64]("base64"), 1 expression[Concat]("concat"), 1 expression[ConcatWs]("concat_ws"), 1 expression[Encode]("encode"), 1 expression[Decode]("decode"), 1 expression[FindInSet]("find_in_set"), 1 expression[FormatNumber]("format_number"), 1 expression[GetJsonObject]("get_json_object"), 1 expression[InitCap]("initcap"), 1 expression[JsonTuple]("json_tuple"), 1 expression[Lower]("lcase"), 1 expression[Lower]("lower"), 1 expression[Length]("length"), 1 expression[Levenshtein]("levenshtein"), 1 expression[RegExpExtract]("regexp_extract"), 1 expression[RegExpReplace]("regexp_replace"), 1 expression[StringInstr]("instr"), 1 expression[StringLocate]("locate"), 1 expression[StringLPad]("lpad"), 1 expression[StringTrimLeft]("ltrim"), 1 expression[FormatString]("format_string"), 1 expression[FormatString]("printf"), 1 expression[StringRPad]("rpad"), 1 expression[StringRepeat]("repeat"), 1 expression[StringReverse]("reverse"), 1 expression[StringTrimRight]("rtrim"), 1 expression[SoundEx]("soundex"), 1 expression[StringSpace]("space"), 1 expression[StringSplit]("split"), 1 expression[Substring]("substr"), 1 expression[Substring]("substring"), 1 expression[SubstringIndex]("substring_index"), 1 expression[StringTranslate]("translate"), 1 expression[StringTrim]("trim"), 1 expression[UnBase64]("unbase64"), 1 expression[Upper]("ucase"), 1 expression[Unhex]("unhex"), 1 expression[Upper]("upper"), 1... 1

可以看出SparkSQL的内置函数也是和UDF一样注册的。

4. Spark SQL Thrift Server实战

The Thrift JDBC/ODBC server implemented here corresponds to the HiveServer2 in Hive 1.2.1 You can test the JDBC server with the beeline script that comes with either Spark or Hive 1.2.1.  打开JDBC/ODBC server:

ps -aux | grep hive 1 hive --service metastore & //先打开hive元数据 1[1] 28268 1./sbin/start-thriftserver.sh 1//Now you can use beeline to test the Thrift JDBC/ODBC server: 1 1./bin/beeline 1//Connect to the JDBC/ODBC server in beeline with: 1 1beeline> !connect jdbc:hive2://master:10000 1//:root 1//密码为空 1hive命令 1

Java通过JDBC访问Thrift Server

package com.dt.sparksql; 1 1import java.sql.Connection; 1import java.sql.DriverManager; 1import java.sql.PreparedStatement; 1import java.sql.ResultSet; 1import java.sql.SQLException; 1/** 1 * 演示Java通过JDBC访问Thrift Server,进而访问Spark SQL,进而访问Hive,这是企业级开发中最为常见的方式 1 * @author dt_sparl 1 * 1 */ 1public class SparkSQLJDBC2ThriftServer { 1 1 public static void main(String[] args) throws SQLException { 1 String sqlTest = "select name from people where age = ?"; 1 Connection conn = null; 1 ResultSet resultSet = null; 1 try { 1 Class.forName("org.apache.hive.jdbc.HiveDriver"); 1 conn = DriverManager.getConnection("jdbc:hive2://<master>:<10001>/<default>?" 1 + "hive.server2.transport.mode=http;hive.server2.thrift.http.path=<cliserver>", 1 "root", ""); 1 1 PreparedStatement preparedStatement = conn.prepareStatement(sqlTest); 1 preparedStatement.setInt(1, 30); 1 resultSet = preparedStatement.executeQuery(); 1 while(resultSet.next()){ 1 System.out.println(resultSet.getString(1)); //这里的数据应该保存在parquet中 1 } 1 } catch (ClassNotFoundException e) { 1 // TODO Auto-generated catch block 1 e.printStackTrace(); 1 }finally { 1 resultSet.close(); 1 conn.close(); 1 } 1 } 1}
转载请注明原文地址: https://www.6miu.com/read-70901.html

最新回复(0)