本文是读耿嘉安先生的《Spark内核设计的艺术:架构设计与实现》的读书笔记; 书中代码实现讲得少一些,更多的是对Spark内核架构的分析与解读 书讲得比较深看实在看不去,就一个字一个字地打出来,甚至图 都再画一遍。 打完之后再回过头来温习两遍,将书中比较有价值的、重要的东西梳理一下形成此文
在Spark中很多地方都涉及网络通信,比如Spark各个组件间的消息互通、用户文件与Jar包的上传,节点间的Shuffle过程,Block数据的复制与备份等。 在Spark0.x.x与Spark1.x.x版本中,组件间的消息通信主要借助于Akka,使用Akka可以轻松地构建强有力的高并发与分布式应用。 但是Akka在Spark2.0.0版本中被移除了,Spark官网文档对此的描述为:"Akka的依赖被移除了,因此用户可以使用任何版本的Akka来编程了。” Spark团队的决策者或许认为对Akka具体版本的依赖,限制了用户对Akka不同版本的使用。 在Spark 1.x.x版本中,用户文件与Jar包的上传采用了由Jetty实现的HttpFileServer,但在Spark2.0.0版本中它也被废弃了,现在使用的是基于Spark内置RPC框架的NettyStreamManager。节点间的Shuffle过程和Block数据的复制与备份这两个部分在Spark2.0.0版本中依然沿用了Netty,通过对接口和程序进行重新设计,将各个组件间的消息互通、用户文件与Jar包的上传等内容统一纳入Spark的RPC框架体系中。
RPC框架的基本架构:
TransportContext内部包含传输上下文的配置信息TransportConf和 对客户端请求消息进行处理的RpcHandler。TransportConf在创建TransportClientFactory和TransportServer时都是必须的,而RpcHandler只用于创建TransportServer。TransportClientFactory是RPC客户端的工厂类。TransportServer是RPC服务端的实现。 图 中记号1的含义: 通过调用TransportContext的createClientFactory方法创建传输客户端工厂TransportClientFactory的实例。在构造TransportClientFactory的实例时,还会传递客户端引导程序TransportClientBootstrap的列表。此外,TransportClientFactory内部还存在针对每个Socket地址的连接池ClientPool。这个连接池缓存的定义如下:
private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;ClientPool的类型定义如下:
private static class ClientPool { TransportClient[] clients; Object[] locks; ClientPool(int size){ clients = new TransportClient[size]; locks = new Object[size]; for(int i = 0; i < size; i++){ locks[i] = new Object(); } } }由此可见,ClientPool实际是由TransportClient的数组构成的,而Locks数组中的Obejct与Clients数组中的TransportClient按照数组索引一一对应,通过对每个TransportClient分别采用不同的锁,降低并发情况下线程间对锁的争用,进而减少阻塞,提高并发度。
记号2表示通过调用TransportContext的createServer方法创建传输服务端TransportServer的实例。在构造TransportServer的实例时,需要传递TransportContext、host、port、RpacHandler及服务端引导程序TransportServerBootstrap的列表。
上面介绍了Spark内置RPC框架的基本架构,下面正式介绍一下Spark的RPC框架所包含的各个组件:
TransportContext 传输上下文,包含了用于创建传输服务端(TransportServer)和传输客户端工厂(TransportClientFactory)的上下文信息,并支持使用TransportChannelHandler设置Netty提供的SocketChannel的Pipeline的实现。
TransportConf 传输上下文的配置信息
RpcHandler 对调用传输客户端(TransportClient)的sendRPC方法发送的消息进行处理的程序。
MessageEncoder 在将消息放入管道前,先对消息内容进行编码,防止管道另一端读取时丢包和解析错误。
MessageDecoder 对从管道中读取的ByteBuf进行解析,防止丢包和解析错误。
TransportFrameDecoder 对从管道中读取的ByteBuf按照数据帧进行解析
RpcResponseCallback RpcHandler对请求的消息处理完毕后进行回调的接口。
TransportClientFactory 创建TransportClient的传输客户端工厂类。
ClientPool 在两个对等节点间维护的关于TransportClient的池子。ClientPool是TransportClientFactory的内部组件。
TransportClient RPC框架的客户端,用于获取预先协商好的流中连续块。TransportClient旨在允许有效传输大量数据,这些数据将被拆分成几百KB到几MB的块。当TransportClient处理从流中获取的块时,实际的设置是在传输层之外完成的。sendRPC方法能够在客户端和服务端的同一水平线的通信进行这些设置。
TransportClientBootstrap 当服务端响应客户端连接时在客户端进行一次的引导程序。
TransportRequestHandler 用于处理客户端的请求并在写完块数据后返回的处理程序。
TransportResponseHandler 用于处理服务端的响应,并且对发出请求的客户端进行响应的处理程序。
TransportChannelHandler 代理由TransportRequestHandler处理的请求和 由TransportResponseHandler处理的响应,并加入传输层的处理。
TransportServerBootstrap 当客户端连接到服务端执行一次的引导程序。
TransportServer RPC框架的服务端,提供高效的、低级别的流服务。
为什么需要MessageEncoder和MessageDecoder? 因为在基于流的传输里(比如TCP/IP),接收到的数据首先会被存储到一个Socket接收缓冲里。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列,即使你发送了2个独立的数据包,操作系统也不会作为 2个消息处理,而仅仅认为是一连串的字节,因此不能保证远程写入的数据会被准确地读取,举个例子,我们假设操作系统的TCP/IP协议栈已经接收了3个数据包:ABC、DEF、GHI。由于基于流传输的的这种统一的性质,你的应用程序在读取数据的时候有很大的可能性被分成下面的片段:AB、CDEFG、H、I。因此,接收方不管是客户端还是服务端都应该把接收到的数据整理成一个或者多个更有意义并且让程序的逻辑更好理解的数据。
