使用 RabbitMQ 传递对象

RabbitMQ 是消息队列,发送和接收的都是字符串/字节数组类型的消息

使用序列化对象

要求:

  1. 传递的对象实现序列化接口
  2. 传递的对象的包名、类名、属性名必须一致
  • 消息提供者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /**
    * @author GarveyZhong
    * @date 2020/6/10 22:06
    */
    @Service
    public class MQService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendGoodsToMq(Goods goods) {
    // 消息队列可以发送 字符串、字节数组、序列化对象
    amqpTemplate.convertAndSend("", "queue1", goods);
    }
    }
  • 消息消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * @author GarveyZhong
    * @date 2020/6/10 22:06
    */
    @Component
    @RabbitListener(queues = "queue1")
    public class ReceiveService {

    @RabbitHandler
    public void receiveMsg(Goods goods) {
    System.out.println("Goods---" + goods);
    }
    }

使用序列化字节数组

要求:

  1. 传递的对象实现序列化接口
  2. 传递的对象的包名、类名、属性名必须一致
  • 消息提供者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Service
    public class MQService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendGoodsToMq(Goods goods) {
    // 消息队列可以发送 字符串、字节数组、序列化对象
    byte[] bytes = SerializationUtils.serialize(goods);
    amqpTemplate.convertAndSend("", "queue1", bytes);
    }
    }
  • 消息消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Component
    @RabbitListener(queues = "queue1")
    public class ReceiveService {

    @RabbitHandler
    public void receiveMsg(byte[] bs) {
    Object goods = SerializationUtils.deserialize(bs);
    System.out.println("byte[]---" + goods);
    }
    }

使用JSON字符串传递

要求:对象的属性名一致

  • 消息提供者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Service
    public class MQService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendGoodsToMq(Goods goods) throws JsonProcessingException {
    // 消息队列可以发送 字符串、字节数组、序列化对象
    ObjectMapper objectMapper = new ObjectMapper();
    String msg = objectMapper.writeValueAsString(goods);
    amqpTemplate.convertAndSend("", "queue1", msg);
    }
    }
  • 消息消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Component
    @RabbitListener(queues = "queue1")
    public class ReceiveService {

    @RabbitHandler
    public void receiveMsg(String msg) throws JsonProcessingException {
    ObjectMapper objectMapper = new ObjectMapper();
    Goods goods = objectMapper.readValue(msg, Goods.class);
    System.out.println("String---" + msg);
    }
    }

基于 Java 的交换机与队列创建

我们使用消息队列,消息队列和交换机可以通过管理系统完成创建,也可以在应用程序中通过 Java 代码来完成创建

Maven 项目交换机及队列创建

  • 使用 Java 代码新建队列

    1
    2
    3
    4
    5
    6
    7
    // 1.定义队列 (使用Java代码在MQ中新建一个队列)
    // 参数1:定义的队列名称
    // 参数2:队列中的数据是否持久化(如果选择了持久化)
    // 参数3: 是否排外(当前队列是否为当前连接私有)
    // 参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
    // 参数5:设置当前队列的参数
    channel.queueDeclare("queue7", false, false, false, null);
  • 新建交换机

    1
    2
    3
    4
    // 定义一个“订阅交换机”
    channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT);
    // 定义一个“路由交换机”
    channel.exchangeDeclare("ex4", BuiltinExchangeType.DIRECT);
  • 绑定队列到交换机

    1
    2
    3
    4
    5
    6
    // 绑定队列
    // 参数1:队列名称
    // 参数2:目标交换机
    // 参数3:如果绑定订阅交换机参数为"",如果绑定路由交换机则表示设置队列的key
    channel.queueBind("queue7", "ex4", "k1");
    channel.queueBind("queue8", "ex4", "k2");

SpringBoot 通过配置完成队列的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* @author GarveyZhong
* @date 2020/6/9 22:21
*/
@Configuration
public class RabbitMQConfig {

/**
* SpringBoot 新建队列
*
* @return
*/
@Bean
public Queue queue9() {
return new Queue("queue9");
}

@Bean
public Queue queue10() {
Queue queue10 = new Queue("queue10");
// queue10.设置参数
return queue10;
}

/**
* 声明订阅交换机
*
* @return
*/
@Bean
public FanoutExchange ex5() {
return new FanoutExchange("ex5");
}

/**
* 声明路由交换机
*
* @return
*/
@Bean
public DirectExchange ex6() {
return new DirectExchange("ex6");
}

/**
* 绑定队列
*
* @param queue9
* @param ex6
* @return
*/
@Bean
public Binding bindingQueue9(Queue queue9, DirectExchange ex6) {
return BindingBuilder.bind(queue9).to(ex6).with("k1");
}

/**
* 绑定队列
*
* @param queue10
* @param ex6
* @return
*/
@Bean
public Binding bindingQueue10(Queue queue10, DirectExchange ex6) {
return BindingBuilder.bind(queue10).to(ex6).with("k2");
}
}

消息的可靠性

RabbitMQ 事务

当在消息发送过程中添加了事务,处理效率降低几十倍甚至上百倍

1
2
3
4
5
6
7
8
9
10
11
// 开启事务
channel.txSelect();
try {
channel.basicPublish("ex4", "k1", null, msg.getBytes());
System.out.println("发送:" + msg);
// 提交事务
channel.txCommit();
} catch (Exception e) {
// 事务回滚
channel.txRollback();
}

RabbitMQ 消息确认和 return 机制

img

消息确认机制:确认消息提供者是否成功发送消息到交换机

return机制:确认消息是否成功的从交换机分发到队列

普通 Maven 项目的消息确认

  • 普通 confirm 方式
1
2
3
4
5
6
// 1.发送消息之前开启消息确认
channel.confirmSelect();
channel.basicPublish("ex1", "a", null, msg.getBytes());
// 2.接收消息确认
boolean b = channel.waitForConfirms();
System.out.println("发送:" + (b ? "成功" : "失败"));
  • 批量 confirm 方式
1
2
3
4
5
6
7
8
// 1.发送消息之前开启消息确认
channel.confirmSelect();
// 2.批量发送消息
for (int i = 0; i < 10; i++) {
channel.basicPublish("ex1", "a", null, msg.getBytes());
}
// 3.接收批量消息确认:发送的所有消息中,如果有一条是失败的,则所有消息发送直接失败,抛出IO异常
boolean b = channel.waitForConfirms();
  • 异步 confirm 方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 发送消息之前开启消息确认
channel.confirmSelect();
// 批量发送消息
for (int i = 0; i < 10; i++) {
channel.basicPublish("ex1", "a", null, msg.getBytes());
}
// 假如发送消息需要10s,waitForConfirms会进入阻塞状态
// boolean b = channel.waitForConfirms();

// 使用监听器异步confirm
channel.addConfirmListener(new ConfirmListener() {
//参数1: long l 返回消息的表示
//参数2: boolean b 是否为批量confirm
public void handleAck(long l, boolean b) throws IOException {
System.out.println("~~~~~消息成功发送到交换机");
}

public void handleNack(long l, boolean b) throws IOException {
System.out.println("~~~~~消息发送到交换机失败");
}
});

普通 Maven 项目的 return 机制

  • 添加 return 监听器
  • 发送消息是指定第三个参数为 true
  • 由于监听器监听是异步处理,所以在消息发送之后不能关闭 channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
String msg = "Hello RabbitMQ!";
// 相当于JDBC操作的数据库连接
Connection connection = ConnectionUtil.getConnection();
// 相当于JDBC操作的statement
Channel channel = connection.createChannel();

// return机制,监控交换机是否将消息分发到队列
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 如果交换机分发消息到队列失败,则会执行此方法(用来处理交换机分发消息到队列失败的情况)
// 标识
System.out.println("唯一标识:" + replyCode);
// 忘了
System.out.println("replyText == " + replyText);
// 交换机名
System.out.println("exchange == " + exchange);
// 交换机对应队列的key
System.out.println("routingKey == " + routingKey);
// 发送的消息
System.out.println("body == " + new String(body));
}
});

//发送消息
// channel.basicPublish("ex4", "g", null, msg.getBytes());
// 不仅开启消息确认,还开启了return机制
channel.basicPublish("ex4", "g", true, null, msg.getBytes());

在 SpringBoot 实现消息确认与 return 监听

配置 application.yml, 开启消息确认和 return 监听

1
2
3
4
spring:
rabbitmq:
publisher-confirm-type: simple # 开启消息确认模式
publisher-returns: true # 使用return监听机制

创建 confirm 和 return 监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
public class MsgConfirmAndReturn implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

Logger logger = LoggerFactory.getLogger(MsgConfirmAndReturn.class);

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}

@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
// 此方法用于监听消息确认结果(消息是否发送到交换机)
if (b) {
logger.info("-------消息成功发送到交换机");
} else {
logger.warn("-------消息发送到交换机失败");
}
}

@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
//此方法用于return监听(当交换机分发消息到队列失败时执行)
logger.warn("~~~~~~~交换机分发消息到队列失败");
}
}

RabbitMQ 的集群部署

  • 暂略

延迟机制

延迟队列

  • 延迟队列——消息进入到队列之后,延迟指定的时间才能被消费者消费

  • AMQP 协议和 RabbitMQ 队列本身是不支持延迟队列功能的,但是可以通过 TTL(Time To Live)特性模拟延迟队列的功能

  • TTL就是消息的存活时间。RabbitMQ 可以分别对队列和消息设置存活时间

    img

    • 在创建队列的时候可以设置队列的存活时间,当消息进入到队列并且在存活时间内没有消费者消费,则此消息就会从当前队列被移除;
  • 创建消息队列没有设置 TTL,但是消息设置了 TTL,那么当消息的存活时间结束,也会被移除;

    • 当 TTL 结束之后,我们可以指定将当前队列的消息转存到其他指定的队列

使用延迟队列实现订单支付监控

  • 实现流程图

    img

创建交换机和队列

img

img

img

img