Spark-源码系列-SparkCore-Shuffle-Shuffle 管理器-实现-SortShuffleManager
一、概述SortShuffleManager 管理基于排序的 Shuffle,输入的记录按照目标分区 ID 排序,然后输出到一个单独的 map 输出文件中。reduce 为了读出 map 输出,需要获取 map 输出文件的连续内容。当 map 的输出数据太大已经不适合放在内存中时,排序后的输出子集将被溢出到文件中,这些磁盘上的文件将被合并生成最终的输出文件。
Spark-源码系列-SparkCore-Shuffle-ShuffleBlockResolver
一、概述https://www.jianshu.com/p/825ce0f30b54
二、实现特质 ShuffleBlockResolver 定义了对 Shuffle Block 进行解析的规范,包括获取 Shuffle 数据文件、获取 Shuffle 索引文件、删除指定的 Shuffle 数据文件和索引文件、生成 Shuffle 索引文件、获取 Shuffle 块的数据等。
ShuffleBlockResolver 目前只有 IndexShuffleBlockResolver 这唯一的实现类。IndexShuffleBlockResolver 用于创建和维护 Shuffle Block 与物理文件位置之间的映射关系。
2.1. IndexShuffleBlockResolverIndexShuffleBlockResolver 主要用 于shuffle blocks, 从逻辑 block 到物理文件之间的映射关系. 它会确保每个 Map 过程最终生成的 block(也就是blockManager 维护的那些 block) 会被按照 key sort 后放在同一个文件里, 然后另外 ...
Spark-源码系列-SparkCore-Shuffle-ShuffleRead-Reader-BlockStoreShuffleReader
一、概述https://blog.csdn.net/u011239443/article/details/56843264
ShuffleReader 实现了下游 Task 如何读取上游 ShuffleMapTask的Shuffle 输出的逻辑,通过 MapOutputTracker 获得数据的位置信息,如果数据在本地则调用BlockManager 的 $getBlockData()$ 读取本地数据。
Shuffle Read 的整体架构如图所示:
$ShuffledRDD.compute()$ 开始~
1234567override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv. ...
Spark-源码系列-SparkCore-Shuffle-ShuffleBlockResolver
一、概述https://www.jianshu.com/p/825ce0f30b54
二、实现特质 ShuffleBlockResolver 定义了对 Shuffle Block 进行解析的规范,包括获取 Shuffle 数据文件、获取 Shuffle 索引文件、删除指定的 Shuffle 数据文件和索引文件、生成 Shuffle 索引文件、获取 Shuffle 块的数据等。
ShuffleBlockResolver 目前只有 IndexShuffleBlockResolver 这唯一的实现类。IndexShuffleBlockResolver 用于创建和维护 Shuffle Block 与物理文件位置之间的映射关系。
2.1. IndexShuffleBlockResolverIndexShuffleBlockResolver 主要用 于shuffle blocks, 从逻辑 block 到物理文件之间的映射关系. 它会确保每个 Map 过程最终生成的 block(也就是blockManager 维护的那些 block) 会被按照 key sort 后放在同一个文件里, 然后另外 ...
Spark-源码系列-SparkCore-Shuffle-ShuffleRead
一、概述Spark ShuffleRead 主要经历从获取数据,序列化流,添加指标统计,可能的聚合 (Aggregation) 计算以及排序等过程。当 Map 任务相关 Stage 的任务都执行完毕后,会唤起下游 Stage 的提交及任务的执行。ShuffleMapTask 计算过程最终会落实到 $ShuffledRDD.compute()$ 方法~
1234567override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1 ...
Spark-源码系列-SparkCore-Shuffle-ShuffleWrite-Writer-BypassMergeSortShuffleWriter
一、概述BypassMergeSortShuffleWriter 是一个相对更有效的 writer,它绕过了合并排序步骤,直接将一个分区写入一个单独的文件中,分区的数量必须很小,才能使这个 writer 在不引起 shuffle 文件数量失控的情况下工作。因此,只有当分区的数量小于 bypassMergeThreshold(默认为200)并且没有 map 侧的聚合时,选择 BypassMergeSortShuffleWriter。BypassMergeSortShuffleWriter 适用于 map 端不需要在持久化数据之前进行聚合、排序等操作的场景。
二、实现
三、Write
Spark-源码系列-SparkCore-Shuffle-ShuffleWrite-Writer-SortShuffleWriter
一、概述SortShuffleWriter 实现类是用于兜底的,在 ShuffleWrite 阶段,如果不满足 UnsafeShuffleWriter、BypassMergeSortShuffleWriter 两种条件,最后代码执行 SortShuffleWriter~,支持所有的 Shuffle 场景,包括 map 端的聚合,排序等操作。
https://zhuanlan.zhihu.com/p/469752748
二、实现SortShuffleWriter 是 ShuffleWriter 的实现类之一,提供了对 Shuffle 数据的排序功能以及聚合功能。
三、WriteSortShuffleWriter 的写入过程如图所示:
3.1. 数据写入内存缓存区3.1.1. 聚合&排序https://mp.weixin.qq.com/s/9zGHp3p6YJSskdsWH9PEEg
ShuffleMapTask 不断地把每个分区的数据填充到内存 buffer 中
有两种 buffer: 若需要通过 key 来进行聚合,则会使用 PartitionedAppend ...
Spark-源码系列-SparkCore-Shuffle-ShuffleWrite-Writer-UnsafeShuffleWriter
https://mp.weixin.qq.com/s/wDKB1IFOS4v1gUuiUDuLAA
https://mp.weixin.qq.com/s/UaVnI6yNAwIc2oPIedrFvg
https://mp.weixin.qq.com/s/IvgdE41TtkjnXHxQkj0ToQ
一、概述当使用 BypassMergeSortShuffleWriter 的条件不满足时,SortShuffleManager 会继续考虑 UnsafeShuffleWriter,它是 Tungsten 支持的,与基本 SortShuffleWriter 相比内存效率高。
使用 UnsafeShuffleWriter 的条件:
Shuffle 依赖不带有聚合(aggregation)操作
支持序列化值的重新定位,即使用 KryoSerializer 或者 SparkSQL 自定义的一些序列化方式
分区数目必须小于 (2^24)
由于 UnsafeShuffleWriter 排序的是二进制的数据,不会进数据进行反序列,所以不能进行聚合操作,另一方面 PartitionId 是占用 2 ...
Spark-源码系列-SparkCore-Shuffle-ShuffleWrite
一、概述https://mp.weixin.qq.com/s/wDKB1IFOS4v1gUuiUDuLAA
https://mp.weixin.qq.com/s/0zlqviF1lzUOrcBiVePLxA
https://www.ngui.cc/el/775149.html?action=onClick
https://maimai.cn/article/detail?fid=1752477164&efid=vgKPFrpp6HvV0pZtHvL3ug
Shuffle 发生与宽依赖的 Stage 间,由于 Stage 内的计算采用 $pipeline$ 的方式。Shuffle 发生的上一个 Stage 为 Map 节点,下游的stage为 Reduce 阶段。而 Shuffle 写的过程就发生在 Map 阶段,ShuffleWriter 的调用主要是在 ShuffleMapStage 中,每个 ShuffleMapStage 包含多个 ShuffleMapTask, mapTask 个数和分区数相关。
每个 ShuffleMapTask 都会在其 $runTask()$ ...
Spark-源码系列-SparkCore-Shuffle-内存管理-数据结构-AppendOnlyMap
一、概述二、实现2.1. 原理AppendOnlyMap 实际上是⼀个只⽀持 record 添加和对Value进⾏更新的HashMap。与Java HashMap采⽤“数组+链表”实现不同,AppendOnlyMap只使⽤数组来存储元素,根据元素的Hash值确定存储位置,如果存储元素时发⽣Hash值冲突,则使⽤⼆次地址探测法(Quadratic probing)来解决Hash值冲突。
对于每个新来的<K,V>record,先使⽤Hash(K)计算其存放位置,如果存放位置为空,就把 record 存放到该位置。如果该位置已经被占⽤,就使⽤⼆次探测法来找下⼀个空闲位置。如下图,插入数据<K6,V6>record来说,第1次找到的位置Hash(K6)已被K2占⽤。按照⼆次探测法向后递增1个record位置,也就是Hash(K6)+1×2,发现位置已被K3占⽤,然后向后递增4个record位置(指数递增,Hash(K6)+2×2),发现位置没有被占⽤,放进去即可









































