Spark-理论笔记-任务调度机制
在生产环境下, Spark 集群的部署方式一般为 YARN-Cluster 模式,因此本文基于 YARN-Cluster 模式
一、前言1.1. 基本概念首先说明下 Spark 里的几个概念:
一个 Spark 应用程序包括 Job、Stage 以及 Task 三个概念:
Job 是以 Action 方法为界,遇到一个 Action 方法则触发一个 Job
Stage 是 Job 的子集,以宽依赖为界。遇到 Shuffle 做一次划分
Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个 task
二、Spark 调度概述Spark 的任务调度总体来说分两路进行,一路是 Stage 级的调度, 一路是 Task 级的调度,总体调度流程如下图所示:
2.1. Stage 级调度DAGScheduler 负责 Stage 级的调度,将 Job 以宽依赖为界切分成若干 Stages,并将每个 Stage 打包成 TaskSet 交给 TaskScheduler 调度。
DAGScheduler 做的事情较为简单,在 Stage 层面上划分 DAG, ...
Spark-源码系列-SparkCore-Shuffle 设计
一、概述Spark 在 DAG 调度阶段会将一个 Job 划分为多个 Stage,上游 Stage 做 Map 工作,下游 Stage 做 Reduce 工作,其本质上还是MapReduce 计算框架。Shuffle 是连接 map 和 reduce 之间的桥梁,它将 map 的输出对应到 reduce 输入中,涉及到序列化反序列化、跨节点网络 I/O 以及磁盘读写 I/O 等,Shuffle 的性能高低直接影响了整个程序的性能和吞吐量。
引用本站文章
Spark-理论笔记-Shuffle 概述
Joker
https://mp.weixin.qq.com/s/cf4JUeB-JU0cLgn04ng3og
二、Shuffle 计算2.1. 触发 shuffle2.1.1. 可能导致 shuffle 算子在 Spark 作业中当父 RDD 与子 ...
Spark-源码系列-SparkCore-Shuffle-MapOutputTracker
一、概述MapOutputTracker 用于跟踪 Map 任务的输出状态,此状态便于 Reduce 任务定位 Map 任务输出结果所在的节点地址,进而获取中间输出结果。每个Map 任务或者 Reduce 任务都会有其唯一标识,分别为 mapId 和 reduceId。每个 Reduce 任务的输入可能是多个 Map 任务的输出,Reduce 会到各个 Map 任务所在的节点上拉取 Block(Shuffle)。每次 Shuffle 都有唯一的标识 shuffleId。
https://blog.csdn.net/LINBE_blazers/article/details/88919759
二、实现https://segmentfault.com/a/1190000040586936
https://masterwangzx.com/2020/09/22/schedule-mapTrack/#mapoutputtrackermaster
其功能有三方面:
DAGScheduler 使用 MapOutputTrackerMaster 来管理各个 ShuffleMapTask 的输出 ...
Spark-源码系列-SparkCore-Shuffle-MapStatus
一、概述MapStatus 用于表示 ShuffleMapTask 返回给 TaskScheduler 的执行结果
二、设计MapStatus 的伴生对象中定义了 $apply$ 函数,可以直接使用 MapStatus(BlockManagerId, partitionLengths) 创建 MapStatus 实例。
1234567def apply(loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long): MapStatus = { if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) { HighlyCompressedMapStatus(loc, uncompressedSizes, mapTaskId) } else { new CompressedMapStatus(loc, uncompressedSizes, mapTaskId) ...
Spark-源码系列-SparkCore-Shuffle-Shuffle 管理器-ShuffleReader-BlockStoreShuffleReader
一、概述https://blog.csdn.net/u011239443/article/details/56843264
ShuffleReader 实现了下游 Task 如何读取上游 ShuffleMapTask的Shuffle 输出的逻辑,通过 MapOutputTracker 获得数据的位置信息,如果数据在本地则调用BlockManager 的 $getBlockData()$ 读取本地数据。
Shuffle Read 的整体架构如图所示:
$ShuffledRDD.compute()$ 开始~
1234567override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv. ...
Spark-源码系列-SparkCore-Shuffle-Shuffle 管理器-ShuffleWriter
一、概述ShuffleWriter 负责将 Map 任务的输出,写出到 Shuffle 系统的文件中。在基于排序的 Shuffle 框架中,ShuffleWriter 会合并文件,它为每个 Map Task生成一个数据文件和一个索引文件。
https://blog.51cto.com/u_15067227/2573455
https://blog.csdn.net/Christopher_L1n/article/details/122903200
二、实现抽象类 ShuffleWriter 定义了将 map 任务的中间结果输出到磁盘上的功能规范,包括将数据写入磁盘和关闭 ShuffleWriter。
ShuffleWriter 定义的 $write()$ 方法用于将 map 任务的结果写到磁盘,而 $stop()$ 方法可以关闭 ShuffleWriter。ShuffleWriter一共有三个子类,分别为SortShuffleWriter、UnsafeShuffleWriter 及 BypassMergeSortShuffleWriter。
https://weread ...
Spark-源码系列-SparkCore-Shuffle-Shuffle 管理器-SortShuffleManager
一、概述SortShuffleManager 管理基于排序的 Shuffle,输入的记录按照目标分区 ID 排序,然后输出到一个单独的 map 输出文件中。reduce 为了读出 map 输出,需要获取 map 输出文件的连续内容。当 map 的输出数据太大已经不适合放在内存中时,排序后的输出子集将被溢出到文件中,这些磁盘上的文件将被合并生成最终的输出文件。
二、注册 ShuffleDriver 和每个 Executor 都会持有一个 ShuffleManager,这个 ShuffleManager 可以通过配置项 spark.shuffle.manager 指定,并且由 SparkEnv 创建。Driver 中的 ShuffleManager 负责注册 Shuffle 的元数据,比如 shuffleId、MapTask 的数量等。Executor 中的 ShuffleManager 则负责读和写 Shuffle 的数据。
三、获取 ShuffleWriterShuffleWriter 负责将 Map 任务的输出,写出到 Shuffle 系统的文件中。在基于排序的 Shuffle ...
Spark-源码系列-SparkCore-Shuffle-Shuffle 管理器-实现-SortShuffleManager
一、概述SortShuffleManager 管理基于排序的 Shuffle,输入的记录按照目标分区 ID 排序,然后输出到一个单独的 map 输出文件中。reduce 为了读出 map 输出,需要获取 map 输出文件的连续内容。当 map 的输出数据太大已经不适合放在内存中时,排序后的输出子集将被溢出到文件中,这些磁盘上的文件将被合并生成最终的输出文件。
Spark-源码系列-SparkCore-Shuffle-ShuffleBlockResolver
一、概述https://www.jianshu.com/p/825ce0f30b54
二、实现特质 ShuffleBlockResolver 定义了对 Shuffle Block 进行解析的规范,包括获取 Shuffle 数据文件、获取 Shuffle 索引文件、删除指定的 Shuffle 数据文件和索引文件、生成 Shuffle 索引文件、获取 Shuffle 块的数据等。
ShuffleBlockResolver 目前只有 IndexShuffleBlockResolver 这唯一的实现类。IndexShuffleBlockResolver 用于创建和维护 Shuffle Block 与物理文件位置之间的映射关系。
2.1. IndexShuffleBlockResolverIndexShuffleBlockResolver 主要用 于shuffle blocks, 从逻辑 block 到物理文件之间的映射关系. 它会确保每个 Map 过程最终生成的 block(也就是blockManager 维护的那些 block) 会被按照 key sort 后放在同一个文件里, 然后另外 ...
Spark-源码系列-SparkCore-Shuffle-ShuffleRead-Reader-BlockStoreShuffleReader
一、概述https://blog.csdn.net/u011239443/article/details/56843264
ShuffleReader 实现了下游 Task 如何读取上游 ShuffleMapTask的Shuffle 输出的逻辑,通过 MapOutputTracker 获得数据的位置信息,如果数据在本地则调用BlockManager 的 $getBlockData()$ 读取本地数据。
Shuffle Read 的整体架构如图所示:
$ShuffledRDD.compute()$ 开始~
1234567override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv. ...