当编写好 Flink 的应用程序,正常的提交方式为:打成jar包,通过 flink 命令来进行提交。flink 命令脚本的底层,是通过 java 命令启动:CliFrontend 类 来启动 JVM 进程执行任务的构造和提交。

一、概述

当用户构建好 Flink 应用程序后,会在客户端运行 StreamExecutionEnvironment.execute() 方 法生 成 StreamGraph 对象,并将
StreamGraph 通过 PipelineExecutor 提交到远程集群中执行。在这个过程中,首先会通过 DataStream API 生成 Transformation转换集合;然后基于 Transformation 转换集合构建 Pipeline 的实现类,也就是 StreamGraph 数据结构;最后再通过
PipelineExecutor 将 StreamGraph 转换为 JobGraph 数据结构,并将 JobGraph 提交到集群中运行。

二、提交脚本解析

三、源码

当编写好 Flink 的应用程序,正常的提交方式为:打成jar包,通过 flink 命令来进行提交。flink 命令脚本的底层,是通过 java 命令启动:CliFrontend 类 来启动 JVM 进程执行任务的构造和提交。

1
flink run xxx.jar class arg1 arg2

Flink 脚本

1
2
exec $JAVA_RUN "$JVM_ARGS" "$FLINK_ENV_JAVA_OPTS" "${log_setting[@]}" -classpath "$(manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS")" org.apache.flink.client.cli.CliFrontend "$@"

这个脚本执行完了之后,最终就会跳转到 CliFrontend 类的 main() 方法

3.1. 打印输出环境信息

1
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

3.2.1. 找到 conf 目录

通过 FLINK_CONF_DIR 变量找到 conf 目录

1
final String configurationDirectory = getConfigurationDirectoryFromEnv();

配置有两个来源。

  1. main() 方法的参数。
  2. flink-conf.yaml 配置文件当中的一些配置信息。

main 方法的参数是 args ,一般来说在提交应用程序的时候,flink-conf.yaml 文件当中的一些配置对 job 会有影响,同时在提交应用程序的时候,也会指定各种参数。那么最终我们都会把它解析到一起来

1
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

3.3. 加载自定义命令参数

1
final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(configuration, configurationDirectory);

3.4. 初始化 CliFrontend

1
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);

3.5. 运行

解析命令行并并开始请求操作

1
int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseParameters(args));

3.5.1. 参数处理

  1. 检查参数的长度

  2. 获取 action 参数,如:flink run 命令中的 run 命令

    1
    String action = args[0];

    我们可以通过 flink 这个命令来执行 run 、stop 、list 等等,在提交一个 job 时的 action 是 run,如果通过 flink run 提交一个 job,最终通过 run() 的方法来提交。

  3. 从所有参数中,移除 action 参数

    1
    final String[] params = Arrays.copyOfRange(args, 1, args.length);

3.5.2. run()

  1. 解析参数

    把命令行的参数,方法的参数,还有之前的 configuration 等的那个参数合并

    1
    final CommandLine commandLine = getCommandLine(commandOptions, args, true);
  2. 构建 PackagedProgram

    PackagedProgram 结构包含应用程序能够运行的全部信息,如.jar文件、任务参数、mainClass等。基于 PackagedProgram 可以打包用户提交的作业,构建作业执行的本地环境,主要是将该作业的依赖包加载至 UserClassLoader中。通过
    PackagedProgram 在客户端进程内运行作业代码逻辑,并从作业中将JobGraph信息抽取出来,最后将JobGraph提交到集群中运行。

    截屏2022-03-20 上午11.17.20
    1
    2
    final ProgramOptions programOptions = ProgramOptions.create(commandLine);
    final PackagedProgram program = getPackagedProgram(programOptions);
  3. 依赖 jar 处理

    1
    final List<URL> jobJars = program.getJobJarAndDependencies();
  4. 执行

    调用 executeProgram() 方法执行PackagedProgram中的作业程序。

    1
    executeProgram(effectiveConfiguration, program);

    点点点~,最后来到 PackagedProgram#callMainMethod() 里,在这个方法里通过反射得到运行主类的 main() 方法实例

    1
    mainMethod = entryClass.getMethod("main", String[].class);

    entryClass 其实就是我们自己写的程序的实例,也就是说接下来要去执行用户自己编写的程序的 main() 方法啦~

3.5.3. entryClass.main()

经过 CliFrontend 类的转交。那么最终到了用户自己编写程序类的 main() 方法,接下来就是我们的自己编写的应用程序的一个执行了~

截屏2022-03-09 下午3.32.31
  1. 构建 StreamExecutionEnvironment

    Flink 应用程序的执行,首先就是创建运行环境 StreamExecutionEnvironment

    一般在企业环境中, 都是通过 getExecutionEnvironment() 来获取 ExecutionEnvironment,如果是本地运行的话,则会获 取到:LocalStreamEnvironment,如果是提交到 Flink 集群运行,则获取到: StreamExecutionEnvironment。

    1
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    **SparkContext & ExecutionEnvironment **🤔️

    Spark 当中的 spark context 这个类非常的重量级,它里面包含了各种在执行应用程序过程当中所需要的各种组件 ExecutionEnvironment 这个类呢相对来说要轻量级一点。

    StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供了一些重要的操作机制:

    1. 提供了 readTextFile(), socketTextStream(), createInput(), addSource() 等方法去对接数据源

      我们要去加载数据源,得到一个数据抽象,最终调用的都是 StreamExecutionEnvironment 里面的这四大类的方法。

      1
      DataStream<String> text = env.socketTextStream(hostname, port, "\n");

    2. 提供了 setParallelism() 设置程序的并行度

    3. StreamExecutionEnvironment 对象内部有两个配置管理对象

      • ExecutionConfig

        管理是当前这个job 在执行过程当中的信息。

      • Configuration

        管理集群的配置信息

    4. List<Transformation<?>> transformations

      StreamExecutionEnvironment 管理了一个 List<Transformation<?>> transformations 成员变量,保存 Job 的各种算子转化得到的 transformation

      在写 Flink 应用程序的时候,针对某一个 data stream 会调用各种算子来执行一些操作,最终所有的算子都会被转化成 transformation,然后添加到 transformations 集合里面。在提交 Job 的时候,从 transformation 集合里面拿出所有的 transformation,把它构建成 StreamGraph

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      DataStream<WordWithCount> windowCounts = text

      .flatMap((FlatMapFunction<String, WordWithCount>) (value, out) -> {
      for (String word : value.split("\\s")) {
      out.collect(new WordWithCount(word, 1L));
      }
      })
      .keyBy(value -> value.word)
      .timeWindow(Time.seconds(5))
      .reduce(new ReduceFunction<WordWithCount>() {
      @Override
      public WordWithCount reduce(WordWithCount a, WordWithCount b) {
      return new WordWithCount(a.word, a.count + b.count);
      }
      });

      flatMap 为例

      截屏2022-03-20 下午1.10.12

      如图,程序会调用一个 DataStream#addOperator() 的方法,将 flatMap 算子成的 transformation,添加到 transformations 集合里面

      1
      2
      3
      public void addOperator(Transformation<?> transformation) {
      this.transformations.add(transformation);
      }

      截屏2022-03-21 上午9.48.50

    5. StreamExecutionEnvironment 提供了 execute() 方法主要用于提交 Job 执行。该方法接收的 参数就是:StreamGraph

      通过 execute 方法来提交 job 的时候,那我们先把所有的 transformation,构建成一个stream graph。然后再提交

      1
      env.execute("Socket Window WordCount");

​ 返回一个可用的执行环境。执行环境是整个 Flink 程序执行的上下文,记录了相关配置 (如并行度),并提供了一系列方法,如读 取输入流,以及真正开始运行整个代码的 execute() 方法等

  1. execute()

    当客户端调用 StreamExecutionEnvironment.execute() 方法执行应用程序代码时,就会通过StreamExecutionEnvironment中 提供的方法生成 StreamGraph 对象。StreamGraph 结构描述了作业的逻辑拓扑结构,并以有向无环图的形式描述作业中算子之间的上下游连接关系。

    1
    2
    3
    public JobExecutionResult execute(String jobName) throws Exception {
    return execute(getStreamGraph(jobName));
    }

    来看 StreamGraph 的底层实现及构建过程👀~

    我们去提交这一个job 执行。那么到底提交的时候会做哪些事情呢?那么记住当你点进来 execute() 的时候,你会发现这个代码是这个样子的。

    • getStreamGraph()

      • generate()