Flink Job 提交脚本解析

当编写好 Flink 的应用程序,正常的提交方式为:打成jar包,通过 flink 命令来进行提交。

flink 命令脚本的底层,是通过 java 命令启动:CliFrontend 类 来启动 JVM 进程执行任务的构造和提 交。

1
flink run xxx.jar class arg1 arg2

CliFrontend 提交分析

当用户把 Flink 应用程序打成 jar 使用 flink run … 的 shell 命令提交的时候,底层是通过 CliFrontend 来处理。底层的逻辑,**就是通过反射来调用用户程序的 main() 方法执行。**

在刚组建内部,主要有以下几件事要做:

  1. 根据 flink 后面的执行命令来确定执行方法(run ==> run(params))

  2. 解析 main 参数,构建 PackagedProgram,然后执行 PackagedProgram

  3. 通过反射获取应用程序的 main 方法的实例,通过反射调用执行起来

总的来说,就是准备执行 Program 所需要的配置,jar包,运行主类等的必要的信息,然后提交执行。

ExecutionEnvironment 源码解析

经过 CliFrontend 类的转交。那么最终到了用户自己编写程序类的 main() 方法了。main 方法就是我们正儿八经提交作业 jar 包当中的一个执行入口。

Flink 应用程序的执行,首先就是创建运行环境 StreamExecutionEnvironment,一般在企业环境中, 都是通过 getExecutionEnvironment() 来获取 ExecutionEnvironment,如果是本地运行的话,则会获 取到:LocalStreamEnvironment,如果是提交到 Flink 集群运行,则获取到: StreamExecutionEnvironment。

1
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供了一些重要的操作机制:

  1. 提供了 readTextFile(), socketTextStream(), createInput(), addSource() 等方法去对 接数据源
  2. 提供了 setParallelism() 设置程序的并行度
  3. StreamExecutionEnvironment 管理了 ExecutionConfig 对象,该对象负责Job执行的一些行为 配置管理。还管理了 Configuration 管理一些其他的配置
  4. StreamExecutionEnvironment 管理了一个 List<Transformation<?>> transformations 成员变量,该成员变量,主要用于保存 Job 的各种算子转化得到的 Transformation,把这些 Transformation 按照逻辑拼接起来,就能得到 StreamGragh(Transformation -> StreamOperator -> StreamNode)
  5. StreamExecutionEnvironment 提供了 execute() 方法主要用于提交 Job 执行。该方法接收的 参数就是:StreamGraph

StreamExecutionEnvironment 是 Flink 应用程序执行的上下文,提供了很多功能,不过重点关注以上 五点即可。

Job 提交流程源码分析

1
2
3
4
5
// 核心入口
env.execute("Streaming WordCount");
// 负责生成 StreamGraph
// 负责执行 StreamGraph
execute(getStreamGraph(jobName));

StreamGraph