一、概述

Master 的资源调度是 Spark 的一级资源调度,主要包括: 对 Driver 的资源调度以及对 Executor 的资源调度~

二、Driver 调度

2.1. Driver 启动

Master 在接受到 RequestSubmitDriver 后,将 Driver 信息封装成 DriverInfo,然后添加待调度列表 waitingDrivers 中,然后调用 $Master.schedule()$ 为处于待分配资源的 Application 分配资源。在每次有新的 Application 加入或者新的资源加入时都会调用 $schedule()$ 进行调度。

注意: Master 在接受到 RequestSubmitDriver 后,将 Driver 信息封装成 DriverInfo,然后添加待调度列表 waitingDrivers 中,但此时 waitingApps 为空,所以并不会启动 Executor,当 Master 在接收到 RegisterApplication 消息后完成 $registerApplication(app)$,此时才会启动 Executor。

1
2
3
4
5
6
7
8
9
10
11
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule()
context.reply(SubmitDriverResponse(self, true, Some(driver.id), s"Driver successfully submitted as ${driver.id}"))
}

2.2. 执行用户程序 mainClass

Worker 接到执行 Driver 指令后创建 DriverRunner 执行 Driver 应用程序 mainClass,mainClass 执行时其会初始化 Spark 执行上下文环境 SparkContext,SparkContext 是用户应用和 Spark 集群的交换的主要接口,用户应用一般首先要创建它。

如果使用 SparkShell,不必显式地创建它,系统会自动创建一个名为 sc 的 SparkContext 的实例。

伴随 SparkContext 会创建 DAGScheduler、TaskScheduler 和 StandaloneSchedulerBackend 分别用于 Stage 调度和任务调度,并会触发 RDD 的 Action 算子提交job。

2.2.1. 注册 Application

在 Standalone 模式下集群启动时,Worker 会向 Master注册,使得 Master 可以感知进而管理整个集群;Master 通过借助zookeeper,可以简单实现高可用性;而应用方通过 SparkContext 这个与集群的交互接口,在创建 SparkContext 时完成了 Application 的注册,由 Master 为其分配 Executor。

在 StandaloneSchedulerBackend 中初始化 StandaloneAppClient,并调用 $StandaloneAppClient.start()$~

1
2
3
def start(): Unit = {
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}

ClientEndpoint 继承了 ThreadSafeRpcEndpoint,初始化时会调用 $onStart()$~

$StandaloneAppClient.tryRegisterAllMasters()$ 会向 Master 发送 RegisterApplication 消息~

2.2.2. Executor 调度

当 Master 在接收到 RegisterApplication 消息后注册 Application,最后 $schedule()$ 方法调用 $startExecutorsOnWorkers()$ 方法,进而引发对 Executor 资源的调度。

1
2
3
4
5
6
7
8
9
case RegisterApplication(description, driver) =>
if (state == RecoveryState.STANDBY) {
} else {
val app = createApplication(description, driver)
registerApplication(app)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
schedule()
}

Standlone 模式下的资源调度逻辑,Standlone 的作业调度是采用的简单的 FIFO 的方式调度,通过遍历 $waitingApps$ 数组中的 app 逐一进行executor 调度;针对每个需要调度的作业,如果还有未分配完成的资源,则持续为该作业分配资源并启动 executor 实例:首先找出所有 active 的 worker 节点,过滤出所有的可用计算资源足以调度 executor 的 worker 节点,然后对这些 worker 节点中可调度的 core 数进行倒序排序获得一个可用的 worker 列表,并得到 executor 的调度计划用于启动 executor 实例。

  1. Executor 启动

    $schedule()$ 方法的最后调用了 $startExecutorsOnWorkers()$ 方法,进而引发对 Application 的 Executor 资源的调度。

  2. Executor 动态管理

    ExecutorAllocationManager 用于对已分配的 Executor 进行管理。

2.2.3. 任务调度

在应用方创建了 RDD 并且在这个 RDD 上进行了很多的转换后,触发 Action,触发 $SparkContext.runJob()$ 方法开始进行任务调度~通过 DAGScheduler 将 DAG 划分为不同的 Stage, 并将 Stage 转换为 TaskSet 交给 TaskSchedulerImpl; 再由
TaskSchedulerImpl 通过 $SchedulerBackend.reviveOffers()$ 向 ExecutorBackend 发送 LaunchTask 的消息;ExecutorBackend 接收到消息后,启动 Task,开始在集群中启动计算~