
文章插图
1.4 消费端流量控制可以在DefaultMQPushConsumer 对象中配置各种属性来对消费流量进行控制:
- PullInterval: 配置消费端拉取MQ消息的间隔时间 。间隔时间是按照上次消费完成之后(比如rocketMQ收到Ack回复消息之后) 。PullInterval=20s,比如上次rocketMq服务收到Ack消息是12:15:15,则 12:15:35再去拉消息 。
- PullBatchSize: 消费端每个队列一次拉取多少个消息,若该消费端分赔了N个监控队列,每次拉取M个,那么消费端每次去rocketMq拉取的消息为N * M 。消费端每次pull到消息总数=PullBatchSize * 监听队列数,如 PullBatchSize = 2,监听队列=5,则 消息总数量 = 2 * 5 = 10 。
- ThreadMin和ThreadMax: 消费端消费pull到的消息需要的线程数量 。
- ThreadMin:消费端拉取到消息后分配消费的线程数
- ThreadMax:最大消费线程,如果默认队列满了,则启用新的线程
- RocketMq 逻辑消费队列数量的配置rocketMq 可以配置消费队列,如 queue Read1,queue Read2,配置数量决定每次pull到的消息总数 。Rocket MQ 提供了读写队列数量的配置 。
- 消费端节点部署数量多节点消费端线程数量要比单节点消费线程数量多,理论上消费速度大于单节点,分治思维 。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupTest");consumer.subscribe("testTopic", MessageSelector.byTag("Tag1|| Tag2 || Tag3").bySql("sex = 'male' AND name = 'brand'));
这种情况下,消息中带有 Tag1 、Tag2、Tag3 标签就会被过滤出来,但是单个消限制息只能有一个标签,这就远远满足不了各种复杂的并交集场景的需要了 。这时候Rocket MQ可以在消息中设置一些属性,再使用SQL表达式筛选属性来过滤出需要的数据 。如下------------| message||----------|sex = male AND name = 'brand' , Gotten| name = 'brand' || sex = 'male'|| age = 21|------------------------| message||----------|sex = male AND name = 'brand', Gotten , Missed| name = 'Anny'|| sex = 'female'|| age = 20 |------------
1.8 提高Consumer的处理能力 :看情况- 提高消费并行度在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提高并行度 。通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数 。注意:总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息 。此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加并行度来提高吞吐量(设置方法是修改consumeThreadMin和consumeThreadMax) 。
- 以批量方式进行消费某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间 。可以通过批量方式消费来提高消费的吞吐量 。实现方法是设置Consumer的consumeMessageBatchMaxSize这个参数,默认是1,如果设置为N,在消息多的时候每次收到的是个长度为N的消息链表 。
经验总结扩展阅读
- 知错不改,直到分手才想要改的星座男
- 解渴的饮料有哪些
- 男士发型 甜到男生走不动路的短发女生
- 深墨绿色的调配方法
- 爱太卑微,做备胎也甘之如饴的星座
- 2到6度冷藏牛奶可以加热吗
- 明知会万劫不复,还是会爱ta的星座
- 痴情人,痛过就很难再爱的四大星座女
- 定义和物理意义的区别
- 洒脱的近义词与反义词