RabbitMQ 用户管理

逻辑结构

  • 用户
  • 虚拟主机
  • 队列

用户管理

命令行用户管理

  • 在 linux 中使用命令行创建用户

    1
    2
    3
    4
    5
    # 进入到rabbit_mq的sbin目录
    cd /usr/local/rabbitmq_server-3.7.0/sbin

    # 新增用户
    ./rabbitmqctl add_user garvey admin123
  • 设置用户级别

    1
    2
    3
    4
    5
    6
    7
    # 用户级别:
    # 1.administrator 可以登录控制台、查看所有信息、可以对RabbitMQ进行管理
    # 2.monitoring 监控者 登录控制台、查看所有信息
    # 3.policymaker 策略制定者 登录控制台、指定策略
    # 4.managment 普通管理员 登录控制台

    ./rabbitmqctl set_user_tags garvey administrator

管理系统进行用户管理

管理系统登录:访问 http://你的主机ip:15672/

img

img

img

img2

img

RabbitMQ 工作模式

RabbitMQ 提供了多种消息的通信方式 官方文档

消息通信是由两个角色完成:消息生产者(producer)和 消息消费者(Consumer)

一个队列只有一个消费者

img

生产者将消息发送到队列,消费者从队列取出数据

多个消费者监听同一个队列

img

多个消费者监听同一个队列,但多个消费者中只有一个消费者会成功的消费消息

一个交换机绑定多个消息队列,每个消息队列有一个消费者监听

img

消息生产者发送的消息可以被每一个消费者接收

一个交换机绑定多个消息队列,每个消息队列都由自己唯一的 key,每个消息队列有一个消费者监听

img

RabbitMQ交换机和队列管理

img

img

img

img

在 Maven 中使用 MQ

RabbitMQ 队列结构

img

消息生产者

  • 创建 Maven 项目

  • 添加 RabbitMQ 连接所需要的依赖 (pom.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.10.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
    <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
    <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.9</version>
    </dependency>
  • 在 resources 目录下创建 log4j.properties

    1
    2
    3
    4
    5
    log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG 
    log4j.logger.org.mybatis = DEBUG
    log4j.appender.A1=org.apache.log4j.ConsoleAppender
    log4j.appender.A1.layout=org.apache.log4j.PatternLayout
    log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n
  • 创建 MQ 连接帮助类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    /**
    * @author GarveyZhong
    * @date 2020/6/7 21:17
    */
    public class ConnectionUtil {

    /**
    * 获取连接
    *
    * @return
    * @throws IOException
    * @throws TimeoutException
    */
    public static Connection getConnection() throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 2.在工厂对象中设置MQ的连接信息(ip,port,virtualhost,username,password)
    factory.setHost("修改成你的服务器ip");
    factory.setPort(5672);
    factory.setVirtualHost("host1");
    factory.setUsername("garvey");
    factory.setPassword("admin");
    // 3.通过工厂对象获取与MQ的连接
    return factory.newConnection();
    }

    /**
    * 测试连接
    *
    * @param args
    * @throws IOException
    * @throws TimeoutException
    */
    public static void main(String[] args) throws IOException, TimeoutException {
    System.out.println(getConnection());
    }
    }
  • 消息生产者发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    package com.zjw.mq.service;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.zjw.mq.utils.ConnectionUtil;

    public class SendMsg {
    public static void main(String[] args) throws Exception {
    String msg = "Hello Garvey!";
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    // 定义队列(使用Java代码在MQ中新建一个队列)
    // 参数1:定义的队列名称
    // 参数2:队列中的数据是否持久化(如果选择了持久化)
    // 参数3: 是否排外(当前队列是否为当前连接私有)
    // 参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
    // 参数5:设置当前队列的参数
    // channel.queueDeclare("queue7",false,false,false,null);

    // 参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""
    // 参数2:目标队列名称
    // 参数3:设置当前这条消息的属性(设置过期时间 10)
    // 参数4:消息的内容
    channel.basicPublish("", "queue7", null, msg.getBytes());
    System.out.println("发送:" + msg);

    channel.close();
    connection.close();
    }
    }

消息消费者

  • 创建 Maven 项目

  • 添加依赖

  • log4j.properties

  • ConnetionUtil.java

  • 消费者消费消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    package com.zjw.mq.service;

    import com.rabbitmq.client.*;
    import com.zjw.mq.utils.ConnectionUtil;

    public class ReceiceMsg {
    public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
    // body就是从队列中获取的数据
    System.out.println(new String(body));
    }
    };
    channel.basicConsume("queue7", true, consumer);
    }
    }

一个发送者多个消费者

发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SendMsg {
public static void main(String[] args) throws Exception {
System.out.println("请输入消息:");
Scanner scanner = new Scanner(System.in);
String msg = null;
while (!"quit".equals(msg = scanner.nextLine())) {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

channel.basicPublish("", "queue2", null, msg.getBytes());
System.out.println("发送成功:" + msg);

channel.close();
connection.close();
}
}
}

消费者一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
// body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer1接收:" + msg);
if ("wait".equals(msg)) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
channel.basicConsume("queue2", true, consumer);
}

消费者二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ReceiceMsg2 {

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
// body就是从队列中获取的数据
System.out.println("Consumer2接收:" + new String(body));
}
};
channel.basicConsume("queue2", true, consumer);
}
}

发送者 发送消息到交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SendMsg {
public static void main(String[] args) throws Exception {
System.out.println("请输入消息:");
Scanner scanner = new Scanner(System.in);
String msg = null;
while (!"quit".equals(msg = scanner.nextLine())) {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

channel.basicPublish("ex1", "", null, msg.getBytes());
System.out.println("发送:" + msg);

channel.close();
connection.close();
}
}
}

消费者一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ReceiceMsg1 {

public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
// body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer1接收:" + msg);
if ("wait".equals(msg)) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
channel.basicConsume("queue3", true, consumer);
}
}

消费者二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ReceiceMsg2 {

public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("consumer2 == 收到了消息 == " + new String(body));
}
};
channel.basicConsume("queue4", true, consumer);
}
}

发送者:发送消息到交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class SendMsg {

public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("请输入消息:");
Scanner input = new Scanner(System.in);
String msg = null;
while (!"quit".equals(msg = input.nextLine())) {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

if (msg.startsWith("a")) {
channel.basicPublish("ex2", "a", null, msg.getBytes());
} else if (msg.startsWith("b")) {
channel.basicPublish("ex2", "b", null, msg.getBytes());
}
System.out.println("发送成功:" + msg);
channel.close();
connection.close();
}
}
}

消费者一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ReceiveMsg1 {

public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
// body就是从队列中获取的数据
String msg = new String(body);
System.out.println("Consumer1接收:" + msg);
if ("wait".equals(msg)) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
channel.basicConsume("queue5", true, consumer);
}
}

消费者二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ReceiceMsg2 {

public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("Consumer2接收:" + new String(body));
}
};
channel.basicConsume("queue6", true, consumer);
}
}

Spring Boot 中使用 MQ

Spring Boot 应用可以完成自动配置及依赖注入— —可以通过 Spring 直接提供与 MQ 的连接对象

创建 Spring Boot 应用,添加依赖

img

配置 application.yml

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 你的主机ip
virtual-host: host1
port: 5672
username: garvey
password: admin123

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Service
public class TestService {

@Autowired
private AmqpTemplate amqpTemplate;

public void sendMsg(String msg) {
// 1. 发送消息到队列
amqpTemplate.convertAndSend("queue1", msg);

// 2. 发送消息到交换机(订阅交换机)
amqpTemplate.convertAndSend("ex1", "", msg);

// 3. 发送消息到交换机(路由交换机)
amqpTemplate.convertAndSend("ex2", "a", msg);
}
}
  1. 创建项目添加依赖

  2. 配置yml

  3. 接收消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Service
    // @RabbitListener(queues = {"queue1","queue2"})
    @RabbitListener(queues = "queue1")
    public class ReceiveMsgService {

    @RabbitHandler
    public void receiveMsg(String msg) {
    System.out.println("接收MSG:" + msg);
    }

    //@RabbitHandler
    //public void receiveMsg(byte[] bs){
    //
    //}
    }