网站首页 > 博客文章 正文
问题一:从kafka上消费数据方式
提供两种消费方式
pull(拉) 模式:消费者主动从 broker 中读取数据。
push(推)模式:broker推送数据给消费者。
pull 模式不足:如果 kafka 没有数据,消费者可能会陷入循环中, 一直返回空数据。 针对这一点, Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费, consumer 会等待一段时间之后再返回,这段时长即为 timeout。
push模式不足:很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
轮询是一个无限的循环。
consumer.poll()消息轮询是消费者API的核心。一旦消费者订阅了某个主题,轮询就会处理所有的细节,包括群组协调 ,分区再均衡 ,发送心跳 、获取数据
轮询不只是获取数据那么简单。在第一次调用新消费者的 poll() 方挂时,它会负责查找 GroupCoordinator, 然后加入群组,接受分配 的分区。 如果发生了再均衡,整个过程也是 在轮询期间进行的。当然 ,心跳也是从轮询里发迭出去的。所以,我们要确保在轮询期间 所做的任何处理工作都应该尽快完成。
问题二:消费者和消费者群组关系
消费者 属于 消费者群组。一个群组里的消费者订阅的是 同一个主题
- 不要让消费者群组里的消费者超过主题的分区数量,超过的话,会有一部分消费者闲置
- 每个消费者只处理部分分区的消息
- 一个主题可以有多个不同的消费者群组,消费群组之间接收的消息互不影响(以群组划分。每个群组都能接收到主题中的所有消息)
问题三:消费者群组的消费者消费哪个分区
主题有多个分区,多个消费者去消费的时候,该消费哪个分区?
消费者以组的名义订阅主题,主题有多个分区,消费者组中有多个消费者实例,同一时刻,一条消息只能被组中的一个消费者实例消费。
- 如果分区数大于组中的消费者实例数,一个消费者会负责多个分区。
- 如果分区数小于组中的消费者实例数,有些消费者将处于空闲状态并且无法接收消息。
分区分配策略
Kafka 读取分区消息 有两种分配策略:
- round-robin循环 (轮询分配)
- range(排序分配,默认的分配策略)
round-robin循环
Roudn Robin分配策略,其主要采用的是一种轮询的方式分配所有的分区
RoundRobin基于轮询算法,对应的实现类是 org.apache.kafka.clients.consumer.RoundRobinAssignor
- 首先,将所有主题的分区组成TopicAndPartition列表。
- 然后对TopicAndPartition列表按照hashCode进行排序某个 topic。
假设,有两个消费者C0和C1,两个主题T0和T1,每个主题有3个分区,分配结果是:
- C0将消费T0主题的0、2分区,以及T1主题的1分区。
- C1将消费T0主题的1分区,以及T1主题的0、2分区。
即:t0_p0->c0、t0_p1->c1、t0_p2->c0、t1_p0->c1、t1_p1->c0、t1_p2->c1
range (默认的策略)
Range分配策略,对应的实现类是org.apache.kafka.clients.consumer.RangeAssignor
- 首先,将分区按数字顺序排行序,消费者按名称的字典序排序。
- 然后,用分区总数除以消费者总数。如果能够除尽,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区。
假如,有2个主题(T0和T1),分别有3个分区,两个消费者,分配结果是:
- C0将消费T1主题的 0、1 分区,以及T1主题的 0、1 分区。
- C1将消费T1主题的 2 分区,以及T2主题的 2分区。
结合轮询的分配思考下
案例1
一个主题,4个分区,同一个消费组,一个消费者的消费情况。
将主题分为T1-0,T1-1,T1-2,T1-4,然后分配给消费者群组中的消费者1,因为只有一个,所以所有的分区的信息,都分配到了消费者1
案例2
一个主题,4个分区,同一个消费组,2个消费者的消费情况。
这时消费者只是消费主题的某个分区部分,消息落到了不同的消费者上。
T0-0 分配给c1 T0-1分配给c2 T0-2 分配给c1 T0-3 分配给c2
案例3
一个主题,4个分区,同一个消费组,4个消费者的消费情况。
最终的分配是每个消费者都负责消费一个分区
案例4
一个主题,4个分区,两个消费组,6个消费者的消费情况。
群组1,每个消费者都负责消费一个分区。群组2,根据轮询分配,将分区分配给群组2的消费者。
以群组划分。每个群组都能接收到主题中的所有消息
问题四:消费者消费数据的过程
Kafka消费数据的过程是通过消费者组(Consumer Group)来实现的。
消费者组由一组消费者实例组成,每个消费者实例负责从一个或多个分区中消费数据。
消费者实例从分区中消费数据的过程包括制定消费方案、拉取数据、处理数据和提交偏移量。
- 消费者consumerA,consumerB, consumerC向kafka集群中的协调器coordinator发送JoinGroup的请求。coordinator主要是用来辅助实现消费者组的初始化和分区的分配。
- coordinator老大节点选择 = groupid的hashcode值 % 50( __consumer_offsets内置主题位移的分区数量)例如: groupid的hashcode值 为1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。
- 选出一个 consumer作为消费中的leader,比如上图中的ConsumerB。
- 消费者leader制定出消费方案,比如谁来消费哪个分区等
- 把消费方案发给coordinator
- 最后coordinator就把消费方案下发给各个consumer, 图中只画了一条线,实际上是有下发各个consumer。
每个消费者都会和coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms=5分钟),也会触发再平衡,也就是重新进行上面的流程。
- 消费者创建一个网络连接客户端ConsumerNetworkClient, 发送消费请求,可以进行如下配置:
- fetch.min.bytes: 每批次最小抓取大小,默认1字节
- fetch.max.bytes: 每批次最大抓取大小,默认50M
- fetch.max.wait.ms:最大超时时间,默认500ms
- 发送请求到kafka集群
- 成功的回调,会将数据保存到completedFetches队列中
- 消费者从队列中抓取数据,根据配置max.poll.records一次拉取数据返回消息的最大条数,默认500条。
- 获取到数据后,需要经过反序列化器、拦截器等。
- 上一篇: Kafka快速入门
- 下一篇: 面试题之kafka:如何提高整体的消费能力
猜你喜欢
- 2024-12-11 Kafka知识点总结 一篇读懂 建议收藏
- 2024-12-11 连 Kafka 的稳定性都不懂,也敢说自己会用Kafka
- 2024-12-11 从架构上详解(SLB,Redis,Mysql,Kafka,Clickhouse)热点问题
- 2024-12-11 Kafka最佳实践 - 合理安排kafka的broker、partition、consumer数量
- 2024-12-11 学Kafka,就必须了解的再均衡问题
- 2024-12-11 面试题之kafka:如何提高整体的消费能力
- 2024-12-11 Kafka快速入门
- 2024-12-11 扫盲Kafka?看这一篇就够了!
- 2024-12-11 kafka启动顺序&命令启动
- 2024-12-11 超详细 Kafka 入门(最佳实践)
你 发表评论:
欢迎- 最近发表
-
- 比GoPro 13更强的大疆Action 5 Pro,到底强在哪里?
- 信号和槽(信号和槽的实现原理)
- 在响应式项目中连接设计与开发(请简述实现响应式设计包括哪些技术点)
- 【C#】委托、Action、Func 和 Event 之间的关系
- 如何使用JavaScript实现Prompt弹窗?
- 谷歌Magic Actions功能曝光:AI革新安卓16通知交互
- 基于目标TPS的性能测试,如何通过手动设置场景进行测试?
- IOS基础学习之输出口和动作(io口输入输出实验总结及体会)
- 《Java语言程序设计》期末考试模拟试题——判断题和问答题
- Android学习之Touch事件的处理(android触摸事件实例)
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)