专业的编程技术博客社区

网站首页 > 博客文章 正文

RocketMQ 中MessageExt 详解和开发实战

baijin 2024-08-29 12:11:51 博客文章 3 ℃ 0 评论

消费消息时,除了获取消息Payload外,还想获取RocketMQ消息的其它系统属性,需要怎么做?

欢迎关注《Apache RocketMQ 深入浅出》系列文章,架构师将循序渐进地讲解Apache RocketMQ的开发实践。

1. Apache RocketMQ 入门介绍和整体架构图

2. 介绍新版RocketMQ v4.9.3 下载、安装、配置的完成过程

3. 启动和停止RocketMQ服务进程、测试消息的发送和消费

4. Spring Boot集成RocketMQ :生产者和消费者开发入门实践

5. RocketMQ 可视化管理界面Dashboard的搭建和使用

6. 了解Apache RocketMQ 中的消息类型和消费模式

7. Spring Boot 集成RocketMQ:使用rocketmq-spring-boot-starter 生产和消费消息

8. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送同步、异步和单向消息

9. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送顺序消息

10. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送延时消息

11. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送事务消息(1)

12. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送事务消息(2)

13. RocketMQ 消费模式:集群消费模式和广播消费模式

14. RocketMQ消费端Push和Pull两种消费方式:拉模式开发示例

15. RocketMQ消费端Push和Pull两种消费方式:推模式开发示例

16. RocketMQ中高级特性-消息过滤和标签Tag开发实战(1)

17. RocketMQ中高级特性-消息过滤和标签Tag开发实战(2)

18. RocketMQ 中MessageExt 详解和开发实战


一、MessageExt 开发实战

消费者在实现RocketMQListener接口时,只需要使用泛型为MessageExt即可,这样在onMessage方法将接收到RocketMQ原生的MessageExt消息。


@Slf4j
@Component
@RocketMQMessageListener(
 consumerGroup = AppConstant.MSG_EXT_CG,
 topic = AppConstant.TAG_TOPIC,
 enableMsgTrace = true,
 selectorExpression = "*"
)
public class MessageExtConsumer implements RocketMQListener<MessageExt> {
 @SneakyThrows
 @Override
 public void onMessage(MessageExt message) {
 // 获取Message Body,转换为字符串
 String msgBody = new String(message.getBody(),"utf-8");
 // JSON 转换成POJO
 Book book = JSONObject.parseObject(msgBody, Book.class);
 log.info("Tag: {}, Book: {}", message.getTags(), book);
 }
}


body:消息的内容,这是一个字节数组。


发送的消息内容体是如何被序列化与反序列化的?

RocketMQ的消息体都是以byte[]方式存储。当业务系统的消息内容体如果是java.lang.String类型时,统一按照utf-8编码转成byte[];如果业务系统的消息内容为非java.lang.String类型,则采用jackson-databind序列化成JSON格式的字符串之后,再统一按照utf-8编码转成byte[]。


二、MessageExt 源码分析

MessageExt 继承Message类,如图所示:


MessageExt 代码如下所示,省略部分get/set代码:

public class MessageExt extends Message {
 private static final long serialVersionUID = 5720810158625748049L;
 //记录MessageQueue编号,消息在Topic下对应的MessageQueue中被拉取
 private int queueId;
 //记录消息在Broker存盘大小
 private int storeSize;
 //记录在ConsumeQueue中的偏移
 private long queueOffset;
 //记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
 private int sysFlag;
 //消息创建时间,在Producer发送消息时设置
 private long bornTimestamp;
 //记录发送该消息的producer地址
 private SocketAddress bornHost;
 //消息存储时间
 private long storeTimestamp;
 //记录存储该消息的Broker地址
 private SocketAddress storeHost;
 //消息Id
 private String msgId;
 //记录消息在Broker中存储偏移
 private long commitLogOffset;
 //消息内容CRC校验值
 private int bodyCRC;
 //消息重试消费次数
 private int reconsumeTimes;
 //记录消息在半事务消息中存储偏移
 private long preparedTransactionOffset;
 @Override
 public String toString() {
 return "MessageExt [brokerName=" + brokerName + ", queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
 + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
 + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
 + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
 + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
 + ", toString()=" + super.toString() + "]";
 }
}


Message 类代码如下所示,省略部分get/set 代码:

public class Message implements Serializable {
 private static final long serialVersionUID = 8445773977080406428L;
 //主题
 private String topic;
 //网络通信层标记
 private int flag;
 /**
 * MIN_OFFSET:最小偏移
 * MAX_OFFSET:最大偏移
 * CONSUME_START_TIME:消费拉取时间
 * UNIQ_KEY:
 * CLUSTER:集群
 * WAIT:
 * TAGS:消息标签
 * DELAY:延时级别
 **/
 private Map<String, String> properties;
 //Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
 private byte[] body;
 //事务消息相关的事务编号
 private String transactionId;
 //得到延时级别
 public int getDelayTimeLevel() {
 String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
 if (t != null) {
 return Integer.parseInt(t);
 }
 return 0;
 }
 //设置延时级别
 public void setDelayTimeLevel(int level) {
 this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
 }
 @Override
 public String toString() {
 return "Message{" +
 "topic='" + topic + '\'' +
 ", flag=" + flag +
 ", properties=" + properties +
 ", body=" + Arrays.toString(body) +
 ", transactionId='" + transactionId + '\'' +
 '}';
 }
}

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表