网站首页 > 博客文章 正文
前言
本文针对解决Kafka不同Topic之间存在一定的数据关联时的顺序消费问题。
如存在Topic-insert和Topic-update分别是对数据的插入和更新,当insert和update操作为同一数据时,应保证先insert再update。
1、问题引入
kafka的顺序消费一直是一个难以解决的问题,kafka的消费策略是对于同Topic同Partition的消息可保证顺序消费,其余无法保证。
如果一个Topic只有一个Partition,那么这个Topic对应consumer的消费必然是有序的。不同的Topic的任何情况下都无法保证consumer的消费顺序和producer的发送顺序一致。
如果不同Topic之间存在数据关联且对消费顺序有要求,该如何处理?本文主要解决此问题。
另外,Kafka 系列面试题和答案全部整理好了,微信搜索Java技术栈,在后台发送:面试,可以在线阅读。
2、解决思路
现有Topic-insert和Topic-update,数据唯一标识为id,对于id=1的数据而言,要保证Topic-insert消费在前,Topic-update消费在后。
两个Topic的消费为不同线程处理,所以为了保证在同一时间内的同一数据标识的消息仅有一个业务逻辑在处理,需要对业务添加锁操作。
使用synchronized进行加锁的话,会影响无关联的insert和update的数据消费能力,如id=1的insert和id=2的update,在synchronized的情况下,无法并发处理,这是没有必要的,我们需要的是对于id=1的insert和id=1的update在同一时间只有一个在处理,所以使用细粒度锁来完成加锁的操作。
细粒度锁实现:https://blog.csdn.net/qq_38245668/article/details/105891161
PS:如果为分布式系统,细粒度锁需要使用分布式锁的对应实现。
在对insert和update加锁之后,其实还是没有解决消费顺序的问题,只是确保了同一时间只有一个业务在处理。 对于消费顺序异常的问题,也就是先消费了update再消费insert的情况。
处理方式:消费到update数据,校验库中是否存在当前数据(也就是是否执行insert),如果没有,就将当前update数据存入缓存,key为数据标识id,在insert消费时检查是否存在id对应的update缓存,如果有,就证明当前数据的消费顺序异常,需执行update操作,再将缓存数据移除。
3、实现方案
消息发送:
kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");
监听代码示例:
KafkaListenerDemo.java
@Componen
@Slf4
public class KafkaListenerDemo {
// 消费到的数据缓存
private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();
// 数据存储
private Map<String, String> DATA_MAP = new ConcurrentHashMap<>();
private WeakRefHashLock weakRefHashLock;
public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {
this.weakRefHashLock = weakRefHashLock;
}
@KafkaListener(topics = "TOPIC_INSERT")
public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{
// 模拟顺序异常,也就是insert后消费,这里线程sleep
Thread.sleep(1000);
String id = record.value();
log.info("接收到insert :: {}", id);
Lock lock = weakRefHashLock.lock(id);
lock.lock();
try {
log.info("开始处理 {} 的insert", id);
// 模拟 insert 业务处理
Thread.sleep(1000);
// 从缓存中获取 是否存在有update数据
if (UPDATE_DATA_MAP.containsKey(id)){
// 缓存数据存在,执行update
doUpdate(id);
}
log.info("处理 {} 的insert 结束", id);
}finally {
lock.unlock();
}
acknowledgment.acknowledge();
}
@KafkaListener(topics = "TOPIC_UPDATE")
public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{
String id = record.value();
log.info("接收到update :: {}", id);
Lock lock = weakRefHashLock.lock(id);
lock.lock();
try {
// 测试使用,不做数据库的校验
if (!DATA_MAP.containsKey(id)){
// 未找到对应数据,证明消费顺序异常,将当前数据加入缓存
log.info("消费顺序异常,将update数据 {} 加入缓存", id);
UPDATE_DATA_MAP.put(id, id);
}else {
doUpdate(id);
}
}finally {
lock.unlock();
}
acknowledgment.acknowledge();
}
void doUpdate(String id) throws InterruptedException{
// 模拟 update
log.info("开始处理update::{}", id);
Thread.sleep(1000);
log.info("处理update::{} 结束", id);
}
}
日志(代码中已模拟必现消费顺序异常的场景):
接收到update ::1
消费顺序异常,将update数据 1 加入缓存
接收到insert ::1
开始处理 1 的insert
开始处理update::1
处理update::1 结束
处理 1 的insert 结束
观察日志,此方案可正常处理不同Topic再存在数据关联的消费顺序问题。
猜你喜欢
- 2024-12-11 Kafka知识点总结 一篇读懂 建议收藏
- 2024-12-11 连 Kafka 的稳定性都不懂,也敢说自己会用Kafka
- 2024-12-11 从架构上详解(SLB,Redis,Mysql,Kafka,Clickhouse)热点问题
- 2024-12-11 Kafka最佳实践 - 合理安排kafka的broker、partition、consumer数量
- 2024-12-11 学Kafka,就必须了解的再均衡问题
- 2024-12-11 面试题之kafka:如何提高整体的消费能力
- 2024-12-11 认识kafka消费者(从kafka读取数据)
- 2024-12-11 Kafka快速入门
- 2024-12-11 扫盲Kafka?看这一篇就够了!
- 2024-12-11 kafka启动顺序&命令启动
你 发表评论:
欢迎- 最近发表
-
- 比GoPro 13更强的大疆Action 5 Pro,到底强在哪里?
- 信号和槽(信号和槽的实现原理)
- 在响应式项目中连接设计与开发(请简述实现响应式设计包括哪些技术点)
- 【C#】委托、Action、Func 和 Event 之间的关系
- 如何使用JavaScript实现Prompt弹窗?
- 谷歌Magic Actions功能曝光:AI革新安卓16通知交互
- 基于目标TPS的性能测试,如何通过手动设置场景进行测试?
- IOS基础学习之输出口和动作(io口输入输出实验总结及体会)
- 《Java语言程序设计》期末考试模拟试题——判断题和问答题
- Android学习之Touch事件的处理(android触摸事件实例)
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)