Spark-源码学习-SparkCore-存储服务-磁盘组件-DiskStore
一、概述

二、实现
https://mp.weixin.qq.com/s/Z-VOIkb-ubGlyxZxOMcYDA
2.1. 属性
minMemoryMapBytes: 使用内存映射(memory map)读取文件的最小阈值
由配置项 spark.storage.memoryMapThreshold 指定,默认值2M。当磁盘中的文件大小超过该值时,就不会直接读取,而用内存映射文件来读取,提高效率。
maxMemoryMapBytes: 使用内存映射读取文件的最大阈值
由配置项 spark.storage.memoryMapLimitForTests 指定。它是个测试参数,默认值为不限制。
blockSizes: 维护块ID与其对应大小之间的映射关系的ConcurrentHashMap。
三、BlockData
3.1. DiskBlockData
3.1.1. 方法
$toChunkedByteBuffer()$
在数据量比较大的时候,因为每次申请的内存块大小有限制
maxMemoryMapBytes
,因此须要切分红多个块$toChunkedByteBuffer()$ 方法会将文件转化为输入流 FileInputStream,并获取其 ReadableFileChannel,再调用$JavaUtils.readFully()$ 方法将从 Channel 中取得的数据填充到 ByteBuffer 中。每个 ByteBuffer 即为一个 Chunk,所有 Chunk 的数组形成最终的 ChunkedByteBuffer。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
// Utils.tryWithResource调用保证在使用完资源后关闭资源
// 基本等同于java中的try{}finally{}
Utils.tryWithResource(open()) { channel =>
var remaining = blockSize
val chunks = new ListBuffer[ByteBuffer]()
while (remaining > 0) {
// 这里取剩余大小和maxMemoryMapBytes的较小值,
// 也就是说每次申请的内存块大小不超过maxMemoryMapBytes
val chunkSize = math.min(remaining, maxMemoryMapBytes)
val chunk = allocator(chunkSize.toInt)
remaining -= chunkSize
JavaUtils.readFully(channel, chunk)
chunk.flip()
chunks += chunk
}
new ChunkedByteBuffer(chunks.toArray)
}
}$toByteBuffer()$
$toByteBuffer()$ 方法会检查块大小是否小于 spark.storage.memoryMapThreshold。如果小于的话,就会采用与 $toChunkedByteBuffer()$ 相同的方式直接填充 ByteBuffer。反之,就调用 $ReadableFileChannel.map()$ 方法将数据映射到MappedByteBuffer中,即进程的虚拟内存中。
1
2
3
4
5
6
7
8
9
10
11
12
13override def toByteBuffer(): ByteBuffer = {
Utils.tryWithResource(open()) { channel =>
if (blockSize < minMemoryMapBytes) {
// For small files, directly read rather than memory map.
val buf = ByteBuffer.allocate(blockSize.toInt)
JavaUtils.readFully(channel, buf)
buf.flip()
buf
} else {
channel.map(MapMode.READ_ONLY, 0, file.length)
}
}
}
四、数据块写入
4.1. put()
写入块的逻辑由 $put()$ 方法来实现。$put()$ 方法首先调用 $contains()$ 方法检查块是否已经以文件的形式写入了,只有没有写入才会继续操作。然后,调用 $DiskBlockManager.getFile()$ 方法打开块 ID 对应的文件,然后获取该文件的 WritableByteChannel (NIO中的写通道,表示可以通过调用$write()$ 方法向文件写入数据)。最后,调用参数中传入的 $writeFunc$ 函数,操作 WritableByteChannel 将数据写入,并将块 ID 与其对应的字节数加入 blockSizes 映射。
1 | def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = { |
4.2. putBytes()
五、数据块读取
$getBytes()$
经过 DiskBlockManager 获取对应的文件名,而后将其包装成一个 BlockData 对象,分为加密和不加密两种。