专业的编程技术博客社区

网站首页 > 博客文章 正文

应如何在 Spring Boot 中使用 RocketMQ 实现批量消息消费?

baijin 2024-12-12 11:01:05 博客文章 9 ℃ 0 评论

故事起因

最近,发现我们的项目中,越来越多需要异步操作的场景,之前都是一直开线程去异步执行的,最近发现会影响整个线上服务器的性能,造成服务器宕机假死[流泪]!所以和我们 Leader 沟通了下,准备把这些异步操作的场景,全部用 MQ 剥离出去。用单独一台服务器去运行这些业务!

接下来就介绍如何在 Spring Boot 中利用 RocketMQ 实现批量消息消费处理,适合那些想提高消息处理效率,并且不会压块服务性能的的小伙伴们。

1. 添加依赖[玫瑰]

先来搞定依赖,打开你的 pom.xml 文件,加入以下内容:

<!-- RocketMQ Spring Boot依赖,用于Spring Boot 3 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<!-- 兼容 RocketMQ 集群版本 5.3.0 的依赖 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.3.0</version>
</dependency>

2. 配置文件 bootstrap.yaml [玫瑰]

bootstrap.yaml 文件中加入 RocketMQ 的配置:

rocketmq:
  name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # NameServer地址
  consumer:
    group: consume-group-test
    access-key: access # 使用ACL时配置
    secret-key: secret
    consume-message-batch-max-size: 50  # 每批次消费的最大消息数
    pull-batch-size: 100  # 每次从Broker拉取的最大消息数
  topics:
    project: "group-topic-1"
  groups:
    project: "consume-group-1"  # 不同业务可以使用不同的组

3. 配置类 MqConfigProperties [玫瑰]

接下来,我们新建一个配置类 MqConfigProperties,用来管理 RocketMQ 的配置信息。

import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;

import java.io.Serializable;

@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {

    private static final long serialVersionUID = 1L;

    @Autowired
    private RocketMQProperties rocketMQProperties;

    private TopicProperties topics;
    private GroupProperties groups;

    @Data
    public static class TopicProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }

    @Data
    public static class GroupProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }
}

4. 实现消费者类 [玫瑰]

然后我们来写消费者类 UserConsumer,用于批量接收并处理消息。

import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Resource;
import java.util.List;

@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {

    @Resource
    private MqConfigProperties mqConfigProperties;

    @Resource
    private ApplicationContext applicationContext;

    private volatile boolean running;
    private DefaultMQPushConsumer consumer;

    @Override
    public void start() {
        if (isRunning()) {
            throw new IllegalStateException("消费者已经在运行");
        }
        initConsumer();
        setRunning(true);
        log.info("UserConsumer 启动成功!");
    }

    @Override
    public void stop() {
        if (isRunning() && consumer != null) {
            consumer.shutdown();
            setRunning(false);
            log.info("UserConsumer 停止运行。");
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    private void initConsumer() {
        String topic = mqConfigProperties.getTopics().getProject();
        String group = mqConfigProperties.getGroups().getProject();
        String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
        String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
        String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();

        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
        consumer = rpcHook != null
                ? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
                : new DefaultMQPushConsumer(group);

        consumer.setNamesrvAddr(nameServer);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeMessageBatchMaxSize(100);  // 设置每次消费的最大消息数
        consumer.subscribe(topic, "*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                log.info("收到 {} 条消息", msgs.size());
                for (MessageExt message : msgs) {
                    String body = new String(message.getBody());
                    log.info("处理消息: {}", body);
                    User user = JSONObject.parseObject(body, User.class);
                    processUser(user);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        log.info("UserConsumer 已初始化,监听主题 [{}],消费组 [{}]。", topic, group);
    }

    private void processUser(User user) {
        log.info("处理用户 ID: {}", user.getId());
        // 在这里处理与用户相关的业务逻辑
    }
}

5. 生产者示例代码 [玫瑰]

要批量发送消息,我们还需要创建一个 UserProducer 类来批量生成消息。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;

public class UserProducer {

    private DefaultMQProducer producer;

    public void sendBatchMessages(List<User> users, String topic) {
        List<Message> messages = new ArrayList<>();
        for (User user : users) {
            messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
        }
        try {
            producer.send(messages);
        } catch (Exception e) {
            log.error("发送批量消息时出错", e);
        }
    }
}

6. 优化建议 [玫瑰]

  • 性能优化:可以调整消费者线程池大小,默认是 consumeThreadMin=20consumeThreadMax=20。在高并发场景下,增加线程池大小能显著提升性能。
  • 错误处理:避免无限重试。使用 RECONSUME_LATER 时要谨慎,以免导致死循环。
  • 租户隔离:不同的业务模块最好用不同的消费组,确保数据不会误用。

以上就是如何在 SpringBoot 中用 RocketMQ 实现批量消息消费的基本步骤,小伙们可以跟着一步一步实操起来呀!

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表