虽然多线程也能做异步处理,但是多线程仅限于在同一个进程做异步处理,会占用服务器的资源。
MQ是可以分布式的,可以分担服务器的压力。

同步调用和异步调用

同步调用

优势:时效性强,等待到结果后才返回
问题:拓展性差、性能差、级联失败问题

异步调用

  • 消息发送者:投递消息的人(调用者)
  • 消息接收者:接收和处理消息的人(服务提供者)
  • 消息代理:管理、暂存、转发消息(微信服务器)
    7ac1576587c5453f95aaeaad8a28e8e2.png

扣减数据余额和更新支付状态必须采用同步调用。
更新订单状态、短信通知用户、增加用户积分,这三个业务其实和支付服务的关系不大,有点业务耦合,可以用异步调用,支付服务先发一条消息给消息代理,交易服务、通知服务、积分服务去监听消息代理。如果服务故障了,没有收到消息,只要这个服务重新启动,消息代理还会投递消息给这个服务。

优势:解除耦合,拓展性好;无需等待;故障隔离;缓存消息,流量削峰填谷。
问题:不能立刻得到调用结果,时效性差;不能确定下游业务执行是否成功;业务安全依赖Broker的可靠性。

MQ技术选型

MQ:消息队列,存放消息的队列。也就是异步调用的Broker
7a3c7ee01adf459590df9d0ba9fca1a5.png

RabbitMQ

安装

  1. 创建并运行容器
docker run  -d \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
-v mq-plugins:/plugins \
--name rabbitmq \
--hostname rabbitmq \
--restart=always \
-p 15672:15672 \
-p 5672:5672 \
--network hm-net \
rabbitmq:3.8-management
  1. 创建容器成功后,即可查看rabbitmq控制台:http://192.168.140.101:15672,用户名、密码为admin

基本介绍

  • virtual-host:虚拟主机,起到数据隔离的作用
  • publisher:消息发送者
  • consumer:消息的消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由消息
    4787873ed4ff4f8e8a866b25113b115e.png

交换机只能路由消息,无法存储消息
交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定

Spring AMQP

收发消息

  1. 引入spring-amqp依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置RabbitMQ服务端信息,让微服务连接到rabbitmq
spring:
rabbitmq:
host: 192.168.140.101 # 虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
  1. 【发送消息】:SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void sendMsg() {
String queueName = "simple.queue"; // 队列名
String msg = "hello world"; // 消息
rabbitTemplate.convertAndSend(queueName, msg); // 发送消息
}
}
  1. 【接收消息】:只需要通过注解(@RabbitListener)在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法
@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) { // 使用字符串发送,就用字符串接收
log.info("监听到simple.queue的消息:{}", msg);
}
}

Work Queues

work Queues:任务模型,让多个消费者绑定到一个队列,共同消费队列中的消息。

@Component
@Slf4j
public class SpringRabbitListener {
// 多个消费者绑定到同一个队列
@RabbitListener(queues = "work.queue")
public void listenSimpleQueue1(String msg) {
log.error("consumer 1: " + msg);
}
@RabbitListener(queues = "work.queue")
public void listenSimpleQueue2(String msg) {
log.info("consumer 2: " + msg);
}
}

实际开发只会写一个方法,但是在部署的时候会在不同服务器上部署多个实例,形成一个集群。
问题】:默认是轮询投递给绑定在队列上的每一个消费者,但是这并没有考虑到消费者处理完消息,可能出现消息堆积的问题。
解决】:修改application.yml,设置preFetch的值为1,确保同一时刻最多投递给消费者一条消息(性能好的服务器,处理的消息越多,能者多劳)

spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

交换机的类型

接收发送者发送的消息,并将消息路由到与其绑定的队列。
19d6a91aad5646afa6e9fa613898128b.png

Fanout交换机(广播)

会将接收到的消息路由到每一个与其绑定的Queue。
9f362598ed8b4902b945901ead3258e0.png

【案例】:用户消费完毕,更新支付状态后,需要通知交易服务、短信服务、积分服务。如果没有交换机,通过队列发送,但是在队列里,一个消息只能被一个消费者处理。所以需要fanout交换机,把这个消息发送给所有的队列。

发送消息到Fanout交换机的API:

@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testFanoutExchange() {
String exchangeName = "hmall.fanout"; // 队列名
String msg = "hello world";// 消息
// 发送消息,参数分别为:交换机名,RoutingKey(暂时为空),消息
rabbitTemplate.convertAndSend(exchangeName, null, msg);
}
}

【注】:convertAndSend方法中两个参数默认是发给队列,三个参数是发给交换机

Direct交换机(定向)

会将接收到的消息根据规则(RoutingKey)路由到指定的Queue,规则如下:

  • 每一个Queue都与Exchange绑定时都需要设置一个BindingKey
  • 发布者发布消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
    adcd87bd2a454998874466cddbd409e1.png

【注】:当两个队列的BindingKey相同时,Direct交换机就变成了Fanout交换机。
【场景】:用户消费完毕后,如果点了取消,交易服务需要把订单状态变成已取消,但是不需要通知短信服务、积分服务了。此时只需要发给交易服务即可。

在rabbitMq控制台设置交换机和队列的绑定关系,并设置RoutingKey:
229f3d34f2f445afb2be234ee6615af6.png
发送消息到Direct交换机的API:

@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDirectExchange() {
String exchangeName = "hmall.direct"; // 队列名
String msg = "蓝色";// 消息
// 发送消息,参数分别为:交换机名,RoutingKey(绑定关系),消息
rabbitTemplate.convertAndSend(exchangeName, "blue", msg); // 此时只有direct.queue1收到消息
}
}

Topic交换机(话题)

也是基于RoutingKey做消息路由,但时RoutingKey通常是多个单词的组合,并且以.分割。
Queue与Exchange指定的BindingKey可以使用通配符:

  • #:代表0个或多个单词
  • *:代表一个单词
    18608ac9f3074897853eab96668b1491.png

在rabbitMq控制台设置交换机和队列的绑定关系,并设置RoutingKey:
65f0d098fb8d4ff2be093a4c6759f03a.png
发送消息到Topic交换机的API:

@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testTopicxchange() {
String exchangeName = "hmall.topic"; // 队列名
String msg = "有关中国的新闻";// 消息
// 发送消息,参数分别为:交换机名,RoutingKey(绑定关系),消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", msg); // 此时topic.queue1和topic.queue4都能收到消息
}
}

声明队列和交换机

  • Queue:声明队列,可以用工厂类QueueBuilder创建
  • Exchange:声明交换机,可以用工厂类ExchangeBuilder创建
  • Binding:声明队列和交换机的绑定关系,可以用工厂类BindingBuilder创建

基于JavaBean的方式声明

【案例】:声明一个Direct类型的交换机,并创建两个队列与其绑定

@Configuration
public class DirectConfiguration {
@Bean
public DirectExchange directExchange() { // 交换机
// return new directExchange("hmall.direct");
return ExchangeBuilder.directExchange("hmall.direct").build();
}
@Bean
public Queue directQueue1() { // 队列1
return new Queue("direct.queue1");
}
@Bean
public Queue directQueue2() { // 队列2
return QueueBuilder
.durable("direct.queue2") // 队列持久化到磁盘
.build();
}
@Bean
public Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) { // 绑定关系1
// 把队列绑定到交换机
return BindingBuilder
.bind(directQueue1) // 队列
.to(directExchange) // 交换机
.with("blue"); // 绑定关系(但是只能传一个key,如果需要绑定多个routingKey需要再写一个绑定关系)
}
@Bean
public Binding directQueue2Binding(Queue directQueue2, DirectExchange directExchange) { // 绑定关系2
// 把队列绑定到交换机
return BindingBuilder
.bind(directQueue2) // 队列
.to(directExchange) // 交换机
.with("yellow"); // 绑定关系(但是只能传一个key)
}
}

基于注解的方式声明

使用@RabbitListener注解来声明队列和交换机

@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1", durable = "true"), // 队列
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), // 交换机
key = {"red", "blue"} // 绑定关系

))
public void listenDirectQueue1(String msg) {
log.info("consumer 1: " + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg) {
log.info("consumer 2: " + msg);
}
}

消息转换器

【场景】:向object.queue队列发送一个对象:

// 发送
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendObject() {
Map<String, Object> msg = new HashMap<>();
msg.put("name", "xiaolin");
msg.put("age", 18);
rabbitTemplate.convertAndSend("object.queue", msg);
}
}

在控制台上看到的是:
5aad7ced954b4a119d1de1a78585dfab.png
【原因】:Spring对消息对象的处理是由MessageConverter来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
【这种方式存在的问题】:JDK的序列化有安全风险,JDK序列化的消息太大,JDK序列化的消息可读性太差
【解决】:建议采用JSON序列化代替默认的JDK序列化。

  1. 在publisher和consumer中,引入jackson依赖
<!--jackson消息转换器-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
  1. 在publisher和consumer中,配置MessageConverter
@ConditionalOnClass(RabbitTemplate.class) // 有RabbitTemplate才生效
@Configuration
public class MqConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}

359d3cb0ff0f413f8fdf65a278ba6712.png