一、概述

聚合操作(Aggregation)指的是在原始数据的基础上按照一定的逻辑进行整合,从而得到新的数据,一般通过聚合函数(如 count、 max 和 sum 等)汇总多行的信息。聚合查询一直以来都是数据分析应用中必不可少的部分,在各种 SQL 算子中占据着重要地位。

在 Catalyst 的 SqlBase.g4 文法文件中,聚合语句 aggregation 定义如下:在常见的聚合查询 中,通常包括分组语句(group by)和聚合函数( aggregate function );聚合函数出现在 Select 语 句中的情形较多,定义在 functionCall 标签的 prim缸yExpression 表达式文法中, qualifiedName 对 应函数名,括号内部是该函数的参数列表。

二、普通聚合

2.1. 聚合缓冲区与聚合模式(AggregateMode)

聚合查询在计算聚合值的过程中,通常都需要保存相关的中间计算结果,例如 $max$ 函数需要保存当前最大值,$count$ 函数需要保存当前的数据总数,求平均值的 $avg$ 函数需要同时保存 count 和 sum 的值,更复杂的函数(如 $pencentil$ 等)甚至需要临时存储全部的数据,聚合查询计算过程中产生的这些中间结果会临时保存在聚合函数缓冲区。

  1. 聚合函数缓冲区

    聚合函数缓冲区是指在同一个分组的数据聚合的过程中,用来保存聚合函数计算中间结果的内存空间。聚合函数缓冲区的定义有一个前提条件,即聚合函数缓冲区针对的是处于同一个分组内(实例中属于同一个id) 的数据。

    需要注意的是,查询中可能包含多个聚合函数,因此聚合函数缓冲区是多个聚合函数所共享的。

  2. 聚合模式

    在 SparkSQL 中,聚合过程有 4 种模式,分别是 Partial 模式、 ParitialMerge 模式、 Final 模式 和 Complete 模式。

    1. Final/Partial

      Final 模式一般和 Partial 模式组合在一起使用。 Partial 模式可以看作是局部数据的聚合,在具体实现中, Partial 模式的聚合函数在执行时会根据读入的原始数据更新对应的聚合缓冲区, 当处理完所有的输入数据后,返回的是聚合缓冲区中的中间数据 。 而 Final 模式所起到的作用 是将聚合缓冲区的数据进行合并,然后返回最终的结果。

      如图所示,在最终分组计算总和之前,可以先进行局部聚合处理,这样能够避免数据传输并减少计算量。 因此,上述聚合过程 中在 map 阶段的 sum 函数处于 Partial 模式,在 reduce 阶段的 sum 函数处于 Final 模式。

    2. Complete

      Complete 模式和 Partial/Final 组合方式不一样,不进行局部聚合计算。 同样的聚合函数采用 Complete 模式,最终阶段直接针对原始输入,中间没有局部聚合过程。

      Complete 模式一般应用在不支持 Partial 模式的聚合函数中

    3. PartialMerge

      相比 Partial、 Final 和 Complete 模式,PartialMerge 模式的聚合函数主要是对聚合缓冲区进行合并,但此时仍然不是最终的结果。ParitialMerge主要应用在 distinct 语句中,如图所示。聚合语句针对同一张表进行 sum 和 count (distinct)查询,最终的执行过程包含了4步聚合操作。

      • 第 1 步按照(A,C)分组,对 sum 函数进行 Partial 模式聚合计算
      • 第2步是 PartialMerge 模式,对上一步计算之后的聚合缓冲区进行合并,但此时仍然不是最终的结果
      • 第3步分组的列发生变化,再一次进行 Partial 模式的 count 计算
      • 第4步完成 Final 模式的最终计算。

2.2. 聚合函数(AggregateFunction)

聚合函数 (AggregateFunction)是聚合查询中非常重要的元素。在实现上,聚合函数是表达式中的一种,和 Catalyst 中定义的聚合表达式(AggregationExpression)紧密关联。无论是在逻辑算子树还是物理算子树中,聚合函数都是以聚合表达式的形式进行封装的,同时聚合函数表达式中也定义了直接生成聚合表达式的方法。
聚合表达式 (AggregationExpression) 的成员变量和函数如图 7.7所示,根据命名,这些变量和函数的含义很好理解。例如,resultAtribute 表示聚合结果,获取子节点的 children 方法并返回聚合函数表达式;dataType 函数直接调用聚合函数中的 dataType 函数获取数据类型。在默认情况下,聚合表达式的 foldable 函数返回的是 false,因为聚合表达式一般无法静态得到最终的结果,需要经过进一步的计算。

2.2.1. DeclarativeAggregate 聚合函数

DeclarativeAggregate 聚合函数是一类直接由 Catalyst 中的表达式(Expressions) 构建的聚合函数,主要逻辑通过调用4个表达式完成,分别是 initialValues(聚合缓冲区初始化表达式)、updateExpressions(聚合缓冲区更新表达式)、mergeExpressions(聚合缓冲区合并表达式)和evaluateExpression(最终结果生成表达式)。

2.2.2. ImperativeAggregate 聚合函数

不同于 DeclarativeAggregate 聚合函数基于 Catalyst 表达式的实现方式,ImperativeAggregate 聚合函数需要显式地实现 initialize、 update 和 merge 方法来操作聚合缓冲区中的数据。一个比较显著的不同是,ImperativeAggregate 聚合函数所处理的聚合缓冲区本质上是基于行 (InternalRow 类型)的。

2.2.3. TypedImperativeAggregate 聚合函数

2.3. 聚合执行

聚合执行本质上是将 RDD 的每个 Partition 中的数据进行处理。对于每个 Partition 中的输入数据即 Input(通过 InputIterator 进行读取),经过聚合执行计算之后,得到相应的结果数据即 Result(通过 AggregationIterator 来访问)。

聚合查询的最终执行有两种方式:基于排序的聚合执行方式(SortAggregateExec) 与基于 Hash 的聚合执行方式(HashAggregateExec)。在后续版本中,又加入了 ObjectHashAggregateExec 的执行方式。

常见的聚合查询语句通常采用 HashAggregate 方式,当存在以下几种情况时,会用 SortAggregate 方式来执行。

  1. 查询中存在不支持 Partial 方式的聚合函数: 此时会调用 $AggUtils#planAggregateWithoutPartial$ 方法,直接生成 SortAggregateExec 聚合算子节点。

  2. 聚合函数结果不支持 Buffer 方式: 如果结果类型不属于(Null Type, Booleanlype, ByteType, ShortType, IntegerType, LongType, FloatType, Double Type, DateType, TimestampType, DecimalType) 集合中的任意一种,则需要执行 SortAggregateExec 方式

    例如 collect_setcollect_list 函数。

  3. 内存不足: 如果在 HashAggregate 执行过程中,内存空间已满,那么聚合执行会切换到 SortAggregateExec 方式

三、窗口聚合

窗口函数(WindowFunction)和普通聚合函数(AggregateFunction)类似,都是对多行数据信息进行整合。不同之处在于,窗口函数中多了一个灵活的“窗口”,支持用户指定更加复杂的聚合行为(如数据划分和范围设置等),因此在某些特殊分析场景下扮演着重要的角色。