专业的编程技术博客社区

网站首页 > 博客文章 正文

认识kafka消费者(从kafka读取数据)

baijin 2024-12-11 10:29:23 博客文章 5 ℃ 0 评论


问题一:从kafka上消费数据方式

提供两种消费方式

pull(拉) 模式:消费者主动从 broker 中读取数据

push(推)模式:broker推送数据给消费者。

pull 模式不足如果 kafka 没有数据,消费者可能会陷入循环中, 一直返回空数据。 针对这一点, Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费, consumer 会等待一段时间之后再返回,这段时长即为 timeout。

push模式不足:很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。


轮询是一个无限的循环。

consumer.poll()消息轮询是消费者API的核心。一旦消费者订阅了某个主题,轮询就会处理所有的细节,包括群组协调 ,分区再均衡发送心跳获取数据

轮询不只是获取数据那么简单。在第一次调用新消费者的 poll() 方挂时,它会负责查找 GroupCoordinator, 然后加入群组,接受分配 的分区。 如果发生了再均衡,整个过程也是 在轮询期间进行的。当然 ,心跳也是从轮询里发迭出去的。所以,我们要确保在轮询期间 所做的任何处理工作都应该尽快完成。


问题二:消费者和消费者群组关系

消费者 属于 消费者群组。一个群组里的消费者订阅的是 同一个主题

  • 不要让消费者群组里的消费者超过主题的分区数量,超过的话,会有一部分消费者闲置
  • 每个消费者只处理部分分区的消息
  • 一个主题可以有多个不同的消费者群组,消费群组之间接收的消息互不影响(以群组划分。每个群组都能接收到主题中的所有消息)

问题三:消费者群组的消费者消费哪个分区

主题有多个分区,多个消费者去消费的时候,该消费哪个分区?

消费者以组的名义订阅主题,主题有多个分区,消费者组中有多个消费者实例,同一时刻,一条消息只能被组中的一个消费者实例消费

  • 如果分区数大于组中的消费者实例数,一个消费者会负责多个分区
  • 如果分区数小于组中的消费者实例数,有些消费者将处于空闲状态并且无法接收消息

分区分配策略

Kafka 读取分区消息 有两种分配策略:

  • round-robin循环 (轮询分配)
  • range(排序分配,默认的分配策略)


round-robin循环

Roudn Robin分配策略,其主要采用的是一种轮询的方式分配所有的分区

RoundRobin基于轮询算法,对应的实现类是 org.apache.kafka.clients.consumer.RoundRobinAssignor

  • 首先,将所有主题的分区组成TopicAndPartition列表。
  • 然后对TopicAndPartition列表按照hashCode进行排序某个 topic。

假设,有两个消费者C0和C1,两个主题T0和T1,每个主题有3个分区,分配结果是:

  • C0将消费T0主题的0、2分区,以及T1主题的1分区。
  • C1将消费T0主题的1分区,以及T1主题的0、2分区。

即:t0_p0->c0、t0_p1->c1、t0_p2->c0、t1_p0->c1、t1_p1->c0、t1_p2->c1

range (默认的策略)

Range分配策略,对应的实现类是org.apache.kafka.clients.consumer.RangeAssignor

  • 首先,将分区按数字顺序排行序,消费者按名称的字典序排序。
  • 然后,用分区总数除以消费者总数。如果能够除尽,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区

假如,有2个主题(T0和T1),分别有3个分区,两个消费者,分配结果是:

  • C0将消费T1主题的 0、1 分区,以及T1主题的 0、1 分区。
  • C1将消费T1主题的 2 分区,以及T2主题的 2分区。

结合轮询的分配思考下

案例1


一个主题,4个分区,同一个消费组,一个消费者的消费情况。

将主题分为T1-0,T1-1,T1-2,T1-4,然后分配给消费者群组中的消费者1,因为只有一个,所以所有的分区的信息,都分配到了消费者1

案例2


一个主题,4个分区,同一个消费组,2个消费者的消费情况。

这时消费者只是消费主题的某个分区部分,消息落到了不同的消费者上。

T0-0 分配给c1 T0-1分配给c2 T0-2 分配给c1 T0-3 分配给c2

案例3


一个主题,4个分区,同一个消费组,4个消费者的消费情况。

最终的分配是每个消费者都负责消费一个分区

案例4


一个主题,4个分区,两个消费组,6个消费者的消费情况。

群组1,每个消费者都负责消费一个分区。群组2,根据轮询分配,将分区分配给群组2的消费者。

以群组划分。每个群组都能接收到主题中的所有消息

问题四:消费者消费数据的过程

Kafka消费数据的过程是通过消费者组(Consumer Group)来实现的。

消费者组由一组消费者实例组成,每个消费者实例负责从一个或多个分区中消费数据。


消费者实例从分区中消费数据的过程包括制定消费方案拉取数据处理数据提交偏移量

  1. 消费者consumerA,consumerB, consumerC向kafka集群中的协调器coordinator发送JoinGroup的请求。coordinator主要是用来辅助实现消费者组的初始化和分区的分配。
  • coordinator老大节点选择 = groupidhashcode值 % 50( __consumer_offsets内置主题位移的分区数量)例如: groupid的hashcode值 为1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset
  1. 选出一个 consumer作为消费中的leader,比如上图中的ConsumerB
  2. 消费者leader制定出消费方案,比如谁来消费哪个分区等
  3. 把消费方案发给coordinator
  4. 最后coordinator就把消费方案下发给各个consumer, 图中只画了一条线,实际上是有下发各个consumer

每个消费者都会和coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms=5分钟),也会触发再平衡,也就是重新进行上面的流程。

  1. 消费者创建一个网络连接客户端ConsumerNetworkClient, 发送消费请求,可以进行如下配置:
  • fetch.min.bytes: 每批次最小抓取大小,默认1字节
  • fetch.max.bytes: 每批次最大抓取大小,默认50M
  • fetch.max.wait.ms:最大超时时间,默认500ms
  1. 发送请求到kafka集群
  2. 成功的回调,会将数据保存到completedFetches队列中
  3. 消费者从队列中抓取数据,根据配置max.poll.records一次拉取数据返回消息的最大条数,默认500条。
  4. 获取到数据后,需要经过反序列化器、拦截器等。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表