文章插图
打开RabbitMQ后台确认 , 原本orders.notification
里的消息已经被消费掉了 , 同时多了一个orders.notification_dlx
队列 , 并且orders.notification_delay
队列相比orders.notification
多了一个DLX
标签 , Type
为1的消息就是被转移该队列 。

文章插图
进入
orders.notification_delay
队列 , 交换机与队列正常绑定 , x-dead-letter-exchange
也已被设置
文章插图
检查队列中的消息 , 可以看到
Properties
里的expiration: 3600000
和headers:biz-delayed: 1

文章插图
再过3600000毫秒 ,
orders.notification_dlx
队列就会被投递到orders.notification
交换机 , 队列orders.notification
也就会收到这些信息 , 这时因为消息头里有biz-delayed
, 消费者会正常将其消费 。使用延迟交换机实现使用延迟交换机 , 需要RabbitMQ服务器安装
rabbitmq_delayed_message_exchange
插件 , 原理是投递到延迟交换机的消息 , 会延迟指定时间(x-delay
参数设置)后 , 自动投递到该交换机绑定的另一交换机上 。直接看代码 。docker环境安装
rabbitmq_delayed_message_exchange
插件这里介绍下docker环境如何安装rabbitmq_delayed_message_exchange
插件 , 首先在github https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 下载与你RabbitMQ服务器大版本匹配的Release , 将文件复制到RabbitMQ的/plugins
目录下 , 命令如下docker cp {rabbitmq_delayed_message_exchange文件路径} {rabbitmq容器id}:/pluginsdocker exec -it {rabbitmq容器id} rabbitmq-plugins enable rabbitmq_delayed_message_exchange
以我本机为例 , 插件启用成功 。
文章插图
下面给解决方法添加一个
DMConsumer
项目 。cd srcdotnet new console -n DMConsumercd DMConsumerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/DMConsumer/DMConsumer.csproj
DMConsumer
完整实现如下using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct, durable: true, autoDelete: false);var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");var dmExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification_dm", configure => configure.AsDelayedExchange(ExchangeType.Direct));//两个交换机绑定await bus.Advanced.BindAsync(dmExchange, sourceExchange, "");bus.Advanced.Consume<OrderNotification>(sourceQueue, OrderNotificationHandler);Console.ReadLine();async Task OrderNotificationHandler(IMessage<OrderNotification> message, MessageReceivedInfo msgInfo){Console.WriteLine($"{DateTime.Now}: 开始消费 OrderId:{message.Body.OrderId} Type:{message.Body.Type}");if (message.Body.Type == 1 && !message.Properties.Headers.ContainsKey("biz-delayed")){message.Properties.Headers["biz-delayed"] = 1;message.WithDelay(TimeSpan.FromHours(1));await bus.Advanced.PublishAsync(dmExchange, "", true, message);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已延迟消费");}else{//假装在消费//Thread.Sleep(1000);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已成功消费");}}
经验总结扩展阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 希腊现在是哪个国家
- 2023年12月31日适合装修吗 2023年12月装修吉日一览表
- 酒精能放在车里吗
- 奥运会发源地是哪个国家
- 2023年12月1日适合结婚吗 今天是黄道吉日吗
- 过夜的小龙虾还能吃吗
- 冰箱冷藏多少度合适
- 荷花代表什么生肖
- 独一无二美甲店名字美甲店起名大全
- 金字塔和埃菲尔铁塔谁高