虽然多线程也能做异步处理,但是多线程仅限于在同一个进程做异步处理,会占用服务器的资源。
MQ是可以分布式的,可以分担服务器的压力。
同步调用和异步调用
同步调用
优势:时效性强,等待到结果后才返回
问题:拓展性差、性能差、级联失败问题
异步调用
- 消息发送者:投递消息的人(调用者)
- 消息接收者:接收和处理消息的人(服务提供者)
- 消息代理:管理、暂存、转发消息(微信服务器)

扣减数据余额和更新支付状态必须采用同步调用。
更新订单状态、短信通知用户、增加用户积分,这三个业务其实和支付服务的关系不大,有点业务耦合,可以用异步调用,支付服务先发一条消息给消息代理,交易服务、通知服务、积分服务去监听消息代理。如果服务故障了,没有收到消息,只要这个服务重新启动,消息代理还会投递消息给这个服务。
优势:解除耦合,拓展性好;无需等待;故障隔离;缓存消息,流量削峰填谷。
问题:不能立刻得到调用结果,时效性差;不能确定下游业务执行是否成功;业务安全依赖Broker的可靠性。
MQ技术选型
MQ:消息队列,存放消息的队列。也就是异步调用的Broker

RabbitMQ
安装
- 创建并运行容器
docker run -d \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ -v mq-plugins:/plugins \ --name rabbitmq \ --hostname rabbitmq \ --restart=always \ -p 15672:15672 \ -p 5672:5672 \ --network hm-net \ rabbitmq:3.8-management
|
- 创建容器成功后,即可查看rabbitmq控制台:
http://192.168.140.101:15672
,用户名、密码为admin
基本介绍
- virtual-host:虚拟主机,起到数据隔离的作用
- publisher:消息发送者
- consumer:消息的消费者
- queue:队列,存储消息
- exchange:交换机,负责路由消息

交换机只能路由消息,无法存储消息
交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定
Spring AMQP
收发消息
- 引入spring-amqp依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
- 配置RabbitMQ服务端信息,让微服务连接到rabbitmq
spring: rabbitmq: host: 192.168.140.101 port: 5672 virtual-host: /hmall username: hmall password: 123
|
- 【发送消息】:SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息
@SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate;
@Test public void sendMsg() { String queueName = "simple.queue"; String msg = "hello world"; rabbitTemplate.convertAndSend(queueName, msg); } }
|
- 【接收消息】:只需要通过注解(
@RabbitListener
)在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法
@Component @Slf4j public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg) { log.info("监听到simple.queue的消息:{}", msg); } }
|
Work Queues
work Queues:任务模型,让多个消费者绑定到一个队列
,共同消费队列中的消息。
@Component @Slf4j public class SpringRabbitListener { @RabbitListener(queues = "work.queue") public void listenSimpleQueue1(String msg) { log.error("consumer 1: " + msg); } @RabbitListener(queues = "work.queue") public void listenSimpleQueue2(String msg) { log.info("consumer 2: " + msg); } }
|
实际开发只会写一个方法,但是在部署的时候会在不同服务器上部署多个实例,形成一个集群。
【问题
】:默认是轮询投递给绑定在队列上的每一个消费者,但是这并没有考虑到消费者处理完消息,可能出现消息堆积的问题。
【解决
】:修改application.yml,设置preFetch的值为1,确保同一时刻最多投递给消费者一条消息(性能好的服务器,处理的消息越多,能者多劳)
spring: rabbitmq: listener: simple: prefetch: 1
|
交换机的类型
接收发送者发送的消息,并将消息路由到与其绑定的队列。

Fanout交换机(广播)
会将接收到的消息路由到每一个与其绑定的Queue。

【案例】:用户消费完毕,更新支付状态后,需要通知交易服务、短信服务、积分服务。如果没有交换机,通过队列发送,但是在队列里,一个消息只能被一个消费者处理。所以需要fanout交换机,把这个消息发送给所有的队列。
发送消息到Fanout交换机的API:
@SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testFanoutExchange() { String exchangeName = "hmall.fanout"; String msg = "hello world"; rabbitTemplate.convertAndSend(exchangeName, null, msg); } }
|
【注】:convertAndSend方法中两个参数默认是发给队列
,三个参数是发给交换机
Direct交换机(定向)
会将接收到的消息根据规则
(RoutingKey)路由到指定的Queue,规则如下:
- 每一个Queue都与Exchange绑定时都需要设置一个BindingKey
- 发布者发布消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

【注】:当两个队列的BindingKey相同时,Direct交换机就变成了Fanout交换机。
【场景】:用户消费完毕后,如果点了取消,交易服务需要把订单状态变成已取消,但是不需要通知短信服务、积分服务了。此时只需要发给交易服务即可。
在rabbitMq控制台设置交换机和队列的绑定关系,并设置RoutingKey:

发送消息到Direct交换机的API:
@SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testDirectExchange() { String exchangeName = "hmall.direct"; String msg = "蓝色"; rabbitTemplate.convertAndSend(exchangeName, "blue", msg); } }
|
Topic交换机(话题)
也是基于RoutingKey做消息路由,但时RoutingKey通常是多个单词的组合,并且以.
分割。
Queue与Exchange指定的BindingKey可以使用通配符:
- #:代表0个或多个单词
- *:代表一个单词

在rabbitMq控制台设置交换机和队列的绑定关系,并设置RoutingKey:

发送消息到Topic交换机的API:
@SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testTopicxchange() { String exchangeName = "hmall.topic"; String msg = "有关中国的新闻"; rabbitTemplate.convertAndSend(exchangeName, "china.news", msg); } }
|
声明队列和交换机
- Queue:声明队列,可以用工厂类QueueBuilder创建
- Exchange:声明交换机,可以用工厂类ExchangeBuilder创建
- Binding:声明队列和交换机的绑定关系,可以用工厂类BindingBuilder创建
基于JavaBean的方式声明
【案例】:声明一个Direct类型的交换机,并创建两个队列与其绑定
@Configuration public class DirectConfiguration { @Bean public DirectExchange directExchange() { return ExchangeBuilder.directExchange("hmall.direct").build(); } @Bean public Queue directQueue1() { return new Queue("direct.queue1"); } @Bean public Queue directQueue2() { return QueueBuilder .durable("direct.queue2") .build(); } @Bean public Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) { return BindingBuilder .bind(directQueue1) .to(directExchange) .with("blue"); } @Bean public Binding directQueue2Binding(Queue directQueue2, DirectExchange directExchange) { return BindingBuilder .bind(directQueue2) .to(directExchange) .with("yellow"); } }
|
基于注解的方式声明
使用@RabbitListener
注解来声明队列和交换机
@Component @Slf4j public class SpringRabbitListener { @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1", durable = "true"), // 队列 exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), // 交换机 key = {"red", "blue"} // 绑定关系
)) public void listenDirectQueue1(String msg) { log.info("consumer 1: " + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2", durable = "true"), exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg) { log.info("consumer 2: " + msg); } }
|
消息转换器
【场景】:向object.queue队列发送一个对象:
@SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendObject() { Map<String, Object> msg = new HashMap<>(); msg.put("name", "xiaolin"); msg.put("age", 18); rabbitTemplate.convertAndSend("object.queue", msg); } }
|
在控制台上看到的是:

【原因】:Spring对消息对象的处理是由MessageConverter来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
【这种方式存在的问题】:JDK的序列化有安全风险,JDK序列化的消息太大,JDK序列化的消息可读性太差
【解决】:建议采用JSON序列化代替默认的JDK序列化。
- 在publisher和consumer中,引入jackson依赖
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
|
- 在publisher和consumer中,配置MessageConverter
@ConditionalOnClass(RabbitTemplate.class) @Configuration public class MqConfig { @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
|
