spark 教程4 (dataset 基础操作)

xiaoxiao2021-02-28  18


1 配置好maven依赖,依赖如下,注意spark的依赖要选择对应scal版本的依赖 <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.1</version> </dependency> </dependencies> 2 新建一个scalaObject,在代码里面初始化好环境,初始代码如下 package com import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object TestDateSet01 { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "hdfs") val conf = new SparkConf().setAppName("testDataSet").setMaster("local") val sparkSession = SparkSession.builder().config(conf).getOrCreate() } } 这样我们就可以用本地模式运行代码,并且已经得到了SparkSession对象,接下来可以创建dataset了 3 从一个简单的dataset开始 我们在初始代码的main里面追如下代码并运行 val rangeDs = sparkSession.range(50,55) rangeDs.describe().show 运行得到如下结果: +---+ | id| +---+ | 50| | 51| | 52| | 53| | 54| +---+ (这是我们通过range方法得到的dataset rangeDs) +-------+------------------+ |summary| id| +-------+------------------+ | count| 5| | mean| 52.0| | stddev|1.5811388300841898| | min| 50| | max| 54| +-------+------------------+ (这是我们的dataset rangeDs的描述信息,包含平均值、标准偏差等) 4 对象转成dataset 我们在初始代码的main方法里面追加如下代码 import sparkSession.implicits._ val seq = Seq(Person("lily",33,0),Person("jack",22,1)) sparkSession.createDataset(seq).show() 并在main方法的外面加上如下代码 case class Person(name:String,age:Int,gender:Int) 运行得到如下结果 +----+---+------+ |name|age|gender| +----+---+------+ |lily| 33| 0| |jack| 22| 1| +----+---+------+ 4.wordcount例子(读取文件到dataset、清洗标点符号、单词次数排序、持久化到文件) 我们要统计的wordcount文件内容如下,可以复制一下内容到E:\wordcount文件中: A Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each Dataset also has an untyped view called a DataFrame, which is a Dataset of Row. Operations available on Datasets are divided into transformations and actions. Transformations are the ones that produce new Datasets, and actions are the ones that trigger computation and return results. Example transformations include map, filter, select, and aggregate (groupBy). Example actions count, show, or writing data out to file systems. Datasets are "lazy", i.e. computations are only triggered when an action is invoked. Internally, a Dataset represents a logical plan that describes the computation required to produce the data. When an action is invoked, Spark's query optimizer optimizes the logical plan and generates a physical plan for efficient execution in a parallel and distributed manner. To explore the logical plan as well as optimized physical plan, use the explain function. To efficiently support domain-specific objects, an Encoder is required. The encoder maps the domain specific type T to Spark's internal type system. For example, given a class Person with two fields, name (string) and age (int), an encoder is used to tell Spark to generate code at runtime to serialize the Person object into a binary structure. This binary structure often has much lower memory footprint as well as are optimized for efficiency in data processing (e.g. in a columnar format). To understand the internal binary representation for data, use the schema function. There are typically two ways to create a Dataset. The most common way is by pointing Spark to some files on storage systems, using the read function available on a SparkSession. val people ="...").as[Person] // Scala Dataset<Person> people ="...").as(Encoders.bean(Person.class)); // Java Datasets can also be created through transformations available on existing Datasets. For example, the following creates a new Dataset by applying a filter on the existing one: val names = // in Scala; names is a Dataset[String] Dataset<String> names = p) ->, Encoders.STRING)); // in Java 8 Dataset operations can also be untyped, through various domain-specific-language (DSL) functions defined in: Dataset (this class), Column, and functions. These operations are very similar to the operations available in the data frame abstraction in R or Python. To select a column from the Dataset, use apply method in Scala and col in Java. val ageCol = people("age") // in Scala Column ageCol = people.col("age"); // in Java Note that the Column type can also be manipulated through its various functions. // The following creates a new column that increases everybody's age by 10. people("age") + 10 // in Scala people.col("age").plus(10); // in Java A more concrete example in Scala: // To create Dataset[Row] using SparkSession val people ="...") val department ="...") people.filter("age > 30") .join(department, people("deptId") === department("id")) .groupBy(department("name"), "gender") .agg(avg(people("salary")), max(people("age"))) and in Java: // To create Dataset<Row> using SparkSession Dataset<Row> people ="..."); Dataset<Row> department ="..."); people.filter("age".gt(30)) .join(department, people.col("deptId").equalTo(department("id"))) .groupBy(department.col("name"), "gender") .agg(avg(people.col("salary")), max(people.col("age"))); 然后我们在初始代码的main方法里面追加如下代码 import sparkSession.implicits._ //定义符号集合(需要过滤这些无意义的符号) val punctuations = """ ,."|{}:“”!/()= """.stripMargin.toCharArray //wordcount文件初始dataset val wcDs ="E:\\wordcount.txt").as[String] //过滤标点并把行切分成单词 .flatMap(_.split(punctuations)).flatMap(_.split("\\s+")) //数据清洗,主要是转换成小写 .map(_.trim().toLowerCase()).filter(_!="") //统计每个单词出现次数 .groupByKey(v=>v).count() //根据次数排序,取前50 .orderBy(org.apache.spark.sql.functions.col("count(1)").desc).limit(10) //持久化到文件 .write.csv("E:\\result")