网站首页 > 博客文章 正文
事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。本文对RocketMQ的事务消息进行详细介绍,并给出了代码示例。
一. 相关概念
RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:
- Half(Prepare) Message——半消息(预处理消息):半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。
- Message Status Check——消息状态回查:由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。
二. 执行流程
上面是官网提供的事务消息执行流程图,下面对具体流程进行分析:
- Step1:Producer向Broker端发送Half Message;
- Step2:Broker ACK,Half Message发送成功;
- Step3:Producer执行本地事务;
- Step4:本地事务完毕,根据事务的状态,Producer向Broker发送二次确认消息,确认该Half Message的Commit或者Rollback状态。Broker收到二次确认消息后,对于Commit状态,则直接发送到Consumer端执行消费逻辑,而对于Rollback则直接标记为失败,一段时间后清除,并不会发给Consumer。正常情况下,到此分布式事务已经完成,剩下要处理的就是超时问题,即一段时间后Broker仍没有收到Producer的二次确认消息;
- Step5:针对超时状态,Broker主动向Producer发起消息回查;
- Step6:Producer处理回查消息,返回对应的本地事务的执行结果;
- Step7:Broker针对回查消息的结果,执行Commit或Rollback操作,同Step4。
三. 代码实例
本节通过一个简单的场景模拟RocketMQ的事务消息:存在2个微服务,分别是订单服务和商品服务。订单服务进行下单处理,并发送消息给商品服务,对于下单成功的商品进行减库存。
首先是订单服务:
/**
* @Auther: ZhangShenao
* @Date: 2019/3/27 16:44
* @Description:使用RocketMQ事务消息——订单服务发送事务消息,然后进行本地下单,并通知商品服务减库存
*/
public class OrderService {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer();
producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
producer.setProducerGroup(RocketMQConstants.TRANSACTION_PRODUCER_GROUP);
//自定义线程池,执行事务操作
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20), (Runnable r) -> new Thread("Order Transaction Massage Thread"));
producer.setExecutorService(executor);
//设置事务消息监听器
producer.setTransactionListener(new OrderTransactionListener());
producer.start();
System.err.println("OrderService Start");
for (int i = 0;i < 10;i++){
String orderId = UUID.randomUUID().toString();
String payload = "下单,orderId: " + orderId;
String tags = "Tag";
Message message = new Message(RocketMQConstants.TRANSACTION_TOPIC_NAME, tags, orderId, payload.getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送事务消息
TransactionSendResult result = producer.sendMessageInTransaction(message, orderId);
System.err.println("发送事务消息,发送结果: " + result);
}
}
}
事务消息需要一个TransactionListener,主要进行本地事务的执行和事务回查,代码如下:
/**
* @Auther: ZhangShenao
* @Date: 2019/3/27 16:50
* @Description:订单事务消息监听器
*/
public class OrderTransactionListener implements TransactionListener {
private static final Map<String, Boolean> results = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String orderId = (String) arg;
//记录本地事务执行结果
boolean success = persistTransactionResult(orderId);
System.err.println("订单服务执行本地事务下单,orderId: " + orderId + ", result: " + success);
return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = msg.getKeys();
System.err.println("执行事务消息回查,orderId: " + orderId);
return Boolean.TRUE.equals(results.get(orderId)) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
private boolean persistTransactionResult(String orderId) {
boolean success = Math.abs(Objects.hash(orderId)) % 2 == 0;
results.put(orderId, success);
return success;
}
}
下面是商品服务及监听器:
/**
* @Auther: ZhangShenao
* @Date: 2019/3/27 17:09
* @Description:使用RocketMQ事务消息——商品服务接收下单的事务消息,如果消息成功commit则本地减库存
*/
public class ProductService {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
consumer.setConsumerGroup(RocketMQConstants.TRANSACTION_CONSUMER_GROUP);
consumer.subscribe(RocketMQConstants.TRANSACTION_TOPIC_NAME, "*");
consumer.registerMessageListener(new ProductListener());
consumer.start();
System.err.println("ProductService Start");
}
}
/**
* @Auther: ZhangShenao
* @Date: 2019/3/27 17:14
* @Description:
*/
public class ProductListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
Optional.ofNullable(msgs).orElse(Collections.emptyList()).forEach(m -> {
String orderId = m.getKeys();
System.err.println("监听到下单消息,orderId: " + orderId + ", 商品服务减库存");
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
分别运行OrderService和ProductService,可以看出只有事务执行成功的订单才会通知商品服务进行减库存。
监听到下单消息,orderId: f25a7127-307e-45ce-8f83-6e0a922ebb94, 商品服务减库存
监听到下单消息,orderId: d960171d-97c0-4e13-aa4a-c2b96102de4b, 商品服务减库存
监听到下单消息,orderId: 63aedaa2-ce74-4cb7-bf58-fb6a73082a73, 商品服务减库存
监听到下单消息,orderId: 25764461-70b2-44db-8296-960211179e6e, 商品服务减库存
监听到下单消息,orderId: fb319fe7-c8be-4edf-ae4e-6108898068ca, 商品服务减库存
监听到下单消息,orderId: 4f61a61a-7254-458a-bc10-9d4006a9f581, 商品服务减库存
Java程序员福利:金三银四,我把最近一年经历过的Java岗位面试,和一些刷过的面试题都做成了PDF,PDF都是可以免费分享给大家的,关注私信我:【101】,免费领取!
猜你喜欢
- 2024-12-12 RocketMQ同一个消费者唯一Topic多个tag踩坑经历
- 2024-12-12 RocketMQ——RocketMQ搭建及问题解决
- 2024-12-12 腾讯云微服务正式发布RocketMQ Serverless版本
- 2024-12-12 3分钟白话RocketMQ系列—— 核心概念
- 2024-12-12 RocketMQ如何避免未来再次发生积压
- 2024-12-12 rocketmq延迟消息实现原理(上)
- 2024-12-12 RocketMQ跨队列的顺序消费
- 2024-12-12 Kafka、RabbitMQ、RocketMQ、ActiveMQ 等多个分布式消息队列比较
- 2024-12-12 应如何在 Spring Boot 中使用 RocketMQ 实现批量消息消费?
- 2024-12-12 RocketMQ 5.0 多语言客户端的设计与实现
你 发表评论:
欢迎- 最近发表
-
- 比GoPro 13更强的大疆Action 5 Pro,到底强在哪里?
- 信号和槽(信号和槽的实现原理)
- 在响应式项目中连接设计与开发(请简述实现响应式设计包括哪些技术点)
- 【C#】委托、Action、Func 和 Event 之间的关系
- 如何使用JavaScript实现Prompt弹窗?
- 谷歌Magic Actions功能曝光:AI革新安卓16通知交互
- 基于目标TPS的性能测试,如何通过手动设置场景进行测试?
- IOS基础学习之输出口和动作(io口输入输出实验总结及体会)
- 《Java语言程序设计》期末考试模拟试题——判断题和问答题
- Android学习之Touch事件的处理(android触摸事件实例)
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)