RabbitMQ(二)
使用 RabbitMQ 传递对象
RabbitMQ 是消息队列,发送和接收的都是字符串/字节数组类型的消息
使用序列化对象
要求:
- 传递的对象实现序列化接口
- 传递的对象的包名、类名、属性名必须一致
消息提供者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/**
* @author GarveyZhong
* @date 2020/6/10 22:06
*/
public class MQService {
private AmqpTemplate amqpTemplate;
public void sendGoodsToMq(Goods goods) {
// 消息队列可以发送 字符串、字节数组、序列化对象
amqpTemplate.convertAndSend("", "queue1", goods);
}
}消息消费者
1
2
3
4
5
6
7
8
9
10
11
12
13/**
* @author GarveyZhong
* @date 2020/6/10 22:06
*/
public class ReceiveService {
public void receiveMsg(Goods goods) {
System.out.println("Goods---" + goods);
}
}
使用序列化字节数组
要求:
- 传递的对象实现序列化接口
- 传递的对象的包名、类名、属性名必须一致
消息提供者
1
2
3
4
5
6
7
8
9
10
11
12
public class MQService {
private AmqpTemplate amqpTemplate;
public void sendGoodsToMq(Goods goods) {
// 消息队列可以发送 字符串、字节数组、序列化对象
byte[] bytes = SerializationUtils.serialize(goods);
amqpTemplate.convertAndSend("", "queue1", bytes);
}
}消息消费者
1
2
3
4
5
6
7
8
9
10
public class ReceiveService {
public void receiveMsg(byte[] bs) {
Object goods = SerializationUtils.deserialize(bs);
System.out.println("byte[]---" + goods);
}
}
使用JSON字符串传递
要求:对象的属性名一致
消息提供者
1
2
3
4
5
6
7
8
9
10
11
12
13
public class MQService {
private AmqpTemplate amqpTemplate;
public void sendGoodsToMq(Goods goods) throws JsonProcessingException {
// 消息队列可以发送 字符串、字节数组、序列化对象
ObjectMapper objectMapper = new ObjectMapper();
String msg = objectMapper.writeValueAsString(goods);
amqpTemplate.convertAndSend("", "queue1", msg);
}
}消息消费者
1
2
3
4
5
6
7
8
9
10
11
public class ReceiveService {
public void receiveMsg(String msg) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
Goods goods = objectMapper.readValue(msg, Goods.class);
System.out.println("String---" + msg);
}
}
基于 Java 的交换机与队列创建
我们使用消息队列,消息队列和交换机可以通过管理系统完成创建,也可以在应用程序中通过 Java 代码来完成创建
Maven 项目交换机及队列创建
使用 Java 代码新建队列
1
2
3
4
5
6
7// 1.定义队列 (使用Java代码在MQ中新建一个队列)
// 参数1:定义的队列名称
// 参数2:队列中的数据是否持久化(如果选择了持久化)
// 参数3: 是否排外(当前队列是否为当前连接私有)
// 参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
// 参数5:设置当前队列的参数
channel.queueDeclare("queue7", false, false, false, null);新建交换机
1
2
3
4// 定义一个“订阅交换机”
channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT);
// 定义一个“路由交换机”
channel.exchangeDeclare("ex4", BuiltinExchangeType.DIRECT);绑定队列到交换机
1
2
3
4
5
6// 绑定队列
// 参数1:队列名称
// 参数2:目标交换机
// 参数3:如果绑定订阅交换机参数为"",如果绑定路由交换机则表示设置队列的key
channel.queueBind("queue7", "ex4", "k1");
channel.queueBind("queue8", "ex4", "k2");
SpringBoot 通过配置完成队列的创建
1 | /** |
消息的可靠性
RabbitMQ 事务
当在消息发送过程中添加了事务,处理效率降低几十倍甚至上百倍
1 | // 开启事务 |
RabbitMQ 消息确认和 return 机制
消息确认机制:确认消息提供者是否成功发送消息到交换机
return机制:确认消息是否成功的从交换机分发到队列
普通 Maven 项目的消息确认
- 普通
confirm
方式
1 | // 1.发送消息之前开启消息确认 |
- 批量
confirm
方式
1 | // 1.发送消息之前开启消息确认 |
- 异步
confirm
方式
1 | // 发送消息之前开启消息确认 |
普通 Maven 项目的 return 机制
- 添加 return 监听器
- 发送消息是指定第三个参数为 true
- 由于监听器监听是异步处理,所以在消息发送之后不能关闭 channel
1 | String msg = "Hello RabbitMQ!"; |
在 SpringBoot 实现消息确认与 return 监听
配置 application.yml, 开启消息确认和 return 监听
1 | spring: |
创建 confirm 和 return 监听
1 |
|
RabbitMQ 的集群部署
- 暂略
延迟机制
延迟队列
延迟队列——消息进入到队列之后,延迟指定的时间才能被消费者消费
AMQP 协议和 RabbitMQ 队列本身是不支持延迟队列功能的,但是可以通过 TTL(Time To Live)特性模拟延迟队列的功能
TTL就是消息的存活时间。RabbitMQ 可以分别对队列和消息设置存活时间
- 在创建队列的时候可以设置队列的存活时间,当消息进入到队列并且在存活时间内没有消费者消费,则此消息就会从当前队列被移除;
创建消息队列没有设置 TTL,但是消息设置了 TTL,那么当消息的存活时间结束,也会被移除;
- 当 TTL 结束之后,我们可以指定将当前队列的消息转存到其他指定的队列
使用延迟队列实现订单支付监控
实现流程图
创建交换机和队列
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 小嘉的部落格!
评论