一个 Flink 流式作业,从 Client 提交到 Flink 集群,到最后执行,总共会经历四种不同的状态。总的来说:

  1. Client 首先根据用户编写的代码生成 StreamGraph,然后把 StreamGraph 构建成 JobGraph 提 交给 Flink 集群主节点
  2. 然后启动的 JobMaster 在接收到 JobGraph 后,会对其进行并行化生成 ExecutionGraph 后调度 启动 StreamTask 执行。
  3. StreamTask 并行化的运行在 Flink 集群,就是最终的物理执行图状态结构。

一、概述

Flink 中的执行图可以分成四层:StreamGraph ==> JobGraph ==> ExecutionGraph ==> 物理执行图。

  • StreamGraph

    根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。

  • JobGraph

    StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的 序列化反序列化传输消耗。 ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。

  • ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。

  • 物理执行图

    JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的图,并不是一个具体的数据结构。

截屏2022-03-07 下午9.07.10

二、源码

Flink 应用程序通过 DataStream API 生成 Transformation 转换集合,基于Transformation 转换集合构建 Pipeline 实现类,也就是StreamGraph数据结构,最后通过 PipelineExecutor 将 StreamGraph 转换为 JobGraph 数据结构,并将 JobGraph 提交到集群中运行。

截屏2022-03-20 下午9.03.53

  1. 用户通过 API 编写应用程序,将可执行 JAR 包通过客户端提交到集群中运行,此时在客户端将 DataStream 转换操作集合保存至 StreamExecutionEnvironment 的 Transformation 集合。
  2. 通过 StreamGraphGenerator 对象 将 Transformation 集合转换为StreamGraph。
  3. 在 PipelineExector 中将 StreamGraph对象转换成 JobGraph 数据结构。JobGraph结构是所有类型客户端和集群之间的任务提交协议,不管是哪种类型的 Flink应用程序,最终都会转换成 JobGraph 提交到集群运行时中运行。
  4. 集群运行时接收到JobGraph之后,会通过JobGraph创建和启动相应的 JobManager服务,并在JobManager服务中将JobGraph转换为ExecutionGraph。
  5. JobManager会根据ExecutionGraph中的节点进行调度,实际上就是将具
    体的Task部署到TaskManager中进行调度和执行。

2.1. StreamGraph 构建

当客户端调用 StreamExecutionEnvironment.execute() 方法执行应用程序代码时,就会通过StreamExecutionEnvironment中提供的方法生成 StreamGraph 对象。StreamGraph 结构描述了作业的逻辑拓扑结构,并以有向无环图的形式描述作业中算子之间的上下游连接关系。

1
env.execute("Socket Window WordCount");

点进 execute()

1
2
3
4
public JobExecutionResult execute(String jobName) throws Exception {
...
return execute(getStreamGraph(jobName));
}

所以可以分为 getStreamGraph(jobName) 构建 StreamGraph 和 execute(Graph) 执行 StreamGraph 两部分来看~

2.1.1. StreamGraph 数据结构

StreamGraph 结构是由 StreamGraphGenerator 通过 Transformation 集合转换而来的,StreamGraph 实现了Pipeline的接口,且通过有向无环图的结构描述了 DataStream 作业的拓扑关系。StreamGraph结构包含 StreamEdge 和 StreamNode 等结构,此外,StreamGraph结构还包含任务调度模式 ScheduleMode 及 TimeCharacteristic 时间概念类型等与作业相关的参数。

截屏2022-03-23 下午1.46.13

StreamGraph 中存储了这个 StreamGraph 中的所有 StreamNode, 在 StreamNode 节点中,会存储 StreamNode,和 StreamNode 之间的边 StreamEdge 之间的关系

截屏2022-03-23 下午1.56.21

2.1.2. 构建 StreamGraph

创建 Stream Graph 的整个过程中借助 DataStream API 构建 Transformation 转换操作集合,然后通过Transformation集
合构建 StreamGraph,最终 PipelineExecutor 将生成的 StreamGraph 转换成 JobGraph并提交到集群中运行。

详情见这个~ [Flink-源码-Graph演变-StreamGraph 构建.md](Flink-源码-Graph演变-StreamGraph 构建.md)

2.1.3. 执行 StreamGraph

在构建完 StreamGraph 之后,我们去执行~~~

1
2
3
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
final JobClient jobClient = executeAsync(streamGraph);
}

截屏2022-03-20 下午10.04.57

点点点~,来到核心代码 AbstractSessionClusterExecutor#execute()

1
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);

注:这儿的 pipeline 可以理解为 StreamGraph

这个方法 将 pipeline 转换成 JobGraph 提交到 Flink 集群,所以,我们下面看看 JobGraph 构建和提交~

2.2. JobGraph 构建和提交

当作业程序通过客户端提交到集群环境运行时,需要将 StreamGraph 转换成 JobGraph 结构,然后通过客户端的 ClusterClient 将JobGraph 提交到集群运行时中运行。JobGraph 是所有类型作业与集群运行时之间的通信协议,相比于 StreamGraph 结构,JobGraph 主要增加了系统执行参数及依赖等信息,如作业依赖的 JAR 包等。

2.2.1. JobGraph 整体结构

StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构

JobGraph 的三个重要的概念

  • JobVertex

    JobVertex 相当于是 JobGraph 的顶点,跟 StreamNode 的区别是,它是 Operator Chain 之后的顶点,会包含多个 StreamNode

  • IntermediateDataSet

    IntermediateDataSet: 它是由一个 Operator(可能是 source,也可能是某个中间算子)产生的一个中间数据集

  • JobEdge

    相当于是 JobGraph 中的边(连接通道),这个边连接的是一个 IntermediateDataSet 跟一个要消费的 JobVertex

当作业程序通过客户端提交到集群环境运行时,需要将 StreamGraph 转换成 JobGraph 结构,然后通过客户端的 ClusterClient 将 JobGraph 提交到集群运行时中运行。JobGraph 是所有类型作业与集群运行时之间的通信协议,相比于 StreamGraph 结构,JobGraph 主要增加了系统执行参数及依赖等信息,如作业依赖的JAR包等。

截屏2022-03-23 下午2.10.07

2.2.2. JobGraph 构建

Flink会生成 JobGraph 去优化StreamGraph, 其中包括设置Checkpoint,Slot分组策略,内存占比外,最主要的还是将符合条件的Operator 组合成 ChainableOperator,生成对应的 JobVertex,IntermediateDateSet,JobEdge 等核心组件并通过JobEdge连接上IntermediateDateSet和JobVertex,但 JobGraph 生成的 JobVertex 等还是只是粗粒度的用户代码的逻辑结构,包括像IntermediateDateSet 也并不存储数据而仅仅是维护了上下游的生产消费者的数据结构而已,而真正包含物理执行的需要等后面生成最细粒度的 Task 实例时所构造的 ResultSubPartition,InpuGate等才会交互用户的物理数据

详情见这个~ [Flink-源码-JobGraph 构建.md](Flink-源码-JobGraph 构建.md)

2.3. ExecutionGraph 构建

JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构,

2.3.1. ExecutionGraph 结构

每个 ExecutionGraph 都有一个与其相关联的作业状态。此作业状态指示作业执行的当前状态

  1. ExecutionJobVertex

    和 JobGraph 中的 JobVertex一一对应。每一个 ExecutionJobVertex 都有和并发度一样多的 ExecutionVertex。

  2. ExecutionVertex

    表示 ExecutionJobVertex 的其中一个并发子任务,输入是ExecutionEdge,输出是 IntermediateResultPartition。

  3. IntermediateResult

    和JobGraph中的IntermediateDataSet一一对应。一个IntermediateResult包含多个 IntermediateResultPartition,其个数等于该operator的并发度。

  4. IntermediateResultPartition

    表示ExecutionVertex的一个输出分区,producer是ExecutionVertex,consumer是若干个ExecutionEdge。

  5. ExecutionEdge

    表示 ExecutionVertex 的输入,source 是 IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一个。

  6. Execution

    是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过ExecutionAttemptID 来确定消息接受者。

2.3.2. ExecutionGraph 构建

在 ExecutionGraph 转换过程中,JobGraph 中的 JobVertex 节点转换为 Execution Job Vertex 节点,且 Execution Vertex 为Execution Job Vertex中的子节点,其中ExecutionVertex的数量取决于JobVertex的并行度。从图中可以看出,JobGraph共有4个JobVertex,每个 JobVertex 的并行度都为2,因此每个 Execution JobVertex 中的 ExecutionVertex 数量都为2。

详情见这个~ Flink-源码-Graph演变-ExecutionGraph构建.md

截屏2022-03-23 下午2.46.41

2.4. 物理执行图

JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。

  1. Task

    Execution 被调度后在分配的 TaskManager 中启动对应的 Task。Task 包裹了具有用户执行逻辑的 operator。

  2. ResultPartition

    代表由一个Task的生成的数据,和 ExecutionGraph 中的 IntermediateResultPartition一一对应。

  3. ResultSubpartition

    是ResultPartition的一个子分区。每个ResultPartition包含多个ResultSubpartition,其数目要由下游消费 Task 数和DistributionPattern 来决定。

  4. InputGate

    代表Task的输入封装,和JobGraph中JobEdge一一对应。每个InputGate消费了一个或多个的ResultPartition。

  5. InputChannel

    每个InputGate会包含一个以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一对应,也和ResultSubpartition一对一地相连,即一个InputChannel接收一个ResultSubpartition的输出。