Spark-源码学习-SparkCore-存储服务-内存-存储内存池 StorageMemoryPool
一、概述存储内存池主要用于 RDD 的缓存,广播以及备份中。不像执行内存池需要维护每个 Task 的内存占用情况,存储内存池只提供了一个 _memoryUsed 的变量来进行当前内存的使用情况。
二、实现2.1. 结构2.1.1. 属性
_memoryUsed: 已经使用的内存大小
_memoryStore: 当前 StorageMemoryPool 所关联的 MemoryStore
三、内存管理3.1. 申请内存$acquireMemoryacquireMemory()$ 用于给 BlockId 对应的 Block 获取 numBytes 指定大小的内存。
首先计算要申请的内存大小 numBytes 与空闲空间 memoryFree 的差值 numBytesToFree,然后调用重载的 $acquireMemory()$ 方法申请获得内存。
1234def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized { val numBytesToFree = mat ...
Spark-源码学习-SparkCore-存储服务-内存-执行内存池 ExecutionMemoryPool
一、概述执行内存池内存只会分配给 Task 进行使用,主要用于 Task 中的 Shuffle、Join、Aggregation 等操作时候的内存提供。由于执行内存会由多个 Task 进行共享,所以为了保证 Task 合理地进行内存使用,避免某些 Task 过度使用内存导致其它的Task频繁将数据溢写到磁盘,拖垮整体执行速度,执行内存池需要保证在 N 个 Task 的情况下,每个 Task 所能分配到的内存在总内存的 1/2N~1/N 之间,由于 Task 数量是动态的,因此会跟踪所有激活的Task的数量以便重新计算 1/2N 和 1/N 的值。
二、实现2.1. 结构2.1.1. 属性ExecutionMemoryPool 用一个 HashMap 来维护一个 TaskAttempt[身份标识为 taskAttemptId]与所消费内存的大小之间的映射关系。
12345/** * Map from taskAttemptId -> memory consumption in bytes */@GuardedBy("lock")private val ...
Spark-源码学习-通信服务-RpcEnv
一、概述RpcEnv 是各个组件之间通信的执行环境,每个节点之间(Driver 或者 Worker/Executor) 组件的 Endpoint 和对应的 EndpointRef 之间的信息通信和方法调用都是通过 RpcEnv 作协调。
RpcEnv 类似于 ActorSystem,服务端和客户端都可以使用它来做通信:
对于 server 端来说,RpcEnv 是 RpcEndpoint 的运行环境,负责 RpcEndPoint 的生命周期管理,解析 Tcp 层的数据包以及反序列化数据封装成 RpcMessage,然后根据路由传送到对应的 Endpoint;
对于 client 端来说,可以通过 RpcEnv 获取 RpcEndpoint 的引用 RpcEndpointRef,然后通过 RpcEndpointRef 与对应的 Endpoint 通信。
二、RpcEnv 初始化RpcEnv 的初始化由 RpcEnvFactory 负责, RpcEnvFactory 目前只有一个子类实现: NettyRpcEnvFactory,初始化 NettyRpcEnv 的过程其实就是对内部各 ...
Spark-源码学习-SparkCore-通信服务-架构设计-传输层-服务端
一、概述RPC 传输层服务端负责接收客户端序列化后的请求数据,同时把响应数据序列化后传给客户端。
二、架构设计2.1. 服务端引导程序 TransportServerBootstrap接口 TransportServerBootstrap 定义了服务端引导程序的规范,服务端引|导程序旨在当客户端与服务端建立连接之后,在服务端持有的客户端管道上执行的引导程序。TransportServer 的构造器中的 bootstraps 是 TransportServerBootstrap 的列表。
2.2. 组件2.2.1. TransportServerTransportServer 是 RPC 框架的服务端,可提供高效、低级别的流服务。
三、实现3.1. TransportServer3.1.1. 构造器TransportServer 的构造器中的各个变量如下。
12345678public TransportServer( TransportContext context, String hostToBind, int portToBind, RpcHandler ...
Spark-源码学习-通信服务-架构设计-客户端
一、概述客户端请求程序通过本地调用的方式调用服务,服务调用过程中,真正的方法逻辑存在于服务端中,客户端保存就是服务端真实方法的一个存根 stub(也可以认为是服务端的代理,存放服务端的地址等信息); 当客户端需要远程访问服务端方法的时候,可以凭借服务端在客户端中的存根来组装发起远程调用所需要的信息。Spark 远程通信全部使用 netty 进行了替换, TransportClientFactory 是 RPC 客户端的工厂类。
二、架构设计2.1. 请求程序本地程序通过本地调用的方式调用远程服务。
2.2. stub 程序当客户端需要远程访问服务端方法的时候,可以凭借服务端在客户端中的存根来组装发起远程调用所需要的信息。
2.2.1. 协议层不同的请求本质上是不同的协议。因此,有众多不同的协议实现类。在这些实现类中,主要负责对各自请求的结构体进行序列化与反序列化的操作。
引用本站文章
Spark-源码学习-SparkCore-通信服务-架构设计-协议层
...
Spark-源码学习-SparkCore-通信服务-架构设计-服务端
一、概述
二、架构设计2.1. 传输层 Server数据传输全部使用 Netty 进行了替换。
引用本站文章
Spark-源码学习-通信服务-架构设计-传输层
Joker
2.2. Stub 程序2.2.1. 序列化层2.2.2. 协议层
引用本站文章
Spark-源码学习-通信服务-架构设计-协议层
Joker
2.2.3. 路由层Spark RPC 路由层负责将 RPC 消息路由到要该对此消息处理的 RpcEndpoint。
引用本站文章
...
Spark-源码学习-SparkCore-通信服务-架构设计
一、概述在 Spark 中很多地方都涉及网络通信,比如 Spark 各个组件间的消息互通 用户文件与 Jar 包的上传、节点间的 Shuffle 过程、Block 数据的复制与备份等。
1.1. 分布式通信概述
引用本站文章
分布式通信概述
Joker
1.2. Spark RPC在 Spark 2.x 版本之前,组件间的消息通信主要借助于 Akka,使用 Akka 可以轻松地构建强有力的高并发与分布式应用。但是Akka 在 Spark 2.0.0 版本中被移除了。
引用站外地址,不保证站点的可用性和安全性
SPARK-5293
Jira
Spark 官网文档对此的描述为 Akka 的 ...
Spark-源码学习-SparkSQL-架构设计-SQL 引擎-Analyzer 模块-Rule-Cleanup-CleanupAlias
一、概述表达式一般指的是不需要触发执行引擎而能够直接进行计算的单元,例如加减乘除四则运算、逻辑操作、转换操作、过滤操作等。
二、实现Catalyst 实现了完善的表达式体系,与各种算子(QueryPlan)占据同样的地位。算子执行前通常都会进行”绑定”操作,将表达式与输入的属性对应起来,同时算子也能够调用各种表达式处理相应的逻辑。
在 Expression 类中,主要定义了5个方面的操作,包括基本属性、核心操作、输入输出、字符串表示和等价性判断
核心操作 eval 函数实现了表达式对应的处理逻辑,也是其他模块调用该表达式的主要接口,而 genCode 和 doGenCode 用于生成表达式对应的 Java 代码。字符串表示用于查看该 Expression 的具体内容,如表达式名和输入参数等。
Spark-源码学习-SparkSQL-架构设计-SQL 引擎-Planner 模块-Rule-Preparations-EnsureRequirements
一、概述表达式一般指的是不需要触发执行引擎而能够直接进行计算的单元,例如加减乘除四则运算、逻辑操作、转换操作、过滤操作等。
二、实现Catalyst 实现了完善的表达式体系,与各种算子(QueryPlan)占据同样的地位。算子执行前通常都会进行”绑定”操作,将表达式与输入的属性对应起来,同时算子也能够调用各种表达式处理相应的逻辑。
在 Expression 类中,主要定义了5个方面的操作,包括基本属性、核心操作、输入输出、字符串表示和等价性判断
核心操作 eval 函数实现了表达式对应的处理逻辑,也是其他模块调用该表达式的主要接口,而 genCode 和 doGenCode 用于生成表达式对应的 Java 代码。字符串表示用于查看该 Expression 的具体内容,如表达式名和输入参数等。
Spark-源码学习-SparkSQL-架构设计-SQL 引擎-Rule 体系
一、概述对逻辑算子树的操作(绑定,解析,优化等)主要都是基于规则的,通过 Scala 的语言模式匹配进行树结构转换或节点改写。由 RuleExecutor 来调用规则,所有涉及树形结构转换过程的都继承自RuleExecutor[TreeType] 抽象类。
Analyzer、Optimizer 定义了一系列 rule,而 RuleExecutor 定义了一个 rules 执行框架,即怎么把一批批规则应用在一个 plan 上得到一个新的 plan。Spark sql 通过 Analyzer 中定义的 rule 把 Parsed Logical Plan 解析成 Analyzed Logical Plan; 通过 Optimizer 定义的 rule 把 Analyzed Logical Plan 优化成 Optimized Logical Plan。
二、实现RuleExecutor 内部提供一个 Seq[Batch] 定义了改 RuleExecutor 的处理步骤,每个 Batch 代表一套规则;$RuleExecutor.apply()$ 方法会按照 batches 和 bat ...