一、概述

对于 DataStreamer 发出的每一个数据包,数据流管道中的 Datanode 都会发送 ack 响应给客户端。ResponseProcessor 线程就是处理 ack 响应的线程类。

二、实现

ResponseProcessor 线程的处理逻辑比较简单,它从数据流管道下游节点的输入流中读入响应消息。然后判断响应状态,如果下游数据节点执行写入数据包失败,则通过 ack 消息中的应答码记录错误节点(errorIndex),并设置错误标志位(hashError)。最后会在 DataStreamer.run() 方法中调用 processDatanodeEroor() 处理这个错误信息。如果下游节点写入数据包成功,则把当前数据包信息从ackQucue 中移除。至此,一个数据包就成功地写入了。

  1. 从输入流中读取响应 ack

  2. 处理所有 DataNode 的响应状态

    • 处理重启的 Datanode

      1
      2
      3
      4
      5
      6
      7
      if (PipelineAck.isRestartOOBStatus(reply) && shouldWaitForRestart(i)) {
      restartDeadline = dfsClient.getConf().datanodeRestartTimeout + Time.monotonicNow();
      setRestartingNodeIndex(i);
      String message = "A datanode is restarting: " + targets[i];
      DFSClient.LOG.info(message);
      throw new IOException(message);
      }
    • 处理错误的数据节点

      1
      2
      3
      4
      if (reply != SUCCESS) {
      setErrorIndex(i);
      throw new IOException("Bad response " + reply + " for block " + block + " from datanode " + targets[i]);
      }
  3. 心跳消息的响应,则不用特别处理

  4. 数据流管道成功写入数据包的消息

  5. 此 ack 回复包推断完毕后,会进行对应的 Packet 移除

  6. 异常处理

    发生异常的时候,responderClosed 设置为 true,循环退出