软件系统定制开发如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息

软件系统定制开发如何在中实现一个生产者,软件系统定制开发多个消费者,软件系统定制开发多个消费者都能收到同一条消息

场景:用户登录,软件系统定制开发邀请其它用户进行视频会议,收到邀请的用户进入会议

rabbitmq实现思路:

选型:发布订阅模式(Publish/Subscribe)

一个生产者,多个消费者,每一个消费者都有自己的一个,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。

这种情况下,我们有四种交换机可供选择,分别是:

  • Direct
  • Fanout
  • Topic
  • Header

由于消费者的数量不固定,所以要动态生成临时队列,无法指定routingkey因此选fanout模式

FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用

代码实现:
1.pom文件引入rabbitmq依赖

 <!-- rabbitMQ -->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2.配置文件

server:  port: 9091spring:  application:    name: rabbitmq  # rabbitmq配置  rabbitmq:    host: 192.168.8.142    port: 5672    username: admin    password: admin    virtual-host: my_vhost
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

3.constant类

package com.anychat.rabbitmqtest.constant;/** * @author Liby * @date 2022-05-05 10:02 * @description: * @version: */public class RabbitmqConstant {    public  static final String  MEETING_FANOUT_EXCHANGE = "meeting_exchange";}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

4.用户实体类

package com.anychat.rabbitmqtest.entity;/** * @author Liby * @date 2022-05-06 09:39 * @description: * @version: */public class User {    private Integer userId;    private String username;    public Integer getUserId() {        return userId;    }    public void setUserId(Integer userId) {        this.userId = userId;    }    public String getUsername() {        return username;    }    public void setUsername(String username) {        this.username = username;    }    public User(Integer userId, String username) {        this.userId = userId;        this.username = username;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

5.工具类

package com.anychat.rabbitmqtest.util;import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;/** * @author Liby * @date 2022-04-28 10:27 * @description: * @version: */public class RabbitmqUtil {    @Autowired    private static RabbitTemplate rabbitTemplate;    public static Channel getChannel() {        Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);        return channel;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

6.消费者类

package com.anychat.rabbitmqtest.consumer;import cn.hutool.core.util.StrUtil;import com.anychat.rabbitmqtest.constant.RabbitmqConstant;import com.anychat.rabbitmqtest.entity.User;import com.rabbitmq.client.*;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.concurrent.ConcurrentHashMap;/** * @author Liby * @date 2022-04-25 11:18 * @description:消费者,动态创建临时队列 * @version: */@Slf4j@Componentpublic class FanoutConsumer {    @Autowired    private RabbitTemplate rabbitTemplate;    public void createQueue(User user) {        //创建信道        Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);        try {            //声明一个交换机与生产者相同            channel.exchangeDeclare(RabbitmqConstant.MEETING_FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);            //获取一个随机的队列名称,使用默认方式,产生的队列为临时队列,在没有消费者时将会自动删除            String queueName = channel.queueDeclare().getQueue();            //用户Id与队列名绑定            ConcurrentHashMap<String, Integer> userQueueMap = new ConcurrentHashMap<>();            userQueueMap.putIfAbsent(queueName, user.getUserId());            //关联 exchange 和 queue ,因为是广播无需指定routekey,routingKey设置为空字符串            // channel.queueBind(queue, exchange, routingKey)            channel.queueBind(queueName, RabbitmqConstant.MEETING_FANOUT_EXCHANGE, "");            DefaultConsumer consumer = new DefaultConsumer(channel) {                @Override                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                    super.handleDelivery(consumerTag, envelope, properties, body);                    //对信息进行操作                    String message = new String(body, "UTF-8");                    if (StrUtil.isNotBlank(message)) {                        String[] receiveIds = message.split(",");                        Integer userId = userQueueMap.get(queueName);                        for (String id : receiveIds) {                            if (userId.equals(Integer.valueOf(id))) {                                log.info("用户{}收到入会邀请", id);                            }                        }                    }                }            };            //true 自动回复ack            channel.basicConsume(queueName, true, consumer);        } catch (Exception ex) {        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

7.controller类

package com.anychat.rabbitmqtest.controller;import com.anychat.rabbitmqtest.constant.RabbitmqConstant;import com.anychat.rabbitmqtest.consumer.FanoutConsumer;import com.anychat.rabbitmqtest.entity.User;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @author Liby * @date 2022-04-24 16:34 * @description:生产者 * @version: */@RestController@Slf4j@RequestMapping("/producer")public class ProducerController {    @Autowired    private RabbitTemplate rabbitTemplate;    @Autowired    private FanoutConsumer fanoutConsumer;    /**    * 模拟用户登录后,创建一个临时队列,与该用户绑定    */    @PostMapping("/login")    public String login(){        //模拟三个用户登录        int userNum=3;        for (int i = 0; i < userNum; i++) {            //用户绑定临时队列,并监听队列            fanoutConsumer.createQueue(new User(i, "用户" + i));            log.info("用户{}登录成功",i);        }        return "用户登录成功";    }    @PostMapping("/meeting")    public String meeting(){        String message="1,2";        log.info("邀请用户{}进入会议",message);        //发送消息,要求userId为2和3的用户进入会议        rabbitTemplate.convertAndSend(RabbitmqConstant.MEETING_FANOUT_EXCHANGE,"",message);        return "发送成功";    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

postman分别调用login和meeting两个接口
可以看到日志打印

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发