一、概述

Spark Core 内部的事件框架实现了基于事件的异步化编程模式。它的最大好处是可以提升应用程序对物理资源的充分利用,能最大限度的压榨物理资源,提升应用程序的处理效率。缺点比较明显,降低了应用程序的可读性。Spark 的基于事件的异步化编程框架由事件框架和异步执行线程池組成,应用程序产生的 Event 发送给 ListenerBus, ListenerBus 再把消息广播给所有的 Listener,每个 Listener 收到 Event 判断是否自己感兴趣的 Event, 若是,会在 Listener 独享的线程池中执行 Event 所对应的逻辑程序。

二、实现

Spark 中的事件总线采用监听器模式设计。

2.1. SparkListenerEvent

在 Spark 任务运行期间会产生大量包含运行信息的 SparkListenerEvent,例如 ApplicationStart/StageCompleted/MetricsUpdate 等等,都有对应的 SparkListenerEvent 实现。所有的 event 会发送到 ListenerBus 中,被注册在 ListenerBus 中的所有 listener 监听

2.2. LiveListenerBus

Spark 事件总线的核心是 LiveListenerBus,其内部维护了多个 AsyncEventQueue 队列用于存储和分发和响应 SparkListenerEvent 事件,负责接受 SparkListenerEvent 并且分发给各个 Listener。

MetricsSystem 是一个为了衡量系统的各种指标的度量系统。Listener可以是 MetricsSystem 的信息来源之一。他们之间总体是一个互相补充的关系

2.2.1. 属性

  1. queuedEvents 属性

    queuedEvents 维护一个 SparkListenerEvent 列表,在 LiveListenerBus 启动成功之前,缓存可能已经收到的事件。在启动之后,这些缓存的事件会首先投递出去。

    1
    @volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()
  2. queues

    queues 维护一个 AsyncEventQueue 的列表,LiveListenerBus 中会有多个事件队列。采用 CopyOnWriteArrayList 来保证线程安全性。

    1
    private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()

    AsyncEventQueue 实现了 SparkListenerBus,提供了异步事件队列机制。SparkListenerBus 继承了 ListenerBus,实现了 $doPostEvent()$ 方法,对事件进行匹配,并调用 Listener 的处理方法。如果无法匹配到事件,则调用 $onOtherEvent()$ 方法。

    SparkListenerBus 支持的监听器都是 SparkListenerInterface 的子类,事件则是 SparkListenerEvent 的子类。

    ListenerBus 特征是 Spark 内所有事件总线实现的基类,主要用于接收事件并提交到对应的事件监听器, 添加和移除事件监听器。ListenerBus 特征带有两个泛型参数 L 和E。L 代表监听器的类型,并且它可以是任意类型的。E 则代表事件的类型。

    • 属性

      1. listenersPlusTimers 维护了所有注册在事件总线上的监听器以及它们对应计时器的二元组。计时器是可选的,用来指示监听器处理事件的时间。它采用了并发容器CopyOnWriteArrayList,以保证线程安全和支持并发修改。

        1
        private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]
      2. listeners 就是将 listenersPlusTimers 中的监听器单独取出来

        1
        private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
    • 方法

      1. $add/removeListener()$

        ListenerBus 特征中定义了一些基本的与事件总线相关的方法,$addListener()$ 与 $removeListener()$ 方法分别向事件总线中注册监听器与移除监听器。它们都是直接在 CopyOnWriteArrayList 上操作,因此是线程安全的。

      2. $doPostEvent()$

        将事件 event 投递给监听器 listener 进行处理。在 ListenerBus 中只提供了定义,具体逻辑须要由各个实现类来提供。

      3. $postToAll()$

        通过调用 $doPostEvent()$ 方法,将事件 event 投递给所有已注册的监听器

      AsyncEventQueue 使用生产者-消费者模型,$post()$ 事件到阻塞队列 BlockingQueue, 然后线程不断的从 BlockingQueue 拿事件,调用监听者的相应 $post()$ 方法,完成事件的分发~

      • 属性

        • dispatchThread: 将队列中的事件分发到各监听器的守护线程,实际上调用了 $dispatch()$ 方法。
        1
        2
        3
        4
        5
        6
        private val dispatchThread = new Thread(s"spark-listener-group-$name") {
        setDaemon(true)
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
        dispatch()
        }
        }
      • 方法

        1. 入队: $post()$

          $post()$ 方法首先检查队列是否已经停止。如果是运行状态,就试图将事件 event 入队。若 $offer()$ 方法返回 false,表示队列已满,将丢弃事件的计数器自增,并标记有事件被丢弃。最后,若当前的时间戳与上一次输出 droppedEventsCounter 值的间隔大于1分钟,就在日志里输出它的值。

        2. 分发消息: $dispatch()$

          循环地从事件队列中取出事件,并调用父类 ListenerBus 特征的 $postToAll()$ 方法。

2.2.2. 方法

LiveListenerBus 作为一个事件总线,也必须提供监听器注册、事件投递等功能,这些都是在 AsyncEventQueue 基础之上实现的。

  • $start()$

  • 注册监听器: $addToQueue()$

    $addToQueue()$ 将监听器 listener 注册到 queue 的队列中。它会在 queues 列表中寻找符合条件的队列,如果该队列已经存在,就调用父类 ListenerBus 的 $addListener()$ 方法直接注册监听器。反之,就先创建一个 AsyncEventQueue,注册监听器到新的队列中。

    LiveListenerBus 还提供了另外 4 种直接注册监听器的方法,分别对应内置的 4 个队列:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    def addToSharedQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, SHARED_QUEUE)
    }
    def addToManagementQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
    }
    def addToStatusQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, APP_STATUS_QUEUE)
    }
    def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EVENT_LOG_QUEUE)
    }
  • 事件投递: $post()$

    $post()$ 方法会检查 queuedEvents 中有无缓存的事件,以及事件总线是否还没有启动。投递时会调用 $postToQueues()$ 方法,将事件发送给所有队列,由AsyncEventQueue 来完成投递到监听器的工作。

2.3. SparkListener

Spark 消息 SparkListenerEvent 被异步的发送给已经注册过的 SparkListener