一、概述

RpcService 是 RpcEndpoint 的运行时环境,是 Akka 中 ActorSystem 的封装。Flink 中 RpcService 也有多套,

JobManager 和 TaskManager 进程中都有两套 RpcService。

Jobmanager rpcservice

二、结构设计

2.1. 属性

AkkaRpcService 是 RpcService 的唯一实现类。它除了持有 AkkaSystem (akkaSystem) 的引用外,还维护所有注册了的 RpcEndpoint的引用,为每个 RpcEndpoint 分配一个 ActerRef 并保存他们的对应关系 (actors)。

image-20221225113654265

2.2. 方法

RpcService 主要包含如下两个重要方法:

2.2.1. startServer()

startServer() 方法用于启动 RpcEndpoint 中的 RpcServer。启动完成后,RpcEndpoint 中的 RpcServer 就能够对外提供服务了。

2.2.2. connect()

connect()方法用于连接远端 RpcEndpoint 并返回给调用方实现了 RpcGateway 接口的代理类,RPC 客户端就能像调用本地方法一样调用由远端 RpcServer 提供的 RpcGateway 接口。

在 JobMaster 组件中创建与 ResourceManager 组件之间的 RPC 连接时,会在 JobMaster 中创建 ResourceManagerGateway 的动态代理类,最终转换成 RpcInvocationMessage 通过 Akka 系统发送到 ResourceManager 节点对应的 RpcServer 中执行,使 JobMaster 像调用本地方法一样在 ResourceManager 中执行请求任务。

三、AkkaRpcService 初始化

在创建和启动 ClusterEntrypoint 及 TaskManagerRunner 的过程中,会调用 AkkaRpcServiceUtils.createRpcService()方法创建默认的 AkkaRpcService,然后使用 AkkaRpcService 启动集群运行时中 RpcEndpoint 对应的 RpcServer。

JobManager 节点中会使用 AkkaRpcService 实例创建并启动 ResourceManager、 Dispatcher 以及 JobMaster 等 RPC 服务。

创建 AkkaRpcService 主要包括如下步骤:

  1. 在 ClusterEntrypoint 中调用 AkkaRpeSeviceUtis.createRpcService() 方法创建 RpcService。

  2. AkkaRpcServiceUtils 调用 BootstrapTools.startActorSystem() 方法启动 ActorSystem 服务。

  3. 在 BootstrapTools 中调用 AkkaUtils 创建 RobustActorSystem。

  4. RobustActorSystem 实际上是对 Akka 的 ActorSystem 进行了封装和拓展,相比于原生 Akka ActorSystem,RobustActorsystem 包合了 UncaughtExceptionHandler组件,能够对 Actorsystem 抛出的异常进行处理。

  5. 返回创建的 RobustActorSystem 给 AkkaRpcServiceUtils,调用 AkkaRpcServiceUtils.instantiateAkkaRpcService() 方法,使用 RobustActorSystem 创建 AkkaRpcService 实例。

四、初始化&启动 RpcServer

在 Flink 集群运行时中创建了共用的 AkkaRpcService 服务,相当于创建了 Akka 系统中的 ActorSystem,接下来使用 AkkaRpcService 启动各个 RpcEndpoint 中的 RpcServer 实例。
AkkaRpcService.startServer() 方法主要涉及如下过程:

4.1. RpcServer 初始化

4.1.1. 创建 akkaRpcActorProps 对象

根据 RpcEndpoint 是否为 FencedRpcEndpoint 创建 akkaRpcActorProps 对象,用于通过 ActorSystem 创建相应Actor 的 ActorRef 引用类。

例如 FencedRpcEndpoint 会使用 FencedAkkaRpcActor 创建 akkaRpcActorProps 配置。

4.1.2. 创建 ActorRef 实例

根据 AkkaRpcActorProps 的配置信息创建 ActorRef 实例,调用 actorSystem.actorOfakkaRpcActorProps,rpcEndpoint.getEndpointId() 方法创建指定 AkkaRpcActor 的 ActorRef 对象,创建完毕后会将 RpcEndpoint 和 ActorRef 信
息存储在 Actor 键值对集合中。

4.1.3.启动 RpcEndpoint 对应的 RPC 服务

首先获取当前 RpcEndpoint 实现的 RpcGateway 接口。其中包括默认的 RpcGateway 接口,如 RpcServer、AkkaBasedEndpoint,还有RpcEndpoint 各个实现类自身的 RpcGateway 接口。
RpcGateway 接口最终通过 RpcUtils.extractImplementedRpcGateways() 方法从类定义抽取出来

例如 JobMaster 组件会抽取JobMasterGateway 接口定义。

4.1.4. 创建 InvocationHandler

创建 InvocationHandler 代理类,事先定义动态代理类 InvocationHandler,根据 InvocationHandler 代理类提供的 invoke() 方
法实现被代理类的具体方法,处理本地 Runnable 线程和远程由 Akka 系统创建的 RpcInvocationMessage 消息类对应的方法。根据 RpcEndpoint 是否为FencedRpcEndpoint, InvocationHandler 分为 FencedAkkaInvocationHandler 和 AkkaInvocationHandler 两种类型。

FencedMainThreadExecutable 代理的接口主要有 FencedMainThreadExecutable 和 FencedRpcGateway 两种。
AkkaInvocationHandler 主要代理实现 AkkaBasedEndpoint、 RpcGateway、StartStoppable、MainThreadExecutable、RpcServer等接口。

4.1.5.创建代理类

创建好 InvocationHandler 代理类后,将当前类的 ClassLoader、InvocationHandler 实例以及 implementedRpcGateways 等参数传递到 Proxy.newProxyInstance() 方法中,通过反射的方式创建代理类。创建的代理类会被转换为 RpcServer 实例,再返回给 RpcEndpoint 使用。

4.2. RpcServer 启动

RpcServer在 RpcEndpoint 的构造器中完成初始化后,接下来就是启动 RpcEndpoint 和 RpcServer。

以 ResourceManager 为例: 首先在 DefaultDispatcherResourceManagerComponentFactory 中调用 ResourceManager.start() 方法启动 ResourceManager 实例,此时在 ResourceManager.start() 方法中会同步调用 RpcServer.start()方法,启动 ResourceManager 所在 RpcEndpoint 中的 RpcServer。

五、连接 RpcServer 并创建 RpcGateway

当 AkkaRpcService 启动 RpcEndpoint 中的 RpcServer 后,RpcEndpoint 组件仅能对外提供处理 RPC 请求的能力,RpcEndpoint 组件需要在启动后向其他组件注册自己的 RpcEndpoint 信息,并完成组件之间的 RpcConnection 注册,才能相互访问和通信。

创建 RPC 连接需要调用 RpcService connect() 方法。在 AkRaRpcService.connect()方法中,实际上调用了connectInternal() 方法完成 RpcConnection 对象的创建。同时在方法中会使用 Lambda 表达式定义函数接口的实现,通过给定的 ActorRef 引用类创建FencedAkkaInvocationHandler实例。最后方法会返回 FencedRpcGateway 接口代理类,此时调用方就能像本地一样调用远端 RpcEndpoint 实现的 RpcGateway 接口。
AkkaRpcService.connectInternal()方法主要包含如下逻辑。

  1. 根据指定的 Address 调用 actorSystem.actorSelection(address) 方法创建 ActorSelection 实例,使用 ActorSelection 对象向该路径指向的 Actor 对象发送消息。
  2. 调用 Patterns.ask() 方法, 向 ActorSelection 指定的路径发送 Identify 消息。
  3. 调用 FutureUtils.toJava() 方法,将 Scala 类型的 Future 对象转换成 Java CompletableFuture对象。
  4. 通过 identifyFuture 获取 actorRefFuture 对象,并从中获取 ActorRef 引用对象。
  5. 调用Patterns.ask()方法,向 actorRef 对应的 RpcEndpoint 节点发送 RemoteHandshakeMessage 消息,确保连接的 RpcEndpoint 节点正常,如果成功,则 RpcEndpoint 会返回 HandshakeSuccessMessage 消息。

六、总结

经过以上步骤,实现了创建 RpcEndpoint 组件之间的 RPC 连接,此时集群 RPC 组件之间可以进行相互访问,例如 JobMaster 可以向 ResourceManager 发送 Slot 资源请求。