消费消息时,除了获取消息Payload外,还想获取RocketMQ消息的其它系统属性,需要怎么做?
欢迎关注《Apache RocketMQ 深入浅出》系列文章,架构师将循序渐进地讲解Apache RocketMQ的开发实践。
1. Apache RocketMQ 入门介绍和整体架构图
2. 介绍新版RocketMQ v4.9.3 下载、安装、配置的完成过程
3. 启动和停止RocketMQ服务进程、测试消息的发送和消费
4. Spring Boot集成RocketMQ :生产者和消费者开发入门实践
5. RocketMQ 可视化管理界面Dashboard的搭建和使用
6. 了解Apache RocketMQ 中的消息类型和消费模式
7. Spring Boot 集成RocketMQ:使用rocketmq-spring-boot-starter 生产和消费消息
8. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送同步、异步和单向消息
9. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送顺序消息
10. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送延时消息
11. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送事务消息(1)
12. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送事务消息(2)
13. RocketMQ 消费模式:集群消费模式和广播消费模式
14. RocketMQ消费端Push和Pull两种消费方式:拉模式开发示例
15. RocketMQ消费端Push和Pull两种消费方式:推模式开发示例
16. RocketMQ中高级特性-消息过滤和标签Tag开发实战(1)
17. RocketMQ中高级特性-消息过滤和标签Tag开发实战(2)
18. RocketMQ 中MessageExt 详解和开发实战
一、MessageExt 开发实战
消费者在实现RocketMQListener接口时,只需要使用泛型为MessageExt即可,这样在onMessage方法将接收到RocketMQ原生的MessageExt消息。
@Slf4j
@Component
@RocketMQMessageListener(
consumerGroup = AppConstant.MSG_EXT_CG,
topic = AppConstant.TAG_TOPIC,
enableMsgTrace = true,
selectorExpression = "*"
)
public class MessageExtConsumer implements RocketMQListener<MessageExt> {
@SneakyThrows
@Override
public void onMessage(MessageExt message) {
// 获取Message Body,转换为字符串
String msgBody = new String(message.getBody(),"utf-8");
// JSON 转换成POJO
Book book = JSONObject.parseObject(msgBody, Book.class);
log.info("Tag: {}, Book: {}", message.getTags(), book);
}
}
body:消息的内容,这是一个字节数组。
发送的消息内容体是如何被序列化与反序列化的?
RocketMQ的消息体都是以byte[]方式存储。当业务系统的消息内容体如果是java.lang.String类型时,统一按照utf-8编码转成byte[];如果业务系统的消息内容为非java.lang.String类型,则采用jackson-databind序列化成JSON格式的字符串之后,再统一按照utf-8编码转成byte[]。
二、MessageExt 源码分析
MessageExt 继承Message类,如图所示:
MessageExt 代码如下所示,省略部分get/set代码:
public class MessageExt extends Message {
private static final long serialVersionUID = 5720810158625748049L;
//记录MessageQueue编号,消息在Topic下对应的MessageQueue中被拉取
private int queueId;
//记录消息在Broker存盘大小
private int storeSize;
//记录在ConsumeQueue中的偏移
private long queueOffset;
//记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
private int sysFlag;
//消息创建时间,在Producer发送消息时设置
private long bornTimestamp;
//记录发送该消息的producer地址
private SocketAddress bornHost;
//消息存储时间
private long storeTimestamp;
//记录存储该消息的Broker地址
private SocketAddress storeHost;
//消息Id
private String msgId;
//记录消息在Broker中存储偏移
private long commitLogOffset;
//消息内容CRC校验值
private int bodyCRC;
//消息重试消费次数
private int reconsumeTimes;
//记录消息在半事务消息中存储偏移
private long preparedTransactionOffset;
@Override
public String toString() {
return "MessageExt [brokerName=" + brokerName + ", queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
+ ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
+ ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
+ ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
+ reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
+ ", toString()=" + super.toString() + "]";
}
}
Message 类代码如下所示,省略部分get/set 代码:
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
//主题
private String topic;
//网络通信层标记
private int flag;
/**
* MIN_OFFSET:最小偏移
* MAX_OFFSET:最大偏移
* CONSUME_START_TIME:消费拉取时间
* UNIQ_KEY:
* CLUSTER:集群
* WAIT:
* TAGS:消息标签
* DELAY:延时级别
**/
private Map<String, String> properties;
//Producer发送的实际消息内容,以字节数组(ASCII码)形式进行存储。Message消息有一定大小限制。
private byte[] body;
//事务消息相关的事务编号
private String transactionId;
//得到延时级别
public int getDelayTimeLevel() {
String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
if (t != null) {
return Integer.parseInt(t);
}
return 0;
}
//设置延时级别
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
@Override
public String toString() {
return "Message{" +
"topic='" + topic + '\'' +
", flag=" + flag +
", properties=" + properties +
", body=" + Arrays.toString(body) +
", transactionId='" + transactionId + '\'' +
'}';
}
}
本文暂时没有评论,来添加一个吧(●'◡'●)