所谓的副本写入,是指向副本底层日志写入消息。在 ReplicaManager 类中,实现副本写入的方法叫 appendRecords。

1. 概述

副本(Replica)是分布式系统中常见的概念之一,指的是分布式系统对数据和提供的一种冗余方式。在常见的分布式系统
中,为了对外提供可用的服务,对数据和服务进行副本处理。

数据副本是指在不同的节点上持久化同一份数据,当某一点上存储的数据丢失时,可以从副本上读取该数据

服务副本,指多个节点提供同样的服务,每个节点都有能力接收来自外部的请求并进行相应的处理。
Kafka 从 0.8 版本开始为分区引入了多副本机制,通过增加副本数量来提升数据容灾能力。同时,Kafka 通过多副本机制实现故障自动转移,在 Kafka 集群中某个 broker 节点失效的情况下仍然保证服务可用。
副本是相对于分区而言的,即副本是特定分区的副本。

  1. 一个分区中包含一个或多个副本,其中一个为 leader 副本,其余为 follower 副本,各个副本位于不同的 broker 节点中。只有 leader 副本对外提供服务,follower 副本只负责数据同步。
  2. 分区中的所有副本统称为 AR,而 ISR 是指与 leoder 副本保持同步状态的副本集合,当然leader副本本身也是这个集合中的一员。
  3. LEO 标识每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的 LEO,ISR 中最小的 LEO 即为 HW,俗称高水位,消费者只能拉取到 HW 之前的消息。
  4. 从生产者发出的一条消息首先会被写入分区的leader副本,不过还需要等待 ISR 集合中的所有 follower 副本都同步完之后才能被认为已经提交,之后才会更新分区的 HW,进而消费者可以消费到这条消息。

2. 副本写入场景

  1. 生产者向 Leader 副本写入消息
  2. Follower 副本拉取消息后写入副本
  3. 消费者组写入组信息
  4. 事务管理器写入事务信息(包括事务标记、事务元数据等)

除了第二个场景是直接调用 Partition 对象的方法实现之外,其他 3 个都是调用 appendRecords 来完成的。该方法将给定一组分区的消息写入到对应的 Leader 副本中,并且根据 PRODUCE 请求中 acks 设置的不同,有选择地等待其他副本写入完成。然后,调用指定的回调逻辑。

3. appendRecords

3.1. 输入参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def appendRecords(
timeout: Long, // 请求处理超时时间
requiredAcks: Short, // 请求acks设置
internalTopicsAllowed: Boolean, // 是否允许写入内部主题
origin: AppendOrigin, // 写入方来源
entriesPerPartition: Map[TopicPartition, MemoryRecords], // 待写入消息
// 回调逻辑
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
recordConversionStatsCallback:
Map[TopicPartition, RecordConversionStats] => Unit = _ => ())
: Unit = {
......
}
  1. timeout

    请求处理超时时间。对于生产者来说,它就是 request.timeout.ms 参数值。

  2. requiredAcks

    是否需要等待其他副本写入。对于生产者而言,它就是 acks 参数的值。而在其他场景中,Kafka 默认使用 -1,表示等待其他副本全部写入成功再返回。

  3. internalTopicsAllowed

    是否允许向内部主题写入消息。对于普通的生产者而言,该字段是 False,即不允许写入内部主题。对于 Coordinator 组件,特别是消费者组 GroupCoordinator 组件来说,它的职责之一就是向内部位移主题写入消息,因此,此时,该字段值是 True。

  4. origin

    AppendOrigin 是一个接口,表示写入方来源。当前,它定义了 3 类写入方,分别是 Replication、Coordinator 和 Client。

    • Replication 表示写入请求是由 Follower 副本发出的,它要将从 Leader 副本获取到的消息写入到底层的消息日志中。

    • Coordinator 表示这些写入由 Coordinator 发起,它既可以是管理消费者组的 GroupCooridnator,也可以是管理事务的 TransactionCoordinator。

注: Client 表示本次写入由客户端发起。Follower 副本同步过程不调用 appendRecords 方法,因此,这里的 origin 值只可能是 Replication 或 Coordinator。

3.2. 方法体

isValidRequiredAcks(requiredAcks)

判断 requiredAcks 的取值是否在合理范围内,”是否是 -1、0、1 这 3 个数值中的一个”。如果不是合理取值,代码就进入到外层的 else 分支,构造名为 INVALID_REQUIRED_ACKS 的异常,并将其封装进回调函数中执行,然后返回结果。否则的话,代码进入到外层的 if 分支下。

1
2
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
origin, entriesPerPartition, requiredAcks)

进入到 if 分支后,代码调用 appendToLocalLog 方法,将要写入的消息集合保存到副本的本地日志上。

1
2
3
4
5
6
7
8
9
val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // 设置下一条待写入消息的位移值
// 构建PartitionResponse封装写入结果
new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime,
result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage))
}

然后构造 PartitionResponse 对象实例,来封装写入结果以及一些重要的元数据信息,比如本次写入有没有错误(errorMessage)、下一条待写入消息的位移值、本次写入消息集合首条消息的位移值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 需要等待其他副本完成写入
if (delayedProduceRequestRequired(
requiredAcks, entriesPerPartition, localProduceResults)) {
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
// 创建DelayedProduce延时请求对象
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
// 再一次尝试完成该延时请求
// 如果暂时无法完成,则将对象放入到相应的Purgatory中等待后续处理
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else { // 无需等待其他副本写入完成,可以立即发送Response
val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
// 调用回调逻辑然后返回即可
responseCallback(produceResponseStatus)
}

如果还需要等待其他副本同步完成消息写入,那么就不能立即返回,代码要创建 DelayedProduce 延时请求对象,并把该对象交由 Purgatory 来管理。DelayedProduce 是生产者端的延时发送请求,对应的 Purgatory 就是 ReplicaManager 类构造函数中的 delayedProducePurgatory。所谓的 Purgatory 管理,主要是调用 tryCompleteElseWatch 方法尝试完成延时发送请求。如果暂时无法完成,就将对象放入到相应的 Purgatory 中,等待后续处理。如果无需等待其他副本同步完成消息写入,那么,appendRecords 方法会构造响应的 Response,并调用回调逻辑函数,至此,方法结束。

4. appendToLocalLog

实现消息写入

1
2
3
4
// 获取分区对象 
val partition = getPartitionOrException(topicPartition, expectLeader = true)
// 向该分区对象写入消息集合
val info = partition.appendRecordsToLeader(records, origin, requiredAcks)

5. delayedProduceRequestRequired

delayedProduceRequestRequired 方法的源码。它用于判断消息集合被写入到日志之后,是否需要等待其他副本也写入成功。

6. 副本读取: fetchMessages

在 ReplicaManager 类中,负责读取副本数据的方法是 fetchMessages。不论是 Java 消费者 API,还是 Follower 副本,它们拉取消息的主要途径都是向 Broker 发送 FETCH 请求,Broker 端接收到该请求后,调用 fetchMessages 方法从底层的 Leader 副本取出消息。

和 appendRecords 方法类似,fetchMessages 方法也可能会延时处理 FETCH 请求,因为 Broker 端必须要累积足够多的数据之后,才会返回 Response 给请求发送方。

1
2
3
4
5
6
7
8
9
10
11
12
def fetchMessages(timeout: Long,
replicaId: Int,
fetchMinBytes: Int,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
isolationLevel: IsolationLevel,
clientMetadata: Option[ClientMetadata]): Unit = {
......
}