Spark RDD深入理解

xiaoxiao2021-02-28  49

通过RDDscala源码来解读什么是RDD Resilient Distributed DatasetRDD通过RDDscala中的注释来解读RDD的概念 RDD五大特性RDD五大特性在源码中的体现 分析RDDscala分析JdbcRDDscala 分析getPartitions方法分析compute方法 图解RDD


通过RDD.scala源码来解读什么是RDD

Spark Github: https://github.com/apache/spark RDD源码: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala

让开发者大大降低了开发分布式应用程序的门槛及提高执行效率

Resilient Distributed Dataset(RDD)

弹性 体现在计算之上; 它代表的是Spark在分布式计算的时候可以进行容错,比如说:数据丢失了,可以通过RDD的容错机制,来进行修复; 因此准确的说,弹性在计算层面体现的尤为明显分布式 数据归属在一个大的集群之上,可以拆开的存在每个机器上; 数据分布在不同的节点上 计算的时候也是一样,代码可以运行在多个节点之上数据集 可以通过不同的方式进行创建 会被拆分成很多分区,进行计算

通过RDD.scala中的注释来解读RDD的概念

the basic abstraction in Spark 在Spark中最基本的抽象单元 在Spark每个框架都应该有个抽象的编程模型

在Spark Core里是基于RDD进行编程在Spark Streaming里是基于DStream进行编程在Spark SQL里是基于SQLContext/HiveContext类似的进行编程

Represents an immutable,partitioned collection of elements that can be operated on in parallel RDD代表了不可变的(scala中不可变额用val进行修饰) 这也就意味着RDD一旦产生,就没有办法改变 举例: RDDA ==各种操作==> RDDB RDDA与RDDB都是独一无二的,因为它们是不可变的,可以通过一系列的操作转换得到

集合里的元素能够被拆成各个分区: 这句话也可以理解为HDFS中的Block或者MapReduce中的InputSplit; 集合中的元素被拆成各个分区之后,数据集能够以一种并行的方式进行操作 举例: 对RDDA执行”+1”的操作,实质上是对RDDA内的所有分区(在不同机器上的)都执行了相同的操作

我们通过RDD.scala源码可以发现对RDD的定义如下:

抽象类 代表RDD不能直接使用,这就好比在Java中抽象类不能直接new出来使用一样 因此RDD必然是有子类实现的,我们使用时直接使用其子类即可Serializable(序列化) 不序列化,提交上个区会报task未被序列化的错误(Spark开发中常见的错误) 因此分布式框架中序列化的好坏直接影响了性能的好坏Logging 在Spark 1.6.x版本中是可以直接使用的 在2.x版本中,是被移走了T(泛型) 意味着可以支持各种数据类型 也说明了RDD中存储的数据类型是不确定的SparkContext@transient

考察JdbcRDD.scala的定义: 继承了RDD,直接使用RDD是不能使用的,需要去实现该抽象类

RDD五大特性

每一个RDD都会有如下的五个特点:

A list of partitionsA function for computing each split(split理解为partition) 对RDD做的操作,即对每个分区做一个相同的functionA list of dependencies on other RDDs RDD之间有相互的依赖关系 RDDA ==> RDDB ==> RDDC ==> RDDD RDD中又有多个partition构成,那么如何体现RDD的弹性呢? 举例: RDDA中的一个partition挂掉了,把数据重新加载一下,就行了 如果RDDC中的一个partition挂掉了,倘若RDDB与RDDC之间的关系是窄依赖,那么直接通过一个转换操作就过来了 因此,当某个partition里面的数据丢失以后,Spark可以通过RDD之间的依赖关系,重新计算partition的分区数据; 【注意】不是对RDD中所有的partition的数据进行重新计算,而是通过计算出了问题的partition就可以了(前提是一个窄依赖关系)Optionally, a Partitioner for key-value RDDs (e.g. to say that RDD is hash-partitioned) 【可选】 K-V类型RDD的partition(比如hash-partitioned)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 【可选】 对每一个分片机芯计算的时候,会有preferred locations 这个特性,体现了数据本地性

preferred locations: 数据在哪,就把task分配到数据所在地去运行(这样就可以大程度的避免数据的移动); 最后的s是副本的概念; 根据e.g. block locations for an HDFS file锁具的案例:HDFS在存放的时候会有多个Block,每个Block又会有它的副本,因此多出来的s就是副本的意思; 那么我们在计算的时候,对于一个split来说,每个Block又会有多个preferred location (因为对于1个Block来说副本会有多个,因此split的最佳位置也可能有多个,因为副本的大小是一样的,只是所在的机器位置不同而已)

移动计算而不是移动数据,数据在哪里,就优先把作业调度到哪里去执行,这样就能减少数据的读取、网络的传输,提升作业的执行时间。 数据存储的时候一定要切割,以多副本的形式进行存储:

一方面可以便于存储,可以进行容错另一方面在计算的时候,也是要进行切分的(默认按照blocksize的大小进行切分,当然也可以自己指定split的大小)

关于切片split的理解,可以参考Blog: [考究Hadoop中split的计算方法] http://blog.csdn.net/lemonzhaotao/article/details/77538289 注意:切片是指对文件进行切分,而Block也是指对文件进行切分 因此对移动计算的理解为: 分散在多台机器的某个大小为300的文件的Block大小分别为:128、128、44 副本数为3个; 切片大小为100,因此split作为输入分片的大小,但是在某个机器的时候,会发现数据不够,因此必然会涉及到数据的移动; 切片是指对要计算的文件进行切分,因而不管怎么设置该大小,肯定会在某个机器上遇到该文件的Block的数据不够作为分片输入task的情况,需要去其它机器上移动些数据过来,才能完成该台计算机计算任务的情况。 每个split作为task的输入,倘若需要组成该split的block在不同机器上,就必然会涉及到数据的传输(spark中的split可以理解为partition,MapReduce中也有split)

关于Spark数据本地性的理解: 分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总会存在移动数据的情况,除非是在集群的所有节点上都保存了数据的副本。(举个例子:如果要运算的数据在该节点没有或者是少了点量,就需要去其它节点拉数据了); 移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率; 为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是设置合理的数据副本; 设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值

总结 Spark的五大特性,完美诠释了Resilient Distributed Dataset

RDD五大特性在源码中的体现

分析RDD.scala

分析源码的地址: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala 对应第二点: 对RDD做计算,实质就是对分区做计算;传入了split,类型为Partition

对应第一点: 一个RDD由一系列的partition组成;因此传入了一个Array[]数组,泛型的类型为Partition

对应第三点: RDD之间有相互的依赖关系;因此需要得到Dependencies

对应第五点: 要计算一个Partition类型的split的时候,必然是需要得到它的 PreferredLocations,要知道它到底在哪个位置;因此最终返回的是一个Seq序列(类似于数组或者集合的东西) Seq[String]可以理解为 所对应的一个路径

对应第四点: key-value形式的partition

分析JdbcRDD.scala

分析源码的地址: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala

分析getPartitions方法

获取到分区信息/有起始和结束的位置/每一个分区处理了多少数据 假设有10个分区和1W条数据,1个粪污1000条数据,从第0个分区开始 第0个必然是前面1000条, 第1个必然是1000到1999条 …以此类推 对每个分区做一个map操作,通过开始和结束的位置结合起来 ==> 来构建出JdbcPartition 返回的是一个Partition数组,得到了一系列的Partition信息

分析compute方法

图解RDD

5个分区,3台机器

partition 有可能存在DISK上 有可能存在MEMORY上 有可能既存在DISK上又存在MEMORY上 有可能存多份 在Spark中,计算时有多少个partition就会对应有多少个task来执行 这里有5个partition,就会有5个task来执行

【注意】

如果core够的话,就是并行执行如果core不够的话,肯定就是先跑一轮,再跑一轮…以此类推,跑完为止

数据本地性 如果在NODE2上运行partition1肯定就涉及到了数据的传输,会影响作业的执行速度 partition1在NODE1上运行肯定最好(体现了数据的本地性)

转载请注明原文地址: https://www.6miu.com/read-2621894.html

最新回复(0)