Spark2.0 新增 Structured Streaming,它是基于 SparkSQL 构建的可扩展和容错的流式数据处理引擎,使得实时流式数据计算可以和离线计算采用相同的处理方式(DataFrame&SQL)。本节课将介绍 Structured Streaming,并演示简单的 WordCount。
Spark 教程:https://www.shiyanlou.com/courses/?course_type=all&tag=Spark&fee=all
spark-2.1.0-bin-hadoop2.6
Xfce 终端
本课程属于初级难度级别,适合具有 scala 基础的用户,如果对 Spark1.x Streaming 了解能够更好的上手本课程。
因为用的本地的环境,可以不用启动任何进程也可以完成本节实验。
Structured Streaming 顾名思义,它将数据源和计算结果都映射成一张”结构化”的表,在计算的时候以结构化的方式去操作数据流,大大方便和提高了数据开发的效率。
首先让我们来看一下 spark1.x 的 Streaming 原理,如下图:
用 Spark Streaming 每次只能消费当前批次内的数据,当然可以通过 Window 操作,消费过去一段时间(多个批次)内的数据。举个简例子,需要每隔10秒,统计当前小时的 Pv 和 Uv,在数据量特别大的情况下,使用 Window 操作并不是很好的选择,我们通常是借助其它分布式数据库如 Redis,Hive 等完成数据统计。
Structured Streaming 的核心理念,将数据源和计算结果都看做是无限大的表,数据源中每个批次的数据,经过计算,都添加到结果表中作为行。这种模型跟其他很多流式计算引擎都不同,它不需要开发人员自己来维护新数据与历史数据的整合并进行聚合操作,不需要去考虑和实现容错机制、数据一致性的语义等。
Structured Streaming 设计将source、sink 和 execution engine 来追踪计算处理的进度,这样就可以在任何一个步骤出现失败时自动重试。
Spark 基于 checkpoint 和 wal 来持久化保存每个 trigger interval 内处理的 offset 的范围,同样的一批数据,无论多少次去更新 sink,都会保持一致和相同的状态,保证完整的一次且仅一次的语义。
我们可以定义每次结果表中的数据更新时,以何种方式,将哪些数据写入外部存储。我们有多种模式的 output:
complete modecomplete mode,被更新后的整个结果表中的数据,都会被写入外部存储。具体如何写入,是根据不同的外部存储自身来决定的。
append mode只有最近一次 trigger 之后,新增加到 result table 中的数据,会被写入外部存储。只有当我们确定,result table 中已有的数据是肯定不会被改变时,才应该使用 append mode。
update mode只有最近一次 trigger 之后,result table 中被更新的数据,包括增加的和修改的,会被写入外部存储中。spark 2.0中还不支持这种 mode。这种 mode 和 complete mode 不同,没有改变的数据是不会写入外部存储的。
#导入依赖 import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession import spark.implicits._
#创建SparkSession入口 val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate() #创建DataFrame,指定格式,主机,端口号,这里设置为本地 val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load() #切分单词,聚合统计 val words = lines.as[String].flatMap(_.split(" ")) val wordCounts = words.groupBy("value").count()
#输出模式 complete val query = wordCounts.writeStream.outputMode("complete").format("console").start() #等待完成 query.awaitTermination() #回车
回到 spark-shell 终端中显示:
继续回到 NetCat 终端,输入 "hello spark"
回到 spark-shell 终端中显示:
可以看到这里 "hello" 出现两次,即Batch 0处理完的数据,也包含在Batch 1,因为我们指定了输出模式 complete mode。每个 Batch 显示的结果,都是完整的 WordCount 统计结果。
为了更好的演示 complete mode与append mode 的区别,按住Ctrl + C 结束 NetCat 与 spark-shell终端,只需修改设置模式为 append 依次重复步骤1,2,3节,
注意: 一定要先启动 NetCat,再启动 spark-shell。
import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate() import spark.implicits._ val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 8888).load() val words = lines.as[String].flatMap(_.split(" ")) #append mode 不支持 groupBy聚合操作 val query = words.writeStream.outputMode("append").format("console").start() query.awaitTermination()
返回 spark-shell 终端:
可以看到 Batch 0 处理完的数据 "world" 并没有在 Batch 2 出现。
双击打开浏览器,输入:localhost:4040 可以看到当前已经完成的 Jobs 点击去会有更详细的执行过程等。
本节课主要介绍了 Structured Streaming 理论概念,在此基础上,进行一个简单的 WordCount 实验,希望学完本节课能助你了解 Structured Streaming ,带您入门。