小程序开发定制Spring项目整合 RabbitMQ消息队列,动态创建队列与交换机

文章目录

小程序开发定制是一个被广泛使用的开小程序开发定制源消息队列。小程序开发定制它是轻量级且易于部署的,小程序开发定制它能支持多种消息协议。RabbitMQ小程序开发定制可以部署在分布式和联合配置中,小程序开发定制以满足高规模、小程序开发定制高可用性的需求。

基本概念

RabbitMQ小程序开发定制的内部结构图

Message

  • 消息,由Header和body组成,Header小程序开发定制是由生产者添加的各种小程序开发定制属性的集合,包括Message小程序开发定制是否被持久化、小程序开发定制优先级是多少、由哪个Message Queue接收等;body小程序开发定制是真正需要发送的数据内容;

Publisher

  • 消息的生产者,也是一个向交换器发布消息的客户端应用程序。

Exchange

  • 消息交换机,作用是接收来自生产者的消息,并根据路由键转发消息到所绑定的队列。生产者发送上的消息,就是先通过 Exchnage 按照绑定 (binding) 规则转发到队列的。
  • 交换机类型 (Exchange Type) 有四种:fanout、direct、topic,headers,其中 headers 并不常用。

Binding

  • 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表,Binding 操作一般用于 RabbitMQ 的路由工作模式和主题工作模式。

Queue

  • 消息队列,内部用于存储消息的对象,是真正用来存储消息的结构。它是消息的容器,也是消息的终点。在生产端,生产者的消息最终发送到指定队列,而消费者也是通过订阅某个队列,达到获取消息的目的。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

Connection

  • 网络连接,是 RabbitMQ 内部对象之一,用于管理每个到 RabbitMQ 的 TCP 网络连接。

Channel

  • 信道,多路复用连接中的一条独立的双向数据流通道,也是我们与 RabbitMQ 打交道的最重要的一个接口,我们大部分的业务操作是在 Channel 这个接口中完成的,包括定义 Queue、定义 Exchange、绑定 Queue 与 Exchange、发布消息等。
  • 信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

Consumer

  • 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host

  • 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
  • 一个 VirtualHost 下面有一组不同 Exchnage 与 Queue,不同的 Virtual host 的 Exchnage 与 Queue 之间互相不影响。应用隔离与权限划分,Virtual host 是 RabbitMQ 中最小颗粒的权限单位划分。
  • 如果要类比的话,我们可以把 Virtual host 比作 MySQL 中的数据库,通常我们在使用 MySQL 时,会为不同的项目指定不同的数据库,同样的,在使用 RabbitMQ 时,我们可以为不同的应用程序指定不同的 Virtual host。

Broker

  • 表示消息队列服务器实体。

RabbitMQ 的常见模式

  • 简单 (simple) 模式
  • 工作 (work) 模式
  • 发布 / 订阅 (pub/sub) 模式
  • 路由 (routing) 模式
  • 主题 (Topic) 模式

Docker部署RabbitMQ

镜像下载

docker pull rabbitmq
  • 1

启动容器

docker run -d --name rabbitmq --restart always --hostname rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq
  • 1
  • -p 指定宿主机和容器端口映射(5672:服务应用端口,15672:管理控制台端口)

安装管理控制台插件

# 进入容器内部docker exec -it rabbitmq /bin/bash## 安装插件rabbitmq-plugins enable rabbitmq_management
  • 1
  • 2
  • 3
  • 4

启动验证

  • 访问RabbitMQ控制台: http://{host}:15672/,初始默认用户名/密码:guest/guest

  • 至此,RabbitMQ的安装和配置完成。

Spring项目集成RabbitMQ

添加AMQP相关依赖

  • 在pom.xml文件中添加AMQP相关依赖
 <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
  • 1
  • 2
  • 3
  • 4

添加RabbitMQ的相关配置

  • 在application.yml添加RabbitMQ的相关配置
spring:  rabbitmq:    host: ip地址   # rabbitmq的连接地址    port: 5672     # rabbitmq的连接端口号    username: guest# rabbitmq的用户名    password: guest# rabbitmq的密码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

动态创建队列、交换机初始化器

  • 创建RabbitMQ的Java配置,主要用于配置交换机、队列和绑定关系;
@Configuration@Slf4jpublic class RabbitConfig {    /**     * 使用json序列化机制,进行消息转换     */    @Bean    public MessageConverter jackson2MessageConverter() {        return new Jackson2JsonMessageConverter();    }    /**     * 动态创建队列、交换机初始化器     */    @Bean    @ConditionalOnMissingBean    public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties) {        return new RabbitModuleInitializer(amqpAdmin, rabbitModuleProperties);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
/** * RabbitMQ 交换机类型枚举 */public enum RabbitExchangeTypeEnum {    /**     * 直连交换机     * <p>     * 根据routing-key精准匹配队列(最常使用)     */    DIRECT,    /**     * 主题交换机     * <p>     * 根据routing-key模糊匹配队列,*匹配任意一个字符,#匹配0个或多个字符     */    TOPIC,    /**     * 扇形交换机     * <p>     * 直接分发给所有绑定的队列,忽略routing-key,用于广播消息     */    FANOUT,    /**     * 头交换机     * <p>     * 类似直连交换机,不同于直连交换机的路由规则建立在头属性上而不是routing-key(使用较少)     */    HEADERS;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
@ConfigurationProperties(prefix = "spring.rabbitmq")@Datapublic class RabbitModuleProperties {    private List<RabbitModuleInfo> modules;}
  • 1
  • 2
  • 3
  • 4
  • 5

实现SmartInitializingSingleton的接口后,当所有单例 bean 都初始化完成以后, Spring的IOC容器会回调该接口的 afterSingletonsInstantiated()方法。主要应用场合就是在所有单例 bean 创建完成之后,可以在该回调中做一些事情。

/** * RabbitMQ队列初始化器 */@Slf4jpublic class RabbitModuleInitializer implements SmartInitializingSingleton {    private AmqpAdmin amqpAdmin;    private RabbitModuleProperties rabbitModuleProperties;    public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties) {        this.amqpAdmin = amqpAdmin;        this.rabbitModuleProperties = rabbitModuleProperties;    }    @Override    public void afterSingletonsInstantiated() {        log.info("RabbitMQ 根据配置动态创建和绑定队列、交换机");        declareRabbitModule();    }    /**     * RabbitMQ 根据配置动态创建和绑定队列、交换机     */    private void declareRabbitModule() {        List<RabbitModuleInfo> rabbitModuleInfos = rabbitModuleProperties.getModules();        if (CollectionUtil.isEmpty(rabbitModuleInfos)) {            return;        }        for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) {            configParamValidate(rabbitModuleInfo);            // 队列            Queue queue = convertQueue(rabbitModuleInfo.getQueue());            // 交换机            Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());            // 绑定关系            String routingKey = rabbitModuleInfo.getRoutingKey();            String queueName = rabbitModuleInfo.getQueue().getName();            String exchangeName = rabbitModuleInfo.getExchange().getName();            Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);            // 创建队列            amqpAdmin.declareQueue(queue);            // 创建交换机            amqpAdmin.declareExchange(exchange);            // 队列 绑定 交换机            amqpAdmin.declareBinding(binding);        }    }    /**     * RabbitMQ动态配置参数校验     *     * @param rabbitModuleInfo     */    public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {        String routingKey = rabbitModuleInfo.getRoutingKey();        Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置");        Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);        Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);        Assert.isTrue(rabbitModuleInfo.getQueue() != null, "routingKey:{}未配置queue", routingKey);        Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getQueue().getName()), "routingKey:{}未配置exchange的name属性", routingKey);    }    /**     * 转换生成RabbitMQ队列     *     * @param queue     * @return     */    public Queue convertQueue(RabbitModuleInfo.Queue queue) {        Map<String, Object> arguments = queue.getArguments();        // 转换ttl的类型为long        if (arguments != null && arguments.containsKey("x-message-ttl")) {            arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));        }        // 是否需要绑定死信队列        String deadLetterExchange = queue.getDeadLetterExchange();        String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();        if (StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {            if (arguments == null) {                arguments = new HashMap<>(4);            }            arguments.put("x-dead-letter-exchange", deadLetterExchange);            arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);        }        return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);    }    /**     * 转换生成RabbitMQ交换机     *     * @param exchangeInfo     * @return     */    public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) {        AbstractExchange exchange = null;        RabbitExchangeTypeEnum exchangeType = exchangeInfo.getType();        String exchangeName = exchangeInfo.getName();        boolean isDurable = exchangeInfo.isDurable();        boolean isAutoDelete = exchangeInfo.isAutoDelete();        Map<String, Object> arguments = exchangeInfo.getArguments();        switch (exchangeType) {            case DIRECT:// 直连交换机                exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);                break;            case TOPIC: // 主题交换机                exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);                break;            case FANOUT: //扇形交换机                exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);                break;            case HEADERS: // 头交换机                exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);                break;        }        return exchange;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
/** * RabbitMQ 队列和交换机机绑定关系实体对象 */@Datapublic class RabbitModuleInfo {    /**     * 路由Key     */    private String routingKey;    /**     * 队列信息     */    private Queue queue;    /**     * 交换机信息     */    private Exchange exchange;    /**     * 交换机信息类     */    @Data    public static class Exchange {        /**         * 交换机类型         */        private RabbitExchangeTypeEnum type = RabbitExchangeTypeEnum.DIRECT; // 默认直连交换机        /**         * 交换机名称         */        private String name;        /**         * 是否持久化         */        private boolean durable = true; // 默认true持久化,重启消息不会丢失        /**         * 当所有队绑定列均不在使用时,是否自动删除交换机         */        private boolean autoDelete = false; // 默认false,不自动删除        /**         * 交换机其他参数         */        private Map<String, Object> arguments;    }    /**     * 队列信息类     */    @Data    public static class Queue {        /**         * 队列名称         */        private String name;        /**         * 是否持久化         */        private boolean durable = true; // 默认true持久化,重启消息不会丢失        /**         * 是否具有排他性         */        private boolean exclusive = false; // 默认false,可多个消费者消费同一个队列        /**         * 当消费者均断开连接,是否自动删除队列         */        private boolean autoDelete = false; // 默认false,不自动删除,避免消费者断开队列丢弃消息        /**         * 绑定死信队列的交换机名称         */        private String deadLetterExchange;        /**         * 绑定死信队列的路由key         */        private String deadLetterRoutingKey;        private Map<String, Object> arguments;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

动态创建队列,交换机

添加配置,动态创建队列,交换机

  rabbitmq:    # 动态创建和绑定队列、交换机的配置    modules:      # 延时队列,到了过期的时间会被转发到订单死信队列      - routing-key: log.inbound.operation.queue.key        queue:          name: log.inbound.operation.queue          #          dead-letter-exchange: order.exchange          #          dead-letter-routing-key: order.close.routing.key          arguments:            # 1分钟(测试),单位毫秒            x-message-ttl: 60000        exchange:          name: log.exchange
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

生产者

生产者发送消息

@Autowiredprivate RabbitTemplate rabbitTemplate;rabbitTemplate.convertAndSend("log.exchange", "log.inventory.operation.queue.key", inventoryChangeLogDTO);
  • 1
  • 2
  • 3
  • 4

消费者

消费者接收消息

@RabbitListener(queues = "log.inventory.operation.queue")public void handleInventoryOperation(InventoryChangeLogDTO inventoryDTO) {    System.out.println(inventoryDTO);}
  • 1
  • 2
  • 3
  • 4

你知道的越多,你不知道的越多。

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发