Spark-源码学习-SparkCore-通信服务-架构设计-路由层-Outbox
一、概述
Outbox 是一个用于发送消息的组件,它负责将消息发送到远程进程的 Dispatcher。
二、实现
2.1. 结构
2.1.1. 属性
nettyEnv: 当前 Outbox 所在节点的 NettyRpcEnv
address: Outbox 所对应的远端 NettyRpcEnv 的地址
messages: 向其他远端 NettyRpcEnv 上的所有 RpcEndpoint 发送的消息列表
消息列表 messages 中的消息类型为 OutboxMessage, 所有将要向远端发送的消息都会被封装成 OutboxMessage 类型。
OutboxMessage 在客户端使用, 是对外发送消息的封装。InboxMessage 在服务端使用, 是对所接收消息的封装。
TransportClient 的 $sendRpc()$ 方法的第二个参数是 RpcResponseCallback 类型, RpcOutboxMessage 本身也实现了 RpcResponseCallback, 所以调用的时候传递了 RpcOutboxMessage 的 this 引用。
client: 当前 Outbox 内的 TransportClient 消息的发送都依赖于此传输客户端
connectFuture: 指向当前 Outbox 内连接任务的 java.util.concurrent.Future 引用。如果当前没有连接任务,则connectFuture
为null。stopped: 当前 Outbox 是否停止的状态
draining: 表示当前 Outbox 内正有线程在处理 messages 列表中消息的状态。消息列表 messages 中的消息类型为 OutboxMessage, 所有将要向远端发送的消息都会被封装成 OutboxMessage 类型。
三、向 Outbox 投递消息
向 Outbox 投递消息的逻辑位于 $NettyRpcEnv.postToOutbox()$ 方法中。
如果已经持有了远端 RpcEndpoint 引用对应的 TransportClient,就直接调用 $OutboxMessage.sendWith()$ 方法来发送。但如果没有持有TransportClient 的话,就先从 outboxes 缓存中获取 RPC 地址对应的发件箱,如果也没有发件箱,就要创建一个出来。最后,在当前 NettyRpcEnv 和Outbox 本身都未停止的前提下,调用 $send()$ 方法发送消息。

四、Outbox 发送消息
Outbox作用于 client 端,当 RpcEndpointRef 请求 RpcEndpoint 时,若 RpcEndpointRef 和 RpcEndpoint 位于同一机器时,走的是 Inbox 的逻辑,否则,即 RpcEndpointRef 和 RpcEndpoint 不在一台机器,则 RpcEndpointRef 将信息发送到 Outbox。
Outbox 内部也维护了 messages 用于存储消息,此外还有 TransportClient,用于和相应的 netty server 通信。方法 $send()$ 用于将消息添加到 messages 并调用 $drainOutbox()$ 方法消费消息。
Outbox 对应的消息类型为 OutboxMessage
和 Inbox 不同,OutBox 并没有启动单独线程 MessageLoop,仅是在方法中消费,最终调用 TransportClient 的 $send$ 或 $sendRpc$ 方法发送消息。
endpointRef、OutBox、TransportClient 根据 RpcAddress 一一对应的,当一个节点需要和多个节点通信时,会为每个节点创建对应的 OutBox,由outboxes 集合维护,同时每个 OutBox 对象内创建对应的要访问节点的 TransportClient。
当 Outbox 遇到以下情况之一,则不处理消息,直接返回:
Outbox 已经停止
1
2
3if (stopped) {
return
}正在在连接远端的 RPC 端点
1
2
3if (connectFuture != null) {
return
}TransportClient 本身为空,说明还没有创建 RPC 客户端,此时先创建 TransportClient
1
2
3
4
5if (client == null) {
// There is no connect task but client is null, so we need to launch the connect task.
launchConnectTask()
return
}$launchConnectTask()$ 使用 clientConnectionExecutor 线程池来提交一个 Callable,其内部会最终调用 $clientFactory.createClient()$ 方法来创建 RPC 客户端。创建成功之后,再次调用 $drainOutbox()$ 方法试图处理消息。
有其他线程已经在处理消息了
1
2
3
4if (draining) {
// There is some thread draining, so just exit
return
}
如果没有异常情况的话,就从 messages 表中取出消息,将标志 draining 设为 true,并调用 $OutboxMessage.sendWith()$ 方法发送。