一、概述

SparkSession 是 Spark 程序的新入口。在 Spark2.0 之前,使用 Spark 需要先创建 SparkConf 和 SparkContext,Spark 2.0 中引入了 SparkSession,为用户提供了一个统一的切入点来使用 Spark 的各项功能。 SparkConf、SparkContext 和 SQLContext 被封装在 SparkSession 中,使用 Spark 的各项功能只需创建一个 SparkSession。

在 Spark 的早期版本,SparkContext 是进入 Spark 的切入点。RDD 的创建和操作得使用 SparkContext 提供的 API; 对于 RDD 之外的其他东西,需要使用其他的 Context。比如流处理使用 StreamingContext; 对于 SQL 得使用 SQLContext; 而对于 Hive 得使用 HiveContext。

testdemo

二、设计

SparkSession 的设计遵循了工厂设计模式,Spark 程序中首先使用 $SparkSession.builder$ 对象进行一系列配置,包括

1
2
3
4
val spark: SparkSession = SparkSession.builder()
.master("local")
.appName("MySQLSourcePractice")
.getOrCreate()
  • $appName()$ (设置程序名)
  • $master()$ (运行模式: local、yarn、standalone 等)
  • $enableHiveSupport()$ (开启 hive 元数据,即使用 HiveContext)
  • $withExtensions()$ (将自定义扩展注入[SparkSession]。这允许用户添加分析器规则、优化器规则、规划策略或自定义解析器)
  • 还提供 $config()$ 方法,设置自定义参数等。

三、初始化 SparkSession

$getOrCreate()$ 方法则负责初始化具体的 SparkContext

3.1. 初始化 SparkConf

1
2
val sparkConf = new SparkConf()
options.foreach { case (k, v) => sparkConf.set(k, v) }

3.2. assertOnDriver()

在 Spark 中,所有的任务都是由 Executor 执行的,Executor 是分布式的计算节点,运行在集群中的不同机器上。Spark 提供了一些配置属性来控制 Executor 的行为和资源使用,其中 EXECUTOR_ALLOW_SPARK_CONTEXT 允许 Executor 可以创建和维护自己的 SparkSession。这个功能可以在一些特定的场景中非常有用,比如需要在 Executor 中读写分布式文件系统等操作,这些操作需要访问 Spark 上下文

默认情况下,Executor 不允许创建自己的 SparkSession,启用 EXECUTOR_ALLOW_SPARK_CONTEXT 后每个 Executor 都需要创建和维护自己的 SparkSession,会对性能产生很大的影响。因此,仅在必要时启用它。并且需要同时设置 spark.driver.allowMultipleContexts 为 true,否则会出现 SparkContext 冲突的问题。

1
2
3
if (!sparkConf.get(EXECUTOR_ALLOW_SPARK_CONTEXT)) {
assertOnDriver()
}

3.3. (get/create) SparkSession 对象

首先尝试取现有的 SparkSession,如果没有现有的,则基于生成器中设置的选项创建新的一个 SparkSession.

3.3.1. get SparkSession

  • 当前线程的 SparkSession 对象

    首先检查是否存在有效的线程本地 SparkSession, activeThreadSession 是一个 new Inheritable ThreadLocal[SparkSession] 方法,InheritableThreadLocal 继承 ThreadLocal

    1
    2
    3
    4
    5
    var session = activeThreadSession.get()
    if ((session ne null) && !session.sparkContext.isStopped) {
    applyModifiableSettings(session)
    return session
    }
  • 全局默认 SparkSession

    再检查是否存在有效的全局的默认 SparkSession

    1
    2
    3
    4
    5
    session = defaultSession.get()
    if ((session ne null) && !session.sparkContext.isStopped) {
    applyModifiableSettings(session, new java.util.HashMap[String, String](options.asJava))
    return session
    }

    applyModifiableSettings() 方法是 SparkSession 的一个成员方法,它允许在创建 SparkSession 之后,动态地更改 SparkSession 的配置设置,而无需重新创建 SparkSession。它被设计用于例如系统属性或命令行参数之类的可变配置源,以便动态地更改特定的 Spark 模块的行为。

3.3.2. create SparkSession

当前线程没有活动 SparkSession 的或者没有全局默认的 SparkSession,则创建一个新的 SparkSession

  1. 初始化 SparkContext
    调用 SparkContext 构造函数初始化 SparkContext

    1
    2
    3
    4
    5
    6
    val sparkContext = userSuppliedContext.getOrElse {
    if (!sparkConf.contains("spark.app.name")) {
    sparkConf.setAppName(java.util.UUID.randomUUID().toString)
    }
    SparkContext.getOrCreate(sparkConf)
    }
  2. 加载用户自定义扩展

    Spark SQL Extensions 提供了一种灵活的机制,使得 Spark 用户可以在 SQL 解析的 Parser、Analyzer、Optimizer 以及 Planner 等阶段进行自定义扩展,包括自定义 SQL 语法解析、新增数据源等等。

    1
    2
    3
    4
    loadExtensions(extensions)
    applyExtensions(
    sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
    extensions)
  3. 创建 SparkSession

    1
    session = new SparkSession(sparkContext, None, None, extensions, options.toMap)

    SparkSession

    • SparkContext

    • 初始化 SharedState

      SharedState 是可以被多个 SparkSession 共享的状态,它让多个 SparkSession 在同一个应用程序中共享相同的状态,包括共享相同的元数据,缓存和其他静态变量,从而减少重复的代码。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      lazy val sessionState: SessionState = {
      parentSessionState
      .map(_.clone(this))
      .getOrElse {
      val state = SparkSession.instantiateSessionState(
      SparkSession.sessionStateClassName(sharedState.conf),
      self)
      state
      }
      }
    • 初始化 SessionState

      SessionState 是基于一个特定 SparkSession 维护的所有状态,SessionState 维护了 SparkSQL 中大部分的核心类,如 SqlParser、 Analyzer、 Optimizer 等。

      这些具体类的实现类型根据当前 Spark Application 的模式会有所不同。

      调用 SparkSession 对象的 cloneSession() 或者 newSession() 创建新的 SparkSession 对象,会重用 SharedState。但是新的 SparkSession 会拥有新的 SessionState 对象,从而拥有新的 conf、resourceLoader、解析器、优化器、物理计划器等,从而将不同的 SparkSession 的操作和配置隔离开。

    • 初始化 SparkSessionExtensions

    • 配置项

3.3.3. 设置 SparkSession

1
2
setDefaultSession(session)
setActiveSession(session)

3.3.4. 注册监听器

设置 SparkListener 当 Application 结束时,重置 default session

1
2
3
4
5
6
7
8
9
10
11
private def registerContextListener(sparkContext: SparkContext): Unit = {
if (!listenerRegistered.get()) {
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
defaultSession.set(null)
listenerRegistered.set(false)
}
})
listenerRegistered.set(true)
}
}

$registerContextListener()$ 允许用户在 SparkSQL 上下文中注册一个回调函数,当上下文关闭 $spark.close()$ 时,指定的回调函数将被调用,在上下文关闭前做一些资源清理工作。

四、执行 Sql 语句(SparkSession#sql())

SparkSession 类的 $sql()$ 方法负责执行 sql 语句,通过调用 SessionState 中的各种对象,包括不同阶段对应的 SparkSqlParser 类、 Analyzer 类、 Optimizer 类和 SparkPlanner 类等~,最后返回 DataFrame。

1
2
3
4
5
6
7
8
9
10
11
12
def sql(sqlText: String, args: Map[String, Any]): DataFrame = withActive {
val tracker = new QueryPlanningTracker
val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
val parsedPlan = sessionState.sqlParser.parsePlan(sqlText)
if (args.nonEmpty) {
ParameterizedQuery(parsedPlan, args.mapValues(lit(_).expr).toMap)
} else {
parsedPlan
}
}
Dataset.ofRows(self, plan, tracker)
}
  1. 初始化 QueryPlanningTracker

  2. 将 SQL 解析为 Unresolved LogicalPlan

    • 初始化 SessionState
    • 获取 Parser
    • $parsePlan()$
  3. ParameterizedQuery

  4. $Dataset.ofRows()$

    plan 就是 sqlParser 解析出来的 logicalPlan ,接着进入到 $Dataset.ofRows()$ ~

    1
    2
    3
    4
    5
    6
    7
    def ofRows(sparkSession: SparkSession, 
    logicalPlan: LogicalPlan,
    tracker: QueryPlanningTracker) = sparkSession.withActive {
    val qe = new QueryExecution(sparkSession, logicalPlan, tracker)
    qe.assertAnalyzed()
    new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
    }
    • 初始化 QueryExecution

      QueryExecution 是 SQL 查询的主要工作流,包含了一条 SQL 语句需要执行的所有计划

      Spark 会为每一个 SQL 语句创建一个 QueryExecution。

    • $QueryExecution.assertAnalyzed()$

      1
      2
      3
      lazy val analyzed: LogicalPlan = executePhase(QueryPlanningTracker.ANALYSIS) {
      sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
      }
    • 初始化 Dataset

      • RowEncoder

五、SparkSession#executeCommand()