Hadoop-组件-HDFS-源码学习-数据读写-写文件-DataStreamer
一、概述
DataStreamer 线程会从 dataQueue 中取出 Packet 对象,然后通过底层 I/O 流将这个 DFSPacket 发送到数据流管道中的第一个 Datanode 上。发送完毕后,将 DFSPacket 从 dataQueue 中移除,放入 ackQueue 中等待下游节点的确认消息。
二、实现
2.1. 初始化
2.1.1. toNewBlock
1 | private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) { |
2.1.2. append
2.2. 获取数据包
2.2.1. 阻塞,等待数据包…
1 | try { |
2.2.2. 唤醒线程
启动 DataStreamer 线程后一直阻塞等待获取数据包,一直监听这个dataQueue,DFSOutputStream 会调用 waitAndQueueCurrentPacket() 方法将数据包放入发送队列 dataQueue 后,唤醒 DataStreamer 等待线程,获取数据包。
1 | dataQueue.notifyAll(); |
2.2.3. 获取数据包
1 | //如果流关闭 || 有错误 || dfsClient 没有运行,则跳过当前循环 |
2.3. 建立数据流管道
2.3.1. 上传
如果当前阶段是 PIPELINE_SETUP_CREATE
,申请数据块,设置 pipeline,初始化数据;
1 | if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {...} |
DataStreamer 在将数据包发送到 Datanode 之前,首先要在 Namenode 的命名空间(Namespace)中分配数据块,建立写数据块的数据流管道。这些操作都是在 DataStreamer.run() 方法中触发的。
首先调用 nextBlockOutputStream() 方法向 Namenode 申请分配新的数据块,然后建立到新分配数据块的输出流。
nextBlockOutputStream() 方法首先会调用 locateFollowingBlock() 在 Namenode 上分配一个新的数据块,这个方法返回存储新数据块的 Datanode 位置信息。然后调用 createBlockOutputStream() 方法创建到数据流管道中第一个 Datanode 的输出流,如果创建成功则返回 true,创建失败则返回 false。当创建输出流失败后,nextBlockOutputStream() 方法会调用 ClientProtocol.abadonBlock() 放弃这个数据块,并将这个 Datanode 加入故障节点队列中,以避免再次访问这个节点。然后重试申请操作,重试超过一定的次数后(由配置项 dfs.client.block.write.retries 配置,默认为3次),将抛出异常。
在 Namenode 上申请新的数据块
调用 ClientProtocol.addBlock() 方法向 Namenode 申请分配一个新的数据块
addBlock() 方法除了分配新的数据块之外,还会提交上一个数据块,然后返回存储新数据块的 Datanode 位置信息。这里还要注意异常处理,对于 NotReplicatedYetException,也就是该文件的上一个提交数据块还没有达到 HDFS 最小副本数的情况,需要 Client 线程睡一段时间之后重试,等待上一个提交数据块有足够的副本。
1
return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes, fileId, favoredNodes);
建立数据流管道
1
2
3
4
5
6success = createBlockOutputStream(nodes, storageTypes, 0L, false);
if (!success) {
dfsClient.namenode.abandonBlock(block, fileId, src, dfsClient.clientName);
block = null;
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
}
调用 setPipeline() 方法记录数据流管道信息(包括存储数据的 Datanode,以及它们的 storageIDs)
1
2
3
4
5private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, String[] storageIDs) {
this.nodes = nodes;
this.storageTypes = storageTypes;
this.storageIDs = storageIDs;
}
2.3.2. 追加
当前阶段是 PIPELINE_SETUP_APPEND
,append 操作时使用 setupPipelineForAppendOrRecovery() 创建数据流管道
1 | else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {} |
- setupPipelineForAppendOrRecovery
2.4. 发送数据包
如果当前数据包是数据块中的最后一个,这个包只有在之前发送的所有数据包确认获得 ack 之后,才可以发送。
最后一个数据包是一个空包,用来标识数据块中的所有数据已经发送完毕
将数据包从 dataQueue 中移出,移入 ackQueue 队列,等待 ack 响应。
将数据包写入底层 I/O 流中。
如果发送的数据包是最后一个空隔离包,则调用 endBLock() 执行清理工作
2.5. 处理响应信息
确认消息是由 DataStreamer 的内部线程类 ResponseProcessor 处理的。调用 initDataStreaming() 启动 ResponseProcessor 线程处理来自 Datanode 的响应信息,并将数据块构建状态(BlockConstructionStage) 设置为 DATA_STREAMING。
1 | private void initDataStreaming() { |
ResponseProcessor 线程等待下游节点的响应 ack,判断 ack 状态码,如果是失败状态,则记录出错 Datanode 的索引(errorIndex &restartIndex),并设置错误状态位(hasError);如果 ack 状态是成功,则将数据包从 ack 队列中移除,整个数据包发送过程完成。
如果在数据块发送过程中出现错误,那所有 ackQueue 队列中等待确认的 DFSPacket 都会被重新放回 dataQueue 队列中重新发送。客户端会执行错误处理流程,将出现错误的 Datanode 从数据流管道中删除,然后向 Namenode 申请新的 Datanode 重建数据流管道。接着 DataStreamer 线程会从 dataQueue 队列中取出 DFSPacket 重新发送。
2.6. 关闭数据流通道
当 DataStreamer 线程完成一个数据块的写入操作后,会调用 endBlock() 方法关闭到这个数据块的数据流管道,包括关闭 responder 线程,关闭数据流管道的输出流 blockStream 以及输入流 blockReplyStream,然后更新数据流管道状态为 PIPELINE_SETUP_CREATE
。
1 | private void endBlock() { |