一、概述

在 NettyRpcEnv 中,要向远端 RpcEndpoint 发送请求,首先要持有 RpcEndpoint 的引用用对象 RpcEndpointRef(类似于 Akka 中 Actor 的 ActorRef)。

二、实现

一般而言,消息投递有下面 3 种情况:

  1. at-most-once

    应用了这种机制的消息会被投递 0 次或 1 次, 这条消息可能会丢失。

  2. at-least-once

    应用了这种机制的消息潜在地存在多次投递尝试并保证至少会成功一次, 这条消息可能会重复消费, 但不会丢失。

  3. exactly-once

    应用了这种机制的消息只会向接收者准确地发送一次, 这种消息既不会丢失, 也不会重复。

2.1. 结构

2.1.1. 属性

  1. maxRetries: RPC 最大重新连接次数。

可以使用 spark.rpc.numRetries 属性进行配置,默认为 3 次

  1. retryWaitMs:RPC 每次重新连接需要等待的毫秒数。

可以使用 spark.rpc.retrywait 属性进行配置, 默认值为 3 秒

  1. defaultAskTimeout: RPC 的 ask 操作的默认超时时间。

可以使用 spark.rpc.askTimeout 或者 spark.network.timeout 属性进行配置, 默认值为 120 秒。

spark.rpc.askTimeout 属性的优先级更高。

三、投递消息

RpcEndpointRef 是对远程 RpcEndpoint 的一个引用,是 RpcEndpointAddress 实例的简单包装。当向一个具体的 RpcEndpoint 发送消息时,一般需要获取到该 RpcEndpoint 的引用,然后通过该应用发送消息,包含请求回调(ask)、异步请求回调(askSync)、通知(send)等方法。

NettyRpcEndpointRef 的 $ask()$ 方法和 $send()$ 方法分别调用了 NettyRpcEnv 的 $ask()$ 方法和 $send()$ 方法。

3.1. ask

$ask()$ 方法异步发送一条消息,并在规定的超时时间内等待 RpcEndpoint 的回复。RpcEndpoint 会调用 $receiveAndReply()$ 方法来处理。

3.1.1. 本地

如果请求消息的接收者的地址与当前 NettyRpcEnv 的地址相同(则说明处理请求的 RpcEndpoint 位于本地的 NettyRpcEnv 中) 新建 Promise 对象,并且给 Promise 的 future 设置完成时的回调函数。

成功时调用 $onSuccess$ 方法,失败时调用 $onFailure$ 方法

发送消息到本地 Outbox,最终调用本地 Dispatcher 的 $postLocalMessage()$ 方法。

3.1.2. 远程

如果请求消息的接收者的地址与当前 NettyRpcEnv 的地址不同(则说明处理请求的 RpcEndpoint 位于其他节点的 NettyRpcEnv中),那么将 message 序列化,并与 $onFailure$ 、$onSuccess$ 方法一道封装为 RpcOutboxMessage 类型的消息。

发送消息最后调用 rpcEnv 的 $postToOutbox()$ 方法将消息投递到远程 Outbox,最终调用 Dispatcher 的 $postRemoteMessage()$ 方法。

1
2
3
4
5
6
7
8
val rpcMessage = RpcOutboxMessage(message.serialize(this), onFailure,
(client, response) => onSuccess(deserialize[Any](client, response)))
rpcMsg = Option(rpcMessage)
postToOutbox(message.receiver, rpcMessage)
promise.future.failed.foreach {
case _: TimeoutException => rpcMessage.onTimeout()
case _ =>
}(ThreadUtils.sameThread)

3.2. send

NettyRpcEnv 重写了抽象类 RpcEnv 的 $send()$ 方法发送单向异步的消息。

“单向” 就是发送完后就会忘记此次发送,不会有任何状态要记录,也不会期望得到服务端的回复。send 采用了 at-most-once 的投递规则。RpcEndpointRef 的 $send()$ 方法类似于 Akka 中 Actor 的 $tell()$ 方法。

3.1.1. 本地

如果请求消息的接收者的地址与当前 NettyRpcEnv 的地址相同(则说明处理请求的 RpcEndpoint 位于本地的 NettyRpcEnv 中) 发送消息到本地 Outbox,最终通过调用本地 Dispatcher 的 $postOneWayMessage()$ 方法

3.1.2. 远程

如果请求消息的接收者的地址与当前 NettyRpcEnv 的地址不同(则说明处理请求的 RpcEndpoint 位于其他节点的 NettyRpcEnv中),那么将 message 序列化,封装为 OneWayOutboxMessage 类型的消息。最后调用 $postToOutbox()$ 方法将消息投递出去。

  1. 如果 NettyRpcEndpointRef 中的 TransportClient 不为空,则直接调用 OutboxMessage 的 $sendWith()$ 方法,否则进入第 [2] 步。
  2. 获取 NettyRpcEndpointRef 的远端 RpcEndpoint 地址所对应的 Outbox。首先从 outboxes 缓存中获取 Outbox。如果outboxes 中没有相应的 Outbox,则需要新建 Outbox 并放入 outboxes 缓存中。
  3. 如果当前 NettyRpcEnv 已经处于停止状态,则将第 [2] 步得到的 Outbox 从 outboxes 中移除,并且调用 Outbox 的 stop方法停止 Outbox。如果当前 NettyRpcEnv 还未停止,则调用第 [2] 步得到的 Outbox 的 send 方法发送消息。