消息被拒收后,再重新发送消息到原有交换机或是队列下中,以使得消息像是消费失败回到了队列中,如此来控制消费次数,但是这种场景下,新消息排在了队列的尾部,而不是原消息排在队列头部 。
文章插图
存储重试次数在存储服务中存储消息的唯一标识与对应重试次数,消费消息前对消息进行判断是否存在 。
文章插图
与消息头判断一致,只是消息重试次数的存储从消息本身挪入存储服务中了 。需要注意的是,消息发送端需要设置消息的唯一标识(MessageId属性)
//模拟外部存储服务var MessageRetryCounts = new Dictionary<ulong, int>();var queueName = "storageretrycount_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{ var message = ea.Body; Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));if (Encoding.UTF8.GetString(message.ToArray()).Contains("50")){ var maxRetryCount = 3; Console.WriteLine("拒收"); //重试次数判断 var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId); if (!existRetryRecord) { //重入队列,继续重试 MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1); ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true); return; } if (MessageRetryCounts[ea.BasicProperties.MessageId] < maxRetryCount) { //重入队列,继续重试 MessageRetryCounts[ea.BasicProperties.MessageId] = MessageRetryCounts[ea.BasicProperties.MessageId] + 1; ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true); return; } //到达最大次数,不再重试消息 ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false); return;} ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
除第一次拒收外,允许三次重试机会,三次重试完毕后,设置requeue为false,消息丢失或进入死信队列(如有设置的话) 。文章插图
队列使用Quorum类型第一种和第二种分别是消息自身、外部存储服务来管理消息重试次数,使用Quorum,由MQ来限定消息的投递次数,也就控制了重试次数 。
文章插图
设置队列类型为quorum,设置投递最大次数,当超过投递次数后,消息被丢弃 。
var queueName = "quorumtype_queue";var arguments = new Dictionary<string, object>(){ { "x-queue-type", "quorum"}, { "x-delivery-limit", 3 }};channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{ var message = ea.Body; Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray())); if (Encoding.UTF8.GetString(message.ToArray()).Contains("50")) { Console.WriteLine($"拒收 {DateTime.Now}"); ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true); return; } ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
经验总结扩展阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- IQueryable和IEnumerable 快读《ASP.NET Core技术内幕与项目实战》EFCore2.5:集合查询原理揭秘
- 「MySQL高级篇」MySQL锁机制 && 事务
- <五>掌握左值引用和初识右值引用
- .NET6打包部署到Windows Service
- Linux软件安装方式 - Tarball&RPM&YUM
- <四>1:全面掌握Const的用法
- Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式
- 是什么让.NET7的Min和Max方法性能暴增了45倍?
- 前端性能优化——首屏时间&&白屏时间
- 19 基于.NetCore开发博客项目 StarBlog - Markdown渲染方案探索