在执行 Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,负责创建 Spark 上下文,提交 Spark 作业[Job],并将作业转化为计算任务[Task],在各个 Executor 进程间协调任务的调度;后者负责在工作节点上执行具体的计算任务,并将结果返回给 Driver, 同时为需要持久化的 RDD 提供存储功能。

1. Execuor 内存模型

1.1. 堆内和堆外内存

作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内 [On-heap]空间进行了更为详细的分配,以充分利用内存。

同时,Spark 引入了堆外[Off-heap]内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。

堆内内存受到 JVM 统一管理,堆外内存是直接向操作系统进行内存的申请和释放。

1.1.1. 堆内内存

堆内内存的大小,由 Spark 应用程序启动时的 –executor-memoryspark.executor.memory 参数配置。

Executor 内运行的并发任务共享 JVM 堆内内存, 这些任务在存储 Shuffle 中间文件、缓存 RDD 数据和 Broadcast 数据时占用的内存被规划为存储 [Storage] 内存, 而这些任务在执行 Shuffle 时占用的内存被规划为执行 [Execution] 内存,剩余的部分不做特殊规划,Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间 。不同的管理模式下, 这三部分占用的空间大小各不相同

1.1.2. 堆外内存

JVM 对于内存的清理无法准确指定时间点,因此无法实现精确的释放。为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外[Off-heap]内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。由于内存的申请和释放不再通过 JVM 机制,而是直接向操作系统申请,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说,堆外内存可以被精确地申请和释放,降低了管理的难度,也降低了误差

在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存 。

2. 内存空间分配

2.1. 静态内存管理

在 Spark 最初采用的静态内存管理机制下,**[存储内存][执行内存][其他内存]**的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。

  • 可用的存储内存

    systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction

  • 可用的执行内存

    systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction

其中 systemMaxMemory 取决于当前 JVM 堆内内存的大小,最后可用的执行内存或者存储内存要在此基础上与各自的 memoryFraction 参数和 safetyFraction 参数相乘得出。

上述计算公式中的两个 safetyFraction 参数,其意义在于在逻辑上预留出 [1-safetyFraction] 这么一块保险区域,降低因实际内存超出当前预设范围而导致 OOM 的风险。

值得注意的是,这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时 Spark 并没有区别对待,和”其它内存”一样交给了 JVM 去管理。

Storage 内存和 Execution 内存都有预留空间,目的是防止 OOM ,因为 Spark 堆内内存大小的记录是不准确的,需要留出保险区域。

堆外的空间分配较为简单,只有存储内存和执行内存。可用的执行内存和存储内存占用的空间大小直接由参数 spark.memory.storageFraction 决定,由于堆外内存占用的空间可以被精确计算,所以无需再设定保险区域

静态内存管理机制实现起来较为简单,但不熟悉 Spark 的存储机制,或没有根据具体的数据规模和计算任务或做相应的配置,很容易造成存储内存和执行内存中的一方剩余大量的空间,而另一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。由于新的内存管理机制的出现,这种方式目前已经很少有开发者使用,出于兼容旧版本的应用程序的目的,Spark 仍然保留了它的实现。

2.2. 统一内存管理

统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域

截屏2021-08-28 下午3.57.36

其中最重要的优化在于动态占用机制, 其规则如下:

  1. 设定基本的存储内存和执行内存区域(spark.storage.storageFraction参数),该设定确定了双方各自拥有的空间的范围;

  2. 双方的空间都不足时,则存储到硬盘

若己方空间不足而对方空余时,可借用对方的空间; [注:存储空间不足是指不足以放下一个完整的Block]

  1. 执行内存的空间被对方占用后,可让对方将占用的部分转存到磁盘,然后”归还”借用的空间;

  2. 存储内存的空间被对方占用后,无法让对方 “归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。

2.2.1. Reserved Memory

默认都是300MB,这个数字一般都是固定不变的,在系统运行的时候 Java Heap 的大小至少为 $Heap Reserved Memory * 1.5$

e.g. 300MB x 1.5 = 450MB 的 JVM 配置

2.2.2. User Memory

Spark 程序中产生的临时数据或者是自己维护的一些数据结构也需要给予它一部份的存储空间,可以认为是程序运行时用户可以主导的空间,叫用户操作空间。

它占用的空间是 $ (Java Heap - Reserved Memory) * 25%$ (默认是 25%,可以有参数供调优),这样设计可以让用户操作时所需要的空间与系统框架运行时所需要的空间分离开。

2.2.3. Spark Memeory

系统框架运行时需要使用的空间,这是从两部份构成的,分别是 Storage Memeory 和 Execution Memory。现在 Storage 和 Execution (Shuffle) 采用了 Unified 的方式共同使用了$ (Heap Size - 300MB) * 75% $,默认情况下 Storage 和 Execution 各占该空间的 50%