副本机制是 Kafka 实现数据高可靠性的基础。同一个分区下的多个副本分散在不同的 Broker 机器上,它们保存相同的消息数据以实现高可靠性。对于分布式系统而言,一个必须要解决的问题,就是如何确保所有副本上的数据是一致的。

AbstractFetcherThread 类是副本获取线程 ReplicaFetcherThread 的抽象基类。它里面定义和实现了很多重要的字段和方法

1.类定义及字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
abstract class AbstractFetcherThread(
name: String, // 线程名称
clientId: String, // Client Id,用于日志输出
val sourceBroker: BrokerEndPoint, // 数据源 Broker 地址, 是指此线程要从哪个 Broker 上读取数据。
failedPartitions: FailedPartitions, // 处理过程中出现失败的分区
fetchBackOffMs: Int = 0, // 获取操作重试间隔
isInterruptible: Boolean = true, // 线程是否允许被中断
val brokerTopicStats: BrokerTopicStats) // Broker端主题监控指标
extends ShutdownableThread(name, isInterruptible) {
// 定义 FetchData 类型表示获取的消息数据
type FetchData = FetchResponse.PartitionData[Records]
// 定义 EpochData 类型表示 Leader Epoch 数据
type EpochData = OffsetsForLeaderEpochRequest.PartitionData
private val partitionStates = new PartitionStates[PartitionFetchState]
......
}
截屏2021-05-10 上午10.31.46

FetchData 定义里的 PartitionData 类型,是客户端 clients 工程中 FetchResponse 类定义的嵌套类。FetchResponse 类封装的是 FETCH 请求的 Response 对象,而里面的 PartitionData 类是一个 POJO 类,保存的是 Response 中单个分区数据拉取的各项数据,包括从该分区的 Leader 副本拉取回来的消息、该分区的高水位值和日志起始位移值等。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static final class PartitionData<T extends BaseRecords> {
public final Errors error; // 错误码
public final long highWatermark; // 高水位值
public final long lastStableOffset; // 最新LSO值
public final long logStartOffset; // 最新Log Start Offset值
// 期望的 Read Replica, 用于指定可对外提供读服务的 Follower 副本
// KAFKA 2.4 之后支持部分 Follower 副本可以对外提供读服务
public final Optional<Integer> preferredReadReplica;
// 该分区对应的已终止事务列表
public final List<AbortedTransaction> abortedTransactions;
// 消息集合,最重要的字段!
public final T records;
}

截屏2021-05-10 上午10.47.49

2.分区读取状态类

AbstractFetcherThread 类构造函数中,封装了一个名为 PartitionStates[PartitionFetchState]类型的字段,表征 分区读取状态的,保存的是分区的已读取位移值 和 **对应的副本状态**。

分区读取状态有 3 个,

  1. 可获取,表明副本获取线程当前能够读取数据。
  2. 截断中,表明分区副本正在执行截断操作(比如该副本刚刚成为 Follower 副本)。
  3. 被推迟,表明副本获取线程获取数据时出现错误,需要等待一段时间后重试。

这里的状态有两个,一个是分区读取状态,一个是副本读取状态。副本读取状态由 ReplicaState 接口表示,

如下所示:

1
2
3
4
5
sealed trait ReplicaState
// 截断中
case object Truncating extends ReplicaState
// 获取中
case object Fetching extends ReplicaState

可见,副本读取状态有截断中和获取中两个:当副本执行截断操作时,副本状态被设置成 Truncating;当副本被读取时,副本状态被设置成 Fetching。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
case class PartitionFetchState(fetchOffset: Long,
lag: Option[Long],
currentLeaderEpoch: Int,
delay: Option[DelayedItem],
state: ReplicaState) {
// 分区可获取的条件是副本处于Fetching且未被推迟执行
def isReadyForFetch: Boolean = state == Fetching && !isDelayed
// 副本处于ISR的条件:没有lag
def isReplicaInSync: Boolean = lag.isDefined && lag.get <= 0
// 分区处于截断中状态的条件:副本处于Truncating状态且未被推迟执行
def isTruncating: Boolean = state == Truncating && !isDelayed
// 分区被推迟获取数据的条件:存在未过期的延迟任务
def isDelayed: Boolean =
delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
......
}

3. processPartitionData

用于处理读取回来的消息集合。它是一个抽象方法,因此需要子类实现它的逻辑。具体到 Follower 副本而言, 是由 ReplicaFetcherThread 类实现的

1
2
3
4
5
protected def processPartitionData(
topicPartition: TopicPartition, // 读取哪个分区的数据
fetchOffset: Long, // 读取到的最新位移值
partitionData: FetchData // 读取到的分区消息数据
): Option[LogAppendInfo] // 写入已读取消息数据前的元数据

4. truncate

1
2
3
4
protected def truncate(
topicPartition: TopicPartition, // 要对哪个分区下副本执行截断操作
truncationState: OffsetTruncationState // Offset + 截断状态
): Unit

5. buildFetch

buildFetch 的本质就是,为指定分区构建对应的 FetchRequest.Builder 对象,而该对象是构建 FetchRequest 的核心组件。Kafka 中任何类型的消息读取,都是通过给指定 Broker 发送 FetchRequest 请求来完成的。

1
2
3
4
5
6
protected def buildFetch(
// 一组要读取的分区列表
// 分区是否可读取取决于PartitionFetchState中的状态
partitionMap: Map[TopicPartition, PartitionFetchState]):
// 封装FetchRequest.Builder对象
ResultWithPartitions[Option[ReplicaFetch]]

6. doWork

doWork 方法是 AbstractFetcherThread 类的核心方法,是线程的主逻辑运行方法

1
2
3
4
override def doWork(): Unit = {
maybeTruncate() // 执行副本截断操作
maybeFetch() // 执行消息获取操作
}

AbstractFetcherThread 线程只要一直处于运行状态,就是会不断地重复这两个操作

分区的 Leader 可能会发生变化。每当有新 Leader 产生时,Follower 副本就必须主动执行截断操作,将自己的本地日志裁剪成与 Leader 一样的消息序列,甚至,Leader 副本本身也需要执行截断操作,将 LEO 调整到分区高水位处。

maybeTruncate()

1
2
3
4
5
6
7
8
9
10
11
12
private def maybeTruncate(): Unit = {
// 将所有处于截断中状态的分区依据有无Leader Epoch值进行分组
val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions()
// 对于有Leader Epoch值的分区,将日志截断到Leader Epoch值对应的位移值处
if (partitionsWithEpochs.nonEmpty) {
truncateToEpochEndOffsets(partitionsWithEpochs)
}
// 对于没有Leader Epoch值的分区,将日志截断到高水位值处
if (partitionsWithoutEpochs.nonEmpty) {
truncateToHighWatermark(partitionsWithoutEpochs)
}
}

7. 总结

截屏2021-05-10 上午10.26.39