.Net Core&RabbitMQ限制循环消费( 三 )

消息被拒收后,再重新发送消息到原有交换机或是队列下中,以使得消息像是消费失败回到了队列中,如此来控制消费次数,但是这种场景下,新消息排在了队列的尾部,而不是原消息排在队列头部 。

.Net Core&RabbitMQ限制循环消费

文章插图
存储重试次数在存储服务中存储消息的唯一标识与对应重试次数,消费消息前对消息进行判断是否存在 。
.Net Core&RabbitMQ限制循环消费

文章插图
与消息头判断一致,只是消息重试次数的存储从消息本身挪入存储服务中了 。需要注意的是,消息发送端需要设置消息的唯一标识(MessageId属性)
//模拟外部存储服务var MessageRetryCounts = new Dictionary<ulong, int>();var queueName = "storageretrycount_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));if (Encoding.UTF8.GetString(message.ToArray()).Contains("50")){    var maxRetryCount = 3;    Console.WriteLine("拒收");    //重试次数判断    var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId);    if (!existRetryRecord)    {        //重入队列,继续重试        MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1);        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    if (MessageRetryCounts[ea.BasicProperties.MessageId] < maxRetryCount)    {        //重入队列,继续重试        MessageRetryCounts[ea.BasicProperties.MessageId] = MessageRetryCounts[ea.BasicProperties.MessageId] + 1;        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    //到达最大次数,不再重试消息    ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);    return;}    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);除第一次拒收外,允许三次重试机会,三次重试完毕后,设置requeue为false,消息丢失或进入死信队列(如有设置的话) 。
.Net Core&amp;RabbitMQ限制循环消费

文章插图
队列使用Quorum类型第一种和第二种分别是消息自身、外部存储服务来管理消息重试次数,使用Quorum,由MQ来限定消息的投递次数,也就控制了重试次数 。
.Net Core&amp;RabbitMQ限制循环消费

文章插图
设置队列类型为quorum,设置投递最大次数,当超过投递次数后,消息被丢弃 。
var queueName = "quorumtype_queue";var arguments = new Dictionary<string, object>(){    { "x-queue-type", "quorum"},    { "x-delivery-limit", 3 }};channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        Console.WriteLine($"拒收 {DateTime.Now}");        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

经验总结扩展阅读