专业的编程技术博客社区

网站首页 > 博客文章 正文

深入理解Apache Kafka的高可靠性原理

baijin 2024-08-23 10:36:23 博客文章 4 ℃ 0 评论

深入理解Apache Kafka的高可靠性原理

目前,许多开源分布式处理系统如Cloudera、Apache Storm、Spark等都支持与Kafka的集成。Kafka越来越受到许多互联网公司的青睐,它们将Kafka作为其核心消息引擎之一。

在本文中,我们将了解Kakfa存储机制、复制原理、同步原理和持久性保证,以分析其可靠性。

如图所示,一个典型的Kafka架构包括几个生产者(可以是服务器日志、业务数据、由页面前端生成的页面视图等)、几个broker(Kafka支持水平扩展,更通用的代理)、几个消费者(组)和一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选择领导者,并在消费者组发生更改时重新平衡。生产者使用推模式向代理发布消息,消费者使用拉模式订阅和消费来自代理的消息。

Topics and Partitions

Partition是一个物理概念,Topic是一个逻辑概念。每个提交到Kafka集群的消息都有一个类别,这个类别称为Topic,每个主题将被划分为多个分区,每个分区是存储级别的附加日志文件。文件中每个消息的位置称为偏移量,偏移量是一个long数字,是唯一标识符。

众所周知,顺序写磁盘比随机写内存更有效。对于Kafka的高吞吐量,每个消息都附加到分区,这是对磁盘的顺序写入,因此非常有效。

分区解决了性能瓶颈问题。通过设置分区规则,可以将所有消息均匀地分布到不同的分区,从而实现一定程度的扩展。在创建topic时,可以在$KAFKA_HOME /config/ server中指定分区的数量。属性,当然,您还可以在创建topic之后更改分区的数量。在发送消息时,可以指定此消息的密钥,根据密钥和分区机制确定发送到哪个分区的此消息的生成器。

Kafka健壮的复制策略保证了它的高可靠性。

通过解释Kafka的复制原理和同步方法,我们已经达到了可以开始探索宏观层次Kafka概念的阶段。现在,让我们从不同的维度开始探索Kafka,比如ISR(同步副本)、HW(高水位)、leader election以及数据可靠性和持久性保证。

Topics and Partitions是如何存储的?

Kafka中的消息按topic分类。生产者通过topic向Kafka的broker发送消息,消费者通过topic读取数据。一个topic可以划分为多个分区,而分区又可以细分为多个段,因此一个分区在物理上由多个段组成。

为了便于说明,我将在这里显示在单个节点集群上的文件只有一个Kafka broker。Kafka消息文件存储目录的位置如下:

[root@tools ~]# ls -ltr /kafka-logs/
total 20
-rw-r--r--. 1 kafka hadoop 57 Aug 10 15:22 meta.properties
drwxr-xr-x. 2 kafka hadoop 70 Aug 11 02:46 __consumer_offsets-0
drwxr-xr-x. 2 kafka hadoop 70 Aug 11 02:46 __consumer_offsets-29
drwxr-xr-x. 2 kafka hadoop 70 Aug 11 02:46 __consumer_offsets-10....
drwxr-xr-x. 2 kafka hadoop 70 Aug 11 02:46 __consumer_offsets-3
drwxr-xr-x. 2 kafka hadoop 70 Aug 11 02:46 __consumer_offsets-13
drwxr-xr-x. 2 kafka hadoop 70 Aug 14 10:16 vidtest-0
drwxr-xr-x. 2 kafka hadoop 70 Aug 21 09:51 interns_test-0
drwxr-xr-x. 2 kafka hadoop 70 Aug 28 06:10 medicalschema-0
-rw-r--r--. 1 kafka hadoop 34 Aug 30 15:28 cleaner-offset-checkpoint
drwxr-xr-x. 2 kafka hadoop 4096 Aug 30 15:29 __consumer_offsets-48
drwxr-xr-x. 2 kafka hadoop 70 Sep 18 23:31 imagetext-0
drwxr-xr-x. 2 kafka hadoop 70 Nov 17 12:29 imageobject-0
drwxr-xr-x. 2 kafka hadoop 70 Nov 19 02:05 my-topic-0
-rw-r--r--. 1 kafka hadoop 1392 Nov 22 02:31 recovery-point-offset-checkpoint
-rw-r--r--. 1 kafka hadoop 1392 Nov 22 02:32 replication-offset-checkpoint

您可以通过修改"server.properties"来更改kafka-logs目录的位置。server.properties文件,位于$KAFKA_HOME/config下。让我们假设分区是最小的存储单元,我们可以想象当Kafka producer不断发送消息时,不可避免地会导致分区文件的无限膨胀,这将严重影响消息文件的维护和所消费消息的清除。因此,可以把分区分隔为段 。每个分区相当于一个巨大的文件被等分成多个大小相等的段(段)数据文件(每个段文件中的消息数不一定相等)。段文件的生命周期可以通过修改服务器配置参数(log.segment.bytes, log.roll)。

段文件由两部分组成,即". Index" 文件and ".log"文件。 分别作为段索引文件和数据文件。这两个文件一一对应。

[root@tools~]# ls -ltr /kafka-logs/__consumer_offsets-5/
total 0
-rw-r--r--. 1 kafka hadoop 0 Aug 11 02:46 00000000000000000000.log
-rw-r--r--. 1 kafka hadoop 10485760 Nov 21 23:28 00000000000000000000.index

".index"索引文件存储了大量的元数据。".log"数据文件存储大量的消息,索引文件中的元数据指向相应数据文件中消息的物理偏移地址。这两个文件的命名约定如下:分区第一部分从0开始,每个后续部分文件名的偏移值是最后一段的最后消息文件,该值为64位大小,20位字符的长度,空位用0填充。 segment中index和data file对应关系物理结构如下:

如上所示,我们有170410段,其中包括0000000000000170410.inedx索引和0000000000000170410.log文件。以".index"中的元数据[3,348]为例。 第三条消息表示在".log"数据文件,即170410 + 3 = 170413条消息在全局分区中,物理偏移量为348在局部段文件中。

如何从分区偏移量中查找消息?

假设我们有下面特定段的文件,我们想读取offset = 170418的消息。首先找到段文件,其中00000000000000000000.index的 文件的开头,第二个文件是00000000000000170410.index (起始偏移量170410 +1 = 170411),第三个文件是00000000000000239430.index(起始偏移量为239430 + 1 = 239431),因此这个偏移量= 170418属于第二个文件。

00000000000000000000.index 00000000000000000000.log 00000000000000170410.index 00000000000000170410.log 00000000000000239430.index00000000000000239430.log

复制和同步

为了提高消息的可靠性,Kafka为每个主题分区提供了N个副本,其中N(大于或等于1)是主题副本因子的数量。Kafka使用多复制机制自动进行故障转移,并确保当Kafka集群中的代理发生故障时,服务是可用的。在Kafka N副本中,一个是leaderr副本,另一个是follower, leader负责处理所有的分区读和写请求,同时follower被动地、有规律地从leader那里复制数据。

Kafka提供了一个数据复制拷贝算法,以确保如果leader失败或挂起,将选出一个新的leader,并且client端的消息会被成功写入。leader负责维护和跟踪ISR (in - sync Replicas,kafka不是完全同步,也不是完全异步,是一种ISR机制)中所有follower lags的状态,这些滞后状态指示一个复制同步队列。当生产者向broker发送消息时,leader写消息并将其复制给所有follower。消息提交后成功地复制到所有同步副本。消息复制延迟受最慢的follower的限制,快速检测slow copies非常重要,如果follower太过"滞后"或失败,则leader会将其从ISR中删除。

这里的核心问题是,在海量的 topic 情况下,或者经常性的流量抖动情况下,我们不能对 topic 的producer 每次打过来的消息数目做任何假设,所以就不太好定出来一个 合适的 replica.lag.max.messages 值

replica.lag.max.messages参数在0.10版本之后被删除。仅仅留下了replica.lag.time.max.ms (delay)作为ISR中用于副本管理的参数。设置太大,将影响真正延迟的follower被删除;设置太小,导致follower被频繁的访问(performance issue)。让我们看看问题所在,对于replica.lag.max.messages,如果当前leader消息的副本数量超过该参数的follower messages的值,则leader将从ISR中删除follower。假设你设置replica.lag.max.messages = 4,如果producer发送到broker的消息数量小于4时, follower从leader那里接收到消息后,如果follower落后了,follower开始拉这些消息,但是消息数量不会超过4条,所以没有follower从ISR中删除,所以这次replica.lag.max.message似乎是合理的。但是,producer发起一个瞬时峰值流,producer一次发送4条以上的消息时,这就超过replica.lag.max.messages,follower被认为与leader不同步,因此follower被踢出ISR。但事实上,这些follower是活着的,没有任何性能问题。不久,他们追上了leader,重新加入了ISR。因此,它们不断地勾选ISRs并再次返回ISRs,这无疑增加了不必要的性能开销。

这段可能不太好理解,简而言之:

在 follower 落后 leader 超过 replica.lag.max.messages = 4 条消息的时候,不会立马踢出ISR 集合,而是持续落后超过 replica.lag.time.max.ms 时间,才会被踢出 ,这样就能避免流量抖动造成的运维问题,因为follower 在下一次fetch的时候就会跟上leader.

上面的部分也提到了HW这个概念。HW一般称为高水位,取一个分区对应的最小ISR LEO(Log End Offset)作为HW,消费者只能消费HW所在的位置。此外,每个副本都有HW, leader和follower负责更新它们的HW状态。对于leader新写的信息,消费者不能立即消费。当ISR中的所有副本都已同步后,leader将等待更新消息。

request.required.acks,设置数据可靠性等级的acks参数:

1(默认值):这意味着生产者在领导者成功接收到ISR中的数据并得到确认后发送下一条消息。如果leader关闭,它将丢失数据。

0:这意味着生产者不需要等待代理的确认就可以继续发送下一批消息。在这种情况下,数据传输效率最高,但数据的可靠性确实最低。

-1:生产者需要等待ISR中所有的follower确认数据收到后发送一次,这是可靠性最高的。但是,这并不保证数据不会丢失。

如果想提高数据的可靠性,请设置request.required.acks= -1,同时设置min.insync.replicas此参数(可以在代理或主题级别设置该参数)以实现最大效率。该参数设置ISR中副本的最小数量。默认值是1。当且仅request.required.acks参数设置为-1时,此参数才生效。如果ISR中的拷贝数小于min.insync.replicas,客户端将返回一个异常。replicas配置:org.apache.kafka.common.errors . notenoughreplicasexceptoin:消息被拒绝,因为同步的副本比所需的少。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表