1. 编写完了公共发邮件组件并测试正常;此组件不能单独工作,必须导入业务组件来一起工作;公共组件都是单向依赖即业务组件依赖公共组件,公共组件不能依赖业务组件;
2.微服务中使用消息列队的好处。解耦、异步、削峰。构建基于spring-cloud-stream消息驱动的微服务应用。
step1:引包
step2:在业务类上进行配置,公共配置
kafka:
bindings:
finance_detail_topic:
producer:
batch-timeout: 200 # 批处理延迟时间上限。
buffer-size: 33554432 # 每次批量发送消息的最大内存
sync: false # 是否同步发送消息,默认为 false 异步。
binder:
brokers: kafka:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
configuration:
fetch.max.wait.ms: 10000 # poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
fetch.min.bytes: 1024 # poll 一次消息拉取的最小数据量,单位:字节
max.poll.records: 100 # poll 一次消息拉取的最大数量
retries: 3 # 发送失败时,重试发送的次数
acks: all # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
batch.size: 16384
buffer.memory: 33554432
cloud:
stream:
bindings:
sms_notice_topic:
destination: sms_notice_topic
content-type: application/json
group: sms-notice-group
email_notice_topic:
destination: email_notice_topic
content-type: application/json
group: email-notice-group
# Spring Cloud Stream Kafka 配置项
step3:定义output和input通道
package com.batsoft.sms.config;
import com.batsoft.common.mail.constant.EmailNoticeConstant;
import com.batsoft.sms.api.constant.SmsQueueConsts;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.MessageChannel;
/**
* @ClassName: SmsMessageSource
* @Description:
* @Date: 2021/1/19 06:23
* @Author: javal
* @Version: 1.0
*/
public interface SmsMessageSource {
@Input(SmsQueueConsts.SMS_NOTICE_TOPIC)
MessageChannel notice();
@Input(EmailNoticeConstant.EMAIL_NOTICE_TOPIC)
MessageChannel emailSource();
}
step4:开启output和input通道绑定
package com.batsoft.sms;
import com.batsoft.common.feign.annotation.EnableBatFeignClients;
import com.batsoft.common.security.annotation.EnableBatResourceServer;
import com.batsoft.common.swagger.annotation.EnableBatSwagger2;
import com.batsoft.sms.config.SmsMessageSource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.SpringCloudApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
/**
* @author leo
* <p>
* 项目启动类
*/
@SpringBootApplication
@EnableBatSwagger2
@SpringCloudApplication
@EnableBatFeignClients
@EnableBatResourceServer
@EnableBinding(SmsMessageSource.class)
public class BatSmsApplication {
public static void main(String[] args) {
SpringApplication.run(BatSmsApplication.class, args);
}
}
step5:创建生产者
package com.batsoft.common.mail.handler;
import com.batsoft.common.mail.annotation.EmailNotice;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* @author: leo
* @data: 2022/3/20
*/
public class EmailNoticeHandler implements HandlerInterceptor {
private final EmailNoticeSource emailNoticeSource;
public EmailNoticeHandler(EmailNoticeSource emailNoticeSource) {
this.emailNoticeSource = emailNoticeSource;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
System.out.println("==========");
HandlerMethod handlerMethod = (HandlerMethod) handler;
EmailNotice emailNotice = handlerMethod.getMethodAnnotation(EmailNotice.class);
if (emailNotice != null) {
EmailNoticeMessage emailNoticeMessage = new EmailNoticeMessage();
emailNoticeMessage.setOrderNo(emailNotice.value());
emailNoticeMessage.setSubject(emailNotice.subject());
Message<EmailNoticeMessage> message = MessageBuilder.withPayload(emailNoticeMessage).build();
emailNoticeSource.emailNoticeOutput().send(message);
}
}
}
step6:创建消费者
package com.batsoft.sms.mq.consumer;
import com.batsoft.common.mail.constant.EmailNoticeConstant;
import com.batsoft.common.mail.handler.EmailNoticeMessage;
import com.batsoft.sms.api.constant.SmsQueueConsts;
import com.batsoft.sms.api.dto.NoticeMsg;
import com.batsoft.sms.service.EmailService;
import com.batsoft.sms.service.MobileService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
*
* @author leo
*/
@Component
public class NoticeQueueHandler {
@Autowired
MobileService mobileService;
@Resource
private EmailService emailService;
@StreamListener(SmsQueueConsts.SMS_NOTICE_TOPIC)
public void onMessage(@Payload NoticeMsg body) {
// System.out.println("body :: ==>> " + body);
mobileService.sendNotice(body);
}
@StreamListener(EmailNoticeConstant.EMAIL_NOTICE_TOPIC)
public void onEmailMessage(@Payload EmailNoticeMessage body) {
emailService.sendEmailNotice(body);
}
}
step7:启动进行测试
本文暂时没有评论,来添加一个吧(●'◡'●)