一、概述

SparkContext 是 Spark 的入口,相当于应用程序的 $main$ 函数。目前在一个 JVM 进程中可以创建多个 SparkContext,但是只能有一个 active 级别的。如果需要创建一个新的 SparkContext 实例,必须先调用 $stop$ 方法停掉当前 active 级别的 SparkContext 实例。

Spark 集群组件

SparkContext 处于 DriverProgram 核心位置,所有与 ClusterManager、Worker 交互的操作都需要 SparkContext 来完成。

二、设计

SparkContext 包含了 Spark 程序用到的几乎所有核心对象,创建 SparkContext 时会添加一个钩子到 ShutdownHookManager 中用于在 Spark 程序关闭时对核心对象进行清理,在创建 RDD 等操作也会判断 SparkContext 是否已 $stop$ ;通常情况下一个 Driver 只会有一个 SparkContext 实例,但可通过 spark.driver.allowMultipleContexts 配置来允许 driver 中存在多个SparkContext 实例。

三、初始化

3.1. 基本信息设置

3.1.1. CallSite 信息

保存了当前的 CallSite 信息

1
private val creationSite: CallSite = Utils.getCallSite()

3.1.2. assertOnDriver()

1
2
3
if (!config.get(EXECUTOR_ALLOW_SPARK_CONTEXT)) {
SparkContext.assertOnDriver()
}

3.1.3. allowMultipleContexts 属性

判断是否允许创建多个 SparkContext 实例,使用的是 spark.driver.allowMultipleContexts 属性

1
private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false)

spark.driver.allowMultipleContexts 属性,默认为 false,此时 SparkContext 只有一个 active 实例。$markPartiallyConstructed()$ 方法用来确保 SparkContext 实例的唯一性,并将当前的 SparkContext 标记为正在构建中,以防止多个 SparkContext 实例同时成为 active。

1
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)

如果设置 spark.driver.allowMultipleContexts 为 true,当存在多个 active 级别的 SparkContext 实例时 Spark 会发生警告,而不是抛出异常。

3.1.4. SparkConf 设置

复制 SparkConf,校验配置信息,其中最主要的就是 SparkConf 必须指定 spark.master(用于设置部署模式) 和 spark.app.name(应用程序名称) 属性,否则会抛出异常。

3.2. 执行环境 SparkEnv

SparkEnv 是 Spark 的执行环境对象,其中包括与众多 Executor 相关的对象。

在 local 模式下 Driver 会创建 Executor,local-cluster 模式或者 Standalone 模式下 Worker CoarseGrainedExecutorBackend 进程中会创建 Executor,所以 SparkEnv 存在于 Driver 或者 CoarseGrainedExecutorBackend 进程中。

$SparkEnv.createDriverEnv()$ 方法负责创建 SparkEnv,方法有四个参数: conf、isLocal、listenerBus 以及在本地模式下 driver 运行 executor 所需的 numberCores。

1
2
3
4
5
6
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)

private[spark] def createSparkEnv(conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf), this)
}

3.2.1. numDriverCores()

获取在本地模式下执行程序需要的 cores 个数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private[spark] def numDriverCores(master: String, conf: SparkConf): Int = {
def convertToInt(threads: String): Int = {
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
}
master match {
case "local" => 1
case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case "yarn" | SparkMasterRegex.KUBERNETES_REGEX(_) =>
if (conf != null && conf.get(SUBMIT_DEPLOY_MODE) == "cluster") {
conf.getInt(DRIVER_CORES.key, 0)
} else {
0
}
case _ => 0 // Either driver is not being used, or its core count will be interpolated later
}
}

3.2.2. createSparkEnv()

3.3. SparkUI

SparkUI 提供了用浏览器访问具有样式及布局并且提供丰富监控数据的页面。其采用的是时间监听机制。发送的事件会存入缓存,由定时调度器取出后分配给监听此事件的监听器对监控数据进行更新。如果不需要 SparkUI,则可以将 spark.ui.enabled 置为 false。

1
2
3
4
5
6
7
8
private var _ui: Option[SparkUI] = None
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime))
} else {
None
}
_ui.foreach(_.bind())

3.4. Hadoop 相关配置

默认情况下,Spark 使用 HDFS 作为分布式文件系统,所以需要获取 Hadoop 相关的配置信息:

1
2
private var _hadoopConfiguration: Configuration = _
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

获取的配置信息包括:

1
2
3
4
5
private[spark] def newConfiguration(conf: SparkConf): Configuration = {
val hadoopConf = new Configuration()
appendS3AndSparkHadoopHiveConfigurations(conf, hadoopConf)
hadoopConf
}
  • 将 Amazon S3 文件系统的 AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY 加载到 Hadoop 的Configuration:
  • 将 SparkConf 中所有的以 spark.hadoop.* 开头的属性都赋值到 Hadoop 的 Configuration
  • 将 SparkConf 的属性 spark.buffer.size 复制到 Hadoop 的 Configuration 的配置 io.file.buffer.size

3.5. Executor 环境变量

executorEnvs 是一个 HashMap 存储, executorEnvs 包含的环境变量将会注册应用程序的过程中发送给 Master,Master 给 Worker 发送调度后,Worker 最终使用 executorEnvs 提供的信息启动 Executor。

通过配置 spark.executor.memory 指定 Executor 占用的内存的大小,也可以配置系统变量 SPARK_EXECUTOR_MEMORY 或者 SPARK_MEM 设置其大小。

1
2
3
4
5
6
7
private var _executorMemory: Int = _
_executorMemory = _conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(1024)

3.6. 注册 HeartbeatReceiver 心跳接收器

在 Spark 的实际生产环境中,Executor 是运行在不同的节点上的。在 local 模式下的 Driver 与 Executor 属于同一个进程,Dirver 与 Executor 可以直接使用本地调用交互,当 Executor 运行出现问题时,Driver 可以很方便地知道,例如,通过捕获异常。但是在生产环境下,Driver 与 Executor 可能不在同一个进程内,也许运行在不同的机器上,甚至在不同的机房里,因此 Driver 对 Executor 失去掌握。为了能够掌控 Executor,在 Driver 中创建了这个心跳接收器。

1
_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

使用 SparkEnv 的子组件通信组件 NettyRpcEnv 的 $setupEndpoint()$ 方法,将 RpcEnv 的 Dispatcher 注册 HeartbeatReceiver,并返回 HeartbeatReceiver 的 NettyRpcEndpointRef 引用。

3.7. 创建和启动 TaskScheduler

TaskScheduler 是 SparkContext 的重要组成部分,负责任务的提交,请求集群管理器对任务调度,并且负责发送的任务到集群,运行它们,任务失败的重试,以及慢任务的在其他节点上重试。

其中给应用程序分配并运行 Executor 为一级调度,而给任务分配 Executor 并运行任务则为二级调度。另外 TaskScheduler 也可以看做任务调度的客户端。

3.7.1. 创建 TaskScheduler

$createTaskScheduler()$ 方法根据 master 的配置匹配部署模式,创建 TaskSchedulerImpl,并生成不同的 SchedulerBackend。

1
2
3
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts

TaskScheduler 负责任务调度资源分配,SchedulerBackend 负责与 Master、Worker 通信收集 Worker 上分配给该应用使用的资源情况。

3.7.2. 启动

TaskScheduler 启动实际是调用了 backend 的 $ start()$ 方法:

1
_taskScheduler.start()

3.8. 创建和启动 DAGScheduler

DAGScheduler 主要用于在任务正式交给 TaskScheduler 提交之前做一些准备工作,包括: 创建Job,将 DAG 中的 RDD 划分到不同的 Stage,提交 Stage 等等。

1
2
@volatile private var _dagScheduler: DAGScheduler = _
_dagScheduler = new DAGScheduler(this)

3.9. 启动测量系统 MetricsSystem

MetricsSystem 中三个概念:

  1. Instance: 指定了谁在使用测量系统

    Spark 按照 Instance 的不同,区分为 Master、Worker、Application、Driver 和 Executor

  2. Source: 指定了从哪里收集测量数据

    Source 有两种来源:

    • Spark internal source: MasterSource/WorkerSource 等
    • Common source: JvmSource
  3. Sink: 指定了往哪里输出测量数据;

    Spark 目前提供的 Sink 有 ConsoleSink、CsvSink、JmxSink、MetricsServlet、GraphiteSink 等;Spark 使用 MetricsServlet 作为默认的 Sink。

MetricsSystem 的启动过程包括:

  1. 注册 Sources
  2. 注册 Sinks
  3. 将 Sinks 增加 Jetty 的 ServletContextHandler

MetricsSystem 启动完毕后,会遍历与 Sinks 有关的 ServletContextHandler,并调用 attachHandler 将它们绑定到 Spark UI 上。

1
_env.metricsSystem.start(_conf.get(METRICS_STATIC_SOURCES_ENABLED))

3.10. 创建 EventLoggingListener

EventLoggingListener 是将事件持久化到存储的监听器,是 SparkContext 中可选组件。当 spark.eventLog.enabled 属性为 true 时启动,默认为 false。

1
2
3
4
5
6
7
8
9
10
11
12
private[spark] def isEventLogEnabled: Boolean = _conf.get(EVENT_LOG_ENABLED)
_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
}

EventLoggingListenser 最为核心的方法是 $logEvent()$,用于将事件转换为 Json 字符串后写入日志文件。

3.11. 创建和启动 ExecutorAllocationManager

ExecutorAllocationManager 用于对以分配的 Executor 进行管理。 默认情况下不会创建 ExecutorAllocationManager,可以修改属性 spark.dynamicAllocation.enabled 为 true 来创建。ExecutorAllocationManager 可以动态的分配最小 Executor 的数量、动态分配最大 Executor 的数量、每个 Executor 可以运行的 Task 数量等配置信息,并对配置信息进行校验。$start()$ 方法将 ExecutorAllocationListener 加入 listenerBus 中,ExecutorAllocationListener 通过监听 listenerBus 里的事件,动态的添加、删除 Executor。并且通过不断添加 Executor,遍历 Executor,将超时的 Executor 杀死并移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
cleaner = cleaner, resourceProfileManager = resourceProfileManager,
reliableShuffleStorage = _shuffleDriverComponents.supportsReliableStorage()))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())

3.12. ContextCleaner 的创建与启动

ContextCleaner 用于清理超出应用范围的 RDD、ShuffleDependency 和 Broadcast 对象。

ContextCleaner 的组成:

  • referenceQueue: 缓存顶级的 AnyRef 引用;
  • referenceBuff: 缓存AnyRef的虚引用;
  • listeners: 缓存清理工作的监听器数组;
  • cleaningThread: 用于具体清理工作的线程。

3.13. SparkListener

SparkContext 支持添加用于自定义 SparkListener

1
setupAndStartListenerBus()
  1. spark.extraListeners 属性中获取用户自定义的 SparkListener 的类名。用户可以通过逗号分割多个自定义 SparkListener。

    1
    2
    3
    4
    5
    6
    7
    conf.get(EXTRA_LISTENERS).foreach { classNames =>
    val listeners = Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)
    listeners.foreach { listener =>
    // 通过发射生成每一个自定义 SparkListener 的实例,并添加到事件总线的监听器列表中。
    listenerBus.addToSharedQueue(listener)
    }
    }
  2. 启动事件总线,并将_listenerBusStarted 设置为 true。

    1
    2
    listenerBus.start(this, _env.metricsSystem)
    _listenerBusStarted = true

3.14. Spark 环境更新

在 SparkContext 的初始化过程中,可能对其环境造成影响,所以需要更新环境:

1
2
3
4
5
6
7
8
9
10
11
12
13
private def postEnvironmentUpdate(): Unit = {
if (taskScheduler != null) {
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val addedArchivePaths = addedArchives.keys.toSeq
val environmentDetails = SparkEnv.environmentDetails(conf, hadoopConfiguration,
schedulingMode, addedJarPaths, addedFilePaths, addedArchivePaths,
env.metricsSystem.metricsProperties.asScala.toMap)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
listenerBus.post(environmentUpdate)
}
}

SparkContext 初始化过程中,如果设置了 spark.jars 属性,spark.jars 指定的 jar 包将由 $addJar()$ 方法加入 httpFileServer 的 jarDir 变量指定的路径下。每加入一个 jar 都会调用 $postEnvironmentUpdate()$ 方法更新环境。

增加文件与增加 jar 相同,也会调用 $postEnvironmentUpdate()$ 方法。

  1. 通过调用 SparkEnv 的方法 $environmentDetails()$ ,将环境的 JVM 参数、Spark 属性、系统属性、classPath 等信息设置为环境明细信息。
  2. 生成事件 SparkListenerEnvironmentUpdate(此事件携带环境明细信息),并投递到事件总线 listenerBus,此事件最终被 EnvironmentListener 监听,并影响 EnvironmentPage 页面中的输出内容。

3.15. 发送 SparkListenerApplicationStart 事件

$postApplicationStart()$ 方法负责向 listenerBus 发送 SparkListenerApplicationStart 事件

1
2
3
4
5
6
7
8
private def postApplicationStart(): Unit = {
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls,
schedulerBackend.getDriverAttributes))
_driverLogger.foreach(_.startSync(_hadoopConfiguration))
}

3.16. 将 SparkContext 标记为激活

SparkContext 初始化的最后将当前 SparkContext 的状态从 contextBeingConstructed(正在构建中) 改为 activeContext(已激活)

1
2
3
4
5
6
7
8
SparkContext.setActiveContext(this)
private[spark] def setActiveContext(sc: SparkContext): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc)
contextBeingConstructed = None
activeContext.set(sc)
}
}