企业管理系统定制开发docker启动rabbitmq及使用

搜索镜像

search rabbitmq:management

下载镜像

docker pull rabbitmq:management

启动容器

docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management


打印容器

docker logs rabbitmq


访问RabbitMQ Management


企业管理系统定制开发账户密码默认:guest

企业管理系统定制开发编写生产者类

package com.xun.rabbitmqdemo.example;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer {    private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws IOException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        factory.setUsername("guest");        factory.setPassword("guest");        factory.setHost("localhost");        factory.setPort(5672);        factory.setVirtualHost("/");        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        /**         * 生成一个queue队列         * 1、队列名称 QUEUE_NAME         * 2、企业管理系统定制开发队列里面的消息是否持久化(企业管理系统定制开发默认消息存储在内存中)         * 3、该队列是否只供一个Consumer消费 是否共享 设置为true可以多个消费者消费         * 4、是否自动删除 最后一个消费者断开连接后 该队列是否自动删除         * 5、其他参数         */        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        String message = "Hello world!";        /**         * 发送一个消息         * 1、发送到哪个exchange交换机         * 2、路由的key         * 3、其他的参数信息         * 4、消息体         */        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());        System.out.println(" [x] Sent '"+message+"'");        channel.close();        connection.close();    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

运行该方法,可以看到控制台的打印

name=hello的队列收到Message

消费者

package com.xun.rabbitmqdemo.example;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Receiver {    private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws IOException, TimeoutException {        ConnectionFactory factory = new ConnectionFactory();        factory.setUsername("guest");        factory.setPassword("guest");        factory.setHost("localhost");        factory.setPort(5672);        factory.setVirtualHost("/");        factory.setConnectionTimeout(600000);//milliseconds        factory.setRequestedHeartbeat(60);//seconds        factory.setHandshakeTimeout(6000);//milliseconds        factory.setRequestedChannelMax(5);        factory.setNetworkRecoveryInterval(500);        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME,false,false,false,null);        System.out.println("Waiting for messages. ");        Consumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println(" [x] Received '" + message + "'");            }        };        channel.basicConsume(QUEUE_NAME,true,consumer);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38


工作队列

RabbitMqUtils工具类

package com.xun.rabbitmqdemo.utils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils {    public static Channel getChannel() throws Exception{        ConnectionFactory factory = new ConnectionFactory();        factory.setHost("localhost");        factory.setUsername("guest");        factory.setPassword("guest");        Connection connection = factory.newConnection();        Channel channel = connection.createChannel();        return channel;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

启动2个工作线程

package com.xun.rabbitmqdemo.workQueue;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.xun.rabbitmqdemo.utils.RabbitMqUtils;public class Work01 {    private static final String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception{        Channel channel = RabbitMqUtils.getChannel();        DeliverCallback deliverCallback = (consumerTag,delivery)->{            String receivedMessage = new String(delivery.getBody());            System.out.println("接收消息:"+receivedMessage);        };        CancelCallback cancelCallback = (consumerTag)->{            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");        };        System.out.println("C1 消费者启动等待消费....");        /**         * 消费者消费消息         * 1、消费哪个队列         * 2、消费成功后是否自动应答         * 3、消费的接口回调         * 4、消费未成功的接口回调         */        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
package com.xun.rabbitmqdemo.workQueue;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.xun.rabbitmqdemo.utils.RabbitMqUtils;public class Work02 {    private static final String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception{        Channel channel = RabbitMqUtils.getChannel();        DeliverCallback deliverCallback = (consumerTag,delivery)->{            String receivedMessage = new String(delivery.getBody());            System.out.println("接收消息:"+receivedMessage);        };        CancelCallback cancelCallback = (consumerTag)->{            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");        };        System.out.println("C2 消费者启动等待消费....");        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

启动工作线程

启动发送线程

package com.xun.rabbitmqdemo.workQueue;import com.rabbitmq.client.Channel;import com.xun.rabbitmqdemo.utils.RabbitMqUtils;import java.util.Scanner;public class Task01 {    private static final String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception{        try(Channel channel= RabbitMqUtils.getChannel();){            channel.queueDeclare(QUEUE_NAME,false,false,false,null);            //从控制台接收消息            Scanner scanner = new Scanner(System.in);            while(scanner.hasNext()){                String message = scanner.next();                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());                System.out.println("发送消息完成:"+message);            }        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

启动发送线程,此时发送线程等待键盘输入

发送4个消息



可以看到2个工作线程按照顺序分别接收message。

消息应答机制

rabbitmq将message发送给消费者后,就会将该消息标记为删除。
但消费者在处理message过程中宕机,会导致消息的丢失。
因此需要设置手动应答。

生产者

import com.xun.rabbitmqdemo.utils.RabbitMqUtils;import java.util.Scanner;public class Task02 {    private static final String TASK_QUEUE_NAME = "ack_queue";    public static void main(String[] args) throws Exception{        try(Channel channel = RabbitMqUtils.getChannel()){            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);            Scanner scanner = new Scanner(System.in);            System.out.println("请输入信息");            while(scanner.hasNext()){                String message = scanner.nextLine();                channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());                System.out.println("生产者task02发出消息"+ message);            }        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

消费者

package com.xun.rabbitmqdemo.workQueue;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.xun.rabbitmqdemo.utils.RabbitMqUtils;import com.xun.rabbitmqdemo.utils.SleepUtils;public class Work03 {    private static final String ACK_QUEUE_NAME = "ack_queue";    public static void main(String[] args) throws Exception{        Channel channel = RabbitMqUtils.getChannel();        System.out.println("Work03 等待接收消息处理时间较短");        DeliverCallback deliverCallback = (consumerTag,delivery)->{            String message = new String(delivery.getBody());            SleepUtils.sleep(1);            System.out.println("接收到消息:"+message);            /**             * 1、消息的标记tag             * 2、是否批量应答             */            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        };        CancelCallback cancelCallback = (consumerTag)->{            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");        };        //采用手动应答        boolean autoAck = false;        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
package com.xun.rabbitmqdemo.workQueue;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.xun.rabbitmqdemo.utils.RabbitMqUtils;import com.xun.rabbitmqdemo.utils.SleepUtils;public class Work04 {    private static final String ACK_QUEUE_NAME = "ack_queue";    public static void main(String[] args) throws Exception{        Channel channel = RabbitMqUtils.getChannel();        System.out.println("Work04 等待接收消息处理时间较长");        DeliverCallback deliverCallback = (consumerTag,delivery)->{            String message = new String(delivery.getBody());            SleepUtils.sleep(30);            System.out.println("接收到消息:"+message);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        };        CancelCallback cancelCallback = (consumerTag)->{            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");        };        //采用手动应答        boolean autoAck = false;        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

工具类SleepUtils

package com.xun.rabbitmqdemo.utils;public class SleepUtils {    public static void sleep(int second){        try{            Thread.sleep(1000*second);        }catch (InterruptedException _ignored){            Thread.currentThread().interrupt();        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

模拟



work04等待30s后发出ack

在work04处理message时手动停止线程,可以看到message:dd被rabbitmq交给了work03


不公平分发

上面的轮询分发,生产者依次向消费者按顺序发送消息,但当消费者A处理速度很快,而消费者B处理速度很慢时,这种分发策略显然是不合理的。
不公平分发:

int prefetchCount = 1;channel.basicQos(prefetchCount);
  • 1
  • 2

通过此配置,当消费者未处理完当前消息,rabbitmq会优先将该message分发给空闲消费者。

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发