Spark-源码学习-SparkCore-调度机制-任务调度-Stage 调度
一、概述
DAGScheduler 实现了面向 DAG 的高层次调度,通过计算将 DAG 中的一系列 RDD 划分到不同的 Stage,然后构建 Stage 之间的父子关系,最后将每个 Stage 按照 Partition 切分为多个 Task,并以 Task 集合(TaskSet)的形式提交给底层的 TaskScheduler。
二、实现
2.1. DAGSchedulerEvent
所有的组件都通过向 DAGScheduler 投递 DAGSchedulerEvent 来使用 DAGScheduler。

DAGScheduler 内部的 DAGSchedulerEventProcessLoop 根据不同的 DAGSchedulerEvent 事件(JobSubmitted、MapStageSubmitted…),并调用 DAGScheduler 的不同方法。
1 | private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { |
2.2. JobListener
JobListener 用于对作业中每个 Task 执行成功或失败进行监听,JobWaiter 实现了 JobListener 并最终确定作业的成功与失败。
三、初始化
TaskScheduler 和 DAGScheduler 都是在 SparkContext 创建时初始化。其中 TaskScheduler 通过 $SparkContext.createTaskScheduler()$ 创建。而 DAGScheduler 是直接调用的它的构造函数创建。
DAGScheduler 保存了 TaskScheduler 的引用,因此需要在 TaskScheduler 创建之后进行创建。
四、Job 提交
SparkContext 初始化 DAGScheduler->创建 EventProcessLoop->调用 $eventLoop.start()$ 方法开启事件监听,action 调用 $SparkContext.runJob()$
调用 RDD 的 $partitions$ 函数来获取当前 Job 的最大分区数 maxPartitions。 根据 maxPartitions,确认没有在一个不存在的 partition 上运行任务。
1
val maxPartitions = rdd.partitions.length
$eagerlyComputePartitionsForRddAndAncestors()$
在 Spark 中,RDD 是惰性求值的,意味着它们的转换不会立即执行,而是在调用操作时才执行。
$eagerlyComputePartitionsForRddAndAncestors()$ 函数的作用是主动计算这些分区,即立即执行转换操作,当希望在等待操作被触发之前强制计算 RDD 分区时通过主动计算 RDD 及其祖先的分区,Spark 确保所有 RDD 的转换和操作在单个步骤中执行,优化执行过程,可能减少整体执行时间。创建 Jobwaiter,此 Jobwaiter 被阻塞,直到 Job 完成或者被取消
1
2val jobId = nextJobId.getAndIncrement()
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)向 eventProcessActor 发送
JobSubmitted
事件返回 JobWaiter
eventLoop 监听到 JobSubmitted
事件,调用 $handleJobSubmitted()$ 开始处理 Job 提交~
4.1. Stage 划分
4.1.1. 概述
DAGScheduler 会将 Job 的 RDD 划分到不同的 Stage,并构建这些 Stage 的依赖关系。这样可以使得没有依赖关系的 Stage 并行执行,并保证有依赖关系的 Stage 顺序执行。 并行执行能够有效利用集群资源,提升运行效率,而串行执行则适用于那些在时间和数据资源上存在强制依赖的场景。
划分策略
DAGScheduler 具体划分策略是,由最终的 RDD 不断通过依赖回溯判断父依赖是否是宽依赖,窄依赖的 RDD 被划分到同一个 Stage 中, 划分的 Stages 分两类, 需要处理 Shuffle 的 ShuffleMapStage 和最下游的 ResultStage, 上游 Stage 先于下游 Stage 执行, ResultStage 是最后执行的 Stage。
ResultStage
可以使用指定的函数对 RDD 中的分区进行计算并得出最终结果。ResultStage 是最后执行的 Stage,此阶段主要进行作业的收尾工作
例如,对各个分区的数据收拢、打印到控制台或写入到 HDFS
ShuffleMapStage
ShuffleMapStage 是 DAG 调度流程的中间 Stage, 包括一到多个 ShuffleMapTask, ShuffleMapStage 一般是 ResultStage 或者其他 ShuffleMapStage 的前置 Stage
4.1.2. 实现
$handleJobSubmitted()$ 方法内部会根据当前的 RDD 创建一个 ResultStage,然后根据这个 ResultStage 对象创建 Job。
1 | finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) |
$getShuffleDependenciesAndResourceProfiles()$
获取所有的 ShuffleDependency 列表,以及此阶段与 RDD 关联的 ResourceProfiles。从 finalRDD 开始向前回溯不断获取当前 rdd 的所有宽依赖列表,如果遍历到的依赖是 ShuffleDependency,则放入 parents 中
1
val parents = new HashSet[ShuffleDependency[_, _, _]]
如果遍历到的是窄依赖则继续遍历该窄依赖的 rdd 的所有 dependency,直到找到下一个 ShuffleDependency 并放到 parents 列表中或者遍历了其所有父 rdd 无法找父 rdd 为止,得到 rdd 所有的 ShuffleDependency。
1
2
3
4
5
6
7
8
9
10
11
12waitingForVisit += rdd
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
visited += toVisit
Option(toVisit.getResourceProfile).foreach(resourceProfiles += _)
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep
case dependency => waitingForVisit.prepend(dependency.rdd)
}
}
}$mergeResourceProfilesForStage()$
Barrier 相关
$getOrCreateParentStages()$
根据 $getShuffleDependenciesAndResourceProfiles()$ 获取的 ShuffleDependency 列表,然后根据该 rdd 的 shuffleDeps 列表中的每个 shuffle 依赖创建/获取出相应的父 stage 列表。
当前 RDD 的直接或间接的依赖是 ShuffleDependency 且己经注册过的 Stage。当前 RDD 的直接或间接的依赖是 ShuffleDependency 且没有注册过 Stage 的,则根据 ShuffleDependency 本身的 RDD,找到它的直接或间接的依赖是 ShuffleDependency 且没有注册过 Stage 的所有 ShuffleDependency,为他们生成 Stage 并注册。
当前 RDD 的直接或间接的依赖是 ShuffleDependency 且没有注册过 Stage 的,为它们生成 Stage 并注册,最后也添加此 Stage 到 List
拼
好了上面这一步就获取动作操作 RDD 两个分支的第一个 Shuffer 操作的RDD来,此时回到上一步循环这两
个 ShufferRDD 来创建 stage,先是依赖A进來 进行模式匹配 看是不是已经建好stage了,如果已经建好
那就直接返回这个stage, 如果没有这个stage,那么就要调用 $getMissingAncestorShuffleDependencies()$ 函数,而这个函数的作用是获取该 rdd 的祖先或者是下一
个 shufferRDD,而且返回的是一个栈结构,这里创建的 ShufferMapStage 公用一个 firstJobId, 最后面将自己的这个 rdd 创建返回 stage 就划分好了现在已经完成了 stage 的划分 并将消息都存存到一个队列了,现在就只需要提交 stage 任务了
$updateJobIdStageIdMaps()$
4.2. Stage 提交
4.2.1. 概述
一个 Stage 是否被提交,需要判断 Stage 是否执行,只有在公 Stage 执行完毕才能提交当前 Stage,如果一个 Stage 没有父 Stage,那么从该 Stage 开始提交。Stage提交时会将Task 信息[分区信息以及方法等]序列化并被打包成 TaskSet 交给 TaskScheduler, 一个 Partition 对应一 Task。
DAGScheduler 划分 Stage,通过调用 $submitStage()$ 来提交一个 Stage 对应的 tasks, $submitStage()$ 通过调用
$getPreferrdeLocations()$ 得到 task 的优先位置,根据每个 task 的优先位置, 确定 task 的 Locality 级别,Locality 一共有五种,优
先级由高到低顺序。
PROCESS_LOCAL | 进程本地化,task 和数据在同一个 Executor 中,性能最好。 |
---|---|
NODE_LOCAL | 节点本地化,task 和数据在同一个节点中,但是 task 和数据不在同一个 Executor 中,数据需要在进程间进行传输。 |
RACK_LOCAL | 机架本地化,task 和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。 |
NO_PREF | 数据从哪里访问都一样快,不需要位置优先 |
ANY | task 和数据不在一个机架中,性能最差。 |
4.2.2. 实现
构造 ActiveJob 对象
1
val job = new ActiveJob(jobId, finalStage, callSite, listener, artifacts, properties)
清除缓存的 block location 信息
1
clearCacheLocs()
记录映射关系
记录 jobId 和 job 对象的映射关系到 jobIdToActiveJobmap 集合中,井且将该 jobId 记录到活动的 jobId 集合中。
1
2
3jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)listenerBus
获取到 Job 所有的 stage 的唯一标识,并且根据唯一标识来获取 stage 对象,调用其 $lastestInfo()$ 方法获取其 StageInfo 对象,进一步封装成
SparkListenerJobStart
事件对象,并post 到 listenerBus 中。1
2
3val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, Utils.cloneProperties(properties)))提交 Stage
最后调用 $submitStage()$ 方法执行 Stage 的提交~
获取 JobId
1
val jobId = activeJobForStage(stage)
判断:
waitingStages
,runningStages
,failedStages
中是否已经存在该 stage,防止重复提交 stag,$getMissingParentStages()$
通过 $getMissingParentStages()$ 深度遍历地从后往前判断当前 stage 是否存在需要重新计算的 stage,加入
missing
集合中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable || !mapStage.shuffleDep.shuffleMergeFinalized) {
missing += mapStage
} else {
mapStage.increaseAttemptIdOnFirstSkip()
}
case narrowDep: NarrowDependency[_] => waitingForVisit.prepend(narrowDep.rdd)
}
}
}$DagScheduler.getCacheLocs()$
判断当前 rdd 是否被 cache 了是通过 $DagScheduler.getCacheLocs()$ 获取缓存的 location,cacheLocs 的数据结构是一个 HashMap,key 为 rdd id,value 为TaskLocation 集合。
1
2
3
4
5
6
7
8
9
10
11
12private[scheduler] def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
if (!cacheLocs.contains(rdd.id)) {
val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
IndexedSeq.fill(rdd.partitions.length)(Nil)
} else {
val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
blockManagerMaster.getLocations(blockIds).map { bms => bms.map(bm => TaskLocation(bm.host, bm.executorId)) }
}
cacheLocs(rdd.id) = locs
}
cacheLocs(rdd.id)
}如果当前 rdd 本身没有设置 StorageLevel,无需查找缓存了,直接返回,否则通过 $blockManagerMaster.getLocations()$ 查找具体 block 对应的位置
BlockManagerMaster 上存储了所有 Executor 汇报上来的所有 block 位置元数据信息
$getOrCreateShuffleMapStage()$
对于 rdd 如果没有显式缓存的情况,需要遍历 rdd 所有的依赖,对于是 ShuffleDependency 的stage,调用 $getOrCreateShuffleMapStage()$ 获取或者创建 mapStage,
$mapStage.isAvailable$
通过 $isAvailable$ 判断所有 output 是否都已经准备好,,对于 $isAvailable$ 为 false 的情况,说明 output 没有,或丢失,需要重新计算
$isAvailable$ 是通过查询 mapOutputTracker 已经注册的 task output 信息得到