Spark-源码系列-SparkCore-Shuffle-ShuffleBlockResolver
一、概述https://www.jianshu.com/p/825ce0f30b54
二、实现特质 ShuffleBlockResolver 定义了对 Shuffle Block 进行解析的规范,包括获取 Shuffle 数据文件、获取 Shuffle 索引文件、删除指定的 Shuffle 数据文件和索引文件、生成 Shuffle 索引文件、获取 Shuffle 块的数据等。
ShuffleBlockResolver 目前只有 IndexShuffleBlockResolver 这唯一的实现类。IndexShuffleBlockResolver 用于创建和维护 Shuffle Block 与物理文件位置之间的映射关系。
2.1. IndexShuffleBlockResolverIndexShuffleBlockResolver 主要用 于shuffle blocks, 从逻辑 block 到物理文件之间的映射关系. 它会确保每个 Map 过程最终生成的 block(也就是blockManager 维护的那些 block) 会被按照 key sort 后放在同一个文件里, 然后另外 ...
Spark-源码系列-SparkCore-Shuffle-ShuffleRead
一、概述Spark ShuffleRead 主要经历从获取数据,序列化流,添加指标统计,可能的聚合 (Aggregation) 计算以及排序等过程。当 Map 任务相关 Stage 的任务都执行完毕后,会唤起下游 Stage 的提交及任务的执行。ShuffleMapTask 计算过程最终会落实到 $ShuffledRDD.compute()$ 方法~
1234567override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] val metrics = context.taskMetrics().createTempShuffleReadMetrics() SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1 ...
Spark-源码系列-SparkCore-Shuffle-ShuffleWrite-Writer-BypassMergeSortShuffleWriter
一、概述BypassMergeSortShuffleWriter 是一个相对更有效的 writer,它绕过了合并排序步骤,直接将一个分区写入一个单独的文件中,分区的数量必须很小,才能使这个 writer 在不引起 shuffle 文件数量失控的情况下工作。因此,只有当分区的数量小于 bypassMergeThreshold(默认为200)并且没有 map 侧的聚合时,选择 BypassMergeSortShuffleWriter。BypassMergeSortShuffleWriter 适用于 map 端不需要在持久化数据之前进行聚合、排序等操作的场景。
二、实现
三、Write
Spark-源码系列-SparkCore-Shuffle-ShuffleWrite-Writer-SortShuffleWriter
一、概述SortShuffleWriter 实现类是用于兜底的,在 ShuffleWrite 阶段,如果不满足 UnsafeShuffleWriter、BypassMergeSortShuffleWriter 两种条件,最后代码执行 SortShuffleWriter~,支持所有的 Shuffle 场景,包括 map 端的聚合,排序等操作。
https://zhuanlan.zhihu.com/p/469752748
二、实现SortShuffleWriter 是 ShuffleWriter 的实现类之一,提供了对 Shuffle 数据的排序功能以及聚合功能。
三、WriteSortShuffleWriter 的写入过程如图所示:
3.1. 数据写入内存缓存区3.1.1. 聚合&排序https://mp.weixin.qq.com/s/9zGHp3p6YJSskdsWH9PEEg
ShuffleMapTask 不断地把每个分区的数据填充到内存 buffer 中
有两种 buffer: 若需要通过 key 来进行聚合,则会使用 PartitionedAppend ...
Spark-源码系列-SparkCore-Shuffle-ShuffleWrite-Writer-UnsafeShuffleWriter
https://mp.weixin.qq.com/s/wDKB1IFOS4v1gUuiUDuLAA
https://mp.weixin.qq.com/s/UaVnI6yNAwIc2oPIedrFvg
https://mp.weixin.qq.com/s/IvgdE41TtkjnXHxQkj0ToQ
一、概述当使用 BypassMergeSortShuffleWriter 的条件不满足时,SortShuffleManager 会继续考虑 UnsafeShuffleWriter,它是 Tungsten 支持的,与基本 SortShuffleWriter 相比内存效率高。
使用 UnsafeShuffleWriter 的条件:
Shuffle 依赖不带有聚合(aggregation)操作
支持序列化值的重新定位,即使用 KryoSerializer 或者 SparkSQL 自定义的一些序列化方式
分区数目必须小于 (2^24)
由于 UnsafeShuffleWriter 排序的是二进制的数据,不会进数据进行反序列,所以不能进行聚合操作,另一方面 PartitionId 是占用 2 ...
Spark-源码系列-SparkCore-Shuffle-ShuffleWrite
一、概述https://mp.weixin.qq.com/s/wDKB1IFOS4v1gUuiUDuLAA
https://mp.weixin.qq.com/s/0zlqviF1lzUOrcBiVePLxA
https://www.ngui.cc/el/775149.html?action=onClick
https://maimai.cn/article/detail?fid=1752477164&efid=vgKPFrpp6HvV0pZtHvL3ug
Shuffle 发生与宽依赖的 Stage 间,由于 Stage 内的计算采用 $pipeline$ 的方式。Shuffle 发生的上一个 Stage 为 Map 节点,下游的stage为 Reduce 阶段。而 Shuffle 写的过程就发生在 Map 阶段,ShuffleWriter 的调用主要是在 ShuffleMapStage 中,每个 ShuffleMapStage 包含多个 ShuffleMapTask, mapTask 个数和分区数相关。
每个 ShuffleMapTask 都会在其 $runTask()$ ...
Spark-源码系列-SparkCore-Shuffle-内存管理-数据结构-AppendOnlyMap
一、概述二、实现2.1. 原理AppendOnlyMap 实际上是⼀个只⽀持 record 添加和对Value进⾏更新的HashMap。与Java HashMap采⽤“数组+链表”实现不同,AppendOnlyMap只使⽤数组来存储元素,根据元素的Hash值确定存储位置,如果存储元素时发⽣Hash值冲突,则使⽤⼆次地址探测法(Quadratic probing)来解决Hash值冲突。
对于每个新来的<K,V>record,先使⽤Hash(K)计算其存放位置,如果存放位置为空,就把 record 存放到该位置。如果该位置已经被占⽤,就使⽤⼆次探测法来找下⼀个空闲位置。如下图,插入数据<K6,V6>record来说,第1次找到的位置Hash(K6)已被K2占⽤。按照⼆次探测法向后递增1个record位置,也就是Hash(K6)+1×2,发现位置已被K3占⽤,然后向后递增4个record位置(指数递增,Hash(K6)+2×2),发现位置没有被占⽤,放进去即可
Spark-源码系列-SparkCore-Shuffle-外部排序器
一、概述Spark 中的外部排序器用于对 map 任务的输出数据在 map 端或 reduce 端进行排序。
二、实现Spark 中有两个外部排序器,分别是 ExternalSorter 和 ShuffleExternalSorter
2.1. ExternalSorterExternalSorter 是 SortShuffleManager 的底层组件,它提供了很多功能,包括将 map 任务的输出存储到 JVM 的堆中,如果指定了聚合函数,则还会对数据进行聚合;使用分区计算器首先将 Key 分组到各个分区中,然后使用自定义比较器对每个分区中的键进行可选的排序;可以将每个分区输出到单个文件的不同字节范围中,便于 reduce 端的 Shuffle 获取。https://www.zhihu.com/question/264364010/answer/2514170889?utm_id=0
2.1.1. 设计
属性
方法
map 端输出的缓存处理: $insertAll()$
map 任务在执行结束后会将数据写入磁盘,等待 reduce 任务获取。但在写入磁盘之前,Spark 可 ...
Spark-源码系列-SparkCore-Shuffle-排序
一、概述Spark 中的外部排序器用于对 map 任务的输出数据在 map 端或 reduce 端进行排序。
二、实现Spark 中有两个外部排序器,分别是 ExternalSorter 和 ShuffleExternalSorter
2.1. ExternalSorterExternalSorter 是 SortShuffleManager 的底层组件,它提供了很多功能,包括将 map 任务的输出存储到 JVM 的堆中,如果指定了聚合函数,则还会对数据进行聚合;使用分区计算器首先将 Key 分组到各个分区中,然后使用自定义比较器对每个分区中的键进行可选的排序;可以将每个分区输出到单个文件的不同字节范围中,便于 reduce 端的 Shuffle 获取。
2.1.1. 设计
属性
方法
map 端输出的缓存处理: $insertAll()$
map 任务在执行结束后会将数据写入磁盘,等待 reduce 任务获取。但在写入磁盘之前,Spark 可能会对 map 任务的输出在内存中进行一些排序和聚合。ExternalSorter 的 $insertAll()$ 方法是这一过程的入 ...