Spark-源码学习-SparkSession-ShareState-CacheManager
一、概述在Spark中,SQL缓存是重复使用某些计算的常用技术。它有可能加快使用相同数据的其他查询的速度,但如果我们想获得良好的性能,有一些注意事项需要牢记。
CacheManager 是 Spark SQL 中内存缓存的管理者,在 Spark SQL 中提供对缓存查询结果的支持,并在执行后续查询时自动使用这些缓存结果。数据使用 InMemoryRelation 中存储的字节缓冲区进行缓存。CacheManager 通过 SharedState 在 SparkSessions 之间共享。
二、设计CacheManager 负责跟踪查询计划中已经缓存的计算。当 $cache()$ 被调用时,CacheManager 会在引擎盖下被直接调用,它会调出缓存函数被调用的DataFrame的分析逻辑计划,并将该计划存储在名为 cachedData 的索引序列中。
缓存管理器阶段是逻辑规划的一部分,它发生在分析器之后,优化器之前:
https://towardsdatascience.com/best-practices-for-caching-in-spark-sql-b22fb0f02d ...
Flink-源码学习-Job 提交-Slot 申请
正在总结中,等我😭~~~
Flink-源码学习-Job 提交-Slot 释放
正在总结中,等我😭~~~
Flink-源码学习-Job 提交-提交脚本解析
Flink Job 提交脚本解析当编写好 Flink 的应用程序,正常的提交方式为:打成jar包,通过 flink 命令来进行提交。
flink 命令脚本的底层,是通过 java 命令启动:CliFrontend 类 来启动 JVM 进程执行任务的构造和提 交。
1flink run xxx.jar class arg1 arg2
CliFrontend 提交分析当用户把 Flink 应用程序打成 jar 使用 flink run … 的 shell 命令提交的时候,底层是通过 CliFrontend 来处理。底层的逻辑,**就是通过反射来调用用户程序的 main() 方法执行。**
在刚组建内部,主要有以下几件事要做:
根据 flink 后面的执行命令来确定执行方法(run ==> run(params))
解析 main 参数,构建 PackagedProgram,然后执行 PackagedProgram
通过反射获取应用程序的 main 方法的实例,通过反射调用执行起来
总的来说,就是准备执行 Program 所需要的配置,jar包,运行主类等的必要的信 ...
Flink-源码学习-Job 提交-架构设计
当编写好 Flink 的应用程序,正常的提交方式为:打成jar包,通过 flink 命令来进行提交。flink 命令脚本的底层,是通过 java 命令启动:CliFrontend 类 来启动 JVM 进程执行任务的构造和提交。
一、概述当用户构建好 Flink 应用程序后,会在客户端运行 StreamExecutionEnvironment.execute() 方 法生 成 StreamGraph 对象,并将StreamGraph 通过 PipelineExecutor 提交到远程集群中执行。在这个过程中,首先会通过 DataStream API 生成 Transformation转换集合;然后基于 Transformation 转换集合构建 Pipeline 的实现类,也就是 StreamGraph 数据结构;最后再通过PipelineExecutor 将 StreamGraph 转换为 JobGraph 数据结构,并将 JobGraph 提交到集群中运行。
二、提交脚本解析三、源码当编写好 Flink 的应用程序,正常的提交方式为:打成jar包,通过 flink 命令来进行提交 ...
Spark-源码学习-SparkSession-ShareState
一、概述ShareState 负责保存多个有效 session 之间的共享状态。SharedState 中包含变量: warehousePath、cacheManager、statusStore、externalCatalog、globalTempViewManager、jarClassLoader。这些变量对于所有的 SparkSession 都是公用的。
二、初始化123lazy val sharedState: SharedState = { existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions))}
综上可以看出,SparkSession 重用 sharedState,但是会新 $clone$ 或者创建新的 sessionState,但是如果在创建多个 SparkSession 时,传入的 existingSharedState 都为空,则多个 SparkSession 也会创建多个 sharedState。
2.1. 解析和处理 S ...
Flink-源码学习-FlinkCore-公共基础服务-HA 服务
一、概述对于 7x24 小时不间断运行的流式系统来讲,保证集群核心组件的高可用是非常重要的。Flink 集群在运行时中主要通过 HighAvailabilityServices 公共基础服务来创建和初始化核心组件的高可用服务
二、 HighAvailabilityServices当启动集群运行时的时候,会初始化 HighAvailabilityServices 高可用基础服务实现。ClusterEntrypoint 会在 initializeServices() 方法中调用 createHaServices() 方法创建 HighAvailabilityServices。
1haServices = createHaServices(configuration, ioExecutor);
HighAvailabilityServices 主要分为 AbstractNonHaServices 和 ZooKeeperHaServices 两种类型,
AbstractNonHaServices 实际上不提供高可用能力
ZooKeeperHaServices 通过 ZooKeeper ...
Flink-源码学习-HA 服务
一、概述对于 7x24 小时不间断运行的流式系统来讲,保证集群核心组件的高可用是非常重要的。Flink 集群在运行时中主要通过 HighAvailabilityServices 公共基础服务来创建和初始化核心组件的高可用服务
二、 HighAvailabilityServices当启动集群运行时的时候,会初始化 HighAvailabilityServices 高可用基础服务实现。ClusterEntrypoint 会在 initializeServices() 方法中调用 createHaServices() 方法创建 HighAvailabilityServices。
1haServices = createHaServices(configuration, ioExecutor);
HighAvailabilityServices 主要分为 AbstractNonHaServices 和 ZooKeeperHaServices 两种类型,
AbstractNonHaServices 实际上不提供高可用能力
ZooKeeperHaServices 通过 ZooKeeper ...
Spark-源码学习-SparkSession 设计
一、概述SparkSession 是 Spark 程序的新入口。在 Spark2.0 之前,使用 Spark 需要先创建 SparkConf 和 SparkContext,Spark 2.0 中引入了 SparkSession,为用户提供了一个统一的切入点来使用 Spark 的各项功能。 SparkConf、SparkContext 和 SQLContext 被封装在 SparkSession 中,使用 Spark 的各项功能只需创建一个 SparkSession。
在 Spark 的早期版本,SparkContext 是进入 Spark 的切入点。RDD 的创建和操作得使用 SparkContext 提供的 API; 对于 RDD 之外的其他东西,需要使用其他的 Context。比如流处理使用 StreamingContext; 对于 SQL 得使用 SQLContext; 而对于 Hive 得使用 HiveContext。
二、设计SparkSession 的设计遵循了工厂设计模式,Spark 程序中首先使用 $SparkSession.builder$ 对象进行一系列 ...
Spark-源码学习-SparkSession-SessionState
一、概述SessionState 是基于一个特定 SparkSession 维护所有单个 session 作用域的所有状态,SessionState 维护了 SparkSQL 中大部分的核心类,如 SqlParser、Analyzer、Optimizer 等,SparkSession 将 SQL Query Planning 功能托管给 SessionState。
二、初始化SparkSession 构造函数中传入 parentSessionState,对其进行 $clone$ ,若传入不为空,则使用 $clone$ 对象。否则进行创建。
123456789lazy val sessionState: SessionState = { parentSessionState.map(_.clone(this)).getOrElse { val state = SparkSession.instantiateSessionState( SparkSession.sessionStateClassName(sharedState.conf), ...