一、概述

SessionState 是基于一个特定 SparkSession 维护所有单个 session 作用域的所有状态,SessionState 维护了 SparkSQL 中大部分的核心类,如 SqlParser、Analyzer、Optimizer 等,SparkSession 将 SQL Query Planning 功能托管给 SessionState。

二、初始化

SparkSession 构造函数中传入 parentSessionState,对其进行 $clone$ ,若传入不为空,则使用 $clone$ 对象。否则进行创建。

1
2
3
4
5
6
7
8
9
lazy val sessionState: SessionState = {
parentSessionState.map(_.clone(this)).getOrElse {
val state = SparkSession.instantiateSessionState(
SparkSession.sessionStateClassName(sharedState.conf),
self
)
state
}
}

2.1. 获取 SessionStateClassName

初始化 SessionState 时,SQL 里会根据参数 spark.sql.catalogImplementation 的值选择对应的 Builder 类。如果编译 Spark 时启用了 hive support,对应的Builder 类是 org.apache.spark.sql.hive.HiveSessionStateBuilder

1
2
3
4
5
6
private def sessionStateClassName(conf: SparkConf): String = {
conf.get(CATALOG_IMPLEMENTATION) match {
case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
}
}

2.2. 初始化 BaseSessionStateBuilder

在 $SparkSession.instantiateSessionState()$ 方法中,通过 Java 反射方式实例化一个 BaseSessionStateBuilder 的子类,然后调用 $build()$ 方法,返回一个 SessionState 实例:

1
2
3
4
5
6
7
8
9
private def instantiateSessionState(className: String, sparkSession: SparkSession): SessionState = {
try {
val clazz = Utils.classForName(className)
val ctor = clazz.getConstructors.head
ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build()
} catch {
//...
}
}

BaseSessionStateBuilder 可以接收一个 parentState 来对其的成员进行集成。parentState 为空则直接新建各组件,不为空,则对各组件进行 $clone$

在 BaseSessionStateBuilder 内部创建 conf、functionRegistry、experimentalMethods、sqlParser、resourceLoader、udfRegistration、analyzer、optimizer、planner、catalog 等组件…