Spark-理论笔记-Join 策略
数据关联总共有 3 种 Join 实现方式。按照出现的时间顺序,分别是嵌套循环连接(NLJ,Nested Loop Join)、排序归并连接(SMJ,Shuffle Sort Merge Join) 和哈希连接(HJ,Hash Join)
1. Join 的实现方式
现在有事实表 orders 和维度表 users。其中,users 表存储用户属性信息,orders 记录着用户的每一笔交易。两张表的 Schema 如下:
1 | // 订单表orders关键字段 |
基于两张表做内关联(Inner Join),同时把用户名、单价、交易额等字段投影出来。
1 | // SQL 查询语句 |
1.1. NLJ
对于参与关联的两张数据表,其中,体量较大、主动扫描数据的表,称作外表或是驱动表;体量较小、被动参与数据扫描的表,为内表或是基表
NLJ 是采用嵌套循环的方式来实现关联
NLJ 会使用内、外两个嵌套的 for 循环依次扫描外表和内表中的数据记录,判断关联条件是否满足,比如例子中的 orders.userId = users.id
,如果满足就把两边的记录拼接在一起,然后对外输出。
在这个过程中,外层的 for 循环负责遍历外表中的每一条数据
如图中的步骤 1 所示。而对于外表中的每一条数据记录,内层的 for 循环会逐条扫描内表的所有记录,依次判断记录的 Join Key 是否满足关联条件,如步骤 2 所示。假设,外表有 M 行数据,内表有 N 行数据,那么 NLJ 算法的计算复杂度是 O(M * N)
1.2. SMJ
因为 NLJ 极低的执行效率,所以在它推出之后没多久之后,就有人用排序、归并的算法代替 NLJ 实现了数据关联。SMJ 的思路是先排序、再归并,参与 Join 的两张表先分别按照 Join Key 做升序排序。然后,SMJ 会使用两个独立的游标对排好序的两张表完成归并关联。
SMJ 刚开始工作的时候,内外表的游标都会先锚定在两张表的第一条记录上,然后再对比游标所在记录的 Join Key。对比结果以及后续操作主要分为 3 种情况
- 外表 Join Key 等于内表 Join Key,满足关联条件,把两边的数据记录拼接并输出,然后把外表的游标滑动到下一条记录外表
- 外表 Join Key 小于内表 Join Key,不满足关联条件,把外表的游标滑动到下一条记录外表
- 外表 Join Key 大于内表 Join Key,不满足关联条件,把内表的游标滑动到下一条记录
SMJ 正是基于这 3 种情况,不停地向下滑动游标,直到某张表的游标滑到头,即宣告关联结束。对于 SMJ 中外表的每一条记录,由于内表按 Join Key 升序排序,且扫描的起始位置为游标所在位置,因此 SMJ 算法的计算复杂度为 $O(M + N)$
SMJ 计算复杂度的降低,仰仗的是两张表已经事先排好序。要知道,排序本身就是一项非常耗时的操作,更何况,为了完成归并关联,参与 Join 的两张表都需要排序
1.3. HJ
SMJ 对排序的要求比较苛刻,所以后来又有人提出了效率更高的关联算法: HJ。HJ 把内表扫描的计算复杂度降低至 O(1)。把一个数据集合的访问效率提升至 O(1),也只有 Hash Map 能做到了。也正因为 Join 的关联过程引入了 Hash 计算,所以它叫 HJ
HJ 的计算分为两个阶段,分别是 Build 阶段和 Probe 阶段。
- 在 Build 阶段,基于内表,使用既定的哈希函数构建哈希表,如上图的步骤 1 所示。哈希表中的 $Key$ 是 $Join\ \ Key$ 应用哈希函数之后的哈希值,表中的 Value 同时包含了原始的 Join Key 和 Payload。
- 在 Probe 阶段,遍历外表数据记录,使用同样的哈希函数计算 Join Key 的哈希值。然后,用计算得到的哈希值去查询刚刚在 Build 阶段创建好的哈希表。如果查询失败,说明该条记录不存在关联关系;如果查询成功,则继续对比两边的 Join Key。如果 Join Key 一致,就把两边的记录进行拼接并输出,从而完成数据关联。
2. 分布式环境下的 Join
相比单机环境,分布式环境中的数据关联在计算环节依然遵循着 NLJ、SMJ 和 HJ 这 3 种实现方式,只不过是增加了网络分发这一变数。在 Spark 的分布式计算环境中,数据在网络中的分发主要有两种方式,分别是 Shuffle 和 Broadcast。
2.1. Shuffle
采用 Shuffle 的分发方式来完成数据关联,那么外表和内表都需要按照 Join Key 在集群中做数据分发。因为只有这样,两个数据表中 Join Key 相同的数据记录才能分配到同一个 Executor 进程,从而完成关联计算
2.2. Broadcast
采用广播机制下,Spark 只需要把内表封装到广播变量,然后进行分发。由于广播变量中包含了内表的全量数据,因此体量较大的外表只要“待在原地、保持不动”,就能完成关联计算
结合 Shuffle、广播这两种网络分发方式和 NLJ、SMJ、HJ 这 3 种计算方式,对于分布式环境下的数据关联,就能组合出 6 种 Join 策略

从执行性能来说,6 种策略从上到下由弱变强。相比之下,CPJ 的执行效率是所有实现方式当中最差的,网络开销、计算开销都很大。BHJ 是最好的分布式数据关联机制,网络开销和计算开销都是最小的。
此外 Spark 并没有选择支持 Broadcast + Sort Merge Join 这种组合方式
当数据能以广播的形式在网络中进行分发时,说明被分发的数据,也就是基表的数据足够小,完全可以放到内存中去。这个时候,相比 NLJ、SMJ,HJ 的执行效率是最高的。因此,在可以采用 HJ 的情况下,Spark 自然就没有必要再去用 SMJ 这种前置开销比较大的方式去完成数据关联。
3. Spark Join 选择策略
在不同的数据关联场景中,对于这 5 种 Join 策略来说,也就是 CPJ、BNLJ、SHJ、SMJ 以及 BHJ,Spark 会基于一定的逻辑取舍
3.1. 等值 Join
等值 Join 是指两张表的 Join Key 是通过等值条件连接在一起的
在等值数据关联中,$Spark$ 会尝试按照 $BHJ$ > $SMJ$ > $SHJ$ 的顺序依次选择 $Join$ 策略
在这三种策略中,执行效率最高的是 BHJ,其次是 SHJ,再次是 SMJ。其中,SMJ 和 SHJ 策略支持所有连接类型,如全连接、Anti Join 等等。
BHJ 尽管效率最高,但是有两个前提条件:
- 连接类型不能是全连接(Full Outer Join)
- 基表要足够小,可以放到广播变量里面去。
SHJ 比 SMJ 执行效率高,排名却不如 SMJ 靠前呢?
相比 SHJ,Spark 优先选择 SMJ 的原因在于,SMJ 的实现方式更加稳定,更不容易 OOM
在 Build 阶段,算法根据内表创建哈希表。在 Probe 阶段,为了让外表能够成功“探测”(Probe)到每一个 Hash Key,哈希表要全部放进内存才行。在不同的计算场景中,数据分布的多样性很难保证内表一定能全部放进内存。而且在 Spark 中,SHJ 策略要想被选中必须要满足两个先决条件
- 外表大小至少是内表的 3 倍
- 内表数据分片的平均大小要小于广播变量阈值。
第一个条件只有当内外表的尺寸悬殊到一定程度时,HJ 的优势才会比 SMJ 更显著。第二个限制的目的是,确保内表的每一个数据分片都能全部放进内存。
和 $SHJ$ 相比,$SMJ$ 无论是单表排序,还是两表做归并关联,都可以借助磁盘来完成。内存中放不下的数据,可以临时溢出到磁盘。
- 单表排序的过程,可以参考 Shuffle Map 阶段生成中间文件的过程。
- 归并关联的时候,算法可以把磁盘中的有序数据用合理的粒度,依次加载进内存完成计算。这个粒度可大可小,大到以数据分片为单位,小到逐条扫描。
正是考虑到这些因素,相比 SHJ,Spark SQL 会优先选择 SMJ
在配置项
spark.sql.join.preferSortMergeJoin
默认为 True 的情况下,Spark SQL 会用 SMJ 策略来兜底,确保作业执行的稳定性,不会去尝试 SHJ。如果想通过配置项来调整 Join 策略,需要把这个参数改为 False,Spark SQL 才有可能去尝试 SHJ
3.2. 不等值 Join
由于不等值 $Join$ 只能使用 NLJ 来实现,因此 Spark SQL 可选的 Join 策略只剩下 BNLJ 和 CPJ。在同一种计算模式下,相比 Shuffle,广播的网络开销更小。显然,在两种策略的选择上,Spark SQL 一定会按照 BNLJ > CPJ 的顺序进行尝试。当然,BNLJ 生效的前提自然是内表小到可以放进广播变量。如果这个条件不成立,那么 Spark SQL 用 CPJ 策略去完成关联计算
4. Join Hints
Spark3.0 为开发者提供了多样化的 Join Hints。在满足前提条件的情况下,如等值条件、连接类型、表大小等等,Spark 会优先尊重开发者的意愿,去选取开发者通过 Join Hints 指定的 Join 策略。