【.NET 6】RabbitMQ延迟消费指南( 三 )


文章插图
打开RabbitMQ后台确认 , 原本orders.notification里的消息已经被消费掉了 , 同时多了一个orders.notification_dlx队列 , 并且orders.notification_delay队列相比orders.notification多了一个DLX标签 , Type为1的消息就是被转移该队列 。

【.NET 6】RabbitMQ延迟消费指南

文章插图
进入orders.notification_delay队列 , 交换机与队列正常绑定 , x-dead-letter-exchange也已被设置
【.NET 6】RabbitMQ延迟消费指南

文章插图
检查队列中的消息 , 可以看到Properties里的expiration: 3600000headers:biz-delayed: 1
【.NET 6】RabbitMQ延迟消费指南

文章插图
再过3600000毫秒 , orders.notification_dlx队列就会被投递到orders.notification交换机 , 队列orders.notification也就会收到这些信息 , 这时因为消息头里有biz-delayed , 消费者会正常将其消费 。
使用延迟交换机实现使用延迟交换机 , 需要RabbitMQ服务器安装rabbitmq_delayed_message_exchange插件 , 原理是投递到延迟交换机的消息 , 会延迟指定时间(x-delay参数设置)后 , 自动投递到该交换机绑定的另一交换机上 。直接看代码 。
docker环境安装rabbitmq_delayed_message_exchange插件这里介绍下docker环境如何安装rabbitmq_delayed_message_exchange插件 , 首先在github https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 下载与你RabbitMQ服务器大版本匹配的Release , 将文件复制到RabbitMQ的/plugins目录下 , 命令如下
docker cp {rabbitmq_delayed_message_exchange文件路径} {rabbitmq容器id}:/pluginsdocker exec -it {rabbitmq容器id} rabbitmq-plugins enable rabbitmq_delayed_message_exchange以我本机为例 , 插件启用成功 。
【.NET 6】RabbitMQ延迟消费指南

文章插图
下面给解决方法添加一个DMConsumer项目 。
cd srcdotnet new console -n DMConsumercd DMConsumerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/DMConsumer/DMConsumer.csprojDMConsumer完整实现如下
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct, durable: true, autoDelete: false);var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");var dmExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification_dm", configure => configure.AsDelayedExchange(ExchangeType.Direct));//两个交换机绑定await bus.Advanced.BindAsync(dmExchange, sourceExchange, "");bus.Advanced.Consume<OrderNotification>(sourceQueue, OrderNotificationHandler);Console.ReadLine();async Task OrderNotificationHandler(IMessage<OrderNotification> message, MessageReceivedInfo msgInfo){Console.WriteLine($"{DateTime.Now}: 开始消费 OrderId:{message.Body.OrderId} Type:{message.Body.Type}");if (message.Body.Type == 1 && !message.Properties.Headers.ContainsKey("biz-delayed")){message.Properties.Headers["biz-delayed"] = 1;message.WithDelay(TimeSpan.FromHours(1));await bus.Advanced.PublishAsync(dmExchange, "", true, message);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已延迟消费");}else{//假装在消费//Thread.Sleep(1000);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已成功消费");}}

经验总结扩展阅读