RabbitMQ(一)
RabbitMQ 用户管理
逻辑结构
- 用户
- 虚拟主机
- 队列
用户管理
命令行用户管理
在 linux 中使用命令行创建用户
1
2
3
4
5# 进入到rabbit_mq的sbin目录
cd /usr/local/rabbitmq_server-3.7.0/sbin
# 新增用户
./rabbitmqctl add_user garvey admin123设置用户级别
1
2
3
4
5
6
7# 用户级别:
# 1.administrator 可以登录控制台、查看所有信息、可以对RabbitMQ进行管理
# 2.monitoring 监控者 登录控制台、查看所有信息
# 3.policymaker 策略制定者 登录控制台、指定策略
# 4.managment 普通管理员 登录控制台
./rabbitmqctl set_user_tags garvey administrator
管理系统进行用户管理
管理系统登录:访问 http://你的主机ip:15672/
RabbitMQ 工作模式
RabbitMQ 提供了多种消息的通信方式 官方文档
消息通信是由两个角色完成:消息生产者(producer)和 消息消费者(Consumer)
一个队列只有一个消费者
生产者将消息发送到队列,消费者从队列取出数据
多个消费者监听同一个队列
多个消费者监听同一个队列,但多个消费者中只有一个消费者会成功的消费消息
一个交换机绑定多个消息队列,每个消息队列有一个消费者监听
消息生产者发送的消息可以被每一个消费者接收
一个交换机绑定多个消息队列,每个消息队列都由自己唯一的 key,每个消息队列有一个消费者监听
RabbitMQ交换机和队列管理
在 Maven 中使用 MQ
RabbitMQ 队列结构
消息生产者
创建 Maven 项目
添加 RabbitMQ 连接所需要的依赖 (
pom.xml
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>在 resources 目录下创建
log4j.properties
1
2
3
4
5log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG
log4j.logger.org.mybatis = DEBUG
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n创建 MQ 连接帮助类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37/**
* @author GarveyZhong
* @date 2020/6/7 21:17
*/
public class ConnectionUtil {
/**
* 获取连接
*
* @return
* @throws IOException
* @throws TimeoutException
*/
public static Connection getConnection() throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.在工厂对象中设置MQ的连接信息(ip,port,virtualhost,username,password)
factory.setHost("修改成你的服务器ip");
factory.setPort(5672);
factory.setVirtualHost("host1");
factory.setUsername("garvey");
factory.setPassword("admin");
// 3.通过工厂对象获取与MQ的连接
return factory.newConnection();
}
/**
* 测试连接
*
* @param args
* @throws IOException
* @throws TimeoutException
*/
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println(getConnection());
}
}消息生产者发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31package com.zjw.mq.service;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zjw.mq.utils.ConnectionUtil;
public class SendMsg {
public static void main(String[] args) throws Exception {
String msg = "Hello Garvey!";
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 定义队列(使用Java代码在MQ中新建一个队列)
// 参数1:定义的队列名称
// 参数2:队列中的数据是否持久化(如果选择了持久化)
// 参数3: 是否排外(当前队列是否为当前连接私有)
// 参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
// 参数5:设置当前队列的参数
// channel.queueDeclare("queue7",false,false,false,null);
// 参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""
// 参数2:目标队列名称
// 参数3:设置当前这条消息的属性(设置过期时间 10)
// 参数4:消息的内容
channel.basicPublish("", "queue7", null, msg.getBytes());
System.out.println("发送:" + msg);
channel.close();
connection.close();
}
}
消息消费者
创建 Maven 项目
添加依赖
log4j.properties
ConnetionUtil.java
消费者消费消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19package com.zjw.mq.service;
import com.rabbitmq.client.*;
import com.zjw.mq.utils.ConnectionUtil;
public class ReceiceMsg {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
// body就是从队列中获取的数据
System.out.println(new String(body));
}
};
channel.basicConsume("queue7", true, consumer);
}
}
一个发送者多个消费者
发送者
1 | public class SendMsg { |
消费者一
1 | public static void main(String[] args) throws Exception { |
消费者二
1 | public class ReceiceMsg2 { |
发送者 发送消息到交换机
1 | public class SendMsg { |
消费者一
1 | public class ReceiceMsg1 { |
消费者二
1 | public class ReceiceMsg2 { |
发送者:发送消息到交换机
1 | public class SendMsg { |
消费者一
1 | public class ReceiveMsg1 { |
消费者二
1 | public class ReceiceMsg2 { |
Spring Boot 中使用 MQ
Spring Boot 应用可以完成自动配置及依赖注入— —可以通过 Spring
直接提供与 MQ 的连接对象
创建 Spring Boot 应用,添加依赖
配置 application.yml
1 | spring: |
发送消息
1 |
|
创建项目添加依赖
配置yml
接收消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// @RabbitListener(queues = {"queue1","queue2"})
public class ReceiveMsgService {
public void receiveMsg(String msg) {
System.out.println("接收MSG:" + msg);
}
//@RabbitHandler
//public void receiveMsg(byte[] bs){
//
//}
}
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 小嘉的部落格!
评论