Spark-源码学习-SparkCore-调度机制-任务调度-Task 调度-SchedulerBackend
一、概述SchedulerBackend 是 TaskScheduler 的调度后端接口。TaskScheduler 给 Task 分配资源实际是通过 SchedulerBackend 来完成的,SchedulerBackend 给 Task 分配完资源后将与分配给 Task 的 Executor 通信,并要求后者运行 Task。
https://weread.qq.com/web/reader/0c832fb05e12e40c8ca6190k32b321d024832bb90e89958
https://zhuanlan.zhihu.com/p/354775031https://www.yisu.com/zixun/563198.html
1.1. LauncherBackend当 Spark 应用程序没有在用户应用程序中运行,而是运行在单独的进程中时,用户可以在用户应用程序中使用 LauncherServer 与Spark 应用程序通信。LauncherServer 将提供 Socket 连接的服务端,与 Spark 应用程序中的 Socket 连接的客户端通信。
TaskSc ...
Spark-源码学习-SparkCore-调度机制-任务调度-Task 调度-TaskSetManager
一、概述TaskSetManager 实现了 Schedulable 特质,并参与到调度池的调度中。TaskSetManager 对 TaskSet 进行管理,包括任务推断、Task 本地性,并对Task 进行资源分配。TaskSchedulerImpl 依赖于 TaskSetManager。
DAGScheduler 将 Stage 打包到 TaskSet 交给 TaskScheduler, TaskSet 调度池中对 Task 进行调度管理的基本单位, TaskScheduler 会 将 TaskSet 封装为 TaskSetManager, 负责监控管理同一个 Stage 中的 Tasks, TaskScheduler 就是以 TaskSetManager 为单元来调度任务。
TaskSetManager 负责监控管理同一个 Stage 中的 Tasks, TaskScheduler 会先把 DAGScheduler 给过来的 TaskSet 封装成,TaskSetManager 放到任务队列里,然后再按照指定的调度策略在调度队列中选择 TaskSetManager。
h ...
Spark-源码学习-SparkCore-调度机制-任务调度-Task 调度-本地性
Task 本地性的分配优先考虑有较高的本地性的级别,否则分配较低的本地性级别,直到 ANY。TaskSet 可以有一到多个本地性级别, 但在给 Task 分配本地性时只能是其中的一个。Taskset 中的所有 Task 都具有相同的允许使用的本地性级别,但在运行期可能因为资源不足、运行时间等因素,导致同一 Taskset 中的各个 Task 的本地性级别可能不同。TaskSet 中实现的本地性操作包括对 Taskset 的本地性级别进行计算、获取某个本地性级别的等待时间、给 Task 分配资源时获取允许的本地性级别等。
computeValidLocalityLevels()
computeValidLocalityLevels() 用于计算有效的本地性级别,将 Task 按照本地性级别,由高到低地分配给允许的 Executor。
如果存在 Executor 上待处理的 Task 的集合且 PROCESS_LOCAL 级别的等待时间不为0,还存在已被激活的 Executor (即pendingTasksForExecutor 中的 Executorld 有存在于 Task ...
Spark-源码学习-SparkCore-数据传输-块传输-架构设计
一、概述由于 Spark 是分布式部署,每个Task最终都运行在不同的机器节点上。map 任务的输出结果直接存储到map任务所在机器的存储体系中,reduce 任务有可能不在同一机器上运行,所以需要远程下载map任务的中间输出。NettyBlockTransferService提供了可以被其他节点的客户端访问的 Shuffle服务。
二、架构设计三、初始化BlockManager 在初始化的时候调用NettyBlockTransferService的init!)方法进行初始化,NettyBlockTransferService 只有在被初始化后才提供服务
创建 NettyBlockRpcServer。 NettyBlockRpcServer继承了RpcHandler,服务端对客户端的Block读写请求的处理都交给了RpcHandler的实现类,NettyBlockRpcServer 处理 Block块的 RPC 请求。
准备客户端引导程序TransportClientBootstrap和服务端引导程序TransportServer-Bootstrap。
创建TransportCo ...
Spark-源码学习-集群启动-standalone-worker-心跳机制
一、概述Worker 通过发送心跳实现的汇报运行状态
二、源码当 worker 向 Master 注册成功后会接收到 Master 回复的 RegisteredWorker 消息,Worker 使用 handleRegisterResponse 方法处理 RegisteredWorker 消息时,将会向 forworaMessageScheduler 提交以 HEARTBEAT_MILLIS 作为间隔向 Worker 自身发送 SendHeartbeat 消息的定时任务。Worker 的 receive 方法实现了对 SendHeartbeat 消息的处理
如果 connected 为 true,则调用 $sendToMaster$ 方法,向 Master 发送 Heartbeat 消息(此消息将携带Worker 的 ID 和 Worker 的 RpcEndpointRef)
三、总结
Spark-源码学习-集群启动-standalone-worker-注册机制
一、概述Worker 在启动后, 需要加入到 Master 管理的整个集群中, 以参与 Driver、Executor 的资源调度。Worker 要加入 Master 管理的集群, 就必须将 Worker 注册到 Master。在启动 Worker 的过程中需要调用 registerWithMaster 方法向 Master 注册 Worker
二、源码2.1. 向所有 Master 注册当前 Worker$tryRegisterAllMasters()$ 方法负责向所有的 Master 注册当前 Worker~
只有处于领导状态的 Master 来处理 Worker 的注册。
$tryRegisterAllMasters()$ 方法遍历 masterRpcAddresses 中的每个 Master 的 RPC 地址,然后向 registerMasterThreadPool 提交向 Master 注册 Worker 的任务。主要向 Master 发送 RegisterWorker 消息,并对返回的结果使用 handleRegisterResponse 方法处理。
12345 ...
Spark-理论笔记-任务调度
一、概述一个 Spark Application 包括 Job、 Stage 以及 Task 三个概念:
Job
由于 Spark 的懒执行,在 driver 调用一个action 之前,spark application 不会做任何事情。针对每个 action,Spark调度器就创建一个执行图(execution.graph) 并且启动一个 Spark Job。
stage
每个 job有多个 stage组成,这些stage就是实现最终的RDD所需的数据转换的步骤,以 RDD 宽依赖为界,遇到宽依赖即划分 stage,
task
每个stage由多个tasks组成,这些tasks就表示每个并行计算并且会在多个执行器上执行
在 Spark 中,调度执行一个Job 总体分两路进行:Stage 级的调度和 Task 级的调度~
二、Job 触发三、Stage 级调度https://blog.csdn.net/swg321321/article/details/125646366Spark 中 DAGScheduler 实现了面向 stage 的调度
3.1. sta ...
Spark-源码学习-SparkCore-存储服务-块元数据管理 BlockInfoManager
一、概述BlockInfoManager 是 BlockManager 内部的子组件之一,BlockInfoManager 对 BlockInfo 进行了一些简单的管理,但是 BlockInfoManager 主要对 Block 的锁资源进行管理
二、实现2.1. 结构2.1.1. 属性
blockInfoWrappers
BlockId 与 BlockInfoWrapper 之间映射关系的缓存。
1private[this] val blockInfoWrappers = new ConcurrentHashMap[BlockId, BlockInfoWrapper]
invisibleRDDBlocks
RDDBlock 可见性
1private[this] val invisibleRDDBlocks = new mutable.HashSet[RDDBlockId]
writeLocksByTask
每次任务执行尝试的标识 TaskAttemptId 与执行获取的 Block 的写锁之间的映射关系。TaskAttemptId 与写锁之间是一对多的关系,即一次任务尝试执行会获 ...
Spark-源码学习-SparkCore-存储服务-块管理器 BlockManager
一、概述BlockManager 是 Spark 存储体系中的核心组件,运行在每个节点(Driver和 Executors)上,提供接口用于读写本地和远程各种存储设备(内存、磁盘和 off-heap)
二、架构设计在 Spark 的 Driver 以及所有的 Executor 上,都存在一个 BlockManager、BlockManagerMaster。BlockManager 提供了存储模块与其他模块的交互接口,而 BlockManagerMaster 则是 Block 管理的接口类,通过调用 BlockManagerMasterEndpoint 和 RpcEndpointRef 进行通信。
无论是 Driver 还是 Executor 节点都会创建自己的 BlockManagerMaster,不过 Driver 上的 BlockManagerMaster 会实例化并且注册 BlockManagerMasterEndpoint, 在 BlockManagerMasterEndpoint 中维护了注册的 BlockManager, BlockManager 和 Executor ...
Spark-源码学习-SparkCore-存储服务-磁盘-DiskStore-DiskBlockManager
一、概述Block 数据存放在本地磁盘,每个 block 对应一个文件,文件名为 block 的 name 属性的值,DiskBlockManager 负责为逻辑的 Block 与数据写入磁盘的位置之间建立逻辑的映射关系~
二、实现2.1. 结构2.1.1. 属性
conf: SparkConf
deleteFilesOnStop: 停止 DiskBlockManager 的时候是否删除本地目录。
当不指定外部的 ShuffleClient(即 spark.shuffle.service.enabled 属性为 false) 或者当前实例是 Driver 时,此属性为 true
localDirs: 本地目录的数组,是 DiskBlockManager 管理的本地目录数组
1private[spark] val localDirs: Array[File] = createLocalDirs(conf)
localDirs 是 DiskBlockManager 管理的本地目录数组。localDirs 通过调用 $createLocalDirs()$ 方法创建的本地目录数组,在 ...