
文章插图
打开RabbitMQ后台 , 名
orders.notification
的队列和交换机已经创建好且相互绑定 , 队列里已经有我们刚刚发送的消息
文章插图
下面我们要做的就是将队列
orders.notification
里Type
为1的消息延迟消费 , 其它则正常消费 。延迟消费使用死信交换机实现原理就是在声明一个队列时 , 给它配置死信交换机(Dead Letter Exchanges , 简称DLX)策略 , 对应参数为
x-dead-letter-exchange
, 这种队列处理带设置了过期时间属性(Properties.expiration
)的消息时 , 在消息到期时 , 会自动将消息投递到事先配置好的死信交换机上 。我们解决方案增加一个控制台类型的消费者项目
DLXConsumer
cd srcdotnet new console -n DLXConsumercd DLXConsumerdotnet add package EasyNetQ-s https://api.nuget.org/v3/index.jsondotnet add package Newtonsoft.Json-s https://api.nuget.org/v3/index.json cd ../..dotnet sln add ./src/DLXConsumer/DLXConsumer.csproj
和生产者类似 , 实现消费者我们也创建一对同名的交换机和队列orders.notification_dlx
, 用于接收转发过来延迟消息 , 同时将该队列的死信交换机设置为orders.notification
;消费消息时 , 为了消息是不是已经延迟过 , 可以在消息头里添加一个自定义参数biz-delayed
, 在将需要延迟处理的消息发送到orders.notification_dlx
交换机之前 , 除了设置过期时间 , 也同时将biz-delayed
设置为1 , 后续再消费该消息时 , 读取该值 , 不至于陷入死循环 。完整代码如下using Core;using EasyNetQ;using EasyNetQ.Topology;var bus = RabbitHutch.CreateBus("host=localhost;port=5672;virtualHost=/;username=guest;password=guest;requestedHeartbeat=10");var sourceExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification", ExchangeType.Direct);var sourceQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification");var dlxExchange = await bus.Advanced.ExchangeDeclareAsync(name: "orders.notification_dlx", ExchangeType.Direct);var dlxQueue = await bus.Advanced.QueueDeclareAsync(name: "orders.notification_dlx", configure => configure.WithDeadLetterExchange(sourceExchange));await bus.Advanced.BindAsync(dlxExchange, dlxQueue, "");bus.Advanced.Consume<OrderNotification>(sourceQueue, OrderNotificationHandler);Console.ReadLine();async Task OrderNotificationHandler(IMessage<OrderNotification> message, MessageReceivedInfo msgInfo){Console.WriteLine($"{DateTime.Now}: 开始消费 OrderId:{message.Body.OrderId} Type:{message.Body.Type}");if (message.Body.Type == 1 && !message.Properties.Headers.ContainsKey("biz-delayed")){message.Properties.Headers.Add("biz-delayed", 1);message.Properties.Expiration = TimeSpan.FromHours(1);await bus.Advanced.PublishAsync(dlxExchange, "", true, message);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已延迟消费");}else{//假装在消费Thread.Sleep(1000);Console.WriteLine($"{DateTime.Now}:OrderId:{message.Body.OrderId} Type:{message.Body.Type} 已成功消费");}}
上述代码中 , EasyNetQ
设置队列死信交换机的API为WithDeadLetterExchange
, 设置消息过期时间的API为Properties.Expiration
。运行
DLXConsumer
项目 , 可以看到Type
为1的消息被延迟 , 其它则被正常消费
经验总结扩展阅读
- 希腊现在是哪个国家
- 2023年12月31日适合装修吗 2023年12月装修吉日一览表
- 酒精能放在车里吗
- 奥运会发源地是哪个国家
- 2023年12月1日适合结婚吗 今天是黄道吉日吗
- 过夜的小龙虾还能吃吗
- 冰箱冷藏多少度合适
- 荷花代表什么生肖
- 独一无二美甲店名字美甲店起名大全
- 金字塔和埃菲尔铁塔谁高