一、概述

Flink 为 Table API 和 SQL 设计了 Table Connector 体系,包括 Table Source 和 Table Sink, 满足 Table API 和 SQL 对数据源访问的需求。

二、设计

2.1. MetaData

通过 DDL 或由 Catalog 提供的元数据创建 CatalogTable 对象。

CatalogTable 是 Flink 中的一个表元数据管理模块,提供了一种统一的表元数据管理方式,方便管理 Fink 中的表信息,包括表的结构、存储位置、分区方式、统计信息等。

截屏2023-06-07 21.48.35

2.2. Planning

在 Planning 阶段,DynamicTableSourceFactory 和 DynamicTableSinkFactory 提供特定连接器的逻辑,根据 CatalogTable 生成 DynamicTableSource 和 DynamicTableSink 的实例。

在大多数情况下,工厂的目的是验证选项(例如在示例中 ‘port’ = ‘8080’),配置编码/解码格式,以及创建表连接器(Table connectors) 的参数化实例。

默认情况下,DynamicTableSourceFactory 和 DynamicTableSinkFactory 的实例是使用 Java 的服务提供商接口(SPI) 发现,connector 选项(例如示例中 ‘connector’ = ‘custom’)必须对应于有效的标识符。
Planner 使用 Source 实例和 Sink 实例执行特定于连接器的双向通信,直到找到最佳逻辑计划为止,根据可选地声明的功能接口。

2.2.1. Source

  1. DynamicTableSourceFactory

    1. DynamicTableSource

      DynamicTableSource 负责从外部系统创建出一个动态表。该接口包含有如下两个子接口: ScanTableSource 和 LookupTableSource

      • ScanTableSource

        ScanTableSource 在运行时扫描来自外部存储系统的所有行。扫描的行不仅可以包含插入,还可以包含更新和删除。因此,ScanTableSource 可用于读取(有限或无限) Changelog。

      • LookupTableSource

        LookupTableSource 在运行时通过一个或多个键查找外部存储系统的行。与 ScanTableSource 相比,LookupTableSource 不必读取整个表,并且可以在需要时从(可能不断变化的)外部表中延迟获取各个值。

        与 ScanTableSource 相比,LookupTableSource 目前仅支持发出插入操作所产生的变化。

    2. 功能接口

      Table source 可以实现其他功能接口如 SupportsProjectionPushDown,所有功能接口都可以在 org.apache.flinktable.connector source.abilities 包中找到。

      接口 描述
      SupportsFilterPushDown 支持将过滤条件下推到 DynamicTableSource。为了更高效处理数据,source 端会将过滤条件下推,以便在数据产生时就处理。
      SupportsLimitPushDown 支持将 limit(期望生产的最大数据条数)下推到 DynamicTableSource。
      SupportsPartitionPushDown 支持将可用的分区信息提供给 planner 并且将分区信息下推到 DynamicTableSource。在运行时为了更高效处理数据,source 端会只从提供的分区列表中读取数据。
      SupportsProjectionPushDown 支持将查询列(可嵌套)下推到 DynamicTableSource。为了更高效处理数据,source 端会将查询列下推,以便在数据产生时就处理。如果 source 端同时实现了 SupportsReadingMetadata,那么 source 端也会读取相对应列的元数据信息。
      SupportsReadingMetadata 支持通过 DynamicTableSource 读取列的元数据信息。source 端会在生产数据行时,在最后添加相应的元数据信息,其中包括元数据的格式信息。
      SupportsWatermarkPushDown 支持将水印策略下推到 DynamicTableSource。水印策略可以通过工厂模式或 Builder 模式来构建,用于抽取时间戳以及水印的生成。在运行时,source 端内部的水印生成器会为每个分区生产水印。
      SupportsSourceWatermark 支持使用 ScanTableSource 中提供的水印策略。当使用 CREATE TABLE DDL 时,<可以使用> SOURCE_WATERMARK() 来告诉 planner 调用这个接口中的水印策略方法。

      注意: 上述接口当前只适用于 ScanTableSource,不适用于 LookupTableSource

2.2.2. Sink

  1. DynamicTableSinkFactory

  2. DynamicTableSink

  3. 功能接口

    接口 描述
    SupportsOverwrite 支持 DynamicTableSink 覆盖写入已存在的数据。默认情况下,如果不实现这个接口,在使用 INSERT OVERWRITE SQL 语法时,已存在的表或分区不会被覆盖写入
    SupportsPartitioning 支持 DynamicTableSink 写入元数据列。
    SupportsWritingMetadata 支持 DynamicTableSource 写入元数据列。sink 端会在消费数据行时,在最后接受相应的元数据信息并进行持久化,其中包括元数据的格式信息。

2.3. Runtime

Planning 阶段完成后,Planner 获取运行时实现(ScanRuntimeProvider/SinkRuntimeProvider)。运行时逻辑一般在 Flink Connector Source/Sink 接口中实现,返回一个用于读取/存储数据的运行实例对象。

Table Connector 体系基于 DataSet/Stream Connector API,是对 DataSet/Stream Connector APlI 的封装。

三、Connector 实现

3.1. Kafka

3.2. Hudi

3.3 Iceberg