一、概述

DataSourceV2 是 Spark 2.x 新推出的 API,用来和外部数据存储进行集成。原来的 SessionCatalog 也暴露出弊端和不足,缺少对表的元数据进行操作,比如创建、修改、删除表等,为了适应新的数据源特性,Spark 推出了新的接口: CatalogPlugin。

二、设计

2.1. 元数据模型

Spark 的元数据模型定义了任务的元数据结构,如数据库、表、视图、函数等,Spark 定义了 4 类接口分别对应于 4 种元数据类型,元数据类型之间的层次关系如图,最顶层的 CatalogPlugin 是元数据的容器。

随着新数据源 DataSourceV2 的出现,原来的 SessionCatalog 暴露出弊端和不足,为了适应新的数据源特性,Spark 推出了新的接口: CatalogPlugin。

CatalogPlugin

实现自定义 Catalog,既可以直接实现 CatalogPlugin,也可以扩展 TableCatalog 接口,TableCatalog 扩展了 CatalogPlugin 并提供了表操作相关功能的接口。

同理,实现函数相关的 Catalog,也可以直接扩展 FunctionCatalog,因为它提供了函数管理相关的接口。同 SessionCatalog 相对应,CatalogPlugin 接口体系也实现了 V2SessionCatalog,整个 CatalogPlugin 类体系表示为下图所示

2.1.1. Table

  1. TableCatalog

    TableCatalog 定义了 Catalog 和表进行交互的方法,实现了创建、修改、删除表的 api。TableCatalog 的实现有 V2SessionCatalog 和 JDBCCatalog,其中 V2SessionCatalog 是为了和之前的 SparkSession 中的 Catalog 做兼容。
    V2SessionCatalog 不同于 SessionCataolog,主要表现在:

    1. V2SessionCatalog 实现了 CatalogPlugin 接口,CatalogPlugin 是针对新数据源(DataSourceV2) 的元数据管理。

    2. SessionCatalog 只是普通类,封装了外部数据源的元数据管理接口 ExternalCatalog。

    3. SessionCatalog 作为 V2SessionCatalog 的属性,或者说 V2SessionCatalog 是 SessionCatalog 的代理实现。

  1. TableChange

    TableChange 是 SparkSQL 中的一种抽象,它用于描述述表的结构以及表中的数据之间的变化,包括数据表表字段的增加、删除或更改,以及数据表中数据的插入、更新或删除,使 Spark 更好地处理表之间的变化。

  2. Table

2.1.2. Function

  1. FunctionCatalog

2.1.3. View

2.3. 元数据管理

CatalogManager 是 CatalogPlugin 的管理者,并且是线程安全的。

CatalogManager

三、Catalog 初始化

Spark 通过 CatalogManager 可以同时管理内部连接多个 Catalog,通过 spark.sql.catalog.${name} 可以注册多个 Catalog,Spark 默认的Catalog 由 spark.sql.catalog.spark_catalog 参数指定,通常的做法是,自定义 Catalog 类继承 DelegatingCatalogExtension 实现,然后通过 spark.sql.catalog.spark_catalog 参数来指定自定义 Catalog 类。
详细看一下 HiveExternalCatalog, v2SessionCatalog, spark_catalog 等对象的实例化和管理流程:

  1. SparkSession 启用 HiveSupport, $SparkSession.enableHiveSupport(true)$ ,在该方法内会设置参数 CATALOG_IMPLEMENTATION = hive, Spark SQL 的 Catalog 默认支持 hive 和 in-memory 两种,如果没有指定,默认为 in-memory。

  2. session.sharedState.externalCatalog 是 SparkSession 实际负责和外部系统交互的 Catalog, 根据上面设置的参数,分别会实例化出 HiveExternalCatalog 和 InMemoryCatalog 两个实例。

  3. 在 BaseSessionStateBuilder/HiveSessionStateBuilder 中会使用上面的 externalCatalog 创建 Catalog 对象,再根据 Catalog 创建 v2SessionCatalog 对象

  4. 根据 catalog 和 v2SessionCatalog 创建 CatalogManager 实例。CatalogManager 通过 catalogs 对象来管理多个 catalog。

    CatalogManager 的 defaultSessionCatalog 属性就是上面的 v2SessionCatalog 对象。

  5. $CatalogManager.catalog$ 方法通过 catalog 的 name 返回 CatalogPlugin 实例,如果没有该实例,则通过 $Catalogs.load$ 方法进行实例化。

  6. $Catalogs.load$ 方法加载 conf 中配置的 Spark.sql.catalog.$ {name}类,并实例化/初始化
    CatalogPlugin 对象

  7. 如果 spark.sql.catalog.${name} 参数为空(默认为空)时,返回 CatalogManager 中的 defaultSessionCatalog 属性.

  8. 如果 spark.sql.catalog.spark_catalog 参数已经配置,对上面 $Catalogs.load$ 出来的实例进行判断,如果发现上面加载的是 CatalogExtension 子类,自动调用其 $setDelegateCatalog$ 方法,将 CatalogManager 中 defaultSessionCatalog 设置为其内部代理对象。