网站首页 > 博客文章 正文
故事起因
最近,发现我们的项目中,越来越多需要异步操作的场景,之前都是一直开线程去异步执行的,最近发现会影响整个线上服务器的性能,造成服务器宕机假死[流泪]!所以和我们 Leader 沟通了下,准备把这些异步操作的场景,全部用 MQ 剥离出去。用单独一台服务器去运行这些业务!
接下来就介绍如何在 Spring Boot 中利用 RocketMQ 实现批量消息消费处理,适合那些想提高消息处理效率,并且不会压块服务性能的的小伙伴们。
1. 添加依赖[玫瑰]
先来搞定依赖,打开你的 pom.xml 文件,加入以下内容:
<!-- RocketMQ Spring Boot依赖,用于Spring Boot 3 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 兼容 RocketMQ 集群版本 5.3.0 的依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
2. 配置文件 bootstrap.yaml [玫瑰]
在 bootstrap.yaml 文件中加入 RocketMQ 的配置:
rocketmq:
name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # NameServer地址
consumer:
group: consume-group-test
access-key: access # 使用ACL时配置
secret-key: secret
consume-message-batch-max-size: 50 # 每批次消费的最大消息数
pull-batch-size: 100 # 每次从Broker拉取的最大消息数
topics:
project: "group-topic-1"
groups:
project: "consume-group-1" # 不同业务可以使用不同的组
3. 配置类 MqConfigProperties [玫瑰]
接下来,我们新建一个配置类 MqConfigProperties,用来管理 RocketMQ 的配置信息。
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;
import java.io.Serializable;
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {
private static final long serialVersionUID = 1L;
@Autowired
private RocketMQProperties rocketMQProperties;
private TopicProperties topics;
private GroupProperties groups;
@Data
public static class TopicProperties implements Serializable {
private static final long serialVersionUID = 1L;
private String project;
}
@Data
public static class GroupProperties implements Serializable {
private static final long serialVersionUID = 1L;
private String project;
}
}
4. 实现消费者类 [玫瑰]
然后我们来写消费者类 UserConsumer,用于批量接收并处理消息。
import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import java.util.List;
@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {
@Resource
private MqConfigProperties mqConfigProperties;
@Resource
private ApplicationContext applicationContext;
private volatile boolean running;
private DefaultMQPushConsumer consumer;
@Override
public void start() {
if (isRunning()) {
throw new IllegalStateException("消费者已经在运行");
}
initConsumer();
setRunning(true);
log.info("UserConsumer 启动成功!");
}
@Override
public void stop() {
if (isRunning() && consumer != null) {
consumer.shutdown();
setRunning(false);
log.info("UserConsumer 停止运行。");
}
}
@Override
public boolean isRunning() {
return running;
}
private void setRunning(boolean running) {
this.running = running;
}
private void initConsumer() {
String topic = mqConfigProperties.getTopics().getProject();
String group = mqConfigProperties.getGroups().getProject();
String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();
RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
consumer = rpcHook != null
? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
: new DefaultMQPushConsumer(group);
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(100); // 设置每次消费的最大消息数
consumer.subscribe(topic, "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
log.info("收到 {} 条消息", msgs.size());
for (MessageExt message : msgs) {
String body = new String(message.getBody());
log.info("处理消息: {}", body);
User user = JSONObject.parseObject(body, User.class);
processUser(user);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
log.info("UserConsumer 已初始化,监听主题 [{}],消费组 [{}]。", topic, group);
}
private void processUser(User user) {
log.info("处理用户 ID: {}", user.getId());
// 在这里处理与用户相关的业务逻辑
}
}
5. 生产者示例代码 [玫瑰]
要批量发送消息,我们还需要创建一个 UserProducer 类来批量生成消息。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
public class UserProducer {
private DefaultMQProducer producer;
public void sendBatchMessages(List<User> users, String topic) {
List<Message> messages = new ArrayList<>();
for (User user : users) {
messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
}
try {
producer.send(messages);
} catch (Exception e) {
log.error("发送批量消息时出错", e);
}
}
}
6. 优化建议 [玫瑰]
- 性能优化:可以调整消费者线程池大小,默认是 consumeThreadMin=20 和 consumeThreadMax=20。在高并发场景下,增加线程池大小能显著提升性能。
- 错误处理:避免无限重试。使用 RECONSUME_LATER 时要谨慎,以免导致死循环。
- 租户隔离:不同的业务模块最好用不同的消费组,确保数据不会误用。
以上就是如何在 SpringBoot 中用 RocketMQ 实现批量消息消费的基本步骤,小伙们可以跟着一步一步实操起来呀!
猜你喜欢
- 2024-12-12 RocketMQ同一个消费者唯一Topic多个tag踩坑经历
- 2024-12-12 RocketMQ——RocketMQ搭建及问题解决
- 2024-12-12 腾讯云微服务正式发布RocketMQ Serverless版本
- 2024-12-12 3分钟白话RocketMQ系列—— 核心概念
- 2024-12-12 RocketMQ如何避免未来再次发生积压
- 2024-12-12 rocketmq延迟消息实现原理(上)
- 2024-12-12 RocketMQ跨队列的顺序消费
- 2024-12-12 Kafka、RabbitMQ、RocketMQ、ActiveMQ 等多个分布式消息队列比较
- 2024-12-12 RocketMQ 5.0 多语言客户端的设计与实现
- 2024-12-12 从基础到进阶,一文详解RocketMQ事务消息,看完不会跪键盘
你 发表评论:
欢迎- 最近发表
-
- 比GoPro 13更强的大疆Action 5 Pro,到底强在哪里?
- 信号和槽(信号和槽的实现原理)
- 在响应式项目中连接设计与开发(请简述实现响应式设计包括哪些技术点)
- 【C#】委托、Action、Func 和 Event 之间的关系
- 如何使用JavaScript实现Prompt弹窗?
- 谷歌Magic Actions功能曝光:AI革新安卓16通知交互
- 基于目标TPS的性能测试,如何通过手动设置场景进行测试?
- IOS基础学习之输出口和动作(io口输入输出实验总结及体会)
- 《Java语言程序设计》期末考试模拟试题——判断题和问答题
- Android学习之Touch事件的处理(android触摸事件实例)
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)