来源:https://blog.csdn.net/weixin_43944305/article/details/120827661
延时原理
mq基本的消息模型
mq死信队列的消息模型
简单的说就是先弄一个正常队列,然后不要设置消费者,接着给这个正常队列绑定一个死信队列,这个死信队列设置方式和正常队列没啥区别。然后监听这个死信队列的消费.
一般死信队列由三大核心组件组成:死信交换机+死信路由+TTL(消息过期时间)
通常死信队列由“面向生产者的基本交换机+基本路由”绑定而成,所以生产者首先是将消息发送至“基本交换机+基本路由”所绑定而成的消息模型中,即间接性地进入到死信队列中,当过了TTL,消息将“挂掉”,从而进入下一个中转站,即“面下那个消费者的死信交换机+死信路由”所绑定而成的消息模型中.
maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件
spring:
application:
name: ttl-queue
rabbitmq:
host: 110.40.181.73
port: 35672
username: root
password: 10086
virtual-host: /fchan
connection-timeout: 15000
# 发送确认
#publisher-confirms: true
# 路由失败回调
publisher-returns: true
template:
# 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
mandatory: true
listener:
simple:
# 每次从RabbitMQ获取的消息数量
prefetch: 1
default-requeue-rejected: false
# 每个队列启动的消费者数量
concurrency: 1
# 每个队列最大的消费者数量
max-concurrency: 1
# 签收模式为手动签收-那么需要在代码中手动ACK
acknowledge-mode: manual
配置类
package com.fchan.mq.mqDelay.dlx;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRabConfig {
//定义普通任务队列(其实就是一个普通队列,只是没有消费者)
@Bean("delayQue")
public Queue delayQue(){
//普通队列绑定死信交换机及路由key
//delayQueue延时队列
return QueueBuilder.durable("delayQueue")
//指定这个延时队列下的死信交换机
//如果消息过时则会被投递到当前对应的my-dlx-exchange
.withArgument("x-dead-letter-exchange", "my-dlx-exchange")
//死信交换机绑定的路由key
//如果消息过时,my-dlx-exchange会根据routing-key-delay投递消息到对应的队列
//mq发送消息的时候一般指定的是exchange交换机+这个routingKey来找到队列,这个应该是出于有时候需要让mq的交换机将一个消息发送到多个队列所采用的方式
.withArgument("x-dead-letter-routing-key", "routing-key-delay")
.build();
}
//定义普通队列的交换机
@Bean("delayExchange")
public Exchange delayExchange(){
return ExchangeBuilder.directExchange("delayExchange").build();
}
//绑定普通队列的交换机
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("delayBindingRoutingKey").noargs();
}
//定义死信队列
@Bean("dlxQueue")
public Queue dlxQueue(){
return QueueBuilder.durable("my-dlx-queue").build();
}
//定义死信交换机,这里的死信交换机要和上面普通队列所绑定的交换机名称一致
@Bean("dlxExchange")
public Exchange dlxExchange(){
return ExchangeBuilder.directExchange("my-dlx-exchange").build();
}
//绑定交换机与死信队列
@Bean("dlxBinding")
public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue){
//这儿这个routingKey要和上面正常队列中所绑定的死信routingKey一致
return BindingBuilder.bind(queue).to(exchange).with("routing-key-delay").noargs();
}
}
生产者
/**
* 发送消息到延时队列 delayQueue
* 消息5秒后会被投递到死信队列
* @return
*/
@GetMapping("sendMsgToDelay")
public String sendMsgToDelay(String msg){
Map<String,Object> map = new HashMap<>();
map.put("msg", msg);
rabbitTemplate.convertAndSend("delayExchange","delayBindingRoutingKey", map, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置消息过期时间,5秒过期
message.getMessageProperties().setExpiration("5000");
return message;
}
});
return msg;
}
消费者
package com.fchan.mq.mqDelay;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class MyRabbitConsume {
Logger log = LoggerFactory.getLogger(MyRabbitConsume.class);
@RabbitListener(queues = {"my-dlx-queue"})
public void receiveDeadMessage(Map<String,Object> map, Message message, Channel channel) throws Exception {
log.info("收到死信信息:{}",map);
log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");
//手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}