Spark-源码学习-集群启动-standalone-worker
一、概述Worker 在 Local/Standalone 部署模式中对工作节点的资源和 Executor 进行管理。Worker 一方面向 Master 汇报自身所管理的资源信息,一方面接收 Master 的命令运行 Driver 或者为 Application 运行 Executor。
二、实现2.1. 属性
cores: 内核数
memory: 内存大小
masterRpcAddresses: Master 的 RpcEnv 地址(即 RpcAddress)的数组。
由于一个集群为了可靠性和容错,需要多个 Master 节点,因此用数组来存储它们的 RpcEnv 地址
workDirPath: Worker 工作目录
host/port: Worker 的 RpcEnv 的 host/端口
forwordMessageScheduler: 用于发送消息的调度执行器(ScheduledThreadPool-Executor)
forwordMessageScheduler 只能调度执行一个线程,执行的线程以 worker-forward-message-schedu ...
Spark-源码学习-SparkCore-存储服务-架构设计
一、概述Spark 的存储体系在整个 Spark 架构中与大地在生态圈中的作用非常类似。Spark 的存储体系贯穿了集群中的每个实例。从单个节点来看,Spark 的存储体系隶属于 SparkEnv。Spark 存储系统用于存储 3 个方面的数据,分别是 RDD 缓存、Shuffle 中间文件、广播变量。
RDD 缓存
RDD 缓存指的是将 RDD 以缓存的形式物化到内存或磁盘的过程。对于一些计算成本和访问频率都比较高的 RDD 来说,缓存有两个好处:
降低失败重试的计算开销
通过对缓存内容的访问,可以有效减少从头计算的次数,从整体上提升作业端到端的执行性能。
Shuffle 中间文件
Shuffle 中间文件指的是 Shuffle Map 阶段的输出结果,这些结果以文件的形式暂存于本地磁盘。在Shuffle Reduce 阶段通过网络拉取中间文件用于聚合计算,如求和、计数等。在集群范围内,Reducer 想要拉取属于自己的那部分中间数据,就必须要知道这些数据都存储在哪些节点,以及什么位置。而这些关键的元信息,也是由 Spark 存储系统保存并维护的。
广播变量
广播变量往 ...
Spark-源码学习-容错-选举机制
一、概述领导选举机制(Leader Election)可以保证集群虽然存在多个 Master,但是只有一个 Master 处于激活(Active)状态,其他 Master 处于支持(Standby)状态。当 Active 状态的 Master 出现故障时,会选举出一个 Standby 状态的 Master 作为新的 Active 状态的 Master。
在 Master 的切换过程中影响:
不能够提交新的应用程序给集群, 因为只有 Active Master 才能接收新的程序提交请求
已经运行的程序中也不能因为 Action 操作触发新的 Job 提交请求。
集群 Worker, Driver 和 Application 的信息都已经通过持久化引擎持久化,因此在 Master 切换的过程中, 所有已经在运行的程序皆正常运行。因为 Spark Application 在运行前就已经通过 Cluster Manager 获得了计算资源, 所以在运行时,Job 本身的调度和处理和 Master 是没有任何关系的。
二、LeaderElectionAgent特质 LeaderEl ...
Spark-源码学习-集群启动-standalone-clustermanager
一、概述Cluster Manager 在 Standalone 部署模式下为 Master。Master 的设计将直接决定整个集群的可扩展性、可用性和容错性。
二、master 实现2.1. 属性
address: RpcEnv 的地址(即 RpcAddress)。RpcAddress 只包含 host 和 port 两个属性,用来记录 Master URL 的 host 和 port
webUiPort: 参数要指定的 WebUI 的端口
securityMgr: 即 SecurityManager
conf: 即 SparkConf
forwardMessageThread: 包含一个线程的 ScheduledThreadPoolExecutor,启动的线程以 master-forward-message-thread 作为名称。forwardMessageThread主要用于运行 checkForWorkerTimeOutTask 和 recoveryCompletionTask。
checkForWorkerTimeOutTask: 检查 Worker 超时的任 ...
Spark-源码学习-集群启动-standalone-选举机制
一、概述领导选举机制(Leader Election)可以保证集群虽然存在多个 Master,但是只有一个 Master 处于激活(Active)状态,其他的 Master 处于支持(Standby)状态。当 Active 状态的 Master 出现故障时,会选举出一个 Standby 状态的 Master 作为新的 Active 状态的 Master。由于整个集群的 Worker, Driver 和 Application 的信息都已经通过持久化引擎持久化,因此切换 Master 时只会影响新任务的提交,对于正在运行中的任务没有任何影响。
二、LeaderElectionAgent特质 LeaderElectionAgent 定义了对当前的 Master 进行跟踪和领导选举代理的通用接口。
1234trait LeaderElectionAgent { val masterInstance: LeaderElectable def stop(): Unit = {} // to avoid noops in implementations. ...
Spark-源码学习-集群启动-standalone 设计
启动 Master
Standalone 部署模式中首先会启动一到多个 Master.main 函数的进程。每个 Master.main 进程中都会创建 Master 实例并注册到 Master 自己的 RpcEnv 中
引用本站文章
Spark-源码学习-集群启动-standalone-Master 启动
Joker
Master 选举
Standalone 模式下可以存在多个Master,这些 Master 之间通过持久化引擎 (PersistenceEngine) 和领导选举机制解决生成环境下 Master 的单点问题,使 Master 在异常退出后,能够重新选举激活状态的 Master,并从故障中恢复集群。
在启动 Master 实例的过程中创建 ZooKeeperLeaderElectionAgent 实例,每个 Master 实例对 ...
Spark-源码学习-集群启动系列
一、Spark 集群架构
1.1. ClusterManagerCluster Manager 负责集群的资源管理和调度。目前 Spark 支持4种 Cluster Manager
Standalone
Spark 自带的集群管理模式, Master
Apaehe Mesos
Mesos 是一种通用的集群资源管理服务,用于管理 MapReduce 应用或者其他类型的应用。
Hadoop YARN
YARN 是由 Hadoop 2.0 引入的集群资源管理服务。
Kubernetes
Kubernetes 是一种管理 containerized 的应用的服务。Spark 2.3以后引入了对Kubernetes的支持
1.2. WorkerWorker Node 是集群中可以执行计算任务的节点。在 YARN 部署模式下实际由 NodeManager 替代。Worker 节点主要负责以下工作
将自己的内存、CPU 等资源通过注册机制告知 Cluster Manager
创建 Executor,将资源和任务进一步分配给 Executor
二、部署模式Spark 应用程序在 ...
Spark-源码学习-SparkSQL-架构设计-SQL 引擎-Planner 模块-Rule-Preparations-CollapseCodegenStage
一、概述表达式一般指的是不需要触发执行引擎而能够直接进行计算的单元,例如加减乘除四则运算、逻辑操作、转换操作、过滤操作等。
二、实现Catalyst 实现了完善的表达式体系,与各种算子(QueryPlan)占据同样的地位。算子执行前通常都会进行”绑定”操作,将表达式与输入的属性对应起来,同时算子也能够调用各种表达式处理相应的逻辑。
在 Expression 类中,主要定义了5个方面的操作,包括基本属性、核心操作、输入输出、字符串表示和等价性判断
核心操作 eval 函数实现了表达式对应的处理逻辑,也是其他模块调用该表达式的主要接口,而 genCode 和 doGenCode 用于生成表达式对应的 Java 代码。字符串表示用于查看该 Expression 的具体内容,如表达式名和输入参数等。
Spark-源码学习-SparkSQL-Join 体系-BroadcastHashJoinExec
一、概述BroadcastJoinExec 主要思想是对小表进行广播操作,避免大量 Shuffle 的产生。这也是一种常见的思路,在 Spark SQL 中,对两个表做 Join 操作最直接的方式是先根据 key 分区,然后在每个分区中把 key 值相同的记录提取出来进行连接操作。这种方式不可避免地涉及数据的 Shuffle,而 Shufle 是比较耗时的操作。因此,当一个大表和一个小表进行 Join 操作时,为了避免数据的 Shufle,可以将小表的全部数据分发到每个节点上,供大表直接使用。
二、实现三、执行机制BroadcastJoinExec 主要思想是对小表进行广播操作,避免大量 Shuffle 的产生,在 Spark SQL 中,对两个表做 Join 操作最直接的方式是先根据 key 分区,然后在每个分区中把 key 值相同的记录提取出来进行连接操作。这种方式不可避免地涉及数据的 Shuffle,而 Shufle 是比较耗时的操作。因此,当一个大表和一个小表进行 Join 操作时,为了避免数据的 Shufle,可以将小表的全部数据分发到每个节点上,供大表直接使用。
Spark-源码学习-SparkSQL-Join 体系-ShuffledHashJoinExec
一、概述Spark SQL 优先级次之的是 ShuffledHashJoinExec,ShuffledHashJoinExec 执行需要满足多种条件。 ShuffledHashJoinExec 的构造同样分为 BuildLeft 和 BuildRight 两种情况,以 BuildRight 为例:
配置中优先开启 SortMergeJoin 的参数 spark.sql.join.preferSortMerge 设置为 false
右表需要满足能够”构建” ($canBuildRight$) 和能够建立 HashMap ($canBuildLocalHashMap$)
右表的数据量要比左表的数据量小很多(3 倍以上)
此外,还有一种生成 ShuffledHashJoinExec 的情况是参与连接的 key 不具有排序的特性。
二、执行