背景最近遇到一个比较特殊需求 , 需要修改一个的RabbitMQ消费者 , 以实现在消费某种特定的类型消息时 , 延迟1小时再处理 , 几个需要注意的点:
- 延迟是以小时为单位
- 不是所有消息都延迟消费 , 只延迟特定类型的消息
- 只在第一次消费时延迟1小时 , 容错机制产生的重新消费(也即消息消费失败 , 多次进入延迟队列重试) , 则不再延迟1小时
- 消费者消费过程中可能会重启
Thread.Sleep
或者Task.Delay
;下面开始演示在不引入其它框架资源的前提下 , 利用现有的RabbitMQ来实现这个需求 。准备如果没有可用的RabbitMQ测试环境 , 推荐使用docker本地搭建
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management
项目搭建创建解决方案RabbitMQDemo
, 并添加一个.Net6控制台程序Producer
作为生产者 , mkdir RabbitMQDemocd RabbitMQDemodotnet new sln -n RabbitMQDemomkdir srccd srcdotnet new console -n Producercd Producerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/Producer/Producer.csproj
我们给Producer
项目添加了两个包 ——EasyNetQ
是用来简便RabbitMQ
操作 , 添加Newtonsoft.Json
则是因为EasyNetQ
从v7版本开始移除了对前者的依赖 , 需要使用者自行添加 。接下来定义消息的数据结构 , 添加一个类库
Core
到解决方案 , cd srcdotnet new classlib --name Corecd ..dotnet sln add ./src/Core/Core.csproj
添加如下OrderNotification
类 , 后面我们根据消息的 Type
的值来确定是正常消费还是延迟消费 。namespace Core{public class OrderNotification{public string OrderId { get; set; }public int Type { get; set; }public DateTime DateCreation { get; set; }}}
生产者在Producer
项目里 , 声明队列orders.notification
, 绑定到同名交换机 , 然后向该交换机发送OrderNotification
类型的数据 , 实际项目中 , 我们很少直接发消息到队列 , 都是发送到交换机 , 这个项目虽然只是demo , 但也遵循这个原则完整代码如下:
using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");//声明交换机var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct);//声明队列var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");//绑定await bus.Advanced.BindAsync(sourceExchange, sourceQueue, "");Console.WriteLine("按Ctrl + C 暂停发送 , 任意键恢复发送");Console.TreatControlCAsInput = true;while (true){Random random = new();var orderId = Guid.NewGuid().ToString();var type = random.Next(1, 3);await bus.Advanced.PublishAsync(sourceExchange, "", true, new Message<OrderNotification>(new OrderNotification { OrderId = orderId, Type = type, DateCreation = DateTime.Now }));Console.WriteLine($"{DateTime.Now}:消息(OrderId:{orderId} , Type:{type}) 已发送");Thread.Sleep(1000);}
运行Producer
项目 , 可以看到消息正在不停的发送经验总结扩展阅读
- 希腊现在是哪个国家
- 2023年12月31日适合装修吗 2023年12月装修吉日一览表
- 酒精能放在车里吗
- 奥运会发源地是哪个国家
- 2023年12月1日适合结婚吗 今天是黄道吉日吗
- 过夜的小龙虾还能吃吗
- 冰箱冷藏多少度合适
- 荷花代表什么生肖
- 独一无二美甲店名字美甲店起名大全
- 金字塔和埃菲尔铁塔谁高