文章目录
- 环境准备:RabbitAdmin:SpringAMQP 声明:RabbitTemplate: 消息模板SimpleMessageListenerContainer : 消息监听容器MessageListenerAdapter 消息监听适配器:MessageConverter 消息转换器,序列化,反序列化:TextMessageConverter 文本消息转换器JSON格式转换器 Jackson2JsonMessageConverterjava对象映射 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter支持java对象多映射转换 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter全局转换器 ContentTypeDelegatingMessageConverter
RabbitMQ和SpringBoot整合使用的组件列表:
RabbitAdmin:
RabbitAdmin 可以用来声明exchange,queue,binding。发送消息等操作
SpringAMQP 声明
通过@Bean注解方式声明交换机和队列。
RabbitTemplate 消息模板
我们在与springAMQP整合的时候进行发送消息的关键类
SimpleMessageListenerContainer
消息监听容器
MessageListenerAdapter
消息监听适配器
MessageConverter
转换器,序列化,反序列化
环境准备:
- 新建一个SpringBoot项目
- pom文件中添加依赖
<!-- junit测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- mq依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</depedency>
- 创建配置类, 用来RabbitMQ加载配置。
package com.example.demo;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan("com.example.demo.*")
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
}
RabbitAdmin:
rabbitAdmin类可以很好地操作RabbitMQ, 在Spring中直接注入即可。
// 在RabbitMQConfig类中加入如下代码
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//AutoStartup 必须设置为true, 否则spring容器不会加载
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
RabbitAdmin 底层实现就是从Sping容器中获取Exchange、Bingding、RoutingKey、以及Queue的@Bean声明。
然后使用RabbitTemplate的execute方法执行对应的声明,修改,删除等一系列RabbitMQ基础功能操作。
添加测试类:RabbitAdminTest
package com.example.demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitAdminTest {
@Test
public void contextLoads() {
}
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testAdmin() throws Exception{
//声明交换机
rabbitAdmin.declareExchange(new DirectExchange("wx.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("wx.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("wx.fanout", false, false));
//声明队列
rabbitAdmin.declareQueue(new Queue("wx.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("wx.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("wx.fanout.queue", false));
//bingding 交换机和队列, new HashMap<>()为其他arguments
rabbitAdmin.declareBinding(new Binding("wx.direct.queue",
Binding.DestinationType.QUEUE,
"wx.direct", //exchange
"direct", //routingKey
new HashMap<>()));
//bingding 交换机和队列的另一种方式
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("wx.topic.queue", false)) //直接创建队列
.to(new TopicExchange("wx.topic", false, false)) //建立关联关系(没有该交换机会报错)
.with("wx.#")); //routingKey
//bingding fanout类型交换机不需要 routingKey
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("wx.fanout.queue", false))
.to(new FanoutExchange("wx.fanout", false, false)));
//清空队列
rabbitAdmin.purgeQueue("wx.topic.queue", false);
}
}
在RabbitMQ控制台 http://localhost:15672/ 可以看到对应的exchange和queue,以及绑定关系。
SpringAMQP 声明:
在RabbitMQ基础API里面声明一个Exchange, 声明一个绑定,一个队列。
通过注解去声明,跟用RabbitMQ写法类似。
新建一个 RabbitMQBinding 配置类。 类上加注解 @Configuration。
package com.example.demo;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQBinding {
/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
FanoutExchange: 将消息分发到所有绑定队列,无routingkey的概念
HeaderExchange: 通过添加属性key-value匹配
DirectExchange: 按照routingkey分发到指定队列
TopicExchange: 多关键字匹配
*/
@Bean
public TopicExchange topic001(){
return new TopicExchange("wx.topic001", false, false);
}
/**
* 声明队列
*/
@Bean
public Queue queue001(){
return new Queue("wx.topic.queue001", true);
}
@Bean
public Queue queue002(){
return new Queue("wx.topic.queue002", true);
}
@Bean
public Queue queue003(){
return new Queue("wx.topic.queue003", true);
}
/**
* 建立绑定关系
*/
@Bean
public Binding binding001(){
return BindingBuilder.bind(queue001()).to(topic001()).with("rabbit.*");
}
@Bean
public Binding binding002(){
return BindingBuilder.bind(queue002()).to(topic001()).with("mq.*");
}
}
在RabbitMQ控制台 http://localhost:15672/ 可以看到对应的exchange和queue,以及绑定关系。
RabbitTemplate: 消息模板
我们在与springAMQP整合的时候进行发送消息的关键类
该类丰富了发送消息方法,包括可靠性投递消息方法,回调监听消息接口 ConfirmCallback, 返回值确认接口ReturnCallback等等。同样我们需要进行注入到Spring容器中,然后直接使用。
在 RabbitMQBinding 类中增加如下代码
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
增加 RabbitTemplateTest 类:
package com.example.demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTemplateTest {
@Test
public void contextLoads() {
}
/**
* rabbitTemplate.convertAndSend 发送消息
* 发送object格式的消息时, 接收端使用string接收, 发送message格式时,用byte[]接收
*/
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage1() throws Exception{
//直接发送一个object
//rabbitTemplate.convertAndSend("wx.topic001", "mq.info", "hello mq");
rabbitTemplate.convertAndSend("wx.topic001", "rabbit.info", "hello rabbit");
rabbitTemplate.convertAndSend("wx.topic001", "mq.info", "hello mq");
}
@Test
public void testSendMessage2() throws Exception{
//通过message发送消息
//消息属性, 可以使用已经存在的属性比如setContentType, 也可以自定义属性
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
//消息内容
Message message = new Message("hello rabbitMQ - text/plain".getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.convertAndSend("wx.topic001", "rabbit.info", message);
}
@Test
public void testSendMessage3() throws Exception{
//发送message, 并在发送时修改和增加内容
//消息属性, 可以使用已经存在的属性比如setContentType, 也可以自定义属性
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "自定义信息描述");
messageProperties.getHeaders().put("type", "自定义消息类型");
//消息内容
Message message = new Message("hello rabbitMQ".getBytes(StandardCharsets.UTF_8), messageProperties);
//通过 MessagePostProcessor 增加额外的属性
rabbitTemplate.convertAndSend("wx.topic001", "rabbit.info", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("添加额外的设置");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外增加的属性");
return message;
}
});
}
}
在RabbitMQ控制台 http://localhost:15672/ 可以看到对应的queue下的消息。
SimpleMessageListenerContainer : 消息监听容器
这个类非常强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
- 监听队列(多个队列),自动启动,自动声明功能
- 设置事务特性,事务管理器,事务属性,事务容量(并发),是否开启事务,回滚消息
- 设置消费者数量,最小最大数量,批量消费
- 设置消息确认和自动确认模式,是否重回队列,异常捕获handler函数。
- 设置消费者标签生成策略,是否独占模式,消费者属性等
- 设置具体的监听器,消息转换器等等。
simpleMessageListenerContainer可以进行动态设置,比如在运行过程中的应用可以动态的修改其消费者数量的大小,接收消息的模式等。
很多基于RabbitMQ的自定制的一些后端管控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出SpringAMQP非常的强大。
在 RabbitMQBinding 类中增加如下代码:
/**
* 声明一个 SimpleMessageListenerContainer 用于监听queue, 并处理消息
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer1(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//设置监听队列
container.setQueues(queue001(), queue002(), queue003());
//设置消费者数量
container.setConcurrentConsumers(1);
//设置最大消费者数量
container.setMaxConcurrentConsumers(2);
//是否重回队列
container.setDefaultRequeueRejected(false);
//签收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消费端的标签策略, 用于区分不同的消费者
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//处理消息
String msg = new String(message.getBody());
System.out.println("----- consumer : " + msg);
}
});
return container;
}
– 消费者增加标签后的效果
MessageListenerAdapter 消息监听适配器:
也可以通过适配器的方式处理消息,上边代码改成如下:
/**
* 声明一个 SimpleMessageListenerContainer 用于监听queue, 并处理消息
* 通过适配器的方式处理消息。 处理消息的模式方法为是 handleMessage, 可以进行修改
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//设置监听队列
container.setQueues(queue001(), queue002(), queue003());
//设置消费者数量
container.setConcurrentConsumers(1);
//设置最大消费者数量
container.setMaxConcurrentConsumers(1);
//是否重回队列
container.setDefaultRequeueRejected(false);
//签收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消费端的标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//适配器方式
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
//修改执行的方法名, 默认方法名为 handleMessage
messageListenerAdapter.setDefaultListenerMethod("consumeMessage");
container.setMessageListener(messageListenerAdapter);
return container;
}
增加一个 MessageDelegate 类:
package com.example.adapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageDelegate {
@Autowired
private MessageDelegate messageDelegate;
public void handleMessage(byte[] messageBody){
System.err.println("handleMessage : " + new String(messageBody));
}
public void handleMessage(String messageBody){
System.err.println("handleMessage : " + messageBody);
}
public void consumeMessage(String messageBody){
System.err.println("consumeMessage string : " + messageBody);
}
public void consumeMessage(byte[] messageBody){
System.err.println("consumeMessage byte[] : ");
consumeMessage(new String(messageBody));
}
public void method1(String messageBody){
System.err.println("method1 : " + messageBody);
}
public void method2(String messageBody){
System.err.println("method2 : " + messageBody);
}
}
MessageConverter 消息转换器,序列化,反序列化:
TextMessageConverter 文本消息转换器
rabbitTemplate.convertAndSend 发送消息时,有一个问题
- 发送object格式的消息时, 接收端使用string接收
- 发送message格式时,用byte[]接收
这样我们每个处理消息的方法都需要重载,因为不知道具体应该用什么格式接收消息。
这时可以使用 MessageConverter 来处理消息。
上边方法改成:
/**
* 声明一个 SimpleMessageListenerContainer 用于监听queue, 并处理消息
* 通过适配器的方式处理消息
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//设置监听队列
container.setQueues(queue001(), queue002(), queue003());
//设置消费者数量
container.setConcurrentConsumers(1);
//设置最大消费者数量
container.setMaxConcurrentConsumers(1);
//是否重回队列
container.setDefaultRequeueRejected(false);
//签收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消费端的标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// //1,适配器方式1
// MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
// //修改执行的方法名, 默认方法名为 handleMessage
// messageListenerAdapter.setDefaultListenerMethod("consumeMessage");
// //指定一个转换器, 将message的字节数组转换为字符串,转换器可以不加(用方法重载解决)
// messageListenerAdapter.setMessageConverter(new TextMessageConverter());
// container.setMessageListener(messageListenerAdapter);
//2, 适配器方式2, 不同的队列可以使用不同的方法处理数据
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("wx.topic.queue001", "method1");
queueOrTagToMethodName.put("wx.topic.queue002", "method2");
//指定一个转换器, 将message的字节数组转换为字符串,转换器可以不加(用方法重载解决)
messageListenerAdapter.setMessageConverter(new TextMessageConverter());
messageListenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(messageListenerAdapter);
return container;
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344
增加一个 TextMessageConverter 转换器类:
自定义转换器:实现MessageConverter 接口, 并且需要重写下面两个方法。
- toMessage:java对象转换为Message
- fromMessage:Message对象转换为java对象。
package com.example.convert;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
return new Message(o.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
// String contentType = message.getMessageProperties().getContentType();
// if(null != contentType && contentType.contains("text")){
// return new String(message.getBody());
// }
Object body = message.getBody();
if(body == null) return body;
/**
* rabbitTemplate.convertAndSend 发送消息时
* 发送object格式的消息时, 接收端使用string接收
* 发送message格式时,用byte[]接收
* 所以在接收到消息时转换一下格式, (判断是否是byte[], 也可以使用其他的属性字段判断, 如上通过contentType判断)
**/
return body instanceof byte[] ? (new String((byte[]) body)) : body;
}
}
JSON格式转换器 Jackson2JsonMessageConverter
修改 messageListenerContainer 代码如下:
/**
* 声明一个 SimpleMessageListenerContainer 用于监听queue, 并处理消息
* 通过适配器的方式处理消息
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//设置监听队列
container.setQueues(queue001(), queue002(), queue003(), queueImage(), queuePdf());
//设置消费者数量
container.setConcurrentConsumers(1);
//设置最大消费者数量
container.setMaxConcurrentConsumers(1);
//是否重回队列
container.setDefaultRequeueRejected(false);
//签收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消费端的标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//1.1 适配器方式, json格式转换器 Jackson2JsonMessageConverter
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
//修改执行的方法名, 默认方法名为 handleMessage
messageListenerAdapter.setDefaultListenerMethod("consumeMapMessage");
//JSON格式转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
messageListenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(messageListenerAdapter);
return container;
}
在 MessageDelegate 类中增加方法:
public void consumeMapMessage(Map messageBody){
System.err.println("consumeMapMessage map : " + messageBody);
}
增加junit测试方法:
@Test
public void testSendJsonMessage() throws Exception{
//实体类
User user = new User("001", "张三", "130000000000X", "宇宙中心");
String json = new ObjectMapper().writeValueAsString(user);
System.err.println("user json : " + json);
MessageProperties messageProperties = new MessageProperties();
//必须修改ContentType 为application/json
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("wx.topic001", "rabbit.info", message);
}
User 实体类代码:
public class User {
private String id;
private String name;
private String IDCard;
private String address;
public User() {}
public User(String id, String name, String IDCard, String address) {
this.id = id;
this.name = name;
this.IDCard = IDCard;
this.address = address;
}
public String toString(){
return "id = " + id + ", name = " + name + ", IDCard = " + IDCard + ", address = " + address;
}
//getter() and setter()
}
java对象映射 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
修改 messageListenerContainer 代码如下:
/**
* 声明一个 SimpleMessageListenerContainer 用于监听queue, 并处理消息
* 通过适配器的方式处理消息
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//设置监听队列
container.setQueues(queue001(), queue002(), queue003(), queueImage(), queuePdf());
//设置消费者数量
container.setConcurrentConsumers(1);
//设置最大消费者数量
container.setMaxConcurrentConsumers(1);
//是否重回队列
container.setDefaultRequeueRejected(false);
//签收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消费端的标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//1.2 适配器方式, json格式 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter java对象转换
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
//修改执行的方法名, 默认方法名为 handleMessage
messageListenerAdapter.setDefaultListenerMethod("consumeMappingMessage");
//JSON格式转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
//java对象转换
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//springBoot2.0新特性, RabbitMQ信任package
javaTypeMapper.setTrustedPackages("*");
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
messageListenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(messageListenerAdapter);
return container;
}
在 MessageDelegate 类中增加消费者方法(接收User):
public void consumeMappingMessage(User user){
System.err.println("user info : " + user.toString());
}
增加测试方法
@Test
public void testSendUser() throws Exception{
User user = new User("001", "张三", "130000000000X", "宇宙中心");
String json = new ObjectMapper().writeValueAsString(user);
MessageProperties messageProperties = new MessageProperties();
//修改ContentType 为application/json
messageProperties.setContentType("application/json");
//指定解析的实体类的类型
messageProperties.getHeaders().put("__TypeId__", "com.example.entity.User");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("wx.topic001", "rabbit.info", message);
}
支持java对象多映射转换 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
修改 messageListenerContainer 代码如下:
/**
* 声明一个 SimpleMessageListenerContainer 用于监听queue, 并处理消息
* 通过适配器的方式处理消息
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//设置监听队列
container.setQueues(queue001(), queue002(), queue003(), queueImage(), queuePdf());
//设置消费者数量
container.setConcurrentConsumers(1);
//设置最大消费者数量
container.setMaxConcurrentConsumers(1);
//是否重回队列
container.setDefaultRequeueRejected(false);
//签收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消费端的标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//支持java对象多映射转换 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
//修改执行的方法名, 默认方法名为 handleMessage
messageListenerAdapter.setDefaultListenerMethod("consumeMappingMessage");
//JSON格式转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
//java对象转换
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//springBoot2.0新特性, RabbitMQ信任package
javaTypeMapper.setTrustedPackages("*");
//支持java对象多映射转换
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("user", User.class);
idClassMapping.put("school", School.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
//设置转换器
messageListenerAdapter.setMessageConverter(jackson2JsonMessageConverter);
//设置适配器
container.setMessageListener(messageListenerAdapter);
return container;
}
}
全局转换器 ContentTypeDelegatingMessageConverter
修改 messageListenerContainer 代码如下:
/**
* 声明一个 SimpleMessageListenerContainer 用于监听queue, 并处理消息
* 通过适配器的方式处理消息
*/
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//设置监听队列
container.setQueues(queue001(), queue002(), queue003(), queueFile());
//设置消费者数量
container.setConcurrentConsumers(1);
//设置最大消费者数量
container.setMaxConcurrentConsumers(1);
//是否重回队列
container.setDefaultRequeueRejected(false);
//签收策略
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消费端的标签策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//ext 全局转换器
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
//修改执行的方法名, 默认方法名为 handleMessage
messageListenerAdapter.setDefaultListenerMethod("consumeMappingMessage");
//全局转换器: 可以支持放入很多的小的Converter
ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
//文本转换器
TextMessageConverter textConvert = new TextMessageConverter();
converter.addDelegate("text", textConvert);
converter.addDelegate("html/text", textConvert);
converter.addDelegate("xml/text", textConvert);
converter.addDelegate("text/plain", textConvert);
//json转换器
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
converter.addDelegate("json", jsonConvert);
converter.addDelegate("application/json", jsonConvert);
//文件流转换器, 也可以根据不同的文件类型(根据contentType判断), 用不用的转换器去做处理
FileMessageConverter fileMessageConverter = new FileMessageConverter();
converter.addDelegate("file", fileMessageConverter);
//设置转换器
messageListenerAdapter.setMessageConverter(converter);
//设置适配器
container.setMessageListener(messageListenerAdapter);
return container;
}
增加文件转化器类 FileMessageConverter :
package com.example.convert;
import org.springframework.amqp.support.converter.MessageConverter;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class FileMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------File MessageConverter----------");
//获取文件扩展名
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ? "png" : _extName.toString();
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "/Users/wx/Desktop/test/" + fileName + "." + extName;
File file = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), file.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return file;
}
}
增加消费者方法:
public void consumeMappingMessage(File file){
System.out.println("file : " + file.getPath());
}
增加测试方法:
@Test
public void testSendExtConvertMessage() throws Exception{
byte[] body = Files.readAllBytes(Paths.get("/Users/wx/Desktop/", "aaa.png"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("file");
messageProperties.getHeaders().put("extName", "png");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "file_queue", message);
}
新增声明 file_queue:
@Bean
public Queue queueFile(){
return new Queue("file_queue", true);
}
本文暂时没有评论,来添加一个吧(●'◡'●)