网站首页 > 博客文章 正文
在分布式消息队列中,消息重复消费是一个常见问题(例如网络重试、消费者重启等场景)。RocketMQ 通过多种机制来尽量减少重复消费,但无法完全避免(需业务方配合实现幂等性)。以下是 RocketMQ 的解决方案和最佳实践:
1. RocketMQ 自身的防重复机制
(1) 消息重试机制(Consumer端)
- 默认重试策略:若消费者消费失败(返回 RECONSUME_LATER),消息会被重新投递(最多 16 次)。
- 风险点:重试可能导致同一条消息被多次消费。
(2) 消息幂等性标识(Message ID + Offset)
- 每条消息有唯一的 Message ID 和 Broker Offset,消费者可通过记录已处理的 ID/Offset 去重。
- 局限性:极端情况下(如 Broker 主从切换),Message ID 可能重复(需结合业务键去重)。
2. 业务层必须实现的幂等性方案
RocketMQ 不保证绝对不重复,需业务代码自行处理幂等。常见方案:
(1) 唯一业务键 + 数据库去重
- 适用场景:订单ID、支付流水号等有唯一键的业务。
- 实现方式:
- java
// 伪代码:消费前检查唯一键是否已处理
String orderId = message.getUserProperty("order_id");
if (orderId != null && !isOrderProcessed(orderId)) {
processOrder(orderId);
markOrderAsProcessed(orderId); // 写入DB或缓存
}
- 存储选择:
- 数据库唯一索引:插入前检查 order_id 是否已存在。
- Redis SETNX:利用原子操作记录已处理的消息键。
(2) 乐观锁(适用于更新操作)
- 示例(库存扣减):
- sql
UPDATE inventory SET stock = stock - 1
WHERE product_id = #{productId} AND stock >= 1;
- 即使消息重复,数据库乐观锁会保证只扣减一次。
(3) 分布式锁(强一致性场景)
- 消费前先获取锁(如 Redis 的 SET key value NX EX):
- java
String lockKey = "msg_lock:" + message.getMsgId();
try {
if (redisClient.setIfAbsent(lockKey, "1", 30, TimeUnit.SECONDS)) {
processMessage(message);
}
} finally {
redisClient.delete(lockKey);
}
3. RocketMQ 4.x 后的增强特性
(1) 事务消息
- 二阶段提交机制可减少因生产者重试导致的消息重复(但消费者仍需幂等)。
- 适用场景:跨系统事务(如支付后发消息)。
(2) 消息轨迹(Message Trace)
- 记录消息生命周期(发送、存储、消费),便于排查重复消费问题。
4. 最佳实践总结
环节 | 措施 |
生产者 | 避免重复发送(如网络超时后重试前先检查状态)。 |
Broker | 依赖 Message ID 和 Offset,但不可完全信任。 |
消费者 | 必须实现幂等逻辑 (唯一键、乐观锁、分布式锁等)。 |
监控 | 通过 RocketMQ 控制台或日志监控重复消费告警。 |
代码示例(消费者幂等)
java
// 基于 Redis 的幂等消费示例
public class MyConsumer implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
String msgId = message.getMsgId();
String orderId = message.getUserProperty("order_id");
// 1. 检查 Redis 是否已处理
if (redisTemplate.opsForValue().setIfAbsent("consumed:" + orderId, "1", 24, TimeUnit.HOURS)) {
try {
// 2. 业务处理
processOrder(orderId);
} catch (Exception e) {
redisTemplate.delete("consumed:" + orderId); // 失败时清除标记
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
} else {
System.out.println("消息已处理,跳过: " + orderId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
关键结论
- RocketMQ 不保证 100% 不重复:需业务方结合消息唯一标识(如订单ID)实现幂等。
- 轻量级方案:优先用 Redis/Database 去重;高并发场景可用乐观锁或分布式锁。
- 监控:通过日志或 RocketMQ Admin Tool 定期检查重复消息。
猜你喜欢
- 2025-07-21 开源|一款类excel报表设计系统,支持拖拽式和word模板设计
- 2025-07-21 SpringBoot利用ThreadPoolTaskExecutor批量插入百万级数据实测!
- 2025-07-21 云端藏经阁:一款开源、精美、可独立部署的知识管理神器
- 2025-07-21 电商秒杀/库存扣减:基于JUC的并发控制实战案例
- 2025-07-21 简单易用的.NET免费开源RabbitMQ操作组件EasyNetQ
- 2025-07-21 亿级分库分表,如何丝滑扩容、如何双写灰度
- 2025-07-21 使用mq实现分布式事务-补偿事务一致性
- 2025-07-21 【RocketMQ】消息的拉取(rocketmq消息大小)
- 2025-07-21 RocketMQ消息消费-客户端拉取消息前的准备工作
- 2025-07-21 RocketMQ消费限流的几种方式(rocketmq并发消费与顺序消费)
你 发表评论:
欢迎- 最近发表
-
- 别再用雪花算法生成ID了!试试这个吧
- Cacti监控服务器配置教程(基于CentOS+Nginx+MySQL+PHP环境搭建)
- 业务系统性能问题诊断和优化分析(业务系统性能问题诊断和优化分析报告)
- 数据库中如何批量添加指定数据(数据库批量新增数据)
- Instagram架构的分片和ID的设计(ins的分类)
- VBA数据库解决方案第十四讲:如何在数据库中动态删除和建立数据表
- MySQL数据库安装教程(mysql数据库安装方法)
- SOLIDWORKS Electrical卸载与升级安装操作步骤
- 数据库分库分表解决方案汇总(数据库分库分表思路)
- 根据工作表数据生成数据库(根据excel生成数据库表结构)
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- flutterrun (59)
- 系统设计图 (58)
- powershellfor (73)
- messagesource (71)
- plsql64位 (73)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- qcombobox样式表 (68)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)