一、概述

在执行 Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程。

  • Driver 为主控进程,负责创建 SparkContext,提交 Spark Job,并将作业转化为 Task,在各个 Executor 间协调任务的调度

    Driver 的主要职责是任务调度,同时参与非常少量的任务计算,因此 Driver 的内存管理 与 JVM 进程没有太大区别~

  • Executor 负责执行具体的计算任务,并将结果返回给 Driver, 同时为需要持久化的 RDD 提供存储功能。

    Executor 内运行的并发任务共享 JVM 堆内内存, 这些任务在缓存 RDD 数据和 Broadcast 数据时占用的内存被规划为存储 Storage 内存, 而这些任务在执行 Shuffle 时占用的内存被规划为执行Execution 内存,不做特殊规划,存储 Spark 内部的和用户定义的对象实例,不同的管理模式下,占用的空间大小各不相同。

    作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内 [On-heap]空间进行了更为详细的分配,以充分利用内存~

    • 堆内 [On-heap] 内存

      内内存受到 JVM 统一管理,堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory-spark.executor.memory 参数配置。

    • 堆外 [Off-heap] 内存

      JVM 对于堆内内存的清理无法准确指定时间点,因此无法实现精确的释放。为了进一步优化内存的使用,Spark 引入了堆外 [Off-heap] 内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。由于内存的申请和释放不再通过 JVM 机制,而是直接向操作系统申请,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。

      序列化的数据占用的空间可以被精确计算,相比堆内内存,堆外内存可以被精确地申请和释放,降低了管理的难度,也降低了误差,在默认情况下堆外内存并不启用,通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存

二、内存模型

MemoryStore 提供了一种宏观的内存模型,负责将 Block 存储到内存, 其底层依赖于 MemoryManager 的服务。

  1. Block 已使用内存

  2. Unroll 内存

  3. 未使用内存

三、内存空间管理

3.1. 数据结构

3.1.1. 内存池 MemoryPool

内存池是 Spark 内存的抽象,它记录了总内存大小,已使用内存大小,剩余内存大小,提供给 MemoryManager 进行分配/回收内存。

Spark 既将内存作为存储体系的一部分, 又作为计算引擎所需要的计算资源, 因此 MemoryPool 既有用于存储体系的实现类 StoregeMemoryPool, 又有用于计算的 ExecutionMemoryPool,分别对应 execution memory 和 storage memory。

MemoryPool 以及子类方法只是用来标记内存使用情况,而不实际分配/回收内存

  1. StorageMemoryPool

    StorageMemoryPool 是对用于存储的物理内存的逻辑抽象,通过对存储内存的逻辑管理,提高 Spark 存储体系对内存的使用效率。

  2. ExecutionMemoryPool

3.2. 内存管理器 MemoryManager

Spark 通过 MemoryManager 提供了一个内存管理接口。它实现了在任务之间划分可用内存以及在存储和执行之间分配内存的策略。

3.2.1. 内存池

MemoryManager 通过 4 块内存池管理内存空间: StorageMemoryPool(on/offHeapStorageMemoryPool) 用于缓存和广播数据,HeapExecutionMemoryPool(on/offHeapExecutionMemoryPool) 主要用于存储 shuffle、join、sort、aggregation 等中的临时数据。

1
2
3
4
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)

3.2.2. 实现

MemoryManager 只有一个实现: UnifiedMemoryManager。

  1. UnifiedMemoryManager

    UniiedMemoryManager 在 MemoryManager 的內存模型交上,将计算內存和存镇内存之间的边界修改为“软”边界,即任何一方可以向另一方借用空闲的内存