.Net Core&RabbitMQ限制循环消费

前言当消费者端接收消息处理业务时,如果出现异常或是拒收消息将消息又变更为等待投递再次推送给消费者,这样一来,则形成循环的条件 。

.Net Core&RabbitMQ限制循环消费

文章插图
循环场景生产者发送100条消息到RabbitMQ中,消费者设定读取到第50条消息时,设置拒收,同时设定是否还留存在当前队列中(当requeue为false时,设置了死信队列则进入死信队列,否则移除消息) 。
consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        Console.WriteLine("拒收");        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};当第50条消息拒收,则仍在队列中且处在队列头部,重新推送给消费者,再次拒收,再次推送,反反复复 。
.Net Core&RabbitMQ限制循环消费

文章插图
最终其他消息全部消费完毕,仅剩第50条消息往复间不断消费,拒收,消费,这将可能导致RabbitMQ出现内存泄漏问题 。
.Net Core&RabbitMQ限制循环消费

文章插图
解决方案RabbitMQ及AMQP协议本身没有提供这类重试功能,但可以利用一些已有的功能来间接实现重试限定(以下只考虑基于手动确认模式情况) 。此处只想到或是只查到了如下几种方案解决消息循环消费问题 。
  • 一次消费
    • 无论成功与否,消费者都对外返回ack,将拒收原因或是异常信息catch存入本地或是新队列中另作重试 。
    • 消费者拒绝消息或是出现异常,返回Nack或Reject,消息进入死信队列或丢弃(requeue设定为false) 。
  • 限定重试次数
    • 在消息的头中添加重试次数,并将消息重新发送出去,再每次重新消费时从头中判断重试次数,递增或递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃 。
    • 可以在Redis、Memcache或其他存储中存储消息唯一键(例如Guid、雪花Id等,但必须在发布消息时手动设置它),甚至在mysql中连同重试次数一起存储,然后在每次重新消费时递增/递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃 。
    • 队列使用Quorum类型,限制投递次数,超过次数消息被删除 。
  • 队列消息过期
    • 设置过期时间,给队列或是消息设置TTL,重试一定次数消息达到过期时间后进入死信队列或丢弃(requeue设定为true) 。
  • 也许还有更多好的方案...
一次消费对外总是Ack消息到达了消费端,可因某些原因消费失败了,对外可以发送Ack,而在内部走额外的方式去执行补偿操作,比如将消息转发到内部的RabbitMQ或是其他处理方式,终归是只消费一次 。
var queueName = "alwaysack_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    try    {        var message = ea.Body;        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))        {            throw new Exception("模拟异常");        }    }    catch (Exception ex)    {        Console.WriteLine(ex.Message);    }    finally    {        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);    }};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

经验总结扩展阅读