所谓的副本写入,是指向副本底层日志写入消息。在 ReplicaManager 类中,实现副本写入的方法叫 appendRecords。

1. 副本写入场景

  1. 生产者向 Leader 副本写入消息
  2. Follower 副本拉取消息后写入副本
  3. 消费者组写入组信息
  4. 事务管理器写入事务信息(包括事务标记、事务元数据等)

除了第二个场景是直接调用 Partition 对象的方法实现之外,其他 3 个都是调用 appendRecords 来完成的。该方法将给定一组分区的消息写入到对应的 Leader 副本中,并且根据 PRODUCE 请求中 acks 设置的不同,有选择地等待其他副本写入完成。然后,调用指定的回调逻辑。

2. appendRecords

2.1. 输入参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def appendRecords(
timeout: Long, // 请求处理超时时间
requiredAcks: Short, // 请求acks设置
internalTopicsAllowed: Boolean, // 是否允许写入内部主题
origin: AppendOrigin, // 写入方来源
entriesPerPartition: Map[TopicPartition, MemoryRecords], // 待写入消息
// 回调逻辑
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
recordConversionStatsCallback:
Map[TopicPartition, RecordConversionStats] => Unit = _ => ())
: Unit = {
......
}
  1. timeout

    请求处理超时时间。对于生产者来说,它就是 request.timeout.ms 参数值。

  2. requiredAcks

    是否需要等待其他副本写入。对于生产者而言,它就是 acks 参数的值。而在其他场景中,Kafka 默认使用 -1,表示等待其他副本全部写入成功再返回。

  3. internalTopicsAllowed

    是否允许向内部主题写入消息。对于普通的生产者而言,该字段是 False,即不允许写入内部主题。对于 Coordinator 组件,特别是消费者组 GroupCoordinator 组件来说,它的职责之一就是向内部位移主题写入消息,因此,此时,该字段值是 True。

  4. origin

    AppendOrigin 是一个接口,表示写入方来源。当前,它定义了 3 类写入方,分别是 Replication、Coordinator 和 Client。

    • Replication 表示写入请求是由 Follower 副本发出的,它要将从 Leader 副本获取到的消息写入到底层的消息日志中。

    • Coordinator 表示这些写入由 Coordinator 发起,它既可以是管理消费者组的 GroupCooridnator,也可以是管理事务的 TransactionCoordinator。

注: Client 表示本次写入由客户端发起。Follower 副本同步过程不调用 appendRecords 方法,因此,这里的 origin 值只可能是 Replication 或 Coordinator。

2.2. 方法体

isValidRequiredAcks(requiredAcks)

判断 requiredAcks 的取值是否在合理范围内,”是否是 -1、0、1 这 3 个数值中的一个”。如果不是合理取值,代码就进入到外层的 else 分支,构造名为 INVALID_REQUIRED_ACKS 的异常,并将其封装进回调函数中执行,然后返回结果。否则的话,代码进入到外层的 if 分支下。

1
2
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
origin, entriesPerPartition, requiredAcks)

进入到 if 分支后,代码调用 appendToLocalLog 方法,将要写入的消息集合保存到副本的本地日志上。

1
2
3
4
5
6
7
8
9
val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // 设置下一条待写入消息的位移值
// 构建PartitionResponse封装写入结果
new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime,
result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage))
}

然后构造 PartitionResponse 对象实例,来封装写入结果以及一些重要的元数据信息,比如本次写入有没有错误(errorMessage)、下一条待写入消息的位移值、本次写入消息集合首条消息的位移值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 需要等待其他副本完成写入
if (delayedProduceRequestRequired(
requiredAcks, entriesPerPartition, localProduceResults)) {
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
// 创建DelayedProduce延时请求对象
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
// 再一次尝试完成该延时请求
// 如果暂时无法完成,则将对象放入到相应的Purgatory中等待后续处理
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else { // 无需等待其他副本写入完成,可以立即发送Response
val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
// 调用回调逻辑然后返回即可
responseCallback(produceResponseStatus)
}

如果还需要等待其他副本同步完成消息写入,那么就不能立即返回,代码要创建 DelayedProduce 延时请求对象,并把该对象交由 Purgatory 来管理。DelayedProduce 是生产者端的延时发送请求,对应的 Purgatory 就是 ReplicaManager 类构造函数中的 delayedProducePurgatory。所谓的 Purgatory 管理,主要是调用 tryCompleteElseWatch 方法尝试完成延时发送请求。如果暂时无法完成,就将对象放入到相应的 Purgatory 中,等待后续处理。如果无需等待其他副本同步完成消息写入,那么,appendRecords 方法会构造响应的 Response,并调用回调逻辑函数,至此,方法结束。

3. appendToLocalLog

实现消息写入

1
2
3
4
// 获取分区对象 
val partition = getPartitionOrException(topicPartition, expectLeader = true)
// 向该分区对象写入消息集合
val info = partition.appendRecordsToLeader(records, origin, requiredAcks)

4. delayedProduceRequestRequired

delayedProduceRequestRequired 方法的源码。它用于判断消息集合被写入到日志之后,是否需要等待其他副本也写入成功。

5. 副本读取: fetchMessages

在 ReplicaManager 类中,负责读取副本数据的方法是 fetchMessages。不论是 Java 消费者 API,还是 Follower 副本,它们拉取消息的主要途径都是向 Broker 发送 FETCH 请求,Broker 端接收到该请求后,调用 fetchMessages 方法从底层的 Leader 副本取出消息。

和 appendRecords 方法类似,fetchMessages 方法也可能会延时处理 FETCH 请求,因为 Broker 端必须要累积足够多的数据之后,才会返回 Response 给请求发送方。

1
2
3
4
5
6
7
8
9
10
11
12
def fetchMessages(timeout: Long,
replicaId: Int,
fetchMinBytes: Int,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
isolationLevel: IsolationLevel,
clientMetadata: Option[ClientMetadata]): Unit = {
......
}