【.NET 6】RabbitMQ延迟消费指南( 二 )

【.NET 6】RabbitMQ延迟消费指南

文章插图
打开RabbitMQ后台 , 名orders.notification的队列和交换机已经创建好且相互绑定 , 队列里已经有我们刚刚发送的消息
【.NET 6】RabbitMQ延迟消费指南

文章插图
下面我们要做的就是将队列orders.notificationType为1的消息延迟消费 , 其它则正常消费 。
延迟消费使用死信交换机实现原理就是在声明一个队列时 , 给它配置死信交换机(Dead Letter Exchanges , 简称DLX)策略 , 对应参数为x-dead-letter-exchange , 这种队列处理带设置了过期时间属性(Properties.expiration)的消息时 , 在消息到期时 , 会自动将消息投递到事先配置好的死信交换机上 。
我们解决方案增加一个控制台类型的消费者项目DLXConsumer
cd srcdotnet new console -n DLXConsumercd DLXConsumerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/DLXConsumer/DLXConsumer.csproj和生产者类似 , 实现消费者我们也创建一对同名的交换机和队列orders.notification_dlx , 用于接收转发过来延迟消息 , 同时将该队列的死信交换机设置为orders.notification;消费消息时 , 为了消息是不是已经延迟过 , 可以在消息头里添加一个自定义参数biz-delayed , 在将需要延迟处理的消息发送到orders.notification_dlx交换机之前 , 除了设置过期时间 , 也同时将biz-delayed设置为1 , 后续再消费该消息时 , 读取该值 , 不至于陷入死循环 。完整代码如下
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct);var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");var dlxExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification_dlx", ExchangeType.Direct);var dlxQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification_dlx", configure => configure.WithDeadLetterExchange(sourceExchange));await bus.Advanced.BindAsync(dlxExchange, dlxQueue, "");bus.Advanced.Consume<OrderNotification>(sourceQueue, OrderNotificationHandler);Console.ReadLine();async Task OrderNotificationHandler(IMessage<OrderNotification> message, MessageReceivedInfo msgInfo){Console.WriteLine($"{DateTime.Now}: 开始消费 OrderId:{message.Body.OrderId} Type:{message.Body.Type}");if (message.Body.Type == 1 && !message.Properties.Headers.ContainsKey("biz-delayed")){message.Properties.Headers.Add("biz-delayed", 1);message.Properties.Expiration = TimeSpan.FromHours(1);await bus.Advanced.PublishAsync(dlxExchange, "", true, message);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已延迟消费");}else{//假装在消费Thread.Sleep(1000);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已成功消费");}}上述代码中 , EasyNetQ设置队列死信交换机的API为WithDeadLetterExchange , 设置消息过期时间的API为Properties.Expiration
运行DLXConsumer项目 , 可以看到Type为1的消息被延迟 , 其它则被正常消费
【.NET 6】RabbitMQ延迟消费指南

经验总结扩展阅读