专业的编程技术博客社区

网站首页 > 博客文章 正文

精通Spring Boot 3 : 10. 使用 Spring Boot 进行消息通信 (3)

baijin 2025-01-16 17:49:28 博客文章 11 ℃ 0 评论

启动用户应用程序

请继续运行用户应用程序。它将通过启动 docker-compose.yaml 文件中的服务来开始,您应该能看到事件的日志:

...
User ximena@email.com active status: true
User norma@email.com active status: false
User dummy@email.com active status: false
User dummy@email.com DELETED at 2023-10-12T15:43:10.798841
...

请记住,这一切发生是因为在 UserConfiguration 类中,我们声明了 CommandLineRunner init(UserService userService) bean,该声明将在应用程序准备就绪并且 UserService 具备事件发布者后执行,随后 UserLogs 类将记录相应的事件。

现在,让我们来看看 ActiveMQ Artemis 代理。请打开浏览器,访问 http://localhost:8161 以进入 ActiveMQ Artemis 管理控制台。请参见图 10-2。

如前所述,用户名和密码均为 artemis。输入后点击登录,展开导航面板,选择地址,然后点击队列标签,如图 10-3 所示。您将看到激活用户和已移除用户的队列,以及一些消息(在消息计数列中显示)。

在导航窗格中选择“已激活用户”队列,以查看此时队列中的消息。请参见图 10-4。

现在,您可以选择一条消息并查看其内容。图 10-5 展示了一个打开的消息示例。

消息采用 JSON 格式。请查看 Headers 和 Properties 部分。Properties 部分显示了我们的关键属性_type 及其值 com.apress.users.events.UserActivatedEvent。请记住,这个属性在使用监听器时,对于 MessageConverter 的反序列化(和序列化)提供提示是非常有用的。

接下来,在导航窗格中选择“已移除用户”队列,然后打开一条消息,以查看类似于图 10-6 所示的内容。

请注意图 10-6 中的事件消息使用 JSON 格式表示 LocalDateTime:yyyy-MM-dd HH:mm:ss。这在 UserRemovedEvent 中,以及@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")注解中。

使用 JMS 监听用户的来电事件

接下来,创建或打开 UserEventListeners 类。请参阅清单 10-11。

package com.apress.users.jms;
import com.apress.users.events.UserActivatedEvent;
import jakarta.jms.JMSException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class UserEventListeners {
    @JmsListener(destination = UserJmsConfig.DESTINATION_ACTIVATED)
           public void onActivatedUserEvent(UserActivatedEvent event){
            log.info("JMS User {}",event);
           }
          @JmsListener(destination = UserJmsConfig.DESTINATION_REMOVED)
          public void onRemovedUserEvent(Object event) throws JMSException {
            log.info("JMS User DELETED message: {} ", event);
          }
}

10-11 src/main/java/apress/com/users/jms/UserEventListeners.java

让我们来回顾一下 UserEventListeners 类

  • @Component:这个注解是 Spring 连接所有找到的监听器所必需的。
  • @JmsListener:这个注解需要指定目标名称(队列或主题);虽然它有更多参数,但这个已经足够了。
  • onActivatedUserEvent:此方法接受一个 UserActivatedEvent 作为参数,该参数将在接收到消息并从 JSON 反序列化为对象后传递。
  • onRemovedUserEvent:此方法接受一个对象(这是故意的),因为我想向您展示接收到的对象的类型。

重新启动用户应用程序

如果你再次运行用户应用程序,应用目前的所有新更改,你将看到以下输出:

...
JMS User UserActivatedEvent(email=ximena@email.com, active=true)
JMS User UserActivatedEvent(email=norma@email.com, active=false)
JMS User UserActivatedEvent(email=dummy@email.com, active=false)
JMS User DELETED message: ActiveMQMessage[ID:93288cbb-6937-11ee-8a2a-8e1616fb5830]:PERSISTENT/ClientMessageImpl[messageID=98, durable=true, address=removed-users,userID=93288cbb-6937-11ee-8a2a-8e1616fb5830,properties=TypedProperties[__AMQ_CID=92b75453-6937-11ee-8a2a-8e1616fb5830,_type=com.apress.users.events.UserRemovedEvent,_AMQ_ROUTING_TYPE=1]]
...

onRemovedUserEvent(Object) 返回一个 ActiveMQMessage(该类实现了 jakarta.jms.Message 接口)。请查看此消息,注意它包含了许多对其他业务逻辑有用的信息。你可能在想,我如何能使用实际的 UserRemovedEvent 对象而不是通用的对象,对吗?答案在下面。

如果你按照下面的代码片段更改签名:

 @JmsListener(destination = UserJmsConfig.DESTINATION_REMOVED)
    public void onRemovedUserEvent(UserRemovedEvent event) throws JMSException {
        log.info("JMS User DELETED message: {} ", event);
    }

你会遇到一个 MessageConversionException,提示无法反序列化 LocalDateTime 对象。虽然我们已经配置了 MessageConverter,但我们需要通过创建一个自定义的 MessageConverter 来指定如何进行反序列化。默认情况下,com.fasterxml.jackson.databind.ObjectMapper 使用其默认配置(序列化很简单,但反序列化就不容易),因此我们需要添加一种处理 LocalDateTime 类型的方法。

因此,在 UserJmsConfig 类中,请按照下面的代码片段替换 MessageConverter 的声明:

@Bean
    public MessageConverter messageConverter(){
        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTypeIdPropertyName("_type");
        converter.setTargetType(MessageType.TEXT);
        converter.setObjectMapper(mapper);
        return converter;
    }

就这样!现在你可以使用 onRemovedUserEvent(UserRemovedEvent event) 方法,所有功能都将正常运作。

请记住,通常情况下,JMS 监听器是一个异步消费消息并处理这些消息的应用程序。在这里,我们将生产者和消费者放在同一代码基础中。我们项目的一个用例是,当用户被创建或删除时,通知 My Retro App,并进行服务器推送以刷新网页。

所以,现在你已经了解了如何使用 ActiveMQ Artemis 的 JMS 点对点模型。接下来,我们将探讨如何使用 JMS 发布-订阅模型(请参见图 10-1)。

在我的复古应用中使用 JMS 主题

使用 JMS 与我的复古应用程序监听用户应用程序中发生的事件是一个很好的发布-订阅模型示例。用户要么被激活,要么未被激活;如果用户从系统中被移除,应用程序必须相应地发送事件。JMS 通过主题实现发布/订阅模型。换句话说,我们需要订阅这些事件中的任何一个。

您可以在 10-messaging-jms/myretro 文件夹中找到源代码。或者,如果您是从头开始使用 Spring Initializr (https://start.spring.io),请添加“用户应用程序与 JMS”部分中的相同依赖项。

按照清单 10-12 的示例打开或创建 build.gradle 文件。

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.2.3'
    id 'io.spring.dependency-management' version '1.1.4'
}
group = 'com.apress'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
repositories {
    mavenCentral()
}
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    implementation 'org.springframework.boot:spring-boot-starter-artemis'
    implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
    runtimeOnly 'com.h2database:h2'
    runtimeOnly 'org.postgresql:postgresql'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
tasks.named('test') {
    useJUnitPlatform()
}

10-12 build.gradle

列表 10-12 显示我们使用了 spring-boot-starter-artemis 和 jackson-datatype-jsr310 这两个依赖项。这里不需要使用 spring-boot-docker-compose,因为我们将连接到用户应用程序所使用的服务。

注意:这里使用的代码库(我的复古应用项目)是基于 spring-boot-starter-data-jpa 的(在这种情况下不使用反应式编程)。

接下来,打开或创建 UserEvent 类。请查看列表 10-13。

package com.apress.myretro.jms;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@AllArgsConstructor
@NoArgsConstructor
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class UserEvent {
    private String email;
    private String action;
    private boolean active;
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    private LocalDateTime removed;
}

10-13 src/main/java/apress/com/myretro/jms/UserEvent.java

您可以将 UserEvent 类视为来自用户应用程序的 UserActivatedEvent 和 UserRemovedEvent 类信息的合并。请注意,我们新增了一个字段 String action,用于保存事件操作描述(已激活或已移除),这对于新功能可能非常有用。此外,我们使用了 @JsonIgnoreProperties(ignoreUnknown = true) 注解,这在处理可能缺少某些字段的 JSON 数据时非常有帮助。

接下来,打开或创建 UserEventListeners 类。请参阅清单 10-14。

package com.apress.myretro.jms;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.annotation.JmsListeners;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class UserEventListeners {
    @JmsListeners({
            @JmsListener(destination = "${jms.user-events.queue.1}"),
            @JmsListener(destination = "${jms.user-events.queue.2}")
    })
    public void onUserEvent(UserEvent userEvent) {
       log.info("UserEventListeners.onUserEvent: {}", userEvent);
    }
}

10-14 src/main/java/apress/com/myretro/jms/UserEventListeners.java

UserEventListeners 类包含以下注解(其中一些您可能已经熟悉)和方法:

  • @Component:这个注解将类标记为 Spring bean,并且它对于其他注解的注册是必需的。
  • @JmsListeners:这个注解可以接受多个 @JmsListener 注解。这在我们的情况下非常有用,因为我们只需要接收一条消息,即 UserEvent 对象。
  • @JmsListener:这个注解用于连接主题(或在点对点模型中的队列)。在这种情况下,我们使用一些外部属性,这些属性将在 application.properties 文件中找到。
  • onUserEvent:这个方法被标记为监听器,将接收 UserEvent 消息。但是,如果我们有一个 JSON 消息,一个是针对激活用户的,另一个是针对被移除用户的,我们该如何处理呢?

接下来,创建或打开 UserEventConfig 类。请参阅清单 10-15。

package com.apress.myretro.jms;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.Session;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
@Configuration
public class UserEventConfig {
    @Bean
    MessageConverter messageConverter() {
        return new MessageConverter() {
           @Override
                    public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
                throw new UnsupportedOperationException("Not supported yet.");
                    }
           @Override
                    public Object fromMessage(Message message) throws JMSException,
MessageConversionException {
ObjectMapper mapper = new ObjectMapper();
                mapper.registerModule(new JavaTimeModule());
                try {
                    String _type = message.getStringProperty("_type");
                    UserEvent event =  mapper.readValue(message.getBody(String.class), UserEvent.class);
                    event.setAction("Activated");
                    if (_type.contains("Removed"))
                        event.setAction("Removed");
                    return event;
                } catch (JsonProcessingException e) {
                    throw new RuntimeException(e);
                }
            }
                       };
        }
}

10-15 src/main/java/apress/com/myretro/jms/UserEventConfig.java

UserEventConfig 类包括以下内容:

  • 正如您所知,这个注解帮助 Spring 查找并配置任何@Bean 定义。
  • 消息转换器:这个 bean 是必需的;我们不需要默认的实现,因为我们处理的是 JSON 对象,有些对象需要进行反序列化。我们返回一个新的消息转换器实现。这个接口提供了两个需要实现的方法:toMessage(我们不需要这个,因为我们不发送任何消息,只是接收)和 fromMessage。我们使用 Jackson 库中的 ObjectMapper,既是因为我们需要使用 JavaTimeModule(因为我们使用了 LocalDateTime 类型),也是因为我们需要根据事件填充 action 字段,值为 Activated 或 Removed。

请花点时间仔细查看清单 10-15 中的代码。

接下来,打开 application.properties 文件,参见列表 10-16。

server.port=${PORT:8181}
## Data
spring.h2.console.enabled=true
spring.datasource.generate-unique-name=false
spring.datasource.name=test-db
# spring.jpa.show-sql=true
## JMS Remote
#spring.artemis.broker-url=tcp://localhost:61616
#spring.artemis.mode=native
spring.artemis.user=artemis
spring.artemis.password=artemis
## JMS Topic
spring.jms.pub-sub-domain=true
## User Event Queues
jms.user-events.queue.1=activated-users
jms.user-events.queue.2=removed-users

10-16 src/main/resources/application.properties

在文件的末尾,请注意列出了两个队列。这些值被注入到 @JmsListener 注解中,代表主题的名称。那么,如何将其轻松更改为主题(发布/订阅)而不是队列(点对点)呢?只需设置 spring.jms.pub-sub-domain=true 属性即可。当然,消息的发送者/发布者也需要设置这个值。这意味着用户应用程序也需要使用这个属性。

运行我的怀旧应用

要运行我的复古应用,您需要按照以下步骤进行:

  1. 请前往用户应用程序,将 spring.jms.pub-sub-domain=true 属性添加到 application.properties 文件中,然后启动应用程序。这将运行 docker-compose.yaml 文件并启动 ActiveMQ Artemis 服务。如果你感兴趣,可以查看 ActiveMQ Artemis 控制台中的队列 (http://localhost:8161,凭据为 artemis/artemis);这些队列是相同的,但你会发现一些重复项,类型为 MULTICAST 的路由器(这个功能与 ActiveMQ Artemis 更相关)。这将允许用户应用程序订阅多个消费者,以便他们能够接收发布到这些队列或主题中的消息。
  2. 启动我的复古应用。
  3. 将用户添加到用户 API。您可以使用 VS Code 或 IntelliJ 中的 HTTP 客户端和 user.http 文件来测试 API,或者在终端中执行相关命令。
	
Add a user to the Users API. You can use the user.http file with the HTTP client from VS Code or IntelliJ to test the API, or in a terminal, you can execute
curl -i -s -d '{"name":"Dummy","email":"dummy@email.com","password":"aw2s0meR!","userRole":["INFO"],"active":true}' \
-H "Content-Type: application/json" \
http://localhost:8080/users
HTTP/1.1 201
Location: http://localhost:8080/users/dummy@email.com
Content-Type: application/json
Transfer-Encoding: chunked
Date: Fri, 13 Oct 2023 19:31:30 GMT
{"email":"dummy@email.com","name":"Dummy","gravatarUrl":null,"password":"aw2s0meR!","userRole":["INFO"],"active":true}

如果你查看这两个应用的日志,你会在我的复古应用中看到以下内容:

UserEventListeners.onUserEvent: UserEvent(email=dummy@email.com, action=Activated, active=true, removed=null)

现在,您可以删除刚刚创建的用户了

curl -i -s -XDELETE http://localhost:8080/users/dummy@email.com
HTTP/1.1 204
Date: Fri, 13 Oct 2023 19:35:05 GMT

然后在日志中,你将看到

UserEventListeners.onUserEvent: UserEvent(email=dummy@email.com, action=Removed, active=false, removed=2023-10-13T15:35:05)

在每个日志中,我们设置了操作,可以是“激活”或“移除”。

现在你已经知道如何通过 JMS 启用主题(发布/订阅)功能。接下来,我们将探讨另一种消息传递实现,即使用 Spring Boot 的 AMQP。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表