网站首页 > 博客文章 正文
生产者生产消息的速度大于消费者的消费速度如何解决?kafka有消息保留机制,有些消息在消息被消费之前就有可能被清理掉了,从而导致消息的丢失。
当生产者发送消息的速度大于消费者消费消息的速度时,可以采用多线程的方式实现消息的消费,提高整体的消费能力。多线程的实现方式有多种:
第一种也是最常见的方式:==线程封闭==,即为每个线程实例化一个kafkaconsumer对象,这种线程称为消费线程。每个消费线程可以消费一个或多个分区的消息。这种实现方式的并发度受限于分区的实际个数,当==消费线程的个数==大于分区时,就会有部分消费线程一直处于空闲状态。==优点==是可以顺序地消费各个分区的消息。==缺点==也很明显,每个消费线程都要维护一个独立的tcp连接,如果分区数和消费线程的值都比较大,那么会造成不小的系统开销。
第二种与第一种对应,==多个消费线程同时消费一个分区==,这个通过assign(),seek()方法实现。==优点==是可以打破原有的消费线程个数不能超过分区数的限制,进一步提高消费能力。但这种方式对于==位移提交和顺序控制==的处理就会变得异常复杂,实际使用极少。
第三种:==利用多线程提高消息处理的能力==。因为如果消息处理得非常快,那么poll()拉取的频次也会更高,进而提高整体消费的性能。一般而言,poll()拉取消息的速度是相当快的,而整体消费的瓶颈也正在消息的逻辑处理上。所以将处理消息模块改为多线程的实现方式,能带动整体消费性能的提升。
- 注意kafkaConsumerThread(消费线程)可以横向扩展来提高整体的消费能力。消费线程中线程池(ThreadPoolExecutor)的最后一个参数拒绝策略设置为callerRunsPolicy,可以有效的防止==消费能力不及==poll()拉取能力时导致的异常。
- 缺点就是对于==消息的顺序处理==比较困难。这里引入一个共享的offsets来参与提交(Map\<TopicPatition, OffsetAndMetadata\> offsets) ,每一个消息处理完之后都会把位移保存到共享变量offsets中,kafkaConsumerThread在每一次poll()方法之后都读取Offsets中的内容并进行位称提交。
- 写入offsets时问题:==如何解决并发问题?==注意在实现过程中需要对offsets读写需要进行加锁处理,防止出现并发问题。
for(TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
sychronized(offsets) {
if(!offsets.containsKey(tp)) {
offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
} else {
long position = offsets.get(tp).offset();
if(position < lastConsumeOffset - 1) {
offsets.put(tp, new OffsetMetaData(lastConsumedOffset + 1)) {
}
}
}
}
- 如何解决位移覆盖的问题?
synchronized(offsets) {
if(!offsets.isEmpty()) {
kafkaConsumer.commitAndSync(offsets);
offsets.clear();
}
}
- 这种移位提交的方式会有数据丢失的风险,什么情况下产生,如何解决?(扩展 滑动窗口解决方案//---todo)
- 上一篇: 认识kafka消费者(从kafka读取数据)
- 下一篇: 学Kafka,就必须了解的再均衡问题
猜你喜欢
- 2024-12-11 Kafka知识点总结 一篇读懂 建议收藏
- 2024-12-11 连 Kafka 的稳定性都不懂,也敢说自己会用Kafka
- 2024-12-11 从架构上详解(SLB,Redis,Mysql,Kafka,Clickhouse)热点问题
- 2024-12-11 Kafka最佳实践 - 合理安排kafka的broker、partition、consumer数量
- 2024-12-11 学Kafka,就必须了解的再均衡问题
- 2024-12-11 认识kafka消费者(从kafka读取数据)
- 2024-12-11 Kafka快速入门
- 2024-12-11 扫盲Kafka?看这一篇就够了!
- 2024-12-11 kafka启动顺序&命令启动
- 2024-12-11 超详细 Kafka 入门(最佳实践)
你 发表评论:
欢迎- 最近发表
-
- 比GoPro 13更强的大疆Action 5 Pro,到底强在哪里?
- 信号和槽(信号和槽的实现原理)
- 在响应式项目中连接设计与开发(请简述实现响应式设计包括哪些技术点)
- 【C#】委托、Action、Func 和 Event 之间的关系
- 如何使用JavaScript实现Prompt弹窗?
- 谷歌Magic Actions功能曝光:AI革新安卓16通知交互
- 基于目标TPS的性能测试,如何通过手动设置场景进行测试?
- IOS基础学习之输出口和动作(io口输入输出实验总结及体会)
- 《Java语言程序设计》期末考试模拟试题——判断题和问答题
- Android学习之Touch事件的处理(android触摸事件实例)
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)