一、概述

UnifiedMemoryManager 是从 1.6 开始的统一内存管理模型,统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域

二、实现

2.1. 成员属性

  1. maxHeapMemory: 最大堆内存。大小为系统可用内存与 spark.memory.fraction 属性值的乘积
  2. onHeadpStorageRegionSize: 用于存储的堆内存大小。
  3. numCores: CPU 内核数

2.2. 主要方法

三、内存申请

3.1. 执行内存

$UnifiedMemoryManager.accquireExecutionMemory()$​ 方法申请执行内存~当任务尝试从 executor 中申请 numBytes 大小的内存
该方法直接向 ExecutionMemoryPool 索要所需内存:

  • 当 ExecutionMemory 内存充足,则不会触发向 Storage 申请内存: $maybeGrowExecutionPool()$

    UnifiedMemoryManager 其中最重要的优化在于动态占用机制, 其规则如下:

    1. 设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围;

    2. 双方的空间都不足时,则存储到硬盘,若己方空间不足而对方空余时,可借用对方的空间; [注: 存储空间不足是指不足以放下一个完整的Block]

    3. 执行内存的空间被对方占用后,可让对方将占用的部分转存到磁盘,然后”归还”借用的空间;

    4. 存储内存的空间被对方占用后,无法让对方 “归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
    if (extraMemoryNeeded > 0) {
    val memoryReclaimableFromStorage = math.max(storagePool.memoryFree, storagePool.poolSize - storageRegionSize)
    if (memoryReclaimableFromStorage > 0) {
    val spaceToReclaim = storagePool.freeSpaceToShrinkPool(math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
    storagePool.decrementPoolSize(spaceToReclaim)
    executionPool.incrementPoolSize(spaceToReclaim)
    }
    }
    }
    • $storagePool.freeSpaceToShrinkPool()$

      StoragePool 释放指定大小的空间,缩小内存池的大小。

  • 每个 Task 能够被使用的内存是被限制的

3.2. 存储内存

流程和 $acquireExecutionMemory()$ 类似,当 storage 的内存不足时,同样会向execution借内存,但区别是当且仅当ExecutionMemory有空闲内存时,StorageMemory 才能借走该内存

3.3. UnRoll 内存

1
2
3
override def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes, memoryMode)
}