软件系统定制开发如何在中实现一个生产者,软件系统定制开发多个消费者,软件系统定制开发多个消费者都能收到同一条消息
场景:用户登录,软件系统定制开发邀请其它用户进行视频会议,收到邀请的用户进入会议
rabbitmq实现思路:
选型:发布订阅模式(Publish/Subscribe)
一个生产者,多个消费者,每一个消费者都有自己的一个,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
这种情况下,我们有四种交换机可供选择,分别是:
- Direct
- Fanout
- Topic
- Header
由于消费者的数量不固定,所以要动态生成临时队列,无法指定routingkey因此选fanout模式
FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用
代码实现:
1.pom文件引入rabbitmq依赖
<!-- rabbitMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
- 1
- 2
- 3
- 4
- 5
2.配置文件
server: port: 9091spring: application: name: rabbitmq # rabbitmq配置 rabbitmq: host: 192.168.8.142 port: 5672 username: admin password: admin virtual-host: my_vhost
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
3.constant类
package com.anychat.rabbitmqtest.constant;/** * @author Liby * @date 2022-05-05 10:02 * @description: * @version: */public class RabbitmqConstant { public static final String MEETING_FANOUT_EXCHANGE = "meeting_exchange";}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
4.用户实体类
package com.anychat.rabbitmqtest.entity;/** * @author Liby * @date 2022-05-06 09:39 * @description: * @version: */public class User { private Integer userId; private String username; public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public User(Integer userId, String username) { this.userId = userId; this.username = username; }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
5.工具类
package com.anychat.rabbitmqtest.util;import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;/** * @author Liby * @date 2022-04-28 10:27 * @description: * @version: */public class RabbitmqUtil { @Autowired private static RabbitTemplate rabbitTemplate; public static Channel getChannel() { Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true); return channel; }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
6.消费者类
package com.anychat.rabbitmqtest.consumer;import cn.hutool.core.util.StrUtil;import com.anychat.rabbitmqtest.constant.RabbitmqConstant;import com.anychat.rabbitmqtest.entity.User;import com.rabbitmq.client.*;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.concurrent.ConcurrentHashMap;/** * @author Liby * @date 2022-04-25 11:18 * @description:消费者,动态创建临时队列 * @version: */@Slf4j@Componentpublic class FanoutConsumer { @Autowired private RabbitTemplate rabbitTemplate; public void createQueue(User user) { //创建信道 Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true); try { //声明一个交换机与生产者相同 channel.exchangeDeclare(RabbitmqConstant.MEETING_FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true); //获取一个随机的队列名称,使用默认方式,产生的队列为临时队列,在没有消费者时将会自动删除 String queueName = channel.queueDeclare().getQueue(); //用户Id与队列名绑定 ConcurrentHashMap<String, Integer> userQueueMap = new ConcurrentHashMap<>(); userQueueMap.putIfAbsent(queueName, user.getUserId()); //关联 exchange 和 queue ,因为是广播无需指定routekey,routingKey设置为空字符串 // channel.queueBind(queue, exchange, routingKey) channel.queueBind(queueName, RabbitmqConstant.MEETING_FANOUT_EXCHANGE, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); //对信息进行操作 String message = new String(body, "UTF-8"); if (StrUtil.isNotBlank(message)) { String[] receiveIds = message.split(","); Integer userId = userQueueMap.get(queueName); for (String id : receiveIds) { if (userId.equals(Integer.valueOf(id))) { log.info("用户{}收到入会邀请", id); } } } } }; //true 自动回复ack channel.basicConsume(queueName, true, consumer); } catch (Exception ex) { } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
7.controller类
package com.anychat.rabbitmqtest.controller;import com.anychat.rabbitmqtest.constant.RabbitmqConstant;import com.anychat.rabbitmqtest.consumer.FanoutConsumer;import com.anychat.rabbitmqtest.entity.User;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @author Liby * @date 2022-04-24 16:34 * @description:生产者 * @version: */@RestController@Slf4j@RequestMapping("/producer")public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private FanoutConsumer fanoutConsumer; /** * 模拟用户登录后,创建一个临时队列,与该用户绑定 */ @PostMapping("/login") public String login(){ //模拟三个用户登录 int userNum=3; for (int i = 0; i < userNum; i++) { //用户绑定临时队列,并监听队列 fanoutConsumer.createQueue(new User(i, "用户" + i)); log.info("用户{}登录成功",i); } return "用户登录成功"; } @PostMapping("/meeting") public String meeting(){ String message="1,2"; log.info("邀请用户{}进入会议",message); //发送消息,要求userId为2和3的用户进入会议 rabbitTemplate.convertAndSend(RabbitmqConstant.MEETING_FANOUT_EXCHANGE,"",message); return "发送成功"; }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
postman分别调用login和meeting两个接口
可以看到日志打印