Hadoop-组件-HDFS-源码学习-数据读写-写文件-ResponseProcessor
一、概述
对于 DataStreamer 发出的每一个数据包,数据流管道中的 Datanode 都会发送 ack 响应给客户端。ResponseProcessor 线程就是处理 ack 响应的线程类。
二、实现
ResponseProcessor 线程的处理逻辑比较简单,它从数据流管道下游节点的输入流中读入响应消息。然后判断响应状态,如果下游数据节点执行写入数据包失败,则通过 ack 消息中的应答码记录错误节点(errorIndex),并设置错误标志位(hashError)。最后会在 DataStreamer.run() 方法中调用 processDatanodeEroor() 处理这个错误信息。如果下游节点写入数据包成功,则把当前数据包信息从ackQucue 中移除。至此,一个数据包就成功地写入了。
从输入流中读取响应 ack
处理所有 DataNode 的响应状态
处理重启的 Datanode
1
2
3
4
5
6
7if (PipelineAck.isRestartOOBStatus(reply) && shouldWaitForRestart(i)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout + Time.monotonicNow();
setRestartingNodeIndex(i);
String message = "A datanode is restarting: " + targets[i];
DFSClient.LOG.info(message);
throw new IOException(message);
}处理错误的数据节点
1
2
3
4if (reply != SUCCESS) {
setErrorIndex(i);
throw new IOException("Bad response " + reply + " for block " + block + " from datanode " + targets[i]);
}
心跳消息的响应,则不用特别处理
数据流管道成功写入数据包的消息
此 ack 回复包推断完毕后,会进行对应的 Packet 移除
异常处理
发生异常的时候,responderClosed 设置为 true,循环退出
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Joker!
评论
ValineTwikoo