一、概述

DataStreamer 线程会从 dataQueue 中取出 Packet 对象,然后通过底层 I/O 流将这个 DFSPacket 发送到数据流管道中的第一个 Datanode 上。发送完毕后,将 DFSPacket 从 dataQueue 中移除,放入 ackQueue 中等待下游节点的确认消息。

二、实现

DataStreamer

2.1. 初始化

2.1.1. toNewBlock

1
2
3
4
5
6
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
isAppend = false;
isLazyPersistFile = isLazyPersist(stat);
this.block = block;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}

2.1.2. append

2.2. 获取数据包

2.2.1. 阻塞,等待数据包…

1
2
3
4
5
6
try {
// 等待数据包
dataQueue.wait(timeout);
} catch (InterruptedException e) {
DFSClient.LOG.warn("Caught exception ", e);
}

2.2.2. 唤醒线程

启动 DataStreamer 线程后一直阻塞等待获取数据包,一直监听这个dataQueue,DFSOutputStream 会调用 waitAndQueueCurrentPacket() 方法将数据包放入发送队列 dataQueue 后,唤醒 DataStreamer 等待线程,获取数据包。

1
dataQueue.notifyAll();

2.2.3. 获取数据包

1
2
3
4
5
6
7
8
9
10
11
12
13
//如果流关闭 || 有错误 || dfsClient 没有运行,则跳过当前循环
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
}
// 如果队列为空,则发送心跳包
if (dataQueue.isEmpty()) {
one = createHeartbeatPacket();
assert one != null;
} else {
// 队列不为空,获取一个dataPacket数据包
one = dataQueue.getFirst(); // regular data packet
long parents[] = one.getTraceParents();
}

2.3. 建立数据流管道

2.3.1. 上传

如果当前阶段是 PIPELINE_SETUP_CREATE,申请数据块,设置 pipeline,初始化数据;

1
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {...}

DataStreamer 在将数据包发送到 Datanode 之前,首先要在 Namenode 的命名空间(Namespace)中分配数据块,建立写数据块的数据流管道。这些操作都是在 DataStreamer.run() 方法中触发的。

  1. 首先调用 nextBlockOutputStream() 方法向 Namenode 申请分配新的数据块,然后建立到新分配数据块的输出流。

    nextBlockOutputStream() 方法首先会调用 locateFollowingBlock() 在 Namenode 上分配一个新的数据块,这个方法返回存储新数据块的 Datanode 位置信息。然后调用 createBlockOutputStream() 方法创建到数据流管道中第一个 Datanode 的输出流,如果创建成功则返回 true,创建失败则返回 false。当创建输出流失败后,nextBlockOutputStream() 方法会调用 ClientProtocol.abadonBlock() 放弃这个数据块,并将这个 Datanode 加入故障节点队列中,以避免再次访问这个节点。然后重试申请操作,重试超过一定的次数后(由配置项 dfs.client.block.write.retries 配置,默认为3次),将抛出异常。

    • 在 Namenode 上申请新的数据块

      调用 ClientProtocol.addBlock() 方法向 Namenode 申请分配一个新的数据块

      addBlock() 方法除了分配新的数据块之外,还会提交上一个数据块,然后返回存储新数据块的 Datanode 位置信息。这里还要注意异常处理,对于 NotReplicatedYetException,也就是该文件的上一个提交数据块还没有达到 HDFS 最小副本数的情况,需要 Client 线程睡一段时间之后重试,等待上一个提交数据块有足够的副本。

      1
      return dfsClient.namenode.addBlock(src, dfsClient.clientName, block, excludedNodes, fileId, favoredNodes);
    • 建立数据流管道

      1
      2
      3
      4
      5
      6
      success = createBlockOutputStream(nodes, storageTypes, 0L, false);
      if (!success) {
      dfsClient.namenode.abandonBlock(block, fileId, src, dfsClient.clientName);
      block = null;
      excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
      }
  2. 调用 setPipeline() 方法记录数据流管道信息(包括存储数据的 Datanode,以及它们的 storageIDs)

    1
    2
    3
    4
    5
    private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, String[] storageIDs) {
    this.nodes = nodes;
    this.storageTypes = storageTypes;
    this.storageIDs = storageIDs;
    }

2.3.2. 追加

当前阶段是 PIPELINE_SETUP_APPEND,append 操作时使用 setupPipelineForAppendOrRecovery() 创建数据流管道

1
else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {}
  1. setupPipelineForAppendOrRecovery

2.4. 发送数据包

  1. 如果当前数据包是数据块中的最后一个,这个包只有在之前发送的所有数据包确认获得 ack 之后,才可以发送。

    最后一个数据包是一个空包,用来标识数据块中的所有数据已经发送完毕

  2. 将数据包从 dataQueue 中移出,移入 ackQueue 队列,等待 ack 响应。

  3. 将数据包写入底层 I/O 流中。

  4. 如果发送的数据包是最后一个空隔离包,则调用 endBLock() 执行清理工作

2.5. 处理响应信息

确认消息是由 DataStreamer 的内部线程类 ResponseProcessor 处理的。调用 initDataStreaming() 启动 ResponseProcessor 线程处理来自 Datanode 的响应信息,并将数据块构建状态(BlockConstructionStage) 设置为 DATA_STREAMING。

1
2
3
4
5
private void initDataStreaming() {
response = new ResponseProcessor(nodes);
response.start();
stage = BlockConstructionStage.DATA_STREAMING;
}

ResponseProcessor 线程等待下游节点的响应 ack,判断 ack 状态码,如果是失败状态,则记录出错 Datanode 的索引(errorIndex &restartIndex),并设置错误状态位(hasError);如果 ack 状态是成功,则将数据包从 ack 队列中移除,整个数据包发送过程完成。

如果在数据块发送过程中出现错误,那所有 ackQueue 队列中等待确认的 DFSPacket 都会被重新放回 dataQueue 队列中重新发送。客户端会执行错误处理流程,将出现错误的 Datanode 从数据流管道中删除,然后向 Namenode 申请新的 Datanode 重建数据流管道。接着 DataStreamer 线程会从 dataQueue 队列中取出 DFSPacket 重新发送。

2.6. 关闭数据流通道

当 DataStreamer 线程完成一个数据块的写入操作后,会调用 endBlock() 方法关闭到这个数据块的数据流管道,包括关闭 responder 线程,关闭数据流管道的输出流 blockStream 以及输入流 blockReplyStream,然后更新数据流管道状态为 PIPELINE_SETUP_CREATE

1
2
3
4
5
6
7
8
9
10
private void endBlock() {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Closing old block " + block);
}
this.setName("DataStreamer for file " + src);
closeResponder();
closeStream();
setPipeline(null, null, null);
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}