【.NET 6】RabbitMQ延迟消费指南

背景最近遇到一个比较特殊需求 , 需要修改一个的RabbitMQ消费者 , 以实现在消费某种特定的类型消息时 , 延迟1小时再处理 , 几个需要注意的点:

  • 延迟是以小时为单位
  • 不是所有消息都延迟消费 , 只延迟特定类型的消息
  • 只在第一次消费时延迟1小时 , 容错机制产生的重新消费(也即消息消费失败 , 多次进入延迟队列重试) , 则不再延迟1小时
  • 消费者消费过程中可能会重启
考虑到这几点 , 我们需要一个标识以及持久化 , 不能简单使用Thread.Sleep或者Task.Delay;下面开始演示在不引入其它框架资源的前提下 , 利用现有的RabbitMQ来实现这个需求 。
准备如果没有可用的RabbitMQ测试环境 , 推荐使用docker本地搭建
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management项目搭建创建解决方案RabbitMQDemo ,  并添加一个.Net6控制台程序Producer作为生产者 , 
mkdir RabbitMQDemocd RabbitMQDemodotnet new sln -n RabbitMQDemomkdir srccd srcdotnet new console -n Producercd Producerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/Producer/Producer.csproj我们给Producer项目添加了两个包 ——EasyNetQ是用来简便RabbitMQ操作 , 添加Newtonsoft.Json则是因为EasyNetQ从v7版本开始移除了对前者的依赖 , 需要使用者自行添加 。
接下来定义消息的数据结构 , 添加一个类库Core到解决方案 , 
cd srcdotnet new classlib --name Corecd ..dotnet sln add ./src/Core/Core.csproj添加如下OrderNotification类 , 后面我们根据消息的 Type的值来确定是正常消费还是延迟消费 。
namespace Core{public class OrderNotification{public string OrderId { get; set; }public int Type { get; set; }public DateTime DateCreation { get; set; }}}生产者在Producer项目里 , 声明队列orders.notification , 绑定到同名交换机 , 然后向该交换机发送OrderNotification类型的数据 , 
实际项目中 , 我们很少直接发消息到队列 , 都是发送到交换机 , 这个项目虽然只是demo , 但也遵循这个原则
完整代码如下:
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");//声明交换机var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct);//声明队列var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");//绑定await bus.Advanced.BindAsync(sourceExchange, sourceQueue, "");Console.WriteLine("按Ctrl + C 暂停发送 , 任意键恢复发送");Console.TreatControlCAsInput = true;while (true){Random random = new();var orderId = Guid.NewGuid().ToString();var type = random.Next(1, 3);await bus.Advanced.PublishAsync(sourceExchange, "", true, new Message<OrderNotification>(new OrderNotification { OrderId = orderId, Type = type, DateCreation = DateTime.Now }));Console.WriteLine($"{DateTime.Now}:消息(OrderId:{orderId} , Type:{type}) 已发送");Thread.Sleep(1000);}运行Producer项目 , 可以看到消息正在不停的发送

经验总结扩展阅读