一、概述

Outbox 是一个用于发送消息的组件,它负责将消息发送到远程进程的 Dispatcher。

二、实现

Outbox总结

2.1. 结构

2.1.1. 属性

  1. nettyEnv: 当前 Outbox 所在节点的 NettyRpcEnv

  2. address: Outbox 所对应的远端 NettyRpcEnv 的地址

  3. messages: 向其他远端 NettyRpcEnv 上的所有 RpcEndpoint 发送的消息列表

    消息列表 messages 中的消息类型为 OutboxMessage, 所有将要向远端发送的消息都会被封装成 OutboxMessage 类型。

    OutboxMessage 在客户端使用, 是对外发送消息的封装。InboxMessage 在服务端使用, 是对所接收消息的封装。

    Outbox

    TransportClient 的 $sendRpc()$ 方法的第二个参数是 RpcResponseCallback 类型, RpcOutboxMessage 本身也实现了 RpcResponseCallback, 所以调用的时候传递了 RpcOutboxMessage 的 this 引用。

  4. client: 当前 Outbox 内的 TransportClient 消息的发送都依赖于此传输客户端

  5. connectFuture: 指向当前 Outbox 内连接任务的 java.util.concurrent.Future 引用。如果当前没有连接任务,则connectFuture
    为null。

  6. stopped: 当前 Outbox 是否停止的状态

  7. draining: 表示当前 Outbox 内正有线程在处理 messages 列表中消息的状态。消息列表 messages 中的消息类型为 OutboxMessage, 所有将要向远端发送的消息都会被封装成 OutboxMessage 类型。

三、向 Outbox 投递消息

向 Outbox 投递消息的逻辑位于 $NettyRpcEnv.postToOutbox()$ 方法中。

如果已经持有了远端 RpcEndpoint 引用对应的 TransportClient,就直接调用 $OutboxMessage.sendWith()$ 方法来发送。但如果没有持有TransportClient 的话,就先从 outboxes 缓存中获取 RPC 地址对应的发件箱,如果也没有发件箱,就要创建一个出来。最后,在当前 NettyRpcEnv 和Outbox 本身都未停止的前提下,调用 $send()$ 方法发送消息。

四、Outbox 发送消息

Outbox作用于 client 端,当 RpcEndpointRef 请求 RpcEndpoint 时,若 RpcEndpointRef 和 RpcEndpoint 位于同一机器时,走的是 Inbox 的逻辑,否则,即 RpcEndpointRef 和 RpcEndpoint 不在一台机器,则 RpcEndpointRef 将信息发送到 Outbox。

Outbox 内部也维护了 messages 用于存储消息,此外还有 TransportClient,用于和相应的 netty server 通信。方法 $send()$ 用于将消息添加到 messages 并调用 $drainOutbox()$ 方法消费消息。

Outbox 对应的消息类型为 OutboxMessage

和 Inbox 不同,OutBox 并没有启动单独线程 MessageLoop,仅是在方法中消费,最终调用 TransportClient 的 $send$ 或 $sendRpc$ 方法发送消息。

endpointRef、OutBox、TransportClient 根据 RpcAddress 一一对应的,当一个节点需要和多个节点通信时,会为每个节点创建对应的 OutBox,由outboxes 集合维护,同时每个 OutBox 对象内创建对应的要访问节点的 TransportClient。

当 Outbox 遇到以下情况之一,则不处理消息,直接返回:

  • Outbox 已经停止

    1
    2
    3
    if (stopped) {
    return
    }
  • 正在在连接远端的 RPC 端点

    1
    2
    3
    if (connectFuture != null) {
    return
    }
  • TransportClient 本身为空,说明还没有创建 RPC 客户端,此时先创建 TransportClient

    1
    2
    3
    4
    5
    if (client == null) {
    // There is no connect task but client is null, so we need to launch the connect task.
    launchConnectTask()
    return
    }

    $launchConnectTask()$ 使用 clientConnectionExecutor 线程池来提交一个 Callable,其内部会最终调用 $clientFactory.createClient()$ 方法来创建 RPC 客户端。创建成功之后,再次调用 $drainOutbox()$ 方法试图处理消息。

  • 有其他线程已经在处理消息了

    1
    2
    3
    4
    if (draining) {
    // There is some thread draining, so just exit
    return
    }

如果没有异常情况的话,就从 messages 表中取出消息,将标志 draining 设为 true,并调用 $OutboxMessage.sendWith()$ 方法发送。

五、Outbox 关闭