一、概述

二、实现

https://mp.weixin.qq.com/s/Z-VOIkb-ubGlyxZxOMcYDA

2.1. 属性

  • minMemoryMapBytes: 使用内存映射(memory map)读取文件的最小阈值

    由配置项 spark.storage.memoryMapThreshold 指定,默认值2M。当磁盘中的文件大小超过该值时,就不会直接读取,而用内存映射文件来读取,提高效率。

  • maxMemoryMapBytes: 使用内存映射读取文件的最大阈值

    由配置项 spark.storage.memoryMapLimitForTests 指定。它是个测试参数,默认值为不限制。

  • blockSizes: 维护块ID与其对应大小之间的映射关系的ConcurrentHashMap。

三、BlockData

3.1. DiskBlockData

3.1.1. 方法

  1. $toChunkedByteBuffer()$

    在数据量比较大的时候,因为每次申请的内存块大小有限制 maxMemoryMapBytes,因此须要切分红多个块

    $toChunkedByteBuffer()$ 方法会将文件转化为输入流 FileInputStream,并获取其 ReadableFileChannel,再调用$JavaUtils.readFully()$ 方法将从 Channel 中取得的数据填充到 ByteBuffer 中。每个 ByteBuffer 即为一个 Chunk,所有 Chunk 的数组形成最终的 ChunkedByteBuffer。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
    // Utils.tryWithResource调用保证在使用完资源后关闭资源
    // 基本等同于java中的try{}finally{}
    Utils.tryWithResource(open()) { channel =>
    var remaining = blockSize
    val chunks = new ListBuffer[ByteBuffer]()
    while (remaining > 0) {
    // 这里取剩余大小和maxMemoryMapBytes的较小值,
    // 也就是说每次申请的内存块大小不超过maxMemoryMapBytes
    val chunkSize = math.min(remaining, maxMemoryMapBytes)
    val chunk = allocator(chunkSize.toInt)
    remaining -= chunkSize
    JavaUtils.readFully(channel, chunk)
    chunk.flip()
    chunks += chunk
    }
    new ChunkedByteBuffer(chunks.toArray)
    }
    }
  2. $toByteBuffer()$

    $toByteBuffer()$ 方法会检查块大小是否小于 spark.storage.memoryMapThreshold。如果小于的话,就会采用与 $toChunkedByteBuffer()$ 相同的方式直接填充 ByteBuffer。反之,就调用 $ReadableFileChannel.map()$ 方法将数据映射到MappedByteBuffer中,即进程的虚拟内存中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    override def toByteBuffer(): ByteBuffer = {
    Utils.tryWithResource(open()) { channel =>
    if (blockSize < minMemoryMapBytes) {
    // For small files, directly read rather than memory map.
    val buf = ByteBuffer.allocate(blockSize.toInt)
    JavaUtils.readFully(channel, buf)
    buf.flip()
    buf
    } else {
    channel.map(MapMode.READ_ONLY, 0, file.length)
    }
    }
    }

四、数据块写入

4.1. put()

写入块的逻辑由 $put()$ 方法来实现。$put()$ 方法首先调用 $contains()$ 方法检查块是否已经以文件的形式写入了,只有没有写入才会继续操作。然后,调用 $DiskBlockManager.getFile()$ 方法打开块 ID 对应的文件,然后获取该文件的 WritableByteChannel (NIO中的写通道,表示可以通过调用$write()$ 方法向文件写入数据)。最后,调用参数中传入的 $writeFunc$ 函数,操作 WritableByteChannel 将数据写入,并将块 ID 与其对应的字节数加入 blockSizes 映射。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
// 经过DiskBlockManager对象检查这个blockId对应的文件名的文件是否存在
if (contains(blockId)) {
throw new IllegalStateException(s"Block $blockId is already present in the disk store")
}
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
// 经过DiskBlockManager获取一个文件用于写入数据
val file = diskManager.getFile(blockId)
// 用CountingWritableChannel包装一下,以便于记录写入的字节数
val out = new CountingWritableChannel(openForWrite(file))
var threwException: Boolean = true
try {
writeFunc(out)
// 关键步骤,记录到内部的map结构中
blockSizes.put(blockId, out.getCount)
threwException = false
} finally {
try {
out.close()
} catch {
case ioe: IOException =>
if (!threwException) {
threwException = true
throw ioe
}
} finally {
if (threwException) {
remove(blockId)
}
}
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName,
Utils.bytesToString(file.length()),
finishTime - startTime))
}

4.2. putBytes()

五、数据块读取

  1. $getBytes()$

    经过 DiskBlockManager 获取对应的文件名,而后将其包装成一个 BlockData 对象,分为加密和不加密两种。