Hadoop-组件-HDFS-源码学习-集群启动-DataNode 启动设计
DataNode 是 Hadoop HDFS 中的从角色,负责具体的数据块存储。DataNode 的数量决定了 HDFS 集群的整体数据存储能力。通过和 NameNode 配合维护着数据块。
让我们看看 HDFS DataNode 启动流程~~~
一、概述
DataNode 类封装了整个数据节点逻辑的实现。它通过 DataStorage 以及 FsDatasetImpl 管理着数据节点存储上的所有数据块,DataNode 类还会通过流式接口对客户端和其他数据节点提供读数据块、写数据块、复制数据块等功能。DataNode 类还会通过 BlockPoolManager 对象周期性地向 Namenode 发送心跳、块汇报、增量块汇报以及缓存汇报,同时执行 Namenode 发回的名字节点指令。DataNode 持有 DataBlockScanner 对象周期性地检查存储上的所有数据块,以及 DirectoryScanner 对象验证存储上数据块和内存中数据块的一致性。
二、源码
DataNode 同 NameNode都一样,是一个 java进程,从 DataNode#main() 方法开始~
其时序图如图所示:
2.1. createDataNode()
跟着时序图来到 NameNode#secureMain()~~~,首先打印一些启动的基本信息日志
1 | StringUtils.startupShutdownMessage(DataNode.class, args, LOG); |
接着初始化DataNode,由时序图知道,实例化 DataNode 代码最后定位于 DataNode#instantiateDataNode()~
1 | DataNode dn = instantiateDataNode(args, conf, resources); |
2.1.1. 解析配置文件
1 | if (!parseArguments(args, conf)) { |
2.1.2. 获取 Hdfs 数据存储目录
1 | Collection<StorageLocation> dataLocations = getStorageLocations(conf); |
本地调试环境中配了 6 个数据存储目录
1 | <property> |
最后 dataLocations 返回结果:
2.1.3. 实例化 DataNode
1 | return makeInstance(dataLocations, conf, resources); |
本方法主要做两件事:
- 对磁盘进行校验,并返回可用的路径列表
- 创建 Datanode 对象
返回可用的路径列表
checkStorageLocations() 传入磁盘检测对象进行磁盘检测,并返回可用的目录列表
1
2DataNodeDiskChecker dataNodeDiskChecker = new DataNodeDiskChecker(permission);
List<StorageLocation> locations = checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);检测磁盘目录
利用磁盘检测对象进行磁盘目录的检测:校验目录的读写执行权限 ,如果目录不存在,则创建目录并给予700权限
1
dataNodeDiskChecker.checkDir(localFS, new Path(uri));
检测完毕没有抛出异常,则说明目录可用,加入到可用列表
1
locations.add(location);
拼接不可用目录:如果出现 IO 异常,说明此磁盘目录不可用,加入到目录中
1
invalidDirs.append("\"").append(uri.getPath()).append("\" ");
在本地调试环境中,有 6 个数据存储目录,其中
/dfs2
和/dfs4
权限不满足,不可用则如图所示,最终得到的 location 和 invaild 如图所示
创建 Datanode 对象
1
return new DataNode(conf, locations, resources)
初始化成员变量
这儿有个重要的成员变量 BlockScanner~
1
2
3
4
5
6...
this.blockScanner = new BlockScanner(this, conf);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
...Datanode 启动时会初始化一个数据块扫描器周期性地验证 Datanode 上存储的所有数据块的正确性,并把发现的损坏数据块报告给 Namenode。 BlockScanner 类就是 Datanode 上数据块扫描器的实现。由于 Datanode 会保存多个块池的数据块,所以 BlockScanner 会持有多个 BlockPoolSliceScanner 对象,每个 BlockPoolSliceScanner 对象都负责验证一个指定块池下数据块的正确性
启动 Datanode
内容好多,2.2.小结: startDatanode() 说~~~
1
2
3
4
5try {
hostName = getHostName(conf);
LOG.info("Configured hostname is " + hostName);
startDataNode(conf, dataDirs, resources);
}
2.2. startDatanode()
首先初始化成员一些用到的成员变量~~~
1 | // ... |
2.2.1. 缓存设置
判断是否基于内存缓存数据块
1 | if (dnConf.maxLockedMemory > 0) { |
2.2.2. 实例化 DataStorage
1 | storage = new DataStorage(); |
DataNode最重要的功能就是管理磁盘上存储的数据块。DataNode将这个管理功能切分为2个部分:DataStorage 和 FsDatasetImpl
DataStorage
DataStorage 继承自 Storage 抽象类,提供了管理与组织磁盘存储目录,如 current,previous,detach,tmp等,在DataNode数据目录,你可以看到一些current tmp,rbw或者finalized文件夹
FsDatasetImpl
管理组织数据块和元数据文件
BlockPoolSlice 负责管理单个存储目录下单个块池的所有数据块;FsVolumeImpl 则负责管理一个完整的存储目录下所有的数据块,也就包括了这个存储目录下多个 BlockPoolSlice 对象的引用。Datanode 可以定义多个存储目录,也就是定义多个 FsVolumeImpl 对象,在 HDFS 中使用 FsVolumeList 对象统一管理 Datanode 上定义的多个 FsVolumelmpl 对象。FsDatasetImpl 负责管理 Datanode 上所有数据块功能的类
2.2.3. 初始化 DataXceiver
在 DataNode 中,有一个后台工作的线程 DataXceiverServer。它被用于接收来自客户端或其他数据节点的数据读写请求,为每个数据读写请求创建一个单独的线程去处理。而处理每次读写请求时所创建的线程

1 | initDataXceiver(conf); |
2.2.4. 启动 Datanode的 web 服务器
NameNode启动流程中,也存在相似的一个startHttpServer,startHttpServer 通过绑定 servlet,来加强自身的功能。DataNode它也是同样,会绑定servlet加强自身
1 | startInfoServer(conf); |

构建 HttpServer2
1
2
3
4
5HttpServer2.Builder builder = new HttpServer2.Builder()
.setName("datanode")
.setConf(conf).setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
.addEndpoint(URI.create("http://localhost:0"))
.setFindPort(true);绑定 servlet
获取校验文件
看当前 Datanode 的 DataBlockScanner 的信息
启动 http 服务
2.2.5. 初始化 IpcServer
启动 Datanode上的 RPC 服务,主要包括两个服务:ClientDatanodeProtocolPB 和 InterDatanodeProtocolPB。
初始化 IpcServer (RPC通信)
DataNode#runDatanodeDaemon() 中启动
1 | initIpcServer(conf); |
指定 DataNode 的 RPC 地址
默认是 50020
1
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(conf.getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
将来自 Client 的 请求翻译成 PB
将来自 Client 的 请求翻译成 PB,并执行请求,以PB形式返回请求结果.
1
ClientDatanodeProtocolServerSideTranslatorPB clientDatanodeProtocolXlator = new ClientDatanodeProtocolServerSideTranslatorPB(this);
注册 ClientDatanodeProtocolService
Datanode 刚刚启动就注册了ClientDatanodeProtocolService,ipcServer 相当于一个server 监听所有来自ClientDatanodeProtocol 代理的关于 Datanode 的方法请求
1
2BlockingService service =
ClientDatanodeProtocolService.newReflectiveBlockingService(clientDatanodeProtocolXlator);创建 RPC 服务端
1
2
3
4
5
6
7
8ipcServer = new RPC.Builder(conf)
.setProtocol(ClientDatanodeProtocolPB.class)
.setInstance(service)
.setBindAddress(ipcAddr.getHostName())
.setPort(ipcAddr.getPort())
.setNumHandlers(
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
.setSecretManager(blockPoolTokenSecretManager).build();Datanode与 Datanode 之间的通信协议
1
service = InterDatanodeProtocolService.newReflectiveBlockingService(interDatanodeProtocolXlator);
2.2.6. 实例化 BlockPoolManager
在DataNode 启动时构造一个BlockPoolManager实例,并调用其refreshNamenodes()方法,refreshNamenodes() 方法用于根据 HDFS 配置添加、删除以及更新命名空间。在BlockPoolManager 的实现中,就是对指定命名空间的 BPOfferSerivce 引用进行更新。
1 | blockPoolManager = new BlockPoolManager(this); |
在 HDFS Federation架构 中, 一个 HDFS 集群可以创建多个命名空间,每一个 DataNode 都可以存储多个 BlockPool 的的数据块
DataNode定义了一个 BlockPoolManager 用于管理 DataNode上所有的块池。 BlockPoolManager 会持有多个 BPOfferService 对象,
每个 BPOfferService 对象都封装了对单个块池操作的 API。同时,由于在 HDFS HA 部署中,每个命名空间又会同时拥有两个 Namenode,一个作为活动的(Active) Namenode, 另一个作为热备的(Standby) Namenode,所以每个 BPOfferService 都会包含两个 BPServiceActor 对象,每个 BPServiceActor 对象都封装了与该命名空间中单个 Namenode 的操作,包括定时向这个Namenode 发送心跳(heartbeat)、增量块汇报(blockReceivedAndDeleted )、全量块汇报(blockreport)、缓存块汇报(cacheReport),以及执行 Namenode 通过心跳/块汇报响应传回的名字节点指令等操作。
三、总结
DataNode 启动流程的入口方法是 main()方法,这个方法调用了 seeureMain() 方法,secureMain() 方法通过调用 createDataNode()创建并启动一个 DataNode 实例,然后在这个DataNode 实例上调用 join() 方法等待 DataNode 停止运行。
createDataNode()方法首先调用静态方法 instantiateDataNode() 创建 DataNode 实例,然后调用 runDatanodeDaemon() 方法启动 DataNode 上的各个服务。
静态方法 instantiateDataNode() 首先解析 DataNode 的启动参数,获取 DataNode 配置文件中定义的所有存储目录,然后调用静态方法 makelnstance()。makelnstance()方法确保定义的存储目录至少有一个可用,然后调用 DataNode 的构造方法创建 DataNode 实例。
DataNode 的构造函数在初始化了若干配置文件中定义的参数后,调用 startDataNode() 法完成 DataNode 的初始化操作,startDataNode() 方法初始化了 DataStorage 对象、DataXceiverServer 对象、shortCircuitRegistry 对象,启动了 HttpInfoServer,初始化了 DataNode 的 IPC Server,然后 创建 BlockPoolManager 并加载每个块池定义的 Namenode 列表。
成功初始化 DataNode 对象之后,就需要调用 runDatanodeDaemon() 方法启动 DataNode 的服务了。runDatanodeDaemon() 方法启动了 blockPoolManager 管理的所有线程,启动了DataXceiverServer 线程,最后启动了 DataNode 的 IPC Server。