JYM来一篇消息队列-Kafka的干货吧!!! 一:先来了解

image.png

一:先来了解下概念

Kafka 是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由 LinkedIn 公司开发,使用 Scala 语言编写,目前是 Apache 的开源项目,也是目前较为主流的消息列队。作用吗和其他消息队列相识,主要用于 【异步】,【解耦】,【削峰】。

二:Kafka组成

  • Producer: 消息生产者,向 Kafka Broker 发消息的客户端。
  • Consumer: 消息消费者,从 Kafka Broker 取消息的客户端。
  • Consumer Group: 消费者组(CG),消费者组内每个消费者负责消费不同分区的数据,提高消费能力。一个分区只能由组内一个消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • Broker: 一台 Kafka 机器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
  • Topic: 可以理解为一个队列,topic 将消息分类,生产者和消费者面向的是同一个 topic。
  • Partition: 为了实现扩展性,提高并发能力,一个非常大的 topic 可以分布到多个 broker (即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个 有序的队列。
  • Replica: 副本,为实现备份的功能,保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
  • Leader: 每个分区多个副本的“主”副本,生产者发送数据的对象,以及消费者消费数据的对象,都是 leader。
  • Follower: 每个分区多个副本的“从”副本,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 还会成为新的 leader。
  • offset: 消费者消费的位置信息,监控数据消费到什么位置,当消费者挂掉再重新恢复的时候,可以从消费位置继续消费。
  • Zookeeper: Kafka 集群能够正常工作,需要依赖于 zookeeper,zookeeper 帮助 Kafka 存储和管理集群信息

kafka集群1.png

三:Kafka 数据存储设计

3.1. partition 的数据文件(offset,MessageSize,data)

partition 中的每条 Message 包含了以下三个属性:offset,MessageSize,data,其中 offset 表 示 Message 在这个 partition 中的偏移量,offset 不是该 Message 在 partition 数据文件中的实 13/04/2018 Page 176 of 283 际存储位置,而是逻辑上一个值,它唯一确定了 partition 中的一条 Message,可以认为 offset 是 partition 中 Message 的 id;MessageSize 表示消息内容 data 的大小;data 为 Message 的具 体内容。

3.2 数据文件分段 segment(顺序读写、分段命令、二分查找)

partition 物理上由多个 segment 文件组成,每个 segment 大小相等,顺序读写。每个 segment 数据文件以该段中最小的 offset 命名,文件扩展名为.log。这样在查找指定 offset 的 Message 的 时候,用二分查找就可以定位到该 Message 在哪个 segment 数据文件中。

3.3 数据文件索引(分段索引、稀疏存储)

Kafka 为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩 展名为.index。index 文件中并没有为数据文件中的每条 Message 建立索引,而是采用了稀疏存 储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以 将索引文件保留在内存中。

image.png

四:生产者设计

4.1 负载均衡(partition 会均衡分布到不同 broker 上)

由于消息 topic 由多个 partition 组成,且 partition 会均衡分布到不同 broker 上,因此,为了有 效利用 broker 集群的性能,提高消息的吞吐量,producer 可以通过随机或者 hash 等方式,将消 息平均发送到多个 partition 上,以实现负载均衡。

image.png

4.2 批量发送

是提高消息吞吐量重要的方式,Producer 端可以在内存中合并多条消息后,以一次请求的方式发 送了批量的消息给 broker,从而大大减少 broker 存储消息的 IO 操作次数。但也一定程度上影响 了消息的实时性,相当于以时延代价,换取更好的吞吐量。

4.3 压缩(GZIP 或 Snappy)

Producer 端可以通过 GZIP 或 Snappy 格式对消息集合进行压缩。Producer 端进行压缩之后,在 Consumer 端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大 数据处理上,瓶颈往往体现在网络上而不是 CPU(压缩和解压会耗掉部分 CPU 资源)。

五:消费者设计

image.png

5.1 Consumer Group

同一 Consumer Group 中的多个 Consumer 实例,不同时消费同一个 partition,等效于队列模 式。partition 内消息是有序的,Consumer 通过 pull 方式消费消息。Kafka 不删除已消费的消息 对于 partition,顺序读写磁盘数据,以时间复杂度 O(1)方式提供消息持久化能力。

六:数据可靠性

6.1 ack机制

为保证producer 发送的数据不丢失,broker 接收到数据后都需要对producer发送ack(确认接收) ,如果producer 未收到ack则会重新发送该条消息。producer 的 ack 策略又分为三种:

  • ack=0 producer不等待broker同步完成的确认,继续发送下一条(批)信息
  • ack=1 producer要等待leader成功收到数据并得到确认,才发送下一条message。
  • ack=-1 producer得到follwer确认(全副本同步完成),才发送下一条数据

6.2 ISR (同步副本表)

采用全副本同步完成再ack会有一个问题:

当leader 接收完数据,所有的follower开始同步数据,但一旦有一个follower不能与leader进行同步,那leader会一直等下去,这样会非常的浪费时间。

为此kafka引入了 isr 机制——leader会维护一个动态的 isr(in-sync replica set)列表,这个列表维护了和leader保持同步的集合。当ISR中的follower完成数据的同步之后,leader就会发送ack。如果follower 长时间未向leader同步数据,则该follower将会被踢出 isr,当其他满足条件的follower也会被加入到isr。这个同步最大时间配置项为replica.lag.time.max.ms 参数设置。如果leader故障了,也会从isr的follower中选举新的leader。

6.3 数据一致性解决

因为副本的消息数在各个之间是存在差异的,可能leader10条,而follower只同步了8条;当leader挂掉,数据就有可能会发生丢失,通过一种机制来保证消费者消费数据的一致性就很有必要了。kafka的数据一致性通过 LEO(每个副本的最后一条offset)和HW(所有的LEO中最小的那个)来保证。示意图:

image.png

消费者只能看到offset<=HW 的消息。

七:消费机制

7.1 消费策略

kafka 对消息消费的处理有三种方式:

  • (at least once)至少一次
  • (at most once)至多一次
  • (exactly once) 有且只有一次

因为ack机制的存在,producer 向kafka发送消息时如果 ack=0,由于producer不等确认消息是否投递成功就不管了 ,可能丢失数据,此时消费者最多消费一次消息;如果ack=1,当producer未收到消息确认投递成功时会再次投递,这个时候可能消息被投递了多次,可能会存在重复消费的情况。当kafka开启数据幂等性且ack=1的时候,此时重复的消息会被去重,因此不会产生重复消费的情况。

启用幂等性的方式是将producer中的参数 enable.idompotence 设置为true。

7.2 消费者相关特性

和rabbitMQ一样,可以指定消费者消费消息是推模式还是拉模式。在消费者组中,有多个消费者,一个topic中有多个partition。那么消息的分配是怎么样的呢,首先一个消费者组中的消费者不能同时消费同一个partition,这是基本原则。 然后partiotion的分配机制有两种,一种是range(范围) 一种是 RoundRobin(轮询),range示 意图:

image.png

RoundRobin 示意图:

image.png

由于consumer也可能会宕机挂掉的风险,当consumer恢复的时候必须要能够从上一次消费的地方重新开始消费。所以consumer需要实时记录自己消费到了哪一个offset,以便能够恢复到宕机前状态。

八:一些疑问

8.1 kafka高效读写保证

kafka的producer生产数据,要以追加的形式写入到log文件中,这个写磁盘的过程是顺序写,相对于磁盘的随机写来说,这个效率要高出很多,这个是kafka高效读写的保证之一。而另外的一个保证高效读写的技术是零拷贝,用过netty的同学应该知道这个,中间少了两次用户态的切换。

8.2 Kafka无丢失消息解决方案

实现Kafka无丢失消息的解决方案如下:

  • 必须使用producer.send(msg, callback)接口发送消息。
  • Producer端设置acks参数值为all。acks参数值为all表示ISR中所有Broker副本都接收到消息,消息才算已提交。
  • 设置Producer端retries参数值为一个较大值,表示Producer自动重试次数。当出现网络瞬时抖动时,消息发送可能会失败,此时Producer能够自动重试消息发送,避免消息丢失。
  • 设置Broker端unclean.leader.election.enable = false,unclean.leader.election.enable参数用于控制有资格竞选分区Leader的Broker。如果一个Broker落后原Leader太多,那么成为新Leader必然会造成消息丢失。因此,要将unclean.leader.election.enable参数设置成false。
  • 设置Broker端参数replication.factor >= 3,将消息保存多份副本。
  • 设置Broker参数min.insync.replicas > 1,保证ISR中Broker副本的最少个数,在acks=-1时才生效。设置成大于1可以提升消息持久性,生产环境中不能使用默认值 1。
  • 必须确保replication.factor > min.insync.replicas,如果两者相等,那么只要有一个副本挂机,整个分区无法正常工作。推荐设置成replication.factor = min.insync.replicas + 1。
  • 确保消息消费完成再提交。设置Consumer端参数enable.auto.commit为false,并采用手动提交位移的方式。

但是上边的一些操作很大一部分限制了kafka 本身的吞吐量。

8.3 Kafka消息重复消费问题

8.3.1 消费者消费过程解析

生产者将消息发送到Topic中,消费者即可对其进行消费,其消费过程如下:

  1. Consumer向Broker提交连接请求,其所连接上的Broker都会向其发送Broker Controller的通信URL,即配置文件中的listeners地址;
  2. 当Consumer指定了要消费的Topic后,会向Broker Controller发送消费请求;
  3. Broker Controller会为Consumer分配一个或几个Partition Leader,并将Partition的当前offset发送给Consumer;
  4. Consumer会按照Broker Controller分配的Partition对其中的消息进行消费;
  5. 当Consumer消费完消息后,Consumer会向Broker发送一个消息已经被消费反馈,即消息的offset;
  6. 在Broker接收到Consumer的offset后,会更新相应的consumer_offset中;
  7. Consumer可以重置offset,从而可以灵活消费存储在Broker上的消息。

8.3.2 重复消费解决方案

1.0 同一个Consumer重复消费

当Consumer由于消费能力低而引发了消费超时,则可能会形成重复消费。
在某数据刚好消费完毕,但正准备提交offset时,消费时间超时,则Broker认为消息未消费成功,产生重复消费问题。

2.0 其解决方案:延长offset提交时间。

不同的Consumer重复消费
当Consumer消费了消息,但还没有提交offset时宕机,则已经被消费过的消息会被重复消费。
其解决方案:将自动提交改为手动提交

从架构设计上解决Kafka重复消费的问题

  • 保存并查询
    • 给每个消息都设置一个唯一的UUID,在消费消息时,首先去持久化系统中查询,查看消息是否被消费过,如果没有消费过,再进行消费;如果已经消费过,直接丢弃。
  • 利用幂等性
    • 幂等性操作的特点是任意多次执行所产生的影响均与一次执行的影响相同。
      如果将系统消费消息的业务逻辑设计为幂等性操作,就不用担心Kafka消息的重复消费问题,因此可以将消费的业务逻辑设计成具备幂等性的操作。利用数据库的唯一约束可以实现幂等性,如在数据库中建一张表,将表的两个或多个字段联合起来创建一个唯一约束,因此只能存在一条记录。
  • 设置前提条件
    • 实现幂等性的另一种方式是给数据变更设置一个前置条件。如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。

8.3.3 kafka为什么这么快

  • 利用 Partition 实现并行处理 :我们都知道 Kafka 是一个 Pub-Sub 的消息系统,无论是发布还是订阅,都要指定 Topic。Topic 只是一个逻辑的概念。每个 Topic 都包含一个或多个 Partition,不同 Partition 可位于不同节点。
  • 顺序读写:因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最“讨厌”随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。
  • 充分利用 Page Cache:引入 Cache 层的目的是为了提高 Linux 操作系统对磁盘访问的性能。Cache 层在内存中缓存了磁盘上的部分数据。当数据的请求到达时,如果在 Cache 中存在该数据且是最新的,则直接将数据传递给用户程序,免除了对底层磁盘的操作,提高了性能。Cache 层也正是磁盘 IOPS 为什么能突破 200 的主要原因之一。在 Linux 的实现中,文件 Cache 分为两个层面,一是 Page Cache,另一个 Buffer Cache,每一个 Page Cache 包含若干 Buffer Cache。Page Cache 主要用来作为文件系统上的文件数据的缓存来用,尤其是针对当进程对文件有 read/write 操作的时候。Buffer Cache 则主要是设计用来在系统对块设备进行读写的时候,对块进行数据缓存的系统来使用。
  • 零拷贝技术
    • 零拷贝(Zero-copy)技术指在计算机执行操作时,CPU 不需要先将数据从一个内存区域复制到另一个内存区域,从而可以减少上下文切换以及 CPU 的拷贝时间。它的作用是在数据报从网络设备到用户程序空间传递的过程中,减少数据拷贝次数,减少系统调用,实现 CPU 的零参与,彻底消除 CPU 在这方面的负载。
    • mmap:Memory Mapped Files:简称 mmap,也有叫 MMFile 的,使用 mmap 的目的是将内核中读缓冲区(read buffer)的地址与用户空间的缓冲区(user buffer)进行映射。从而实现内核缓冲区与应用程序内存的共享,省去了将数据从内核读缓冲区(read buffer)拷贝到用户缓冲区(user buffer)的过程。它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。使用这种方式可以获取很大的 I/O 提升,省去了用户空间到内核空间复制的开销。
    • 批处理:在很多情况下,系统的瓶颈不是 CPU 或磁盘,而是网络IO。因此,除了操作系统提供的低级批处理之外,Kafka 的客户端和 broker 还会在通过网络发送数据之前,在一个批处理中累积多条记录 (包括读和写)。记录的批处理分摊了网络往返的开销,使用了更大的数据包从而提高了带宽利用率。
    • 数据压缩:Producer 可将数据压缩后发送给 broker,从而减少网络传输代价,目前支持的压缩算法有:Snappy、Gzip、LZ4。数据压缩一般都是和批处理配套使用来作为优化手段的。

总结

  • 1.0 partition 并行处理
  • 2.0 顺序写磁盘,充分利用磁盘特性
  • 3.0 利用了现代操作系统分页存储 Page Cache 来利用内存提高 I/O 效率
  • 4.0 采用了零拷贝技术
  • 5.0 Producer 生产的数据持久化到 broker,采用 mmap 文件映射,实现顺序的快速写入
  • 6.0 Customer 从 broker 读取数据,采用 sendfile,将磁盘文件读到 OS 内核缓冲区后,转到 NIO buffer进行网络发送,减少 CPU 消耗

8.3.4 Kafka中的HW、LEO、LSO、LW等分别代表什么?

  • HW是High Watermak的缩写,俗称高水位,它表示了一个特定消息的偏移量(offset),消费之只能拉取到这个offset之前的消息。
  • LEO是Log End Offset的缩写,它表示了当前日志文件中下一条待写入消息的offset。
  • LSO特指LastStableOffset。它具体与kafka的事物有关。消费端参数——isolation.level,这个参数用来配置消费者事务的隔离级别。字符串类型,“read_uncommitted”和“read_committed”。
  • LW是Low Watermark的缩写,俗称“低水位”,代表AR集合中最小的logStartOffset值,副本的拉取请求(FetchRequest)和删除请求(DeleteRecordRequest)都可能促使LW的增长。

8.3.5 Kafka中是怎么体现消息顺序性的

  1. 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
  2. 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。

8.3.6 Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么?

  1. 分区中的所有副本统称为AR(Assigned Repllicas)。所有与leader副本保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集。与leader副本同步滞后过多的副本(不包括leader)副本,组成OSR(Out-Sync Relipcas),由此可见:AR=ISR+OSR。
  2. ISR集合的副本必须满足:
    副本所在节点必须维持着与zookeeper的连接;副本最后一条消息的offset与leader副本最后一条消息的offset之间的差值不能超出指定的阈值
  3. 每个分区的leader副本都会维护此分区的ISR集合,写请求首先由leader副本处理,之后follower副本会从leader副本上拉取写入的消息,这个过程会有一定的延迟,导致follower副本中保存的消息略少于leader副本,只要未超出阈值都是可以容忍的
  4. ISR的伸缩指的是Kafka在启动的时候会开启两个与ISR相关的定时任务,名称分别为“isr-expiration”和”isr-change-propagation”.。isr-expiration任务会周期性的检测每个分区是否需要缩减其ISR集合。

8.3.6 如果我指定了一个offset,Kafka怎么查找到对应的消息?

  1. 通过文件名前缀数字x找到该绝对offset 对应消息所在文件。
  2. offset-x为在文件中的相对偏移。
  3. 通过index文件中记录的索引找到最近的消息的位置。
  4. 从最近位置开始逐条寻找。

8.3.7 Kafka中的延迟队列怎么实现?

Kafka中存在大量的延迟操作,比如延迟生产、延迟拉取以及延迟删除等。Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务TimerTask。时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs × wheelSize计算得出。

8.3.8 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

offset+1

8.3.9 优先副本是什么?它有什么特殊的作用?

优先副本 会是默认的leader副本 发生leader变化时重选举会优先选择优先副本作为leader

本文正在参加「金石计划」

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%