1) SQL,foxpro baseIII,SQLServer(微软的产品) 2000,2005,2008,2012,SyBase (微软的前生【大概在版本6时被微软买了版权,就像微软在操作系统方面,买了dos的版权】,powerdesigner就是sybase的产品,它天生就是做大数据量的,出现要比oracle还早), Oracle 6i(2000年),7i,8i(最成熟的版本),9i,10g,11g 积累很多sql程序员
2) Nosql,hbase,redis
3) Hadoop MR,hive(HQL类SQL),shark(类SQL,在hive基础上又封装一层,伯克利实验室的一个产品)2014.6.1,shark over,并发开启另外一个项目spark-sql
目前趋势:spark-sql会替代掉hive,会替代掉mr。
当前主流仍然是hadoop,少量迁移到spark。还在使用hive,追求性能spark-sql,spark2.0重点优化spark-sql,1.5.2版本不能完成支持sql92,95标准。2.0完全支持。
spark-sql完全兼容hive,启动spark时,hiveContext,sc,sqlContext
提供两种方式
1) 面向对象操作方式(不适用)写的代码面向对象结构,如果查询非常复杂,写的代码非常混乱,代码不易读,不易维护
2) 纯SQL,类似数据库中的SQL语句(推荐)
SparkSQL操作对象DataFrame,DF对象,DF就是对RDD进行再次封装。
SparkStreaming流数据分析,操作对象DStream对象,对象RDD进行再次封装。
SparkMLLib机器学习,操作对象LabledPoint(矩阵,向量,梯度下降法,算法)
启动hdfs后直接启动spark-shell
bin/spark-shell --master=local bin/spark-shell --master=yarn-client scala> val list = List(1,2,3,4,5) scala对象 list: List[Int] = List(1, 2, 3, 4, 5) scala> val rdd = sc.makeRDD(list) RDD对象 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:23 scala> val df = rdd.toDF("id") SparkSql DataFrame对象 df: org.apache.spark.sql.DataFrame = [id: int] scala> rdd.toDF("id") res0: org.apache.spark.sql.DataFrame = [id: int] scala> res0.show #默认只显示20条数据 +---+ | id| +---+ | 1| | 2| | 3| | 4| | 5| | 6| +---+ scala> res0.printSchema #查看列的类型等属性 root |-- id: integer (nullable = true)DataFrame就相当于数据库的一张表。
scala> sc.parallelize(List( (1,"beijing"),(2,"shanghai") ) ) res3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[5] at parallelize at <console>:22 scala> res3.toDF("id","name") res4: org.apache.spark.sql.DataFrame = [id: int, name: string] scala> res4.show +---+--------+ | id| name| +---+--------+ | 1| beijing| | 2|shanghai| +---+--------+ scala> sc.parallelize(List( (1,"beijing",100780),(2,"shanghai",560090),(3,"xi'an",600329))) res6: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:22 scala> res6.toDF("id","name","postcode") res7: org.apache.spark.sql.DataFrame = [id: int, name: string, postcode: int] scala> res7.show +---+--------+--------+ | id| name|postcode| +---+--------+--------+ | 1| beijing| 100780| | 2|shanghai| 560090| | 3| xi'an| 600329| +---+--------+--------+1) txt 不直接支持
注意,要把txt数据首先变成RDD,变成tuple
2) json 直接支持
3) Parquet 本质就是json,二进制,压缩(网络传输安全,文件小,传输速度快,被hadoop底层采用)json升级版
4) mysql,oracle导入形成df
5) HDFS
mysql:上传驱动
启动spark-shell:
bin/spark-shell--master=local --driver-class-path=mysql-connector-java-5.1.28-bin.jar
scala> val prop = new java.util.Properties prop: java.util.Properties = {} scala> prop.put("user","root") res0: Object = null scala> prop.put("password","root") res1: Object = null scala> val df=sqlContext.read.jdbc("jdbc:mysql://192.168.239.1:3306/jtdb","tb_user",prop) df: org.apache.spark.sql.DataFrame = [id: bigint, username: string, password: string, phone: string, email: string, created: timestamp, updated: timestamp] scala> df.printSchema root |-- id: long (nullable = false) |-- username: string (nullable = true) |-- password: string (nullable = true) |-- phone: string (nullable = true) |-- email: string (nullable = true) |-- created: timestamp (nullable = true) |-- updated: timestamp (nullable = true) scala> df.show +---+-------------+--------------------+-----------+------------+--------------------+--------------------+ | id| username| password| phone| email| created| updated| +---+-------------+--------------------+-----------+------------+--------------------+--------------------+ | 1| tony|e10adc3949ba59abb...|12987828382|tony@263.com| null| null| | 4|ll18831615918|39ee112f1db88af09...|18831615918| |2018-03-11 12:59:...|2018-03-11 12:59:...| +---+-------------+--------------------+-----------+------------+--------------------+--------------------+查看一张大表:
scala> val df=sqlContext.read.jdbc("jdbc:mysql://192.168.239.1:3306/jtdb","tb_item",prop) df: org.apache.spark.sql.DataFrame = [id: bigint, title: string, sell_point: string, price: bigint, num: int, barcode: string, image: string, cid: bigint, status: int, created: timestamp, updated: timestamp] scala> df.show +------+--------------------+--------------------+-------+-----+-------+--------------------+---+------+--------------------+--------------------+ | id| title| sell_point| price| num|barcode| image|cid|status| created| updated| +------+--------------------+--------------------+-------+-----+-------+--------------------+---+------+--------------------+--------------------+ |536563|阿尔卡特 (OT-927) 炭黑 ...| 清仓!仅北京,武汉仓有货!| 299000|99999| null|http://image.taot...|560| 1|2015-03-08 21:33:...|2015-03-08 21:33:...| |562379|三星 W999 黑色 电信3G手机...|下单送12000毫安移动电源!双3...|4299000|99999| null|http://image.taot...|560| 1|2015-03-08 21:27:...|2015-03-08 21:27:...| |605616|阿尔卡特 (OT-979) 冰川白...| 清仓!仅上海仓有货!| 309000|99999| null|http://image.taot...|560| 1|2015-03-08 21:33:...|2015-03-08 21:33:...| |635906|阿尔卡特 (OT-927) 单电版...| 清仓!仅北京,武汉仓有货!| 249000|99999| null|http://image.taot...|560| 1|2015-03-08 21:33:...|2015-03-08 21:33:...| |679532|阿尔卡特 (OT-986+) 玫红...| 仅上海,广州,沈阳仓有货!预购从速!| 499000|99999| null|http://image.taot...|560| 1|2015-03-08 21:32:...|2015-03-08 21:32:...| |679533|阿尔卡特 (OT-986+) 曜石...|少量库存,抢完即止!<a tar...| 499000|99999| null|http://image.taot...|560| 1|2015-03-08 21:31:...|2015-03-08 21:31:...| |691300|三星 B9120 钛灰色 联通3G...|下单即送10400毫安移动电源!再...|4399000|99999| null|http://image.taot...|560| 1|2015-03-08 21:29:...|2015-03-08 21:29:...| |738388|三星 Note II (N7100...| 经典回顾!超值价格值得拥有。|1699000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |741524|三星 Note II (N7100...| 下单赠12000毫安移动电源|1699000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |816448|三星 Note II (N7100...| 经典回顾!超值特惠!|1699000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |816753|夏普(SHARP)LCD-46DS...|要好屏,选夏普!日本原装面板,智能...|3799000|99999| null|http://image.taot...| 76| 1|2015-03-08 21:27:...|2015-03-08 21:27:...| |830972|飞利浦 老人手机 (X2560) ...|赠:九安血压计+8G内存!超长待机...| 489000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |832739| 中兴 U288 珠光白 移动3G手机|好评过万!超大彩屏,超大字体,超大...| 199000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |844022|三星 SCH-W899 亮金色 电...|双3.3英寸魔焕炫屏,CG双网双待...|2999000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |847276|飞利浦 老人手机 (X2560) ...|超长待机,关爱无限,更好用!飞利浦...| 489000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |847278|飞利浦 老人手机 (X2560) ...|超长待机,关爱无限,更好用!飞利浦...| 469000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |855739|三星 Galaxy S4 (I95...|三星经典旗舰机!5英寸1080P高...|1888000|99999| null|http://image.taot...|560| 1|2015-03-08 21:27:...|2015-03-08 21:27:...| |856645|三星 Galaxy S4 (I95...| 年货特价来袭!三星经典旗舰机!|1888000|99999| null|http://image.taot...|560| 1|2015-03-08 21:27:...|2015-03-08 21:27:...| |858025|三星 I8552 白色 联通3G手...| 经济实惠机器~~开春入手好时机~| 799000|99999| null|http://image.taot...|560| 1|2015-03-08 21:27:...|2015-03-08 21:27:...| |860275|长虹(CHANGHONG) 3D5...|智能安卓系统 可自由安装应用程序 ...|2699000|99999| null|http://image.taot...| 76| 1|2015-03-08 21:27:...|2015-03-08 21:27:...| +------+--------------------+--------------------+-------+-----+-------+--------------------+---+------+--------------------+--------------------+ only showing top 20 rowsorcal:
使用plsql登录orcal,证明数据库正常
启动spark-shell:
bin/spark-shell--master=local --driver-class-path= ojdbc14.jar
scala> val prop = new java.util.Properties prop: java.util.Properties = {} scala> prop.put("driver","oracle.jdbc.driver.OracleDriver") res0: Object = null scala> prop.put("user","ht") res1: Object = null scala> prop.put("password","ht") res2: Object = null scala> val df = sqlContext.read.jdbc("jdbc:oracle:thin:@192.168.239.1:1521:XE","dept_p",prop) df: org.apache.spark.sql.DataFrame = [DEPT_ID: string, PARENT_ID: string, DEPT_NAME: string, STATE: decimal(38,0), CREATE_BY: string, CREATE_DEPT: string, CREATE_TIME: timestamp, UPDATE_BY: string, UPDATE_TIME: timestamp] scala> df.show +-------+---------+---------+-----+---------+-----------+--------------------+---------+--------------------+ |DEPT_ID|PARENT_ID|DEPT_NAME|STATE|CREATE_BY|CREATE_DEPT| CREATE_TIME|UPDATE_BY| UPDATE_TIME| +-------+---------+---------+-----+---------+-----------+--------------------+---------+--------------------+ | 100100| 100| 人事部| 1| null| null|2018-01-11 01:36:...| null|2018-01-11 02:01:...| | 100300| 100| 研发部| 1| null| null|2018-01-21 10:30:...| null|2018-01-21 10:30:...| | 100400| 100| 销售部| 1| null| null|2018-01-21 10:30:...| null|2018-01-21 10:30:...| | 100| null| 集团总部| 1| null| null| null| null| null| | 100200| 100| 财务部| 1| null| null|2018-01-21 10:07:...| null|2018-01-21 10:07:...| +-------+---------+---------+-----+---------+-----------+--------------------+---------+--------------------+1.select
scala> val df = sc.makeRDD(List((1,"bj"),(2,"sh"),(3,"sz"),(4,"gz"))).toDF("id","name") df: org.apache.spark.sql.DataFrame = [id: int, name: string] scala> df.show 18/05/04 16:38:40 INFO spark.SparkContext: Starting job: show at <console>:24 18/05/04 16:38:40 INFO scheduler.DAGScheduler: Got job 0 (show at <console>:24) with 1 output partitions 18/05/04 16:38:40 INFO scheduler.DAGScheduler: Final stage: ResultStage 0(show at <console>:24) 18/05/04 16:38:40 INFO scheduler.DAGScheduler: Parents of final stage: List() 18/05/04 16:38:40 INFO scheduler.DAGScheduler: Missing parents: List() 18/05/04 16:38:40 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at show at <console>:24), which has no missing parents 18/05/04 16:38:41 INFO storage.MemoryStore: ensureFreeSpace(4904) called with curMem=0, maxMem=555755765 18/05/04 16:38:41 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 4.8 KB, free 530.0 MB) 18/05/04 16:38:42 INFO storage.MemoryStore: ensureFreeSpace(2647) called with curMem=4904, maxMem=555755765 18/05/04 16:38:42 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 530.0 MB) 18/05/04 16:38:42 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:41561 (size: 2.6 KB, free: 530.0 MB) 18/05/04 16:38:42 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861 18/05/04 16:38:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[4] at show at <console>:24) 18/05/04 16:38:42 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 18/05/04 16:38:42 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 2250 bytes) 18/05/04 16:38:42 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0) 18/05/04 16:38:43 INFO codegen.GenerateUnsafeProjection: Code generated in 437.26674 ms 18/05/04 16:38:43 INFO codegen.GenerateSafeProjection: Code generated in 19.531966 ms 18/05/04 16:38:43 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1558 bytes result sent to driver 18/05/04 16:38:43 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 934 ms on localhost (1/1) 18/05/04 16:38:43 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 18/05/04 16:38:43 INFO scheduler.DAGScheduler: ResultStage 0 (show at <console>:24) finished in 1.352 s 18/05/04 16:38:43 INFO scheduler.DAGScheduler: Job 0 finished: show at <console>:24, took 2.970672 s +---+----+ | id|name| +---+----+ | 1| bj| | 2| sh| | 3| sz| | 4| gz| +---+----+ scala> df.select("id") res1: org.apache.spark.sql.DataFrame = [id: int] scala> res1.show 18/05/04 16:39:02 INFO spark.SparkContext: Starting job: show at <console>:26 18/05/04 16:39:02 INFO scheduler.DAGScheduler: Got job 1 (show at <console>:26) with 1 output partitions 18/05/04 16:39:02 INFO scheduler.DAGScheduler: Final stage: ResultStage 1(show at <console>:26) 18/05/04 16:39:02 INFO scheduler.DAGScheduler: Parents of final stage: List() 18/05/04 16:39:02 INFO scheduler.DAGScheduler: Missing parents: List() 18/05/04 16:39:02 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[7] at show at <console>:26), which has no missing parents 18/05/04 16:39:02 INFO storage.MemoryStore: ensureFreeSpace(5000) called with curMem=7551, maxMem=555755765 18/05/04 16:39:02 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.9 KB, free 530.0 MB) 18/05/04 16:39:02 INFO storage.MemoryStore: ensureFreeSpace(2729) called with curMem=12551, maxMem=555755765 18/05/04 16:39:02 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 530.0 MB) 18/05/04 16:39:02 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:41561 (size: 2.7 KB, free: 530.0 MB) 18/05/04 16:39:02 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861 18/05/04 16:39:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[7] at show at <console>:26) 18/05/04 16:39:02 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 18/05/04 16:39:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 2250 bytes) 18/05/04 16:39:02 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 1) 18/05/04 16:39:02 INFO codegen.GenerateUnsafeProjection: Code generated in 15.332302 ms 18/05/04 16:39:02 INFO codegen.GenerateSafeProjection: Code generated in 11.376471 ms 18/05/04 16:39:02 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 1447 bytes result sent to driver 18/05/04 16:39:02 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 48 ms on localhost (1/1) 18/05/04 16:39:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 18/05/04 16:39:02 INFO scheduler.DAGScheduler: ResultStage 1 (show at <console>:26) finished in 0.049 s 18/05/04 16:39:02 INFO scheduler.DAGScheduler: Job 1 finished: show at <console>:26, took 0.067062 s +---+ | id| +---+ | 1| | 2| | 3| | 4| +---+带where条件
scala> val prop = new java.util.Properties prop: java.util.Properties = {} scala> prop.put("user","root") res0: Object = null scala> prop.put("password","root") res1: Object = null scala> val df=sqlContext.read.jdbc("jdbc:mysql://192.168.239.1:3306/jtdb","tb_item",prop) df: org.apache.spark.sql.DataFrame = [id: bigint, title: string, sell_point: string, price: bigint, num: int, barcode: string, image: string, cid: bigint, status: int, created: timestamp, updated: timestamp] scala> df.show 18/05/04 17:29:10 INFO spark.SparkContext: Starting job: show at <console>:24 18/05/04 17:29:10 INFO scheduler.DAGScheduler: Got job 1 (show at <console>:24) with 1 output partitions 18/05/04 17:29:10 INFO scheduler.DAGScheduler: Final stage: ResultStage 1(show at <console>:24) 18/05/04 17:29:10 INFO scheduler.DAGScheduler: Parents of final stage: List() 18/05/04 17:29:10 INFO scheduler.DAGScheduler: Missing parents: List() 18/05/04 17:29:10 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[3] at show at <console>:24), which has no missing parents 18/05/04 17:29:10 INFO storage.MemoryStore: ensureFreeSpace(5256) called with curMem=7433, maxMem=555755765 18/05/04 17:29:10 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.1 KB, free 530.0 MB) 18/05/04 17:29:10 INFO storage.MemoryStore: ensureFreeSpace(2559) called with curMem=12689, maxMem=555755765 18/05/04 17:29:10 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.5 KB, free 530.0 MB) 18/05/04 17:29:10 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:33408 (size: 2.5 KB, free: 530.0 MB) 18/05/04 17:29:10 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861 18/05/04 17:29:10 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at show at <console>:24) 18/05/04 17:29:10 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 18/05/04 17:29:10 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1929 bytes) 18/05/04 17:29:10 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 1) 18/05/04 17:29:10 INFO jdbc.JDBCRDD: closed connection 18/05/04 17:29:10 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 8655 bytes result sent to driver 18/05/04 17:29:10 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 124 ms on localhost (1/1) 18/05/04 17:29:10 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 18/05/04 17:29:10 INFO scheduler.DAGScheduler: ResultStage 1 (show at <console>:24) finished in 0.126 s 18/05/04 17:29:10 INFO scheduler.DAGScheduler: Job 1 finished: show at <console>:24, took 0.143849 s +------+--------------------+--------------------+-------+-----+-------+--------------------+---+------+--------------------+--------------------+ | id| title| sell_point| price| num|barcode| image|cid|status| created| updated| +------+--------------------+--------------------+-------+-----+-------+--------------------+---+------+--------------------+--------------------+ |536563|阿尔卡特 (OT-927) 炭黑 ...| 清仓!仅北京,武汉仓有货!| 299000|99999| null|http://image.taot...|560| 1|2015-03-08 21:33:...|2015-03-08 21:33:...| |562379|三星 W999 黑色 电信3G手机...|下单送12000毫安移动电源!双3...|4299000|99999| null|http://image.taot...|560| 1|2015-03-08 21:27:...|2015-03-08 21:27:...| |605616|阿尔卡特 (OT-979) 冰川白...| 清仓!仅上海仓有货!| 309000|99999| null|http://image.taot...|560| 1|2015-03-08 21:33:...|2015-03-08 21:33:...| |635906|阿尔卡特 (OT-927) 单电版...| 清仓!仅北京,武汉仓有货!| 249000|99999| null|http://image.taot...|560| 1|2015-03-08 21:33:...|2015-03-08 21:33:...| |679532|阿尔卡特 (OT-986+) 玫红...| 仅上海,广州,沈阳仓有货!预购从速!| 499000|99999| null|http://image.taot...|560| 1|2015-03-08 21:32:...|2015-03-08 21:32:...| |679533|阿尔卡特 (OT-986+) 曜石...|少量库存,抢完即止!<a tar...| 499000|99999| null|http://image.taot...|560| 1|2015-03-08 21:31:...|2015-03-08 21:31:...| |691300|三星 B9120 钛灰色 联通3G...|下单即送10400毫安移动电源!再...|4399000|99999| null|http://image.taot...|560| 1|2015-03-08 21:29:...|2015-03-08 21:29:...| |738388|三星 Note II (N7100...| 经典回顾!超值价格值得拥有。|1699000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |741524|三星 Note II (N7100...| 下单赠12000毫安移动电源|1699000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |816448|三星 Note II (N7100...| 经典回顾!超值特惠!|1699000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |816753|夏普(SHARP)LCD-46DS...|要好屏,选夏普!日本原装面板,智能...|3799000|99999| null|http://image.taot...| 76| 1|2015-03-08 21:27:...|2015-03-08 21:27:...| |830972|飞利浦 老人手机 (X2560) ...|赠:九安血压计+8G内存!超长待机...| 489000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |832739| 中兴 U288 珠光白 移动3G手机|好评过万!超大彩屏,超大字体,超大...| 199000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |844022|三星 SCH-W899 亮金色 电...|双3.3英寸魔焕炫屏,CG双网双待...|2999000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |847276|飞利浦 老人手机 (X2560) ...|超长待机,关爱无限,更好用!飞利浦...| 489000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |847278|飞利浦 老人手机 (X2560) ...|超长待机,关爱无限,更好用!飞利浦...| 469000|99999| null|http://image.taot...|560| 1|2015-03-08 21:28:...|2015-03-08 21:28:...| |855739|三星 Galaxy S4 (I95...|三星经典旗舰机!5英寸1080P高...|1888000|99999| null|http://image.taot...|560| 1|2015-03-08 21:27:...|2015-03-08 21:27:...| |856645|三星 Galaxy S4 (I95...| 年货特价来袭!三星经典旗舰机!|1888000|99999| null|http://image.taot...|560| 1|2015-03-08 21:27:...|2015-03-08 21:27:...| |858025|三星 I8552 白色 联通3G手...| 经济实惠机器~~开春入手好时机~| 799000|99999| null|http://image.taot...|560| 1|2015-03-08 21:27:...|2015-03-08 21:27:...| |860275|长虹(CHANGHONG) 3D5...|智能安卓系统 可自由安装应用程序 ...|2699000|99999| null|http://image.taot...| 76| 1|2015-03-08 21:27:...|2015-03-08 21:27:...| +------+--------------------+--------------------+-------+-----+-------+--------------------+---+------+--------------------+--------------------+ only showing top 20 rows scala> df.printSchema root |-- id: long (nullable = false) |-- title: string (nullable = false) |-- sell_point: string (nullable = true) |-- price: long (nullable = false) |-- num: integer (nullable = false) |-- barcode: string (nullable = true) |-- image: string (nullable = true) |-- cid: long (nullable = false) |-- status: integer (nullable = false) |-- created: timestamp (nullable = false) |-- updated: timestamp (nullable = false) scala> df.select("id","title","price") res6: org.apache.spark.sql.DataFrame = [id: bigint, title: string, price: bigint] scala> res6.show 18/05/04 17:32:39 INFO spark.SparkContext: Starting job: show at <console>:26 18/05/04 17:32:39 INFO scheduler.DAGScheduler: Got job 2 (show at <console>:26) with 1 output partitions 18/05/04 17:32:39 INFO scheduler.DAGScheduler: Final stage: ResultStage 2(show at <console>:26) 18/05/04 17:32:39 INFO scheduler.DAGScheduler: Parents of final stage: List() 18/05/04 17:32:39 INFO scheduler.DAGScheduler: Missing parents: List() 18/05/04 17:32:39 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[5] at show at <console>:26), which has no missing parents 18/05/04 17:32:39 INFO storage.MemoryStore: ensureFreeSpace(4736) called with curMem=15248, maxMem=555755765 18/05/04 17:32:39 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.6 KB, free 530.0 MB) 18/05/04 17:32:39 INFO storage.MemoryStore: ensureFreeSpace(2320) called with curMem=19984, maxMem=555755765 18/05/04 17:32:39 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.3 KB, free 530.0 MB) 18/05/04 17:32:39 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:33408 (size: 2.3 KB, free: 530.0 MB) 18/05/04 17:32:40 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:861 18/05/04 17:32:40 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[5] at show at <console>:26) 18/05/04 17:32:40 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 18/05/04 17:32:40 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1929 bytes) 18/05/04 17:32:40 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 2) 18/05/04 17:32:40 INFO jdbc.JDBCRDD: closed connection 18/05/04 17:32:40 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on localhost:33408 in memory (size: 2.5 KB, free: 530.0 MB) 18/05/04 17:32:40 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 2). 3707 bytes result sent to driver 18/05/04 17:32:40 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 81 ms on localhost (1/1) 18/05/04 17:32:40 INFO scheduler.DAGScheduler: ResultStage 2 (show at <console>:26) finished in 0.081 s 18/05/04 17:32:40 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 18/05/04 17:32:40 INFO scheduler.DAGScheduler: Job 2 finished: show at <console>:26, took 0.142760 s +------+--------------------+-------+ | id| title| price| +------+--------------------+-------+ |536563|阿尔卡特 (OT-927) 炭黑 ...| 299000| |562379|三星 W999 黑色 电信3G手机...|4299000| |605616|阿尔卡特 (OT-979) 冰川白...| 309000| |635906|阿尔卡特 (OT-927) 单电版...| 249000| |679532|阿尔卡特 (OT-986+) 玫红...| 499000| |679533|阿尔卡特 (OT-986+) 曜石...| 499000| |691300|三星 B9120 钛灰色 联通3G...|4399000| |738388|三星 Note II (N7100...|1699000| |741524|三星 Note II (N7100...|1699000| |816448|三星 Note II (N7100...|1699000| |816753|夏普(SHARP)LCD-46DS...|3799000| |830972|飞利浦 老人手机 (X2560) ...| 489000| |832739| 中兴 U288 珠光白 移动3G手机| 199000| |844022|三星 SCH-W899 亮金色 电...|2999000| |847276|飞利浦 老人手机 (X2560) ...| 489000| |847278|飞利浦 老人手机 (X2560) ...| 469000| |855739|三星 Galaxy S4 (I95...|1888000| |856645|三星 Galaxy S4 (I95...|1888000| |858025|三星 I8552 白色 联通3G手...| 799000| |860275|长虹(CHANGHONG) 3D5...|2699000| +------+--------------------+-------+ only showing top 20 rows scala> df.select("id","title","price").where($"price"===469000).show 18/05/04 17:33:22 INFO spark.SparkContext: Starting job: show at <console>:24 18/05/04 17:33:22 INFO scheduler.DAGScheduler: Got job 3 (show at <console>:24) with 1 output partitions 18/05/04 17:33:22 INFO scheduler.DAGScheduler: Final stage: ResultStage 3(show at <console>:24) 18/05/04 17:33:22 INFO scheduler.DAGScheduler: Parents of final stage: List() 18/05/04 17:33:22 INFO scheduler.DAGScheduler: Missing parents: List() 18/05/04 17:33:22 INFO scheduler.DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[8] at show at <console>:24), which has no missing parents 18/05/04 17:33:22 INFO storage.MemoryStore: ensureFreeSpace(7456) called with curMem=7056, maxMem=555755765 18/05/04 17:33:22 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 7.3 KB, free 530.0 MB) 18/05/04 17:33:22 INFO storage.MemoryStore: ensureFreeSpace(3705) called with curMem=14512, maxMem=555755765 18/05/04 17:33:22 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 3.6 KB, free 530.0 MB) 18/05/04 17:33:22 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:33408 (size: 3.6 KB, free: 530.0 MB) 18/05/04 17:33:22 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:861 18/05/04 17:33:22 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[8] at show at <console>:24) 18/05/04 17:33:22 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 18/05/04 17:33:22 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, PROCESS_LOCAL, 1929 bytes) 18/05/04 17:33:22 INFO executor.Executor: Running task 0.0 in stage 3.0 (TID 3) 18/05/04 17:33:22 INFO codegen.GeneratePredicate: Code generated in 186.245685 ms 18/05/04 17:33:22 INFO jdbc.JDBCRDD: closed connection 18/05/04 17:33:22 INFO jdbc.JDBCRDD: closed connection 18/05/04 17:33:22 INFO executor.Executor: Finished task 0.0 in stage 3.0 (TID 3). 1833 bytes result sent to driver 18/05/04 17:33:22 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 249 ms on localhost (1/1) 18/05/04 17:33:22 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 18/05/04 17:33:22 INFO scheduler.DAGScheduler: ResultStage 3 (show at <console>:24) finished in 0.253 s 18/05/04 17:33:22 INFO scheduler.DAGScheduler: Job 3 finished: show at <console>:24, took 0.266059 s +----------+--------------------+------+ | id| title| price| +----------+--------------------+------+ | 847278|飞利浦 老人手机 (X2560) ...|469000| |1439566018|爱诺德(AINUODE) TD13...|469000| |1439566019|爱诺德(AINUODE) TD13...|469000| +----------+--------------------+------+ scala> df.select("id","title","price").where(($"price"===469000) && ($"title" like "%飞利浦%")).show 18/05/04 17:40:00 INFO spark.SparkContext: Starting job: show at <console>:24 18/05/04 17:40:00 INFO scheduler.DAGScheduler: Got job 5 (show at <console>:24) with 1 output partitions 18/05/04 17:40:00 INFO scheduler.DAGScheduler: Final stage: ResultStage 5(show at <console>:24) 18/05/04 17:40:00 INFO scheduler.DAGScheduler: Parents of final stage: List() 18/05/04 17:40:00 INFO scheduler.DAGScheduler: Missing parents: List() 18/05/04 17:40:00 INFO scheduler.DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[14] at show at <console>:24), which has no missing parents 18/05/04 17:40:00 INFO storage.MemoryStore: ensureFreeSpace(7720) called with curMem=29688, maxMem=555755765 18/05/04 17:40:00 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 7.5 KB, free 530.0 MB) 18/05/04 17:40:00 INFO storage.MemoryStore: ensureFreeSpace(3813) called with curMem=37408, maxMem=555755765 18/05/04 17:40:00 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 3.7 KB, free 530.0 MB) 18/05/04 17:40:00 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:33408 (size: 3.7 KB, free: 530.0 MB) 18/05/04 17:40:00 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:861 18/05/04 17:40:00 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (MapPartitionsRDD[14] at show at <console>:24) 18/05/04 17:40:00 INFO scheduler.TaskSchedulerImpl: Adding task set 5.0 with 1 tasks 18/05/04 17:40:00 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 5.0 (TID 5, localhost, PROCESS_LOCAL, 1929 bytes) 18/05/04 17:40:00 INFO executor.Executor: Running task 0.0 in stage 5.0 (TID 5) 18/05/04 17:40:00 INFO codegen.GeneratePredicate: Code generated in 14.063987 ms 18/05/04 17:40:00 INFO jdbc.JDBCRDD: closed connection 18/05/04 17:40:00 INFO jdbc.JDBCRDD: closed connection 18/05/04 17:40:00 INFO executor.Executor: Finished task 0.0 in stage 5.0 (TID 5). 1591 bytes result sent to driver 18/05/04 17:40:00 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 5.0 (TID 5) in 55 ms on localhost (1/1) 18/05/04 17:40:00 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool 18/05/04 17:40:00 INFO scheduler.DAGScheduler: ResultStage 5 (show at <console>:24) finished in 0.056 s 18/05/04 17:40:00 INFO scheduler.DAGScheduler: Job 5 finished: show at <console>:24, took 0.071553 s +------+--------------------+------+ | id| title| price| +------+--------------------+------+ |847278|飞利浦 老人手机 (X2560) ...|469000| +------+--------------------+------+2.排序(sort和orderBy用法一样)
orderBy($"列名") 升序排列
orderBy($"列名".desc) 降序排列
orderBy($"列名" , $"列2".desc) 按两列排序
scala> df.select("id","title","price").orderBy($"title").show 18/05/04 17:46:38 INFO spark.SparkContext: Starting job: show at <console>:24 18/05/04 17:46:38 INFO scheduler.DAGScheduler: Got job 8 (show at <console>:24) with 1 output partitions 18/05/04 17:46:38 INFO scheduler.DAGScheduler: Final stage: ResultStage 8(show at <console>:24) 18/05/04 17:46:38 INFO scheduler.DAGScheduler: Parents of final stage: List() 18/05/04 17:46:38 INFO scheduler.DAGScheduler: Missing parents: List() 18/05/04 17:46:38 INFO scheduler.DAGScheduler: Submitting ResultStage 8 (MapPartitionsRDD[22] at show at <console>:24), which has no missing parents 18/05/04 17:46:38 INFO storage.MemoryStore: ensureFreeSpace(6024) called with curMem=57290, maxMem=555755765 18/05/04 17:46:38 INFO storage.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 5.9 KB, free 529.9 MB) 18/05/04 17:46:38 INFO storage.MemoryStore: ensureFreeSpace(2988) called with curMem=63314, maxMem=555755765 18/05/04 17:46:38 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 2.9 KB, free 529.9 MB) 18/05/04 17:46:38 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:33408 (size: 2.9 KB, free: 530.0 MB) 18/05/04 17:46:38 INFO spark.SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:861 18/05/04 17:46:38 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 8 (MapPartitionsRDD[22] at show at <console>:24) 18/05/04 17:46:38 INFO scheduler.TaskSchedulerImpl: Adding task set 8.0 with 1 tasks 18/05/04 17:46:38 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 8.0 (TID 8, localhost, PROCESS_LOCAL, 1929 bytes) 18/05/04 17:46:38 INFO executor.Executor: Running task 0.0 in stage 8.0 (TID 8) 18/05/04 17:46:38 INFO jdbc.JDBCRDD: closed connection 18/05/04 17:46:38 INFO jdbc.JDBCRDD: closed connection 18/05/04 17:46:38 INFO executor.Executor: Finished task 0.0 in stage 8.0 (TID 8). 4677 bytes result sent to driver 18/05/04 17:46:38 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 8.0 (TID 8) in 67 ms on localhost (1/1) 18/05/04 17:46:38 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool 18/05/04 17:46:38 INFO scheduler.DAGScheduler: ResultStage 8 (show at <console>:24) finished in 0.068 s 18/05/04 17:46:38 INFO scheduler.DAGScheduler: Job 8 finished: show at <console>:24, took 0.083977 s +----------+--------------------+-------+ | id| title| price| +----------+--------------------+-------+ |1022456996|21克 MC001S 白色 移动联...| 239000| |1022456037|21克 MC001S 黑色 移动联...| 239000| |1360740115|21克 MC002A 移动3G智能...| 799000| |1356326372|21克 MC002A 移动3G智能...| 799000| |1449174880|21克 MC002C 移动3G智能...| 399000| |1449174881|21克 MC002C 移动3G智能...| 399000| |1474391935| 990本色(精装图文版)真我非我| 4680| |1351223972|E.XUN C688 移动/联通2...| 206000| |1351223973|E.XUN C688 移动/联通2...| 206000| |1356971398|E.XUN CM288 联通/移动...| 116000| |1355143242|E.XUN E600 移动/联通2...| 129000| |1355143244|E.XUN E600 移动/联通2...| 129000| |1355143243|E.XUN E600 移动/联通2...| 129000| |1353319901|E.XUN X10(骑士十五世) ...|2980000| |1350380919|E.XUN X6+ 电信3G防水三...| 667000| |1350239122|E.XUN X8 电信3G三防天翼...| 848000| |1361746010|E.XUN kufone F1 移...| 499000| |1361746009|E.XUN kufone F1 移...| 518000| |1117472045|EMOTO E868 智能手机移动...| 599000| |1117472044|EMOTO E868 智能手机移动...| 599000| +----------+--------------------+-------+ only showing top 20 rows scala> df.select("id","title","price").orderBy($"title".desc).show 18/05/04 17:47:25 INFO spark.SparkContext: Starting job: show at <console>:24 18/05/04 17:47:25 INFO scheduler.DAGScheduler: Got job 9 (show at <console>:24) with 1 output partitions 18/05/04 17:47:25 INFO scheduler.DAGScheduler: Final stage: ResultStage 9(show at <console>:24) 18/05/04 17:47:25 INFO scheduler.DAGScheduler: Parents of final stage: List() 18/05/04 17:47:25 INFO scheduler.DAGScheduler: Missing parents: List() 18/05/04 17:47:25 INFO scheduler.DAGScheduler: Submitting ResultStage 9 (MapPartitionsRDD[25] at show at <console>:24), which has no missing parents 18/05/04 17:47:25 INFO storage.MemoryStore: ensureFreeSpace(6024) called with curMem=66302, maxMem=555755765 18/05/04 17:47:25 INFO storage.MemoryStore: Block broadcast_9 stored as values in memory (estimated size 5.9 KB, free 529.9 MB) 18/05/04 17:47:25 INFO storage.MemoryStore: ensureFreeSpace(2989) called with curMem=72326, maxMem=555755765 18/05/04 17:47:25 INFO storage.MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 2.9 KB, free 529.9 MB) 18/05/04 17:47:25 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on localhost:33408 (size: 2.9 KB, free: 530.0 MB) 18/05/04 17:47:25 INFO spark.SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:861 18/05/04 17:47:25 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 9 (MapPartitionsRDD[25] at show at <console>:24) 18/05/04 17:47:25 INFO scheduler.TaskSchedulerImpl: Adding task set 9.0 with 1 tasks 18/05/04 17:47:25 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 9.0 (TID 9, localhost, PROCESS_LOCAL, 1929 bytes) 18/05/04 17:47:25 INFO executor.Executor: Running task 0.0 in stage 9.0 (TID 9) 18/05/04 17:47:25 INFO jdbc.JDBCRDD: closed connection 18/05/04 17:47:25 INFO jdbc.JDBCRDD: closed connection 18/05/04 17:47:25 INFO executor.Executor: Finished task 0.0 in stage 9.0 (TID 9). 4444 bytes result sent to driver 18/05/04 17:47:25 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 9.0 (TID 9) in 65 ms on localhost (1/1) 18/05/04 17:47:25 INFO scheduler.DAGScheduler: ResultStage 9 (show at <console>:24) finished in 0.066 s 18/05/04 17:47:25 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool 18/05/04 17:47:25 INFO scheduler.DAGScheduler: Job 9 finished: show at <console>:24, took 0.081259 s +----------+--------------------+-------+ | id| title| price| +----------+--------------------+-------+ |1472741569|魅族 魅蓝 8GB手机 官方标配 ...| 899000| |1472061497|魅族 魅蓝 8GB 移动4G手机 ...| 969000| |1459236505|魅族 MX4 银白 移动4G版(3...|2199000| |1459236504|魅族 MX4 银白 移动4G版(1...|1969000| |1453600783|魅族 MX4 灰色 联通4G版(3...|2129000| |1459236502|魅族 MX4 灰色 联通4G版(3...|1998000| |1453600782|魅族 MX4 灰色 联通4G版(1...|1919000| |1459236499|魅族 MX4 灰色 联通4G版(1...|1868000| |1459236500|魅族 MX4 灰色 移动4G版(1...|1868000| |1453600780|魅族 MX4 灰色 移动4G版(1...|1949000| |1457413534|魅族 MX4 前黑后白 联通4G版...|2159000| |1457413531|魅族 MX4 前黑后白 联通4G版...|1949000| |1459236511|魅族 MX4 前黑后白 联通4G版...|1869000| |1459236513|魅族 MX4 前黑后白 移动4G版...|2099000| |1457413532|魅族 MX4 前黑后白 移动4G版...|1959000| | 1279453|魅族 MX4 Pro 16GB 灰...|2199000| | 1279804|魅族 MX4 Pro 16GB 灰...|2199000| |1361179509|首信(CAPITEL) S888 ...| 228000| |1361179508|首信(CAPITEL) S888 ...| 228000| |1166661935|首信CAPITEL S900 GS...| 298000| +----------+--------------------+-------+ only showing top 20 rows3.groupBygroupBy("列名", ...).max(列名) 求最大值
groupBy("列名", ...).min(列名) 求最小值
groupBy("列名", ...).avg(列名) 求平均值
groupBy("列名", ...).sum(列名) 求和
groupBy("列名", ...).count() 求个数
groupBy("列名", ...).agg 可以将多个方法进行聚合:
scala> sc.parallelize(List((1,"tony","bj"),(2,"tina","xa"),(3,"hellen","xa"))) res23: org.apache.spark.rdd.RDD[(Int, String, String)] = ParallelCollectionRDD[30] at parallelize at <console>:22 scala> res23.toDF("id","name","address") res24: org.apache.spark.sql.DataFrame = [id: int, name: string, address: string] scala> res24.groupBy("address").count() res27: org.apache.spark.sql.DataFrame = [address: string, count: bigint] scala> res27.show +-------+-----+ |address|count| +-------+-----+ | bj| 1| | xa| 2| +-------+-----+4.链接join
scala> val dept=sc.parallelize(List((100,"财务部"),(200,"研发部"))).toDF("deptid","deptname") dept: org.apache.spark.sql.DataFrame = [deptid: int, deptname: string] scala> val emp=sc.parallelize(List((1,100,"张财务"),(2,100,"李会计"),(3,200,"王艳发"))).toDF("id","did","name") emp: org.apache.spark.sql.DataFrame = [id: int, did: int, name: string] scala> scala> dept.join(emp, $"deptid" === $"did", "left").show +------+--------+---+---+----+ |deptid|deptname| id|did|name| +------+--------+---+---+----+ | 100| 财务部| 1|100| 张财务| | 100| 财务部| 2|100| 李会计| | 200| 研发部| 3|200| 王艳发| +------+--------+---+---+----+ scala> dept.join(emp, $"deptid" === $"did", "inner").show +------+--------+---+---+----+ |deptid|deptname| id|did|name| +------+--------+---+---+----+ | 100| 财务部| 1|100| 张财务| | 100| 财务部| 2|100| 李会计| | 200| 研发部| 3|200| 王艳发| +------+--------+---+---+----+5.进行计算
scala> dept.select($"deptid"+1).show +------------+ |(deptid + 1)| +------------+ | 101| | 201| +------------+6.获取数组中的某个值
scala> val df = sc.parallelize(List(("tony",Array(1,2,3)))).toDF("name","fn") df: org.apache.spark.sql.DataFrame = [name: string, fn: array<int>] scala> df.selectExpr("fn[0]").show +-----+ |fn[0]| +-----+ | 1| +-----+7.对象关联
user.json文件 {"name":"陈晨","address":{"city":"西安","street":"南二环甲字1号"}} {"name":"娜娜","address":{"city":"西安","street":"南二环甲字2号"}} scala> val df = sqlContext.read.json("/root/user.json") df: org.apache.spark.sql.DataFrame = [address: struct<city:string,street:string>, name: string] scala> df.printSchema root |-- address: struct (nullable = true) | |-- city: string (nullable = true) | |-- street: string (nullable = true) |-- name: string (nullable = true) scala> df.select("address.city").show +----+ |city| +----+ | 西安| | 西安| +----+ scala> df.select("address.city","address.street","name").show +----+-------+----+ |city| street|name| +----+-------+----+ | 西安|南二环甲字1号| 陈晨| | 西安|南二环甲字2号| 娜娜| +----+-------+----+8.count
df.count
9.获取一行记录
scala> val row = df.first() #获取第一行记录 row: org.apache.spark.sql.Row = [[西安,南二环甲字1号],陈晨] scala> row.getString(1) #取得第一个列的值,下标从0开始 res65: String = 陈晨10.collect
将数据收集到Driver端。
一般只在测试时使用,习惯使用show替代。
collect是将数据变为spark对象 scala> val row = df.first() #获取第一行记录 row: org.apache.spark.sql.Row = [[西安,南二环甲字1号],陈晨] scala> row.getString(1) #取得第一个列的值,下标从0开始 res65: String = 陈晨1.查询所有记录
bin/spark-shell --master=local --driver-class-path=mysql-connector-java-5.1.28-bin.jar scala> val df = sc.parallelize(List((1,"tony"),(2,"hellen"))).toDF("id","name") df: org.apache.spark.sql.DataFrame = [id: int, name: string] scala> df.select("id","name").show +---+------+ | id| name| +---+------+ | 1| tony| | 2|hellen| +---+------+ scala> df.registerTempTable("people")//将DF结果res2转换注册一个临时表 scala> sqlContext.sql("select * from people").show +---+------+ | id| name| +---+------+ | 1| tony| | 2|hellen| +---+------+2.分组,获取学生表中每个学生的最高成绩
scala> sc.makeRDD(List( (1,88),(2,77),(2,87),(1,99),(2,60),(1,97))).toDF("id","score") res19: org.apache.spark.sql.DataFrame = [id: int, score: int] scala> res19.registerTempTable("student") scala> sqlContext.sql("select * from student").show +---+-----+ | id|score| +---+-----+ | 1| 88| | 2| 77| | 2| 87| | 1| 99| | 2| 60| | 1| 97| +---+-----+ scala> sqlContext.sql("select id,max(score) from student group by id").show +---+---+ | id|_c1| +---+---+ | 1| 99| | 2| 87| +---+---+3.排序获取前三名
scala> sqlContext.sql("select * from student order by score desc limit 3").show +---+-----+ | id|score| +---+-----+ | 1| 99| | 1| 97| | 1| 88| +---+-----+4.链接join
scala> val dept=sc.parallelize(List((100,"财务部"),(200,"研发部"))).toDF("deptid","deptname") dept: org.apache.spark.sql.DataFrame = [deptid: int, deptname: string] scala> val emp=sc.parallelize(List((1,100,"张财务"),(2,100,"李会计"),(3,200,"王艳发"))).toDF("id","did","name") emp: org.apache.spark.sql.DataFrame = [id: int, did: int, name: string] scala> dept.registerTempTable("dept") scala> emp.registerTempTable("emp") scala> sqlContext.sql("select * from emp e inner join dept d on e.did=d.deptid").show +---+---+----+------+--------+ | id|did|name|deptid|deptname| +---+---+----+------+--------+ | 1|100| 张财务| 100| 财务部| | 2|100| 李会计| 100| 财务部| | 3|200| 王艳发| 200| 研发部| +---+---+----+------+--------+ scala> sqlContext.sql("select * from emp e left join dept d on e.did=d.deptid").show +---+---+----+------+--------+ | id|did|name|deptid|deptname| +---+---+----+------+--------+ | 1|100| 张财务| 100| 财务部| | 2|100| 李会计| 100| 财务部| | 3|200| 王艳发| 200| 研发部| +---+---+----+------+--------+5.查看有哪些表
scala> val teacher = sc.parallelize(List((1,"陈"),(2,"长"),(3,"赵")))toDF("id","name") teacher: org.apache.spark.sql.DataFrame = [id: int, name: string] scala> teacher.saveAsTable("teacher") warning: there were 1 deprecation warning(s); re-run with -deprecation for details [Stage 31:> (0 + 5) / 5]SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 16/05/28 21:59:59 WARN HiveMetaStore: Location: file:/user/hive/warehouse/teacher specified for non-external table:teacher scala> sqlContext.sql("show tables").show +---------+-----------+ |tableName|isTemporary| +---------+-----------+ | dept| true| | emp| true| | people| true| | student| true| | teacher| false| +---------+-----------+isTemporary表示是否是临时表。临时表的意思只要应用程序退出了这些表就消失了。如sparkshell退出,这些表就消失了。
6.查看表结构
scala> sqlContext.sql("desc teacher").show +--------+---------+-------+ |col_name|data_type|comment| +--------+---------+-------+ | id| int| | | name| string| | +--------+---------+-------+7.类似hive的插入表和查询
val sc = new SparkContext(new SparkConf().setAppName("HiveAPP")) val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") val df5 = sqlContext.sql("select key,value from src")sql方法运行查询临时表或真实表的结果类型是dataFrame类型
8.wordcount
(1)
scala> sc.textFile("/root/words.txt").flatMap{ _.split("\\s+") } .toDF("word").registerTempTable("words") scala> sqlContext.sql("select * from words").show +------+ | word| +------+ | big| | data| | big| | spark| | scla| | spark| |hadoop| |hadoop| | hive| | spark| +------+ scala> sqlContext.sql("select word,count(*) c from words group by word").show +------+---+ | word| c| +------+---+ | hive| 1| | scla| 1| | spark| 3| | big| 2| | data| 1| |hadoop| 2| +------+---+(2)简洁语句:
之前scala需要4、5行代码,还很难理解,而这里2行代码就实现了。
sc.textFile("/root/words.txt").flatMap{_.split("\\s+")} .toDF("word").registerTempTable("words") sqlContext.sql("select word,count(*) c from words group by word").show注意,返回不是所有,show默认返回前20行
去掉特殊的单词,分组,排序,前十名
sqlContext.sql("select word,count(*) countnum from words where word not in('the','of','to','and','a') group by word order by countnum desc limit 10").show(3)热点单词,前三名
scala> sqlContext.sql("select word,count(*) c from words group by word order by c desc limit 3").show +------+---+ | word| c| +------+---+ | spark| 3| | big| 2| |hadoop| 2| +------+---+(4)黑郁金香小说中出现最多的单词
scala> sc.textFile("/root/Tulip.txt").flatMap{ _.split("\\s") } .toDF("word").registerTempTable("words") scala> sqlContext.sql("select word,count(*) c from words group by word order by c desc limit 10").show +----+----+ |word| c| +----+----+ | the|3813| | of|2018| | to|1722| | and|1231| | a| 983| | his| 934| | in| 799| | he| 629| | was| 586| |with| 529| +----+----+java实现统计:
package com.liming.utils; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; public class WordCount { private static String path_src = "c:/output.txt"; private static String path_result = "c:/result.txt"; private static BufferedReader br = null; private static BufferedWriter bw = null; private static String line_current = null; private static String[] words = null; private static List<String> word_list = new ArrayList<String>(); public static void main(String[] args) { Long startTime = System.currentTimeMillis(); File file = new File(path_src); if (!file.exists()) { System.out.println("file " + file + " is not existed, exit"); return; } try { br = new BufferedReader(new FileReader(file.getPath())); line_current = br.readLine(); while (line_current != null) { words = line_current.split(" |,|\\."); for (String s : words) { if (!s.equals("")) word_list.add(s); } line_current = br.readLine(); } for (String temp : word_list) { System.out.println(temp); } // HashSet Set<String> hashSet = new HashSet<String>(word_list); for (String str : hashSet) { System.out.println("word: " + str + ", occur times: " + Collections.frequency(word_list, str)); } // HashMap Map<String, Integer> hashMap = new HashMap<String, Integer>(); for (String temp : word_list) { Integer count = hashMap.get(temp); hashMap.put(temp, (count == null) ? 1 : count + 1); } // TreeMap TreeMap<String, Integer> treeMap = new TreeMap<String, Integer>( hashMap); // Record result to another file printMap(treeMap); System.out.println("执行耗时:"+(System.currentTimeMillis()-startTime)/1000+" 秒"); } catch (IOException e) { e.printStackTrace(); } finally { closeInputStream(br); closeOutputStream(bw); } } public static void printMap(Map<String, Integer> map) throws IOException { bw = new BufferedWriter(new FileWriter(path_result)); Set<String> keys = map.keySet(); for (String s : keys) { System.out.println("word: " + s + ", times: " + map.get(s)); writeResult("word: " + s + ", times: " + map.get(s)); } for (Map.Entry<String, Integer> entry : map.entrySet()) { System.out.println("word: " + entry.getKey() + ", number : " + entry.getValue()); writeResult("word: " + entry.getKey() + ", number : " + entry.getValue()); } } public static void writeResult(String line) throws IOException { try { if (bw != null) { bw.write(line); bw.newLine(); bw.flush(); } } catch (IOException e) { e.printStackTrace(); closeOutputStream(bw); } } public static void closeOutputStream(BufferedWriter writer) { try { if (writer != null) { writer.close(); } } catch (IOException e) { e.printStackTrace(); } } public static void closeInputStream(BufferedReader reader) { try { if (reader != null) { reader.close(); } } catch (IOException e) { e.printStackTrace(); } } }结果:
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.<init>(String.java:203) at java.lang.String.substring(String.java:1913) at java.lang.String.subSequence(String.java:1946) at java.util.regex.Pattern.split(Pattern.java:1202) at java.lang.String.split(String.java:2313) at java.lang.String.split(String.java:2355) Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main"可以看出SparkSQL直接执行,很简单。它和传统数据库有质的不同。它是分布式的,支持海量数据,这是关系型数据望尘莫及的。
9.设置参数
默认情况下SparkSQL在执行join或聚合函数时,shuffle所使用的partition数量为200。
可以看到在join时,命令行提示的就能看到partition为200。如果想修改这个参数,可以使用:
sqlContext.setConf("spark.sql.shuffle.partitions","2")发现设置2时,发现task只有一个。设置3个task有两个。如果shuffle产生的task过多了,要减少数量时,就可以配置这个参数。
10.执行sparkSqlJob
//构建SparkContext对象,主要要使用new调用构造方法,否则就变成使用样例类的Apply方法了 val sc = new SparkContext() //构建SQLContext对象 val sqlContext = new SQLContext(sc) //将json数据创建成df对象 val df = sqlContext.read.json("file:///root/user.json") //和spark-shell中格式不同 $("name")改成df("name")形式 df.select("name", "address").where(df("name")==="陈晨").show中文有乱码问题,所有如果含有中文修改文件类型为utf-8格式
不能把所有的文件都按utf-8格式来存放,
如果有中文,使用utf-8,
但是如果都是英文还是使用ascci编码。
因为,utf-8编码文件长度远远大于常规文件长度(u8扩展了整个长度,英文的assic码是8位,而中文需要的assic码是3-4个字节,如果英文用u8就将长度扩大了4倍,长度过长了,所以英文一般使用ascci编码),在海量数据中,物理节点之间传递。如果能保持因为就尽量保持英文。
