1、模式(集群模式)
定制小程序开发消费者采用负载均衡方定制小程序开发式消费消息,一个分组(Group)定制小程序开发下的多个消费者共同消定制小程序开发费队列消息,定制小程序开发每个消费者处理的消息不同。一个Consumer Group中的各个Consumer定制小程序开发实例分摊去消费消息,定制小程序开发即一条消息只会投递到一个Consumer Group下面的一个实例。例如某个Topic有3个,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。
代码中由这一行代码控制:
- // 消费模式 默认是集群模式(负载均衡模式),还有是广播模式
- consumer.setMessageModel(MessageModel.CLUSTERING);
- package com.zc.rocketmq.mashibing.quickstart;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-
- import java.util.List;
-
- /**
- * @Classname BalanceConsumer 集群消费模式
- * @Description 消费者
- * @Version 1.0.0
- * @Date 2022/5/22 15:04
- * @Created by 海贼王
- */
-
- @Slf4j
- public class BalanceConsumer {
- public static void main(String[] args) throws Exception {
-
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
- consumer.setNamesrvAddr("8.136.2.110:9876");
- consumer.subscribe("TopicTest", "TagA");
-
- // 消费模式 默认是集群模式(负载均衡模式),还有是广播模式
- consumer.setMessageModel(MessageModel.CLUSTERING);
-
- // 注册回调函数,处理消息
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- try {
- for (MessageExt msg : msgs) {
- String topic = msg.getTopic();
- String tags = msg.getTags();
- String msgBody = new String(msg.getBody());
- log.info("消息消费成功:{}", "topic:" + topic + " tags:" + tags + " 消息内容:" + msgBody);
- }
- } catch (Exception e) {
- log.info("消息消费失败:{}", e.getMessage());
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- // 最后才启动消费者
- consumer.start();
- log.info("消费者启动成功...");
-
- }
- }
2、广播模式
广播消费模式中消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些 Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
代码中由这一行代码控制:
- // 广播模式
- consumer.setMessageModel(MessageModel.BROADCASTING);
- package com.zc.rocketmq.mashibing.quickstart;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-
- import java.util.List;
-
- /**
- * @Classname BalanceConsumer 广播模式
- * @Description 消费者
- * @Version 1.0.0
- * @Date 2022/5/22 15:04
- * @Created by 海贼王
- */
-
- @Slf4j
- public class BalanceConsumer {
- public static void main(String[] args) throws Exception {
-
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
- consumer.setNamesrvAddr("8.136.2.110:9876");
- consumer.subscribe("TopicTest", "TagA");
-
- // 广播模式
- consumer.setMessageModel(MessageModel.BROADCASTING);
-
- // 注册回调函数,处理消息
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
- try {
- for (MessageExt msg : msgs) {
- String topic = msg.getTopic();
- String tags = msg.getTags();
- String msgBody = new String(msg.getBody());
- log.info("消息消费成功:{}", "topic:" + topic + " tags:" + tags + " 消息内容:" + msgBody);
- }
- } catch (Exception e) {
- log.info("消息消费失败:{}", e.getMessage());
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
-
- // 最后才启动消费者
- consumer.start();
- log.info("消费者启动成功...");
-
- }
- }
3、两种消费模式适用场景以及注意事项
负载均衡模式:适用场景&注意事项
- 消费端集群化部署,每条消息只需要被处理一次。
- 由于消费进度在服务端维护,可靠性更高。
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
广播模式:适用场景&注意事项
- 每条消息都需要被相同逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复的概率稍大于集群模式。
- 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
- 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
- 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
- 目前仅 Java 客户端支持广播模式。
- 广播消费模式下不支持顺序消息。
- 广播消费模式下不支持重置消费位点。
- 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。