MQ系列6:消息的消费( 三 )


  • 检测延时情况,跳过非重要消息Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度 。
  • 2 消息消费的模式2.1 基本信息消费消费者的基本实现,连接 NameServer的地址,指定Topic和Tag,读取到需要消费的数据,然后轮询并处理 。
    public class SimpleConsumerApplication {public static void main(String[] args) throws MQClientException {// 1.创建消费者Consumer,并指定消费者组名为 testConsumGroupDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");// 2.指定NameServer的地址,以获取Broker路由地址consumer.setNamesrvAddr("192.168.139.1:9876");// 3.指定Topic和Tag 信息 。* 代表所有consumer.subscribe("testTopic", "*");// 4.设置回调函数,用来处理读取到的消息, MessageListenerOrderly 用单个线程处理处理队列的数据consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {for (MessageExt msg : msgList) {System.out.println("线程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());// Todo,具体的业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}});// 5.消费者开始执行消费任务consumer.start();}}2.2 顺序消费相比与基本消费,多了一个ConsumeFromWhere的设置 。代表消费者从哪个位置开始消费,枚举如下:
    • CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,非第一次启动接着上次消费的进度继续消费
    • CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,非第一次启动接着上次消费的进度继续消费
    • CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,非第一次启动接着上次消费的进度继续消费以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,消费者挂了再启动,则从上次消费进度继续执行 。
    public class SimpleOrderApplication {public static void main(String[] args) throws MQClientException {// 1.创建消费者Consumer,并指定消费者组名为 testConsumGroupDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");// 2.指定NameServer的地址,以获取Broker路由地址consumer.setNamesrvAddr("192.168.139.1:9876");/*** 设置Consumer第一次启动是从队列头部、队列尾部、还是指定时间戳节点开始消费* 非第一次启动接着上次消费的进度继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 3.指定Topic和Tag 信息 。* 代表所有consumer.subscribe("testTopic", "*");// 4.设置回调函数,用来处理读取到的消息, MessageListenerOrderly 用单个线程处理处理队列的数据consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {for (MessageExt msg : msgList) {System.out.println("线程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());// Todo,具体的业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}});// 5.消费者开始执行消费任务consumer.start();}}2.3 过滤消息消费可以使用MessageSelector.byTag来进行标签筛选;或者使用MessageSelector.bySql 来进行消息属性筛选;或者混合使用 。参考下面代码,注释说明的比较清楚 。
    public class FilterConsumerApplication {public static void main(String[] args) throws MQClientException {// 1.创建消费者Consumer,并指定消费者组名为 testConsumGroupDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");// 2.指定NameServer的地址,以获取Broker路由地址consumer.setNamesrvAddr("192.168.139.1:9876");// 3.指定Topic和Tag 信息 。只有订阅的消息有 sex 和 name 属性, 并且年龄为 18 岁以上的男性// consumer.subscribe("testTopic", MessageSelector.byTag("userTag1 || userTag2"));consumer.subscribe("testTopic", MessageSelector.bySql("sex = 'male' AND age > 18"));// 4.设置回调函数,用来处理读取到的消息, MessageListenerOrderly 用单个线程处理处理队列的数据consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {for (MessageExt msg : msgList) {System.out.println("线程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());// Todo,具体的业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}});// 5.消费者开始执行消费任务consumer.start();}}

    经验总结扩展阅读