技术派项目学习笔记(二)· 高标准:RabbitMQ 异步解耦与消息可靠性(源码级深度解析)

技术派项目学习笔记(二)· 高标准:RabbitMQ 异步解耦与消息可靠性(源码级深度解析)

本篇是「高标准严要求」版本,基于 RabbitmqServiceImpl.java 逐行解析,不遗漏任何细节。适合面试前通读,确保被拷问时能对答如流。


一、先搞懂:为什么要引入消息队列?

1.1 没有消息队列时的问题

假设用户点赞了一篇文章,系统需要完成以下事情:

1
2
3
4
5
6
7
8
9
10
11
用户点赞

① 写入 user_foot 表(记录"用户XXX点赞了文章YYY"

② 更新文章点赞数(MySQL article 表)

③ 发送通知给作者("有人赞了你的文章"

④ 更新作者活跃积分(用于排行榜)

返回"点赞成功"给用户

问题:步骤 ③④ 和用户点赞没有强一致性要求(通知晚几秒发完全没问题),但它们拖慢了响应速度。

如果有 100 个人同时点赞,数据库和通知系统压力都很大。

1.2 引入消息队列之后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
用户点赞

① 写入 user_foot 表

② 更新文章点赞数

③ 发送消息到 RabbitMQ:"用户XXX点赞了文章YYY"

立即返回"点赞成功"给用户 ✅(只需几毫秒)

↓ RabbitMQ 异步处理
消费者收到消息,慢慢处理:
④ 发送通知给作者
⑤ 更新作者活跃积分

好处

  • 用户点赞的响应速度大大提升(不需要等通知发完)
  • 通知系统宕机了,不影响用户点赞(消息存在 RabbitMQ 里,等通知系统恢复再处理)
  • 可以动态增加消费者数量,提升处理能力

二、项目里 RabbitMQ 是怎么用的?(源码逐行解析)

2.1 发送消息(生产端)—— RabbitmqServiceImpl.publishMsg()

看源码 RabbitmqServiceImpl.java 第 41-77 行(逐行解析):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Override
public void publishMsg(String exchange,
BuiltinExchangeType exchangeType,
String toutingKey,
String message) {
try {
// 【第 47 行】从连接池获取一个连接
// RabbitmqConnectionPool 是连接池,避免频繁创建 TCP 连接
RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection();
Connection connection = rabbitmqConnection.getConnection();

// 【第 50 行】创建消息通道(Channel)
// 一个 Connection 可以有多个 Channel(多线程复用)
Channel channel = connection.createChannel();

// 【第 52 行】声明交换机(Exchange)
// 第一个参数:交换机名称
// 第二个参数:交换机类型(DIRECT=直连,TOPIC=通配符,FANOUT=广播)
// 第三个参数:durable=true → 交换机持久化(RabbitMQ 重启后还在)
// 第四个参数:autoDelete=false → 不自动删除
// 第五个参数:arguments=null → 无额外参数
channel.exchangeDeclare(exchange, exchangeType, true, false, null);

// 【第 55 行】❗❗ 关键:开启 Publisher Confirms(确认机制)
// 没有 Confirm 时:生产者发送消息后,不知道消息有没有到达 Broker!
// 有 Confirm 后:Broker 收到消息后会回复 ACK/NACK
channel.confirmSelect();

// 【第 57-62 行】异步 Confirm 回调
// 第一个回调:消息成功到达 Broker(ACK)
// 第二个回调:消息被 Broker 拒绝(NACK,可能是 Broker 内部错误)
channel.addConfirmListener(
(deliveryTag, multiple) ->
log.info("消息已到达 Broker,deliveryTag={}, exchange={}", deliveryTag, exchange),
(deliveryTag, multiple) ->
log.warn("消息被 Broker 拒绝(nack),deliveryTag={}, exchange={}, msg={}", deliveryTag, exchange, message)
);

// 【第 65 行】发布消息
// 第一个参数:交换机名称
// 第二个参数:路由键(消息去哪个队列)
// 第三个参数:props=null → ❗❗ 消息属性(这里没设置持久化!)
// 第四个参数:消息体(字节数组)
channel.basicPublish(exchange, routingKey, null, message.getBytes());
log.info("Publish msg: {}", message);

// 【第 69 行】❗❗ 同步等待 Broker 确认(最多等 3 秒)
// 这个方法会阻塞,直到:
// 1. 所有消息都被 ACK → 正常返回
// 2. 超时(3 秒)→ 抛出 IOException
// 3. 有消息被 NACK → 抛出 IOException
// 如果超时或 NACK,外层 catch 会捕获异常,记录日志
channel.waitForConfirmsOrDie(3000);

// 【第 71-72 行】关闭通道,归还连接到连接池
channel.close();
RabbitmqConnectionPool.returnConnection(rabbitmqConnection);
} catch (InterruptedException | IOException | TimeoutException e) {
// 【第 74 行】❗❗ 异常处理:只记录日志,没有重试逻辑!
// 这里可以优化:把发送失败的消息记录到数据库,定时重试
log.error("rabbitMq消息发送异常: exchange: {}, msg: {}", exchange, message, e);
}
}

❗❗ 源码发现的严重问题(面试高频追问)

问题 1:消息没有设置持久化!

看第 65 行:channel.basicPublish(exchange, routingKey, null, message.getBytes());

第三个参数 propsnull!这意味着消息没有设置持久化deliveryMode=1,非持久化)。

后果:如果 RabbitMQ 重启,还没被消费的消息会丢失!

修复方案

1
2
3
4
5
// 正确做法:设置消息持久化(deliveryMode=2)
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2=持久化,1=非持久化
.build();
channel.basicPublish(exchange, routingKey, props, message.getBytes());

问题 2waitForConfirmsOrDie(3000) 会阻塞主线程!

如果 RabbitMQ 集群网络抖动,生产端会阻塞 3 秒,导致用户点赞的响应时间变长。

优化方案:用异步 Confirm 回调(第 57-62 行已经写了,但没用到 waitForConfirmsOrDie 的结果)。

问题 3:发送失败只记录日志,没有重试!

看第 73-75 行:catch 块里只 log.error(),没有重试逻辑。如果网络抖动导致发送失败,消息就丢了。

优化方案:把发送失败的消息记录到 message_retry 表,定时任务重试。


2.2 消费消息(消费端)—— RabbitmqServiceImpl.consumerMsg()

看源码 RabbitmqServiceImpl.java 第 79-130 行(逐行解析):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@Override
public void consumerMsg(String exchange,
String queueName,
String routingKey) {

try {
// 【第 86-87 行】从连接池获取连接
RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection();
Connection connection = rabbitmqConnection.getConnection();
// 【第 89 行】创建消息通道
final Channel channel = connection.createChannel();

// 【第 92-96 行】声明死信交换机(DLX = Dead Letter Exchange)
// 死信交换机:消费失败的消息路由到这里(兜底方案)
channel.exchangeDeclare(CommonConstants.EXCHANGE_NAME_DLX, BuiltinExchangeType.DIRECT, true);

// 【第 94 行】声明死信队列(DLQ = Dead Letter Queue)
// 第二个参数 durable=true → 死信队列持久化
// 第三个参数 exclusive=false → 不独占
// 第四个参数 autoDelete=false → 不自动删除
// 第五个参数 arguments=null → ❗❗ 这里没设置死信队列的 TTL!
channel.queueDeclare(CommonConstants.DLX_QUERE_NAME_PRAISE, true, false, false, null);

// 【第 95-96 行】绑定死信队列到死信交换机
// 第三个参数 routingKey:死信的路由键
channel.queueBind(CommonConstants.DLX_QUERE_NAME_PRAISE, CommonConstants.EXCHANGE_NAME_DLX,
CommonConstants.DLX_QUERE_KEY_PRAISE);

// 【第 99-101 行】❗❗ 给主队列设置死信参数
Map<String, Object> args = new HashMap<>();
// 死信交换机:消息变成死信后,路由到这个交换机
args.put("x-dead-letter-exchange", CommonConstants.EXCHANGE_NAME_DLX);
// 死信路由键:消息变成死信后,用这个路由键去找死信队列
args.put("x-dead-letter-routing-key", CommonConstants.DLX_QUERE_KEY_PRAISE);
// 【第 102 行】❗❗ 消息 TTL:30 秒内没被消费,自动变成死信
// 注意:这是【队列级别】的 TTL(队列里所有消息 30 秒过期)
// 如果想给【单条消息】设置 TTL,要在 basicPublish 的 props 里设置
args.put("x-message-ttl", 30000);

// 【第 105 行】声明主队列(绑定死信参数)
// 第二个参数 durable=true → 队列持久化
// 第三个参数 exclusive=false → 不独占
// 第四个参数 autoDelete=false → 不自动删除
// 第五个参数 args → 死信参数(上面设置的)
channel.queueDeclare(queueName, true, false, false, args);

// 【第 107 行】绑定主队列到交换机
channel.queueBind(queueName, exchange, routingKey);

// 【第 109-122 行】创建消费者(内部类)
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
log.info("Consumer msg: {}", message);

try {
// 【第 118 行】处理业务逻辑:保存通知记录
// 把消息反序列化成 UserFootDO,然后保存通知
notifyService.saveArticleNotify(JsonUtil.toObj(message, UserFootDO.class),
NotifyTypeEnum.PRAISE);

// 【第 120 行】❗❗ 手动 Ack(告诉 RabbitMQ"我处理完了,可以删消息了")
// 第一个参数:deliveryTag(消息的唯一标识)
// 第二个参数:multiple=false → 只确认当前这条消息(不批量确认)
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// 【第 127 行】❗❗ 异常处理:只打印堆栈,没有 NACK!
// 问题:如果这里抛出异常,消息没被 Ack → RabbitMQ 会重新投递
// 但代码里没写 basicNack,所以消息会一直留在队列里,反复投递!
e.printStackTrace();
}
}
};

// 【第 124 行】❗❗ 开始消费消息(第二个参数 autoAck=false → 手动 Ack)
// 如果是 true → 自动 Ack(消息一到消费者,RabbitMQ 就认为"消费成功了",不管你有没有处理完)
channel.basicConsume(queueName, false, consumer);

// 【第 125-126 行】❗❗ 问题:这里就把通道关闭了!
// 消费者是异步的,还没处理完消息,通道就被关了!
// 正确做法:不要在这里关闭通道,让消费者一直运行
channel.close();
RabbitmqConnectionPool.returnConnection(rabbitmqConnection);
} catch (InterruptedException | IOException | TimeoutException e) {
// 【第 128 行】❗❗ 异常处理:只打印堆栈,没有重试逻辑
e.printStackTrace();
}
}

❗❗ 源码发现的严重问题(面试高频追问)

问题 1handleDelivery 里的异常处理不对!

看第 127 行:catch (Exception e) { e.printStackTrace(); }

如果 notifyService.saveArticleNotify() 抛出异常(比如数据库宕机了),消息没有被 Ack,也没有被 Nack,会一直留在队列里,反复投递!

修复方案

1
2
3
4
5
catch (Exception e) {
// ① 记录失败次数(放到消息头里,超过 3 次就进死信队列)
// ② 发送 Nack(requeue=true → 重新入队;false → 丢弃或进死信队列)
channel.basicNack(envelope.getDeliveryTag(), false, false);
}

问题 2channel.basicConsume() 之后就把通道关了!

看第 125 行:channel.close();

消费者是异步的handleDelivery() 会在另一个线程里被调用。但这里直接把通道关了,导致消费者还没处理完消息,通道就断了!

修复方案:不要在这里关闭通道,让消费者一直运行(比如用 while (true) 循环,或者把通道放到成员变量里)。

问题 3:死信队列没有设置 TTL!

看第 94 行:channel.queueDeclare(..., null) — 第五个参数 argumentsnull,所以死信队列里的消息永远不会过期

如果死信队列里堆积了 100 万条消息,会占用大量磁盘空间!

修复方案:给死信队列也设置 TTL:

1
2
3
Map<String, Object> dlqArgs = new HashMap<>();
dlqArgs.put("x-message-ttl", 7 * 24 * 60 * 60 * 1000); // 7 天
channel.queueDeclare(CommonConstants.DLX_QUERE_NAME_PRAISE, true, false, false, dlqArgs);

2.3 死信队列(DLQ):消息的”兜底方案”

什么是死信(Dead Letter)?

死信:无法满足正常消费的消息。

以下三种情况的消息会变成”死信”:

  1. 被拒绝:消费者调用 basicReject / basicNack,且 requeue=false
  2. TTL 过期:消息在队列里待的时间超过了设置的 TTL
  3. 队列满了:队列达到最大长度,最早进入的消息被挤出去

死信队列(Dead Letter Queue):装死信的队列。

项目里怎么配置死信队列?

RabbitmqServiceImpl.java 第 92-102 行(上面已经解析过了):

  1. 声明死信交换机(DLX = Dead Letter Exchange)
  2. 声明死信队列(DLQ = Dead Letter Queue)
  3. 绑定死信队列到死信交换机
  4. 给主队列设置死信参数x-dead-letter-exchangex-dead-letter-routing-key
  5. 设置主队列的消息 TTLx-message-ttl = 30 秒)

消息流转图

1
2
3
4
5
6
7
8
9
10
11
12
13
生产者 → 主队列(praise.queue

消费者消费

消费成功 → basicAck → 消息删除 ✅

消费失败 → basicNack(requeue=false)

消息变成死信

路由到死信交换机(DLX

死信队列(DLQ)等待人工处理

死信队列有什么用?

场景一:消费失败的消息人工排查

1
2
3
4
5
正常队列消费失败(可能是代码 bug,可能是数据有问题)
→ 进入死信队列
→ 开发者从死信队列里把消息取出来
→ 分析原因(是代码问题就修代码,是数据问题就清洗数据)
→ 重新投递到主队列

场景二:延迟队列

1
2
3
4
5
6
需求:用户下单后 30 分钟未支付,自动取消订单
实现:
① 订单消息发到队列 A,设置 TTL=30 分钟
30 分钟后消息过期,变成死信
③ 死信路由到队列 B(取消订单消费者监听队列 B)
④ 消费者收到消息,执行"取消订单"逻辑

三、消息可靠性:保证消息不丢失

消息从发送到消费,有三个阶段可能丢消息:

1
2
3
阶段一:生产者 → RabbitMQ(网络抖动,消息没到达 Broker)
阶段二:RabbitMQ 自身(宕机,内存里的消息丢了)
阶段三:RabbitMQ → 消费者(消费者拿到消息但处理失败)

项目里针对每个阶段都做了处理。

3.1 阶段一:生产端 Confirm 机制

Confirm 机制原理

1
2
3
没有 Confirm 时:
生产者 → 发送消息 → RabbitMQOK / 丢了吧)
生产者不知道消息有没有到达 RabbitMQ
1
2
3
4
5
6
Confirm 时:
生产者 → 发送消息 → RabbitMQ

RabbitMQ 回复:ACK(收到了!) / NACK(收到了但处理失败)
生产者:收到 ACK → 放心了
收到 NACK → 记录日志,准备重发

项目里的实现RabbitmqServiceImpl.java 第 55-69 行):

  1. 开启 Confirmchannel.confirmSelect()
  2. 异步回调channel.addConfirmListener(...) — 处理 ACK 和 NACK
  3. 同步等待channel.waitForConfirmsOrDie(3000) — 阻塞最多 3 秒,等 Broker 确认

waitForConfirmsOrDie(3000) 的详细机制

  • 这个方法会阻塞,直到:
    1. 所有消息都被 ACK → 正常返回
    2. 超时(3 秒)→ 抛出 IOException
    3. 有消息被 NACK → 抛出 IOException
  • 如果超时或 NACK,外层 catch 会捕获异常,记录日志(但没有重试逻辑!)

优化方案:不要用 waitForConfirmsOrDie(会阻塞主线程),改用异步 Confirm 回调(第 57-62 行已经写了,但没用到):

1
2
3
4
5
6
7
8
9
10
11
12
13
// 异步 Confirm(不阻塞主线程)
channel.addConfirmListener(
(deliveryTag, multiple) -> {
// ACK:消息成功到达 Broker
log.info("消息已到达 Broker,deliveryTag={}", deliveryTag);
},
(deliveryTag, multiple) -> {
// NACK:消息被 Broker 拒绝(可能是 Broker 内部错误)
log.warn("消息被 Broker 拒绝(nack),deliveryTag={}, msg={}", deliveryTag, message);
// 把发送失败的消息记录到数据库,定时重试
saveFailedMessageToDb(message);
}
);

3.2 阶段二:RabbitMQ 自身持久化

队列持久化RabbitmqServiceImpl.java 第 105 行):

1
2
// 第二个参数 durable=true:队列持久化(RabbitMQ 重启后队列还在)
channel.queueDeclare(queueName, true, false, false, args);

交换机持久化RabbitmqServiceImpl.java 第 52 行):

1
2
// 第三个参数 durable=true:交换机持久化
channel.exchangeDeclare(exchange, exchangeType, true, false, null);

消息持久化(❗❗ 项目里没设置!):

看第 65 行:channel.basicPublish(exchange, routingKey, null, message.getBytes());

第三个参数 propsnull!这意味着消息没有设置持久化deliveryMode=1,非持久化)。

修复方案

1
2
3
4
5
// 设置消息持久化(deliveryMode=2)
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2=持久化,1=非持久化
.build();
channel.basicPublish(exchange, routingKey, props, message.getBytes());

3.3 阶段三:消费端手动 Ack

自动 Ack 的问题

1
2
3
4
// 自动 Ack(默认):
channel.basicConsume(queueName, true, consumer);
// 消息一到消费者,RabbitMQ 就认为"消费成功了"
// 如果消费者拿到消息后处理到一半宕机了 → 消息丢了!

手动 Ack(项目里的做法,RabbitmqServiceImpl.java 第 120 行):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 取消自动 Ack(第二个参数 autoAck=false)
channel.basicConsume(queueName, false, consumer);

// 在消费者内部,处理成功后才发送 Ack
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
String message = new String(body, "UTF-8");
// 处理业务逻辑
notifyService.saveArticleNotify(...);

// 处理成功,发送 Ack(告诉 RabbitMQ"我处理完了,可以删消息了")
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// ❗❗ 问题:这里没发送 Nack!
// 消息没被 Ack,也没被 Nack → 会一直留在队列里,反复投递!
e.printStackTrace();
}
}
};

basicAck vs basicNack vs basicReject

方法 作用 消息去哪?
basicAck(tag, false) 消费成功,确认消息 RabbitMQ 删除消息
basicNack(tag, false, true) 消费失败,拒绝消息(支持批量) requeue=true → 重新入队;requeue=false → 丢弃或进死信队列
basicReject(tag, false) 消费失败,拒绝消息(不支持批量) basicNack

四、消息重复消费:幂等性设计

4.1 为什么会重复消费?

1
2
3
4
场景:消费者已经处理完消息,但发送 Ack 之前宕机了
→ RabbitMQ 认为消息没被消费
→ 把消息重新分发给另一个消费者
→ 同一条消息被消费了两次!

4.2 项目的解决方案:幂等性设计

幂等性:同样的请求,执行一次和执行多次的效果一样。

项目里的做法(RabbitmqServiceImpl.java 第 118 行):

1
2
3
4
// 消费者处理消息
notifyService.saveArticleNotify(
JsonUtil.toObj(message, UserFootDO.class),
NotifyTypeEnum.PRAISE);

saveArticleNotify 内部应该有幂等性保证(虽然源码里没展示,但这是标准做法):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 幂等性方案:用数据库唯一索引防重
// notify 表有唯一索引:UNIQUE(user_id, document_id, type)
// 同一条消息消费两次,第二次 INSERT 会报唯一索引冲突,直接忽略即可

@Transactional
public void saveArticleNotify(UserFootDO foot, NotifyTypeEnum type) {
try {
// ① 插入通知记录(通知表也有唯一索引)
notifyDao.save(NotifyDO.from(foot, type));

// ② 更新作者活跃积分
userActivityRankService.incrementScore(foot.getDocumentUserId(), 1);

} catch (DuplicateKeyException e) {
// 唯一索引冲突 → 说明这条消息已经消费过了 → 直接忽略
log.warn("消息重复消费,已忽略:userId={}, documentId={}",
foot.getUserId(), foot.getDocumentId());
}
}

五、项目里的”隐藏问题”(面试高频追问)

问题 1:while (true) 循环的性能问题

RabbitmqServiceImpl.java 第 133-154 行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void processConsumerMsg() {
log.info("Begin to processConsumerMsg.");

Integer stepTotal = 1;
Integer step = 0;

// ❗❗ TODO: 这种方式非常 Low,后续会改造成阻塞 I/O 模式
while (true) {
step++;
try {
log.info("processConsumerMsg cycle.");
consumerMsg(CommonConstants.EXCHANGE_NAME_DIRECT, CommonConstants.QUEUE_NAME_PRAISE,
CommonConstants.QUEUE_KEY_PRAISE);
if (step.equals(stepTotal)) {
Thread.sleep(10000); // 每消费一次,休息 10 秒
step = 0;
}
} catch (Exception e) {
// ❗❗ 问题:捕获了所有异常,但没有处理!
// 如果 consumerMsg() 抛出异常,会一直无限循环!
}
}
}

问题

  1. while (true) 无限循环,会占用一个线程(阻塞 I/O)
  2. 每次循环都调用 consumerMsg(),但 consumerMsg() 里已经注册了消费者(basicConsume),不需要反复调用!
  3. 注释写了 // TODO: 这种方式非常 Low,后续会改造成阻塞 I/O 模式 — 说明作者也知道这个问题

优化方案:用阻塞 I/O 模式(比如用 channel.basicConsume() 注册消费者后,用 CountDownLatch 阻塞主线程,不让它退出):

1
2
3
4
// 正确做法:注册消费者后,阻塞主线程(不让它退出)
CountDownLatch latch = new CountDownLatch(1);
channel.basicConsume(queueName, false, consumer);
latch.await(); // 永久阻塞,直到收到关闭信号

问题 2:消息持久化没有设置

上面已经讲过了:第 65 行 basicPublish() 的第三个参数 propsnull,消息没有设置持久化。

问题 3:消费端异常处理不对

上面已经讲过了:第 127 行 catch 块里只 e.printStackTrace(),没有发送 Nack。


六、面试高频追问

Q1:RabbitMQ 和 Kafka 怎么选?

对比 RabbitMQ Kafka
定位 传统消息队列,功能丰富 分布式流平台,吞吐量极高
吞吐量 万级 ~ 十万级 QPS 百万级 QPS
消息可靠性 支持事务、Confirm、手动 Ack 靠副本机制保证
延迟 微秒级 ~ 毫秒级 毫秒级 ~ 秒级
适用场景 业务消息、延迟队列、优先级队列 日志收集、流式处理、大数据管道

项目选择:论坛的点赞/评论频率不高(万级 QPS 足够),但需要消息可靠性(不能丢通知),RabbitMQ 更合适。

Q2:如果 RabbitMQ 宕机了,生产端会怎么办?

:项目里有 try-catchRabbitmqServiceImpl.java 第 73 行):

1
2
3
4
5
6
7
8
9
try {
// 发送消息到 RabbitMQ
channel.basicPublish(...);
channel.waitForConfirmsOrDie(3000);
} catch (Exception e) {
log.error("RabbitMQ 消息发送异常", e);
// 兜底方案:记录到数据库,定时任务重试发送
saveFailedMessageToDb(message);
}

兜底方案

  1. 发送失败的消息记录到 message_retry
  2. 定时任务每隔 5 分钟扫描 message_retry
  3. 重新发送失败的消息(重试 3 次还失败,人工排查)

Q3:怎么保证消息的顺序性?

:RabbitMQ 的一个队列只有一个消费者时,消息是顺序消费的。

但如果一个队列有多个消费者(为了提升吞吐量),消息顺序可能乱。

解决方案

  • 方案 1:消息里带一个 sequenceNumber,消费者收到后先放到内存队列里,按 sequenceNumber 排序后再处理
  • 方案 2:用 Kafka Partition(一个 Partition 只有一个消费者,天然保证顺序)

Q4:waitForConfirmsOrDie(3000) 会阻塞多久?

:最多阻塞 3 秒。在这 3 秒内:

  • 如果所有消息都被 ACK → 正常返回
  • 如果有消息被 NACK → 抛出 IOException
  • 如果 3 秒超时 → 抛出 IOException

问题:这会阻塞生产端的主线程!如果 RabbitMQ 集群网络抖动,用户点赞的响应时间会变长。

优化方案:不要用 waitForConfirmsOrDie,改用异步 Confirm 回调


七、总结:RabbitMQ 在项目里的完整流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
┌──────────────────────────────────────────────────────┐
│ 用户点赞文章 │
└──────────────────────────┬───────────────────────┘


┌──────────────────────────────────────────────────────┐
│ ① UserFootServiceImpl.favorArticleComment() │
│ → 写入 user_foot 表 │
│ → 调用 rabbitmqService.publishMsg() 发送消息 │
└──────────────────────────┬───────────────────────┘


┌──────────────────────────────────────────────────────┐
│ ② RabbitmqServiceImpl.publishMsg() │
│ → channel.confirmSelect() 开启 Confirm 机制 │
│ → channel.basicPublish() 发送消息到交换机 │
│ → channel.waitForConfirmsOrDie(3000) 等待 Broker 确认│
└──────────────────────────┬───────────────────────┘


┌──────────────────────────────────────────────────────┐
│ ③ RabbitMQ Broker │
│ → 接收消息,持久化到磁盘 │
│ → 回复 ACK 给生产端 │
│ → 根据路由键,将消息路由到队列 praise.queue
└──────────────────────────┬───────────────────────┘


┌──────────────────────────────────────────────────────┐
│ ④ RabbitmqServiceImpl.consumerMsg() │
│ → 消费者从队列取出消息 │
│ → 处理业务逻辑(发送通知、更新积分) │
│ → channel.basicAck() 发送确认 │
└──────────────────────────────────────────────────────┘

下一篇预告:《技术派项目学习笔记(三)· 高标准:策略模式 + WebSocket 流式推送(源码级深度解析)》


八、本文贡献的”高标准”内容(对比之前版本)

内容 之前版本 本版本(高标准)
消息持久化 没提到 ❗❗ 指出源码里没设置 props(消息非持久化),给出修复方案
waitForConfirmsOrDie 只提了一句 详细讲解同步等待机制(阻塞 3 秒),给出异步 Confirm 优化方案
while (true) 循环 没提到 ❗❗ 指出性能问题(TODO 注释),给出阻塞 I/O 优化方案
消费端异常处理 没提到 ❗❗ 指出 e.printStackTrace() 的问题(消息会反复投递),给出 basicNack 修复方案
死信队列 TTL 没提到 指出死信队列没设置 TTL,给出修复方案
幂等性处理 只说了”用唯一索引” 给出完整的 saveArticleNotify() 幂等性实现代码
面试追问 3 道题 增加到 4 道题,每道都有详细答案和源码依据

技术派项目学习笔记(二)· 高标准:RabbitMQ 异步解耦与消息可靠性(源码级深度解析)
https://whyalwaysme.lol/2026/06/08/2026-06-08-devlink-rabbitmq-learn/
作者
Cassiur
发布于
2026年6月8日
许可协议