MQ系列6:消息的消费( 二 )

  • Consumer Offset标记消费组在消息队列(Queue)上的消费进度 。
  • 消费成功后,消费进度暂时更新到本地缓存,调度任务会定时(默认5s)将进度同步到broker(需注意如果宕机,消费进度未提交则可能导致被重复消费),Broker最终将消费进度持久化到磁盘 。
  • RocketMQ支持并发消费,所以是多个线程并行处理,每次记录消费进度的时候,把线程中最小的offset值作为消费进度值,这样避免了消息丢失,但有重复消费的风险,业务中需保证操作幂等性 。
  • offset存储模式:集群模式,消息进度存储于Broker上;广播模式,消息消费进度在消费端即可 。

  • MQ系列6:消息的消费

    文章插图
    1.4 消费端流量控制可以在DefaultMQPushConsumer 对象中配置各种属性来对消费流量进行控制:
    • PullInterval: 配置消费端拉取MQ消息的间隔时间 。间隔时间是按照上次消费完成之后(比如rocketMQ收到Ack回复消息之后) 。PullInterval=20s,比如上次rocketMq服务收到Ack消息是12:15:15,则 12:15:35再去拉消息 。
    • PullBatchSize: 消费端每个队列一次拉取多少个消息,若该消费端分赔了N个监控队列,每次拉取M个,那么消费端每次去rocketMq拉取的消息为N * M 。消费端每次pull到消息总数=PullBatchSize * 监听队列数,如 PullBatchSize = 2,监听队列=5,则 消息总数量 = 2 * 5 = 10 。
    • ThreadMin和ThreadMax: 消费端消费pull到的消息需要的线程数量 。
      • ThreadMin:消费端拉取到消息后分配消费的线程数
      • ThreadMax:最大消费线程,如果默认队列满了,则启用新的线程
    • RocketMq 逻辑消费队列数量的配置rocketMq 可以配置消费队列,如 queue Read1,queue Read2,配置数量决定每次pull到的消息总数 。Rocket MQ 提供了读写队列数量的配置 。
    • 消费端节点部署数量多节点消费端线程数量要比单节点消费线程数量多,理论上消费速度大于单节点,分治思维 。
    1.5 消息的过滤在过滤消息的时候,标签模式简单而是用,可以筛选出你需要的数据 。如下:
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupTest");consumer.subscribe("testTopic", MessageSelector.byTag("Tag1|| Tag2 || Tag3").bySql("sex = 'male' AND name = 'brand'));这种情况下,消息中带有 Tag1 、Tag2、Tag3 标签就会被过滤出来,但是单个消限制息只能有一个标签,这就远远满足不了各种复杂的并交集场景的需要了 。这时候Rocket MQ可以在消息中设置一些属性,再使用SQL表达式筛选属性来过滤出需要的数据 。如下
    ------------| message||----------|sex = male AND name = 'brand' , Gotten| name = 'brand' || sex = 'male'|| age = 21|------------------------| message||----------|sex = male AND name = 'brand', Gotten , Missed| name = 'Anny'|| sex = 'female'|| age = 20 |------------1.8 提高Consumer的处理能力 :看情况
    1. 提高消费并行度在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提高并行度 。通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数 。注意:总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息 。此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加并行度来提高吞吐量(设置方法是修改consumeThreadMin和consumeThreadMax) 。
    2. 以批量方式进行消费某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间 。可以通过批量方式消费来提高消费的吞吐量 。实现方法是设置Consumer的consumeMessageBatchMaxSize这个参数,默认是1,如果设置为N,在消息多的时候每次收到的是个长度为N的消息链表 。

      经验总结扩展阅读