Flink-源码学习-存储服务-内存管理-数据结构-内存页
Flink通过 MemorySegmentFactory 来创建 MemorySegment,MemorySegment 是 Flink 内存分配的最小单位。对于跨 MemorySegment 的数据访问,Flink 抽象出一个访问视图,数据读取视图 datainputView 以及数据写入视图 dataoutputview。
对于跨 MemorySegment 保存的数据,如果需要上层的使用者,需要考虑所有的细节,非常烦琐,所以 Flink 又抽象了一层内存页。内存页是 MemorySegment 之上的数据访问视图,上层使用者无须关心 MemorySegment 的细节,该层会自动处理跨 MemorySegment 的读取和写入。
Flink-源码学习-存储服务-内存管理-数据结构-内存段
Flink 将本来直接存储在堆内存上的数据对象,通过数据序列化处理,存储在预先分配的内存块 MemorySegment,MemorySegment 是 Flink 的最小内存分配单元。默认一个 MemorySegment 对应着一个 32KB 大小的内存块,这块内存既可以是堆内存,也可以是堆外内存,如果 MemorySegment 底层使用的是 JVM 堆内存,数据通常存储至 Java 字节数组 byte[] 中, 如果 MemorySegment 底层使用的是堆外内存,数据会以序列化的形式存储在一个或多个 MemorySegment 中。MemorySegment 将 JVM 堆内存和堆外内存进行集中管理,形成统一的内存访问视图。MemorySegment 提供了高效的内存读写方法。
Flink-源码学习-存储服务-架构设计-内存管理
一、概述1.1. JVM 内存管理基于 Java 语言构建的应用,借助 JVM 提供的 GC 能力能够实现内存的自动管理,但会遇到一些基于 JVM 的内存管理问题。尤其对于大数据处理场景而言,需要处理非常庞大的数据,JVM 内存管理的问题就更加突出了,主要体现在以下几点:
Java 对象存储密度相对较低
对于常用的数据类型,例如Boolean类型数据占16字节内存空间,其中对象头占字节,Boolean 属性仅占 1 字节,其余7字节做对齐填充。而实际上仅1字节就能够代表Boolean值,这种情况造成了比较严重的内存空间浪费
Full GC 影响系统性能使用 JVM 的垃圾回收机制对內存进行回收,在大数据量的情况下 GC 的性能会比较差,尤其对于大数据处理,有些数据对象处理完希望立即释放內存空间,但如果借助 JVM GC 自动回收,通常情况下会有秒级甚至分钟级别的延迟,这对系统的性能造成了非常大的影响
OutOfMemoryError 影响系统稳定性
系统出现对象大小分配超过 JVM 内存限制时,就会触发 OutofMemoryError。导致 JVM 宕机,影响整个数据处理进 ...
Spark-源码学习-SparkSQL-一条连接 SQL 语句的执行过程~
一、概述在典型的 Spark SQL 应用场景中,数据的读取、数据表的创建和分析都是必不可少的过程。通常来讲,SparkSQL 查询所面对的数据模型以关系表为主。
如图所示的案例显示了使用 SparkSQL 进行数据分析的一般步骤。
二、join 体系先了解下 Spark SQL 的 join 体系设计~
引用本站文章
Spark-源码学习-SparkSQL-join 体系-架构设计
Joker
三、流程可以先看看这个~
引用本站文章
Spark-源码学习-SparkSQL-一条 SQL 语句的执行过程概述~
Joker
...
Spark-源码学习-SparkCore-调度机制-资源调度-Standalone-动态管理 Executor
一、概述ExecutorAllocationManager 用于对已分配的 Executor 进行管理,默认情况下不会创建 ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled 为 true 来创建。
ExecutorAllocationManager 内部会定时根据工作负载计算所需的 Executor 数量,如果对 Executor 需求数量大于之前向集群管理器申请的Executor 数量,那么向集群管理器申请添加 Executor;如果对 Executor 需求数量小于之前向集群管理器申请的 Executor 数量,那么向集群管理器申请取消部分 Executor。此外,ExecutorAllocationManager 内部还会定时向集群管理器申请移除过期的 Executor。
二、实现ExecutorAllocationManager 可以设置动态分配最小 Executor 数量、动态分配最大 Executor 数量、每个 Executor 可以运行的 Task 数量等配置信息,并对配置信息进行校验。
...
Spark-源码学习-SparkCore-调度机制-任务调度-Task 调度-任务执行结果处理 TaskResultGetter
一、概述TaskResultGetter 用于对序列化的 Task 执行结果进行反序列化,以得到 Task 执行结果。TaskResultGetter 也可远程获取 Task 执行结果。
https://weread.qq.com/web/reader/10c326305afa9610cf5f1eckc9f326d018c9f0f895fb5e4?
https://weread.qq.com/web/reader/0c832fb05e12e40c8ca6190kd2d32c50249d2ddea18fb39
二、实现三、处理 Task3.1. 处理成功 Task3.2. 处理失败 Task$TaskResultGetter.enqueueFailedTask()$ 方法用于处理执行失败的 Task 的执行结果。
$enqueueFailedTask()$ 方法向 $getTaskResultExecutor$ 提交 Task 执行结果的任务:
对执行结果反序列化,得到类型为 TaskFailedReason 的失败原因。
调用 $TaskSchedulerImpl.handle ...
Spark-源码学习-SparkCore-调度机制-资源调度-Standalone-分配 Executor
一、概述https://weread.qq.com/web/reader/10c326305afa9610cf5f1ec?
Spark-源码学习-SparkCore-调度机制-资源调度-Standalone-启动 Driver
一、概述Driver 端是 Spark 程序入口, 是运行 main 函数的一端,并且负责创建 SparkContext, 初始化 SparkContext 是为了准备 Spark 程序的运行环境,Spark 中由 SparkContext 负责与集群进行通信、资源申请,以及任务的分配与监控等。
二、实现Master 接受到 RequestSubmitDriver 请求之后创建 DriverInfo 对象并将该对象放到一个 HashSet 对象中,然后开始调度 Driver 的启动
2.1. 构建 DriverInfo构建 DriverInfo, 保存后续 Driver 启动所需的信息
12345private def createDriver(desc: DriverDescription): DriverInfo = { val now = System.currentTimeMillis() val date = new Date(now) new DriverInfo(now, newDriverId(date), desc, date) ...
Spark-源码学习-SparkCore-调度机制-资源调度-Standalone
一、概述Master 的资源调度是 Spark 的一级资源调度,主要包括: 对 Driver 的资源调度以及对 Executor 的资源调度~
二、Driver 调度2.1. Driver 启动Master 在接受到 RequestSubmitDriver 后,将 Driver 信息封装成 DriverInfo,然后添加待调度列表 waitingDrivers 中,然后调用 $Master.schedule()$ 为处于待分配资源的 Application 分配资源。在每次有新的 Application 加入或者新的资源加入时都会调用 $schedule()$ 进行调度。
注意: Master 在接受到 RequestSubmitDriver 后,将 Driver 信息封装成 DriverInfo,然后添加待调度列表 waitingDrivers 中,但此时 waitingApps 为空,所以并不会启动 Executor,当 Master 在接收到 RegisterApplication 消息后完成 $registerApplication(app)$,此时才会启动 Executo ...
Spark-源码学习-SparkCore-调度机制-任务调度-Stage 调度
一、概述DAGScheduler 实现了面向 DAG 的高层次调度,通过计算将 DAG 中的一系列 RDD 划分到不同的 Stage,然后构建 Stage 之间的父子关系,最后将每个 Stage 按照 Partition 切分为多个 Task,并以 Task 集合(TaskSet)的形式提交给底层的 TaskScheduler。
二、实现2.1. DAGSchedulerEvent所有的组件都通过向 DAGScheduler 投递 DAGSchedulerEvent 来使用 DAGScheduler。
DAGScheduler 内部的 DAGSchedulerEventProcessLoop 根据不同的 DAGSchedulerEvent 事件(JobSubmitted、MapStageSubmitted…),并调用 DAGScheduler 的不同方法。
12345678private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, r ...