一、概述

ShareState 负责保存多个有效 session 之间的共享状态。SharedState 中包含变量: warehousePath、cacheManager、statusStore、externalCatalog、globalTempViewManager、jarClassLoader。这些变量对于所有的 SparkSession 都是公用的。

二、初始化

1
2
3
lazy val sharedState: SharedState = {
existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions))
}

综上可以看出,SparkSession 重用 sharedState,但是会新 $clone$ 或者创建新的 sessionState,但是如果在创建多个 SparkSession 时,传入的 existingSharedState 都为空,则多个 SparkSession 也会创建多个 sharedState。

2.1. 解析和处理 SparkSQL 支持的文件系统协议

$setFsUrlStreamHandlerFactory()$ 方法用于将文件系统协议添加到 Spark SQL 中,例如 Amazon S3、HDFS 或其他自定义文件系统,以便读取数据。

2.2. 配置文件

(conf, hadoopConf)

  1. resolveWarehousePath

2.3. 初始化 CacheManager

CacheManager 是 Spark SQL 中内存缓存的管理者,在 Spark SQL 中提供对缓存查询结果的支持,并在执行后续查询时自动使用这些缓存结果。数据使用 InMemoryRelation 中存储的字节缓冲区进行缓存。CacheManager 通过 SharedState 在 SparkSessions 之间共享。

2.4. SQLAppStatusStore

SQLAppStatusStore 是一个 SparkSQL 应用程序状态存储库,用于 SparkSQL 内部应用程序的状态管理和检索。它可以用于将 SparkSQL 应用程序在不同步骤之间的状态持久化存储,以便在失败或重新启动时恢复这些状态。

SQLAppStatusStore 提供了一个统一的 API,用于访问和管理应用程序的状态数据,而不管它们是存储在内存、文件系统或数据库中。

1
2
3
4
5
6
7
8
val statusStore: SQLAppStatusStore = {
val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
val listener = new SQLAppStatusListener(conf, kvStore, live = true)
sparkContext.listenerBus.addToStatusQueue(listener)
val statusStore = new SQLAppStatusStore(kvStore, Some(listener))
sparkContext.ui.foreach(new SQLTab(statusStore, _))
statusStore
}

2.5. StreamingQueryStatusListener

还没开始哦~

2.6. ExternalCatalogWithListener

$externalCatalog$ 初始化 ExternalCatalogWithListener,ExternalCatalogWithListener 封装了 ExternalCatalog 类,同时提供了事件监听的功能。

  1. 通过 externalCatalogClassName 函数获取到要反射的类名,然后通过 $reflect$ 函数反射获取到 externalCatalog

    1
    2
    3
    4
    5
    6
    7
    8
    9
    val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
    SharedState.externalCatalogClassName(conf), conf, hadoopConf
    )
    private def externalCatalogClassName(conf: SparkConf): String = {
    conf.get(CATALOG_IMPLEMENTATION) match {
    case "hive" => HIVE_EXTERNAL_CATALOG_CLASS_NAME
    case "in-memory" => classOf[InMemoryCatalog].getCanonicalName
    }
    }
  2. createDatabase

    1
    2
    3
    4
    5
    6
    7
    8
    if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) {
    if (!SessionCatalog.DEFAULT_DATABASE.equalsIgnoreCase(SQLConf.get.defaultDatabase)) {
    throw QueryExecutionErrors.defaultDatabaseNotExistsError(SQLConf.get.defaultDatabase)
    }
    val defaultDbDefinition = CatalogDatabase(SQLConf.get.defaultDatabase, "default database",
    CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), Map())
    externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true)
    }
  3. 事件监听

    1
    2
    3
    val wrapped = new ExternalCatalogWithListener(externalCatalog)
    wrapped.addListener((event: ExternalCatalogEvent) => sparkContext.listenerBus.post(event))
    wrapped

2.7. GlobalTempViewManager

GlobalTempViewManager 全局的临时视图管理器,被 $DataFrame.createGlobalTempView()$ 方法调用,进行跨 session 的视图管理。GlobalTempViewManager 通过 synchronized 关键字保证了线程安全,提供了对全局视图(大小写敏感)的原子操作, 包括创建、更新、删除和重命名等。 内部通过 HashMap 维护视图名和其对应逻辑计划的映射关系

2.8. jarClassLoader

加载用户添加的 jar 包