原创

基于RabbitMq的延迟队列

在开发的时候,有时候有这么一种需求:一个订单下单后30分钟后要进行支付,如果没有支付的话要自动取消订单。

我们处理的方式很多,但是我这边采用的是基于RabbitMq的延迟队列。


首先先讲下原理

RabbitMq有这么一个属性 TTL(消息存活时间),如果一条消息在指定的时间没有被消费掉,那么这条消息继而会被转发到死信队列,然后消费者监听死信队列。




首先我们要明确下,TTL属性可以设置在哪里:

1.队列

2.消息


如果我们是设置在队列里,那么该队列里面的所有消息都拥有了该属性,而如果是设置在具体的消息里面,那么只有当前设置了TTL属性的消息,才拥有该属性。

如果是队列跟消息都设置了该属性,那么会取最小的。

但是队列准守先进先出的原则,所以即使你第一条消息设置的过期时间是30分钟,第二条消息设置的过期时间是1秒,死信队列并不会马上接受到第二条信息,必须等第一条处理完才会到第二条消息。


这里讲下几种方法:


手动在RabbitMq Web 创建


先创建一个延迟交换机:


再创建一个死信交换机


创建一个延迟队列



将延迟交换机跟延迟队列进行绑定

  



创建死信队列



将死信交换机跟死信队列进行绑定

好啦,大功告成,生产者只需要将消息推入到deadLetterExchange这个交换机,并且routkey为delay,那么消息就会自动进入到delayQueue这个队列里面,消费者只需要监听deadQueue即可。


代码创建

    @Bean
public TopicExchange deadExchange(){
//创建死信交换机 持久化 非自动删除
return new TopicExchange("deadExchange",true,false);
}
@Bean
public Queue deadQueue(){
//创建死信队列 持久化
return new Queue("deadQueue",true);
}
@Bean
public Binding deadExchangeBindQueue(){
//死信队列 跟 死信交换机 绑定 routkey为order
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("order");
}
@Bean
public TopicExchange delayExchange(){
//创建延迟交换机 持久化,非自动删除
return new TopicExchange("delayExchange",true,false);
}
@Bean
public Queue delayQueue(){
//创建延迟队列
Map<String,Object> args = new HashMap<>();
args.put("x-message-ttl",10000);
args.put("x-dead-letter-exchange","delayExchange");
args.put("x-dead-letter-routing-key","order");
return new Queue("delayQueue",true,false,false,args);
}
@Bean
public Binding delayExchangeBindQueue(){
//延迟交换机绑定延迟队列 routkey为:delay
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay");
}



利用RabbitMq官方的延迟插件

以下是我利用docker安装rabbitMq的记录

docker pull rabbitmq

docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password docker.io/rabbitmq:3-management

因为这个安装完并没有延迟插件

rabbitmq安装延迟组件https://www.rabbitmq.com/community-plugins.html
1、官网下载delayed_message插件
2、编辑Dockerfile:#vim Dockerfile

From rabbitmq:3-management
COPY rabbitmq_delayed_message_exchange-20171201-3.7.x.ez /plugins
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

3、构建镜像
#docker build -t rabbitmq:3-management-delayed .

机制解释
安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列中,而是存储在mnesia(一个分布式数据系统)表中,检测消息延迟时间,如达到可投递时间时并将其通过x-delayed-type类型标记的交换机类型投递至目标队列。

基于rabbitmq第三方延迟插件dome

正文到此结束