public class SimpleConsumerApplication {public static void main(String[] args) throws MQClientException {// 1.创建消费者Consumer,并指定消费者组名为 testConsumGroupDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");// 2.指定NameServer的地址,以获取Broker路由地址consumer.setNamesrvAddr("192.168.139.1:9876");// 3.指定Topic和Tag 信息 。* 代表所有consumer.subscribe("testTopic", "*");// 4.设置回调函数,用来处理读取到的消息, MessageListenerOrderly 用单个线程处理处理队列的数据consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {for (MessageExt msg : msgList) {System.out.println("线程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());// Todo,具体的业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}});// 5.消费者开始执行消费任务consumer.start();}}
2.2 顺序消费相比与基本消费,多了一个ConsumeFromWhere的设置 。代表消费者从哪个位置开始消费,枚举如下:- CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,非第一次启动接着上次消费的进度继续消费
- CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,非第一次启动接着上次消费的进度继续消费
- CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,非第一次启动接着上次消费的进度继续消费以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,消费者挂了再启动,则从上次消费进度继续执行 。
public class SimpleOrderApplication {public static void main(String[] args) throws MQClientException {// 1.创建消费者Consumer,并指定消费者组名为 testConsumGroupDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");// 2.指定NameServer的地址,以获取Broker路由地址consumer.setNamesrvAddr("192.168.139.1:9876");/*** 设置Consumer第一次启动是从队列头部、队列尾部、还是指定时间戳节点开始消费* 非第一次启动接着上次消费的进度继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 3.指定Topic和Tag 信息 。* 代表所有consumer.subscribe("testTopic", "*");// 4.设置回调函数,用来处理读取到的消息, MessageListenerOrderly 用单个线程处理处理队列的数据consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {for (MessageExt msg : msgList) {System.out.println("线程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());// Todo,具体的业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}});// 5.消费者开始执行消费任务consumer.start();}}
2.3 过滤消息消费可以使用MessageSelector.byTag来进行标签筛选;或者使用MessageSelector.bySql 来进行消息属性筛选;或者混合使用 。参考下面代码,注释说明的比较清楚 。public class FilterConsumerApplication {public static void main(String[] args) throws MQClientException {// 1.创建消费者Consumer,并指定消费者组名为 testConsumGroupDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");// 2.指定NameServer的地址,以获取Broker路由地址consumer.setNamesrvAddr("192.168.139.1:9876");// 3.指定Topic和Tag 信息 。只有订阅的消息有 sex 和 name 属性, 并且年龄为 18 岁以上的男性// consumer.subscribe("testTopic", MessageSelector.byTag("userTag1 || userTag2"));consumer.subscribe("testTopic", MessageSelector.bySql("sex = 'male' AND age > 18"));// 4.设置回调函数,用来处理读取到的消息, MessageListenerOrderly 用单个线程处理处理队列的数据consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {for (MessageExt msg : msgList) {System.out.println("线程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());// Todo,具体的业务逻辑}return ConsumeOrderlyStatus.SUCCESS;}});// 5.消费者开始执行消费任务consumer.start();}}
经验总结扩展阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 知错不改,直到分手才想要改的星座男
- 解渴的饮料有哪些
- 男士发型 甜到男生走不动路的短发女生
- 深墨绿色的调配方法
- 爱太卑微,做备胎也甘之如饴的星座
- 2到6度冷藏牛奶可以加热吗
- 明知会万劫不复,还是会爱ta的星座
- 痴情人,痛过就很难再爱的四大星座女
- 定义和物理意义的区别
- 洒脱的近义词与反义词