什么是 Partition
kafka 将 一个Topic 中的消息分成”多段”,分别存储在不同的 Broker 里,这每一段消息被 kafka 称为 Partition
有人可能会问 kafka 为什么要将 Topic 中的消息分段储存在不同的Broker 里?因为一个 Topic 中的消息可能非常多,一台机器可能存不下,因此需要拆分成多段存储在不同的机器里,而且这样做还能提高读写性能
什么是 Replication
Partition 只存一份肯定不安全,为了数据的存储安全,以及实现高可用,可以让 kafka 将 Partition 存储多份存到不同的 Broker 中,这每一份 Partition 被 kafka 称为 Replication
Replication 逻辑上是一个 Partition 的副本,本质上跟 Partition 一样,还是一段 Topic 消息。
我们说一个文件有三个副本,那么一共应该有4个文件,但是在 kafka 中,我们说一个 Partition 下面有3个 Replication 时,那么总共只有3分数据。这点和我们正常理解的有些不一样,因为在 kafka 中 Replication 才是真正对外提供服务的实例!
因此,当一个 Partition 中只有一个 Replication 时,它俩可以理解成一个东西,当一个 Partition 中有多个 Replication 时,他两的关系更像是类和实例之间的关系,Partition 是分区这个类,而 Replication 是分区这个类的实例
出于节省存储空间的考虑,kafka 默认策略是只有一个Replication
Replication 的两种角色
虽然 Partition 有多个副本,但 kafka 规定只有一个能对外提供服务,这个副本被 kafka 称为 Leader Replication,而其他的副本被称为 Follower Replication
Replication 之间的数据同步
Producer 和 Consumer 只会与 Leader Replication 交互,而其它 Follower Replication 则从 Leader Replication 中同步数据。
Follower 与 Leader 的同步是有延时的,会受到下面两个参数的影响:
replica.lag.time.max.ms
:同步副本滞后与 leader 副本的时间zookeeper.session.timeout.ms
:borker 与 zookeeper 会话超时时间
in-sync Replica
Follower Replication 必须及时从 Leader 中同步消息,不能 “落后太多”。这里的 “落后太多” 指 Follower Replication 落后于 Leader 消息的条数超过阈值或者 Follower 超过一定时间未向 Leader 发送 fetch 请求。
Leader 会跟踪与其保持一定程度同步的 Replication 列表,该列表称为 in-sync Replica(ISR)。如果一个 Follower 宕机,或者落后太多,Leader 将把它从 ISR 中移除。
Leader 中的一条消息只有被 ISR 里的所有 Follower 都接受到才会被认为已提交, 只有已提交消息,才会被 consumer 消费到。但是,为了写入性能, Follower 在接收到数据后就立马向 Leader 返回 ACK,而非等到数据写入 Log 中。因此,对于已经 commit 的消息,Kafka 只能保证它被保存在 ISR 里的所有 Follower 的内存中,而不能保证它们被持久化到磁盘中
in-sync Replica 的伸缩
Kafka 中有个名为 isr-expiration 任务会周期性的检测每个分区是否需要缩减其 ISR 集合。这个周期为 replica.lag.time.max.ms
参数值的一半。当 Follower 超过 replica.lag.time.max.ms
毫秒没有发起 fetch 请求或者,消息落后Leader rerplica.lag.max.messages
条时,就会被移出 ISR 列表
有缩减就会有补充,随着同步的进行,当 out-sync Replica(OSR) 集合中的某个 Follower Replication 的 LEO 大于等于该分区的 High Watermark(LW) 时,会被加入到 in-sync Replica(ISR) 集合中
in-sync Replica 相关的配置
1 | ini复制代码# Follower 最大请求间隔 |
如果 Leader 发现 Follower 超过10秒没有向它发起 fetch 请求, 那么 Leader 会将其移出 ISR 列表
1 | ini复制代码# Follower 消息落后的最大条数,超出该条数 Follower 会被移出 ISR 列表 |
1 | ini复制代码# ISR 中至少要有多少个 Replication |
Out-Sync Relipca
与 in-sync Replica(ISR) 对应的还有 Out-Sync Relipcas(OSR),即非同步副本集、也可以称作落后的副本列表
Assigned Repllica
Assigned Relipcas(AR) 指的是分区中的所有副本,AR = ISR + OSR
Leader 的几种选举
Leader Replication 如果挂掉就需要从 Follower Replication 中选出一个新的 Leader
同步列表选举
kafka 会优先从 ISR 中选取,因为 ISR 中的 Follower Replication 和 Leader 是保持同步的,因此数据一致性最高。
注意,是数据一致性最高,而不是数据一致,因为首先 Follower 与 Leader 的同步是有延时的,其次数据的复制是异步完成的。所以数据可能会丢失,但是概率和数量都很小
始终保证拥有足够数量的同步副本是非常重要的。要将 follower 提升为 Leader,它必须存在于同步副本列表中。每个分区都有一个同步副本列表,该列表由 Leader 分区和 Controller 进行更新。
从 ISR 中选取 leader 的过程称为 clean leader election
非同步列表选举
由于 ISR 是动态调整的,所以会存在 ISR 列表为空的情况,这时候可以从 ISR 之外选取。
通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,极有可能会出现数据的丢失。
在非同步副本中选取 leader 的过程称之为 unclean leader election
因此是否开启 unclean leader election,可以通过 Broker 端参数 unclean.leader.election.enable
来控制,当然也要根据你的具体业务去权衡!
开启 unclean leader election 很可能会造成数据丢失,但好处是,它使得 Leader 一直存在,不至于对外停止服务,因此提升了高可用性。反之,禁用 unclean leader election 的好处在于维护了数据的一致性,避免了消息丢失,但代价是牺牲了高可用性。分布式系统的 CAP 理论说的就是这种情况。
Leader 、follower 角色互换
除了上面提到的两种选举会导致 Leader 角色发生移动之外,还有一种情况也会造成 Leader 移动。
当 Controller Broker 从 Zookeeper 上感知到有新的 Broker 已加入集群时,它将使用该 Broker ID 来检查该 Broker 上是否存在Replication,如果存在,则 Controller 通知新加入的 Broker 同步现有对应 leader 的消息到自己的 follower 分区。之后,为了保证负载均衡,Controller 会下线当前 leader 分区,然后将新加入的 Broker 上的 follower 分区选举为 新leader 分区。
这就是 Leader 、follower 的角色互换,一般发生在有新的 Broker 加入集群时,目的是为了实现 Partition 的负载均衡
换一次 Leader 是有代价的,原 Leader 分区请求都会打到新的 Leader 分区。如果你不想发生 Leader 、follower 角色互换,可以在配置文件中将这个参数设置成 false
Leader Replication 负载均衡
由于 KafKa 客户端读写 topic 的分区时,只会跟 Leader Replication 交互,如果一个 Broker 中存在过多的 Leader Replication,就会导致该 Broker 压力过大。
Partition 配置
配置 topic 默认的副本数
1 | ini复制代码default.replication.factor=3 |
如果创建 topic 时,没有显示指定副本数,则使用默认的副本数
创建 topic 时,可以通过 --replication-factor
显示指定副本数
1 | arduino复制代码bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1 |
本文转载自: 掘金