定制小程序开发RocketMQ——消费者消费模式(集群模式、广播模式)

1、模式(集群模式)

定制小程序开发消费者采用负载均衡方定制小程序开发式消费消息,一个分组(Group)定制小程序开发下的多个消费者共同消定制小程序开发费队列消息,定制小程序开发每个消费者处理的消息不同。一个Consumer Group中的各个Consumer定制小程序开发实例分摊去消费消息,定制小程序开发即一条消息只会投递到一个Consumer Group下面的一个实例。例如某个Topic有3个,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。

 代码中由这一行代码控制:

  1. // 消费模式 默认是集群模式(负载均衡模式),还有是广播模式
  2. consumer.setMessageModel(MessageModel.CLUSTERING);
  1. package com.zc.rocketmq.mashibing.quickstart;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  5. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  9. import java.util.List;
  10. /**
  11. * @Classname BalanceConsumer 集群消费模式
  12. * @Description 消费者
  13. * @Version 1.0.0
  14. * @Date 2022/5/22 15:04
  15. * @Created by 海贼王
  16. */
  17. @Slf4j
  18. public class BalanceConsumer {
  19. public static void main(String[] args) throws Exception {
  20. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
  21. consumer.setNamesrvAddr("8.136.2.110:9876");
  22. consumer.subscribe("TopicTest", "TagA");
  23. // 消费模式 默认是集群模式(负载均衡模式),还有是广播模式
  24. consumer.setMessageModel(MessageModel.CLUSTERING);
  25. // 注册回调函数,处理消息
  26. consumer.registerMessageListener(new MessageListenerConcurrently() {
  27. @Override
  28. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  29. try {
  30. for (MessageExt msg : msgs) {
  31. String topic = msg.getTopic();
  32. String tags = msg.getTags();
  33. String msgBody = new String(msg.getBody());
  34. log.info("消息消费成功:{}", "topic:" + topic + " tags:" + tags + " 消息内容:" + msgBody);
  35. }
  36. } catch (Exception e) {
  37. log.info("消息消费失败:{}", e.getMessage());
  38. }
  39. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  40. }
  41. });
  42. // 最后才启动消费者
  43. consumer.start();
  44. log.info("消费者启动成功...");
  45. }
  46. }

2、广播模式

        广播消费模式中消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些 Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

 

 代码中由这一行代码控制:

  1. // 广播模式
  2. consumer.setMessageModel(MessageModel.BROADCASTING);
  1. package com.zc.rocketmq.mashibing.quickstart;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  5. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  9. import java.util.List;
  10. /**
  11. * @Classname BalanceConsumer 广播模式
  12. * @Description 消费者
  13. * @Version 1.0.0
  14. * @Date 2022/5/22 15:04
  15. * @Created by 海贼王
  16. */
  17. @Slf4j
  18. public class BalanceConsumer {
  19. public static void main(String[] args) throws Exception {
  20. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
  21. consumer.setNamesrvAddr("8.136.2.110:9876");
  22. consumer.subscribe("TopicTest", "TagA");
  23. // 广播模式
  24. consumer.setMessageModel(MessageModel.BROADCASTING);
  25. // 注册回调函数,处理消息
  26. consumer.registerMessageListener(new MessageListenerConcurrently() {
  27. @Override
  28. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  29. try {
  30. for (MessageExt msg : msgs) {
  31. String topic = msg.getTopic();
  32. String tags = msg.getTags();
  33. String msgBody = new String(msg.getBody());
  34. log.info("消息消费成功:{}", "topic:" + topic + " tags:" + tags + " 消息内容:" + msgBody);
  35. }
  36. } catch (Exception e) {
  37. log.info("消息消费失败:{}", e.getMessage());
  38. }
  39. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  40. }
  41. });
  42. // 最后才启动消费者
  43. consumer.start();
  44. log.info("消费者启动成功...");
  45. }
  46. }

3、两种消费模式适用场景以及注意事项

        

负载均衡模式:适用场景&注意事项

  • 消费端集群化部署,每条消息只需要被处理一次。
  • 由于消费进度在服务端维护,可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

   广播模式:适用场景&注意事项

  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。
  • 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
  • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 目前仅 Java 客户端支持广播模式。
  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发