一、概述

CatalogManager 用来管理 Catalog,且可以同时管理多个 Catalog。也就是说,可以通过在一个相同 SQL 中,跨 Catalog 做查询或者关联操作。

在一个 Flink Session 当中,是可以创建多个 Catalog ,每一个 Catalog 对应于一个外部系统。用户可以在 Flink Table API 或者如果使用的是 SQL Client 的话,可以在 Yaml 文件里指定定义哪些 Catalog 。然后在 SQL Client 创建 TableEnvironment 的时候,就会把这些 Catalog 加载起来。TableEnvironment 通过CatalogManager 来管理这些不同的 Catalog 的实例。这样 SQL Client 在后续的提交 SQL 语句的过程中,就可以使用这些 Catalog 去访问外部系统的元数据了。

Flink在创建运行环境时会同时创建一个CatalogManager,用来管理不同的 Catalog 实例,

https://developer.aliyun.com/article/752539?spm=a2c6h.12873581.0.0.3b452634BiGZLo

二、实现

2.1. 架构设计

Flink 通过 CatalogManager 来组织当前系统中可用的 Catalog 和设置、查询当前 Catalog 等的信息,通过 TableEnvironment 暴露给用户常用的操作接口,比如查询当前系统所有的 Catalog,当前 Catalog 的所有表和数据库等。

Catalog 接口能够支持数据库、表、函数、甚至于分区等多种抽象。通过 CatalogManager,可以同时在一个会话中挂载多个 Catalog,从而访问到多个不同的外部系统。

2.2. 实现

2.2.1. 主要属性

2.2.2. 主要方法

  1. $registerCatalog$

  2. $initSchemaResolver$

    初始化 Schema 解析器。Schema 是指数据的结构定义,例如表的列名、数据类型等。在 Flink 中,可以使用 Schema 来定义输入和输出的数据格式。

    1
    2
    3
    4
    5
    6
    public void initSchemaResolver(boolean isStreamingMode, 
    ExpressionResolverBuilder
    expressionResolverBuilder) {
    this.schemaResolver =
    new DefaultSchemaResolver(isStreamingMode, typeFactory, expressionResolverBuilder);
    }

    DefaultSchemaResolver 是 Apache Flink 框架中用于解析数据源 schema 问题的默认实现。当使用 Flink 处理数据时,输入数据往往具有不同的 schema。这可能导致许多问题,例如无法正确解析数据等。此时,就需要使用 SchemaResolver 对数据进行处理。

    $ resolve$ 方法用于解析数据源对象的 schema。首先,它会检查数据源类型是否支持,然后根据数据源类型对其 schema 进行解析。如果有自定义的 schema 解析器,该方法也会使用自定义解析器进行解析。

  3. $resolveCatalogTable$