Flink-源码学习-FlinkSQL&Table-Planner-Parser 模块-CalciteParser
一、概述对于标准的 SQL 语句, ExtendedParser 不会去解析它。标准 SQL 的解析过程由 CalciteParser 负责。
二、实现
Flink-源码学习-FlinkSQL&Table-Planner-Parser 模块-ExtendedParser
一、概述ExtendedParser 用于在不增加 CalciteParser 复杂性的前提下(不用修改Calcite,增加新的关键字),让 Flink SQL 支持更多专用的语法。
二、实现2.1. 策略https://www.jianshu.com/p/e4956652cfcb
ExtendedParser 包含如下解析策略:
1234567private static final List<ExtendedParseStrategy> PARSE_STRATEGIES = Arrays.asList( ClearOperationParseStrategy.INSTANCE, HelpOperationParseStrategy.INSTANCE, QuitOperationParseStrategy.INSTANCE, ResetOperationParseStrategy.INSTANCE, Se ...
Flink-源码学习-FlinkSQL&Table-Planner-Parser 模块
一、概述二、实现2.1. 架构设计2.2. 解析2.2.1. $ParserImpl#parse$
获取 Calcite 的解析器
1CalciteParser parser = calciteParserSupplier.get();
使用 FlinkPlannerImpl 作为 validator
1FlinkPlannerImpl planner = validatorSupplier.get();
EXTENDED_PARSER
对于一些特殊的写法,例如 SET key=value。CalciteParser 是不支持这种写法的, 为了避免在 Calcite 引入过多的关键字,这里定义了一组 extended parser,专门用于在CalciteParser之前,解析这些特殊的语句
1234Optional<Operation> command = EXTENDED_PARSER.parse(statement);if (command.isPresent()) { return Collections.singletonList(command. ...
Flink-源码学习-FlinkSQL&Table-Planner-Validator 模块
一、概述二、实现2.1. 架构设计
2.2. 验证2.2.1. $SqlValidatorImpl#validate$
Flink-源码学习-FlinkSQL&Table-Table 体系-Connector-TableSource
一、概述Flink SQL 可以将多种数据源或数据落地端映射为 table
二、实现2.1. 架构设计Flink 使用 SPI 机制加载Factory(DynamicTableSourceFactory 和 DynamicTableSinkFactory同属 Factory)。在 flink-table-api-java-bridge 项目的 resources/META-INF/services 目录可以找到org.apache.flink.table.factories.Factory 文件,内容为:
123org.apache.flink.table.factories.DataGenTableSourceFactoryorg.apache.flink.table.factories.BlackHoleTableSinkFactoryorg.apache.flink.table.factories.PrintTableSinkFactory
以 DataGenTableSourceFactory 为例:
2.1.1. DynamicTableSourceFactory2. ...
Flink-源码学习-FlinkSQL&Table-一条简单 SQL 的执行~
一、概述123456789101112131415161718192021222324252627282930private val AVG_TEMPLATE_SINK_TABLE_SQL: String = """ |CREATE TABLE avg_template_sink_table ( | `avg_temp_val` DOUBLE, | `address` STRING, | `ts` TIMESTAMP(3) |) WITH ( | 'connector' = 'print' |) |""".stripMargin def main(args: Array[String]): Unit = { val settings: EnvironmentSettings = EnvironmentSettings.newInstance().inStreamingMode ...
Flink-源码学习-FlinkSQL&Table-元数据体系
一、概述为了使用 SQL,首先需要解决的是元数据管理的问题。包括 Table/UDF/View 元数据的注册、查询以及验证。
二、架构设计2.1. 元数据注册FinkSQL 原生支持了 DDL 语法,比如 CREATE 语句,通过 Catalog API 将表/视图/函数 注册到当前或指定的 Catalog 中,默认使用 InMemoryCatalog 将信息临时保存在内存中,有一个唯一的 ID,由三部分组成: 目录 (catalog)、数据库 (database) 名、表名。
在默认情况下,目录名为 defaultcatalog、数据库名为 default database。故直接创建一个 test 表,它的 ID 是: defaultcatalog.default database.MyTable
同时也提供了 HiveCatalog 与 HiveMetastore 进行集成。FlinkSQL 目前支持以下 CREATE 语句:
CREATE TABLE
CREATE CATALOG
CREATE DATABASE
CREATE VIEW
CREATE FUNCTION
...
Flink-源码学习-FlinkSQL&Table-规则体系 Rules-logical
一、概述SQL 的执行流程一般分为四个主要的阶段,Flink 主要依赖于 Calicte 来完成这一流程:
Parse:语法解析,把 SQL 语句转换成为一个抽象语法树(AST),在 Calcite 中用 SqlNode 来表示;
Validate:语法校验,根据元数据信息进行验证,例如查询的表、使用的函数是否存在等,校验之后仍然是 SqlNode 构成的语法树;
Optimize:查询计划优化,这里其实包含了两部分,1)首先将 SqlNode 语法树转换成关系表达式 RelNode 构成的逻辑树,2)然后使用优化器基于规则进行等价变换,例如我们比较熟悉的谓词下推、列裁剪等,经过优化器优化后得到最优的查询计划;
Execute:将逻辑查询计划翻译成物理执行计划,生成对应的可执行代码,提交运行。
Flink SQL 的处理也大体遵循上述处理流程。Calcite 自身的概念较为庞杂,尤其是其内部使用的 HepPlanner 和 VolcanoPlanner 优化器更是非常复杂,但好在 Calcite 的可扩展性很强,优化器的优化规则也可以很容易地进行扩展,因此如果只是了解 Flin ...
Flink-源码学习-FlinkSQL&Table-规则体系 Rules
一、概述https://mp.weixin.qq.com/s/RKYzkYCfZ68l3EfUXS_RDA










































