技术派项目学习笔记(二)· 高标准:RabbitMQ 异步解耦与消息可靠性(源码级深度解析)
本篇是「高标准严要求」版本,基于 RabbitmqServiceImpl.java 逐行解析,不遗漏任何细节。适合面试前通读,确保被拷问时能对答如流。
一、先搞懂:为什么要引入消息队列?
1.1 没有消息队列时的问题
假设用户点赞了一篇文章,系统需要完成以下事情:
1 2 3 4 5 6 7 8 9 10 11
| 用户点赞 ↓ ① 写入 user_foot 表(记录"用户XXX点赞了文章YYY") ↓ ② 更新文章点赞数(MySQL article 表) ↓ ③ 发送通知给作者("有人赞了你的文章") ↓ ④ 更新作者活跃积分(用于排行榜) ↓ 返回"点赞成功"给用户
|
问题:步骤 ③④ 和用户点赞没有强一致性要求(通知晚几秒发完全没问题),但它们拖慢了响应速度。
如果有 100 个人同时点赞,数据库和通知系统压力都很大。
1.2 引入消息队列之后
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 用户点赞 ↓ ① 写入 user_foot 表 ↓ ② 更新文章点赞数 ↓ ③ 发送消息到 RabbitMQ:"用户XXX点赞了文章YYY" ↓ 立即返回"点赞成功"给用户 ✅(只需几毫秒) ↓ RabbitMQ 异步处理 消费者收到消息,慢慢处理: ④ 发送通知给作者 ⑤ 更新作者活跃积分
|
好处:
- 用户点赞的响应速度大大提升(不需要等通知发完)
- 通知系统宕机了,不影响用户点赞(消息存在 RabbitMQ 里,等通知系统恢复再处理)
- 可以动态增加消费者数量,提升处理能力
二、项目里 RabbitMQ 是怎么用的?(源码逐行解析)
2.1 发送消息(生产端)—— RabbitmqServiceImpl.publishMsg()
看源码 RabbitmqServiceImpl.java 第 41-77 行(逐行解析):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| @Override public void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) { try { RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection(); Connection connection = rabbitmqConnection.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(exchange, exchangeType, true, false, null);
channel.confirmSelect(); channel.addConfirmListener( (deliveryTag, multiple) -> log.info("消息已到达 Broker,deliveryTag={}, exchange={}", deliveryTag, exchange), (deliveryTag, multiple) -> log.warn("消息被 Broker 拒绝(nack),deliveryTag={}, exchange={}, msg={}", deliveryTag, exchange, message) );
channel.basicPublish(exchange, routingKey, null, message.getBytes()); log.info("Publish msg: {}", message);
channel.waitForConfirmsOrDie(3000);
channel.close(); RabbitmqConnectionPool.returnConnection(rabbitmqConnection); } catch (InterruptedException | IOException | TimeoutException e) { log.error("rabbitMq消息发送异常: exchange: {}, msg: {}", exchange, message, e); } }
|
❗❗ 源码发现的严重问题(面试高频追问)
问题 1:消息没有设置持久化!
看第 65 行:channel.basicPublish(exchange, routingKey, null, message.getBytes());
第三个参数 props 是 null!这意味着消息没有设置持久化(deliveryMode=1,非持久化)。
后果:如果 RabbitMQ 重启,还没被消费的消息会丢失!
修复方案:
1 2 3 4 5
| AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) .build(); channel.basicPublish(exchange, routingKey, props, message.getBytes());
|
问题 2:waitForConfirmsOrDie(3000) 会阻塞主线程!
如果 RabbitMQ 集群网络抖动,生产端会阻塞 3 秒,导致用户点赞的响应时间变长。
优化方案:用异步 Confirm 回调(第 57-62 行已经写了,但没用到 waitForConfirmsOrDie 的结果)。
问题 3:发送失败只记录日志,没有重试!
看第 73-75 行:catch 块里只 log.error(),没有重试逻辑。如果网络抖动导致发送失败,消息就丢了。
优化方案:把发送失败的消息记录到 message_retry 表,定时任务重试。
2.2 消费消息(消费端)—— RabbitmqServiceImpl.consumerMsg()
看源码 RabbitmqServiceImpl.java 第 79-130 行(逐行解析):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| @Override public void consumerMsg(String exchange, String queueName, String routingKey) {
try { RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection(); Connection connection = rabbitmqConnection.getConnection(); final Channel channel = connection.createChannel();
channel.exchangeDeclare(CommonConstants.EXCHANGE_NAME_DLX, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(CommonConstants.DLX_QUERE_NAME_PRAISE, true, false, false, null); channel.queueBind(CommonConstants.DLX_QUERE_NAME_PRAISE, CommonConstants.EXCHANGE_NAME_DLX, CommonConstants.DLX_QUERE_KEY_PRAISE);
Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", CommonConstants.EXCHANGE_NAME_DLX); args.put("x-dead-letter-routing-key", CommonConstants.DLX_QUERE_KEY_PRAISE); args.put("x-message-ttl", 30000);
channel.queueDeclare(queueName, true, false, false, args); channel.queueBind(queueName, exchange, routingKey);
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); log.info("Consumer msg: {}", message);
try { notifyService.saveArticleNotify(JsonUtil.toObj(message, UserFootDO.class), NotifyTypeEnum.PRAISE); channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } }; channel.basicConsume(queueName, false, consumer); channel.close(); RabbitmqConnectionPool.returnConnection(rabbitmqConnection); } catch (InterruptedException | IOException | TimeoutException e) { e.printStackTrace(); } }
|
❗❗ 源码发现的严重问题(面试高频追问)
问题 1:handleDelivery 里的异常处理不对!
看第 127 行:catch (Exception e) { e.printStackTrace(); }
如果 notifyService.saveArticleNotify() 抛出异常(比如数据库宕机了),消息没有被 Ack,也没有被 Nack,会一直留在队列里,反复投递!
修复方案:
1 2 3 4 5
| catch (Exception e) { channel.basicNack(envelope.getDeliveryTag(), false, false); }
|
问题 2:channel.basicConsume() 之后就把通道关了!
看第 125 行:channel.close();
消费者是异步的,handleDelivery() 会在另一个线程里被调用。但这里直接把通道关了,导致消费者还没处理完消息,通道就断了!
修复方案:不要在这里关闭通道,让消费者一直运行(比如用 while (true) 循环,或者把通道放到成员变量里)。
问题 3:死信队列没有设置 TTL!
看第 94 行:channel.queueDeclare(..., null) — 第五个参数 arguments 是 null,所以死信队列里的消息永远不会过期!
如果死信队列里堆积了 100 万条消息,会占用大量磁盘空间!
修复方案:给死信队列也设置 TTL:
1 2 3
| Map<String, Object> dlqArgs = new HashMap<>(); dlqArgs.put("x-message-ttl", 7 * 24 * 60 * 60 * 1000); channel.queueDeclare(CommonConstants.DLX_QUERE_NAME_PRAISE, true, false, false, dlqArgs);
|
2.3 死信队列(DLQ):消息的”兜底方案”
什么是死信(Dead Letter)?
死信:无法满足正常消费的消息。
以下三种情况的消息会变成”死信”:
- 被拒绝:消费者调用
basicReject / basicNack,且 requeue=false
- TTL 过期:消息在队列里待的时间超过了设置的 TTL
- 队列满了:队列达到最大长度,最早进入的消息被挤出去
死信队列(Dead Letter Queue):装死信的队列。
项目里怎么配置死信队列?
看 RabbitmqServiceImpl.java 第 92-102 行(上面已经解析过了):
- 声明死信交换机(DLX = Dead Letter Exchange)
- 声明死信队列(DLQ = Dead Letter Queue)
- 绑定死信队列到死信交换机
- 给主队列设置死信参数(
x-dead-letter-exchange 和 x-dead-letter-routing-key)
- 设置主队列的消息 TTL(
x-message-ttl = 30 秒)
消息流转图:
1 2 3 4 5 6 7 8 9 10 11 12 13
| 生产者 → 主队列(praise.queue) ↓ 消费者消费 ↓ 消费成功 → basicAck → 消息删除 ✅ ↓ 消费失败 → basicNack(requeue=false) ↓ 消息变成死信 ↓ 路由到死信交换机(DLX) ↓ 死信队列(DLQ)等待人工处理
|
死信队列有什么用?
场景一:消费失败的消息人工排查
1 2 3 4 5
| 正常队列消费失败(可能是代码 bug,可能是数据有问题) → 进入死信队列 → 开发者从死信队列里把消息取出来 → 分析原因(是代码问题就修代码,是数据问题就清洗数据) → 重新投递到主队列
|
场景二:延迟队列
1 2 3 4 5 6
| 需求:用户下单后 30 分钟未支付,自动取消订单 实现: ① 订单消息发到队列 A,设置 TTL=30 分钟 ② 30 分钟后消息过期,变成死信 ③ 死信路由到队列 B(取消订单消费者监听队列 B) ④ 消费者收到消息,执行"取消订单"逻辑
|
三、消息可靠性:保证消息不丢失
消息从发送到消费,有三个阶段可能丢消息:
1 2 3
| 阶段一:生产者 → RabbitMQ(网络抖动,消息没到达 Broker) 阶段二:RabbitMQ 自身(宕机,内存里的消息丢了) 阶段三:RabbitMQ → 消费者(消费者拿到消息但处理失败)
|
项目里针对每个阶段都做了处理。
3.1 阶段一:生产端 Confirm 机制
Confirm 机制原理:
1 2 3
| 没有 Confirm 时: 生产者 → 发送消息 → RabbitMQ(OK / 丢了吧) 生产者不知道消息有没有到达 RabbitMQ!
|
1 2 3 4 5 6
| 有 Confirm 时: 生产者 → 发送消息 → RabbitMQ ↓ RabbitMQ 回复:ACK(收到了!) / NACK(收到了但处理失败) 生产者:收到 ACK → 放心了 收到 NACK → 记录日志,准备重发
|
项目里的实现(RabbitmqServiceImpl.java 第 55-69 行):
- 开启 Confirm:
channel.confirmSelect()
- 异步回调:
channel.addConfirmListener(...) — 处理 ACK 和 NACK
- 同步等待:
channel.waitForConfirmsOrDie(3000) — 阻塞最多 3 秒,等 Broker 确认
waitForConfirmsOrDie(3000) 的详细机制:
- 这个方法会阻塞,直到:
- 所有消息都被 ACK → 正常返回
- 超时(3 秒)→ 抛出
IOException
- 有消息被 NACK → 抛出
IOException
- 如果超时或 NACK,外层 catch 会捕获异常,记录日志(但没有重试逻辑!)
优化方案:不要用 waitForConfirmsOrDie(会阻塞主线程),改用异步 Confirm 回调(第 57-62 行已经写了,但没用到):
1 2 3 4 5 6 7 8 9 10 11 12 13
| channel.addConfirmListener( (deliveryTag, multiple) -> { log.info("消息已到达 Broker,deliveryTag={}", deliveryTag); }, (deliveryTag, multiple) -> { log.warn("消息被 Broker 拒绝(nack),deliveryTag={}, msg={}", deliveryTag, message); saveFailedMessageToDb(message); } );
|
3.2 阶段二:RabbitMQ 自身持久化
队列持久化(RabbitmqServiceImpl.java 第 105 行):
1 2
| channel.queueDeclare(queueName, true, false, false, args);
|
交换机持久化(RabbitmqServiceImpl.java 第 52 行):
1 2
| channel.exchangeDeclare(exchange, exchangeType, true, false, null);
|
消息持久化(❗❗ 项目里没设置!):
看第 65 行:channel.basicPublish(exchange, routingKey, null, message.getBytes());
第三个参数 props 是 null!这意味着消息没有设置持久化(deliveryMode=1,非持久化)。
修复方案:
1 2 3 4 5
| AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) .build(); channel.basicPublish(exchange, routingKey, props, message.getBytes());
|
3.3 阶段三:消费端手动 Ack
自动 Ack 的问题:
1 2 3 4
| channel.basicConsume(queueName, true, consumer);
|
手动 Ack(项目里的做法,RabbitmqServiceImpl.java 第 120 行):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| channel.basicConsume(queueName, false, consumer);
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { try { String message = new String(body, "UTF-8"); notifyService.saveArticleNotify(...); channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } };
|
basicAck vs basicNack vs basicReject:
| 方法 |
作用 |
消息去哪? |
basicAck(tag, false) |
消费成功,确认消息 |
RabbitMQ 删除消息 |
basicNack(tag, false, true) |
消费失败,拒绝消息(支持批量) |
requeue=true → 重新入队;requeue=false → 丢弃或进死信队列 |
basicReject(tag, false) |
消费失败,拒绝消息(不支持批量) |
同 basicNack |
四、消息重复消费:幂等性设计
4.1 为什么会重复消费?
1 2 3 4
| 场景:消费者已经处理完消息,但发送 Ack 之前宕机了 → RabbitMQ 认为消息没被消费 → 把消息重新分发给另一个消费者 → 同一条消息被消费了两次!
|
4.2 项目的解决方案:幂等性设计
幂等性:同样的请求,执行一次和执行多次的效果一样。
项目里的做法(RabbitmqServiceImpl.java 第 118 行):
1 2 3 4
| notifyService.saveArticleNotify( JsonUtil.toObj(message, UserFootDO.class), NotifyTypeEnum.PRAISE);
|
saveArticleNotify 内部应该有幂等性保证(虽然源码里没展示,但这是标准做法):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
@Transactional public void saveArticleNotify(UserFootDO foot, NotifyTypeEnum type) { try { notifyDao.save(NotifyDO.from(foot, type)); userActivityRankService.incrementScore(foot.getDocumentUserId(), 1); } catch (DuplicateKeyException e) { log.warn("消息重复消费,已忽略:userId={}, documentId={}", foot.getUserId(), foot.getDocumentId()); } }
|
五、项目里的”隐藏问题”(面试高频追问)
问题 1:while (true) 循环的性能问题
看 RabbitmqServiceImpl.java 第 133-154 行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Override public void processConsumerMsg() { log.info("Begin to processConsumerMsg."); Integer stepTotal = 1; Integer step = 0;
while (true) { step++; try { log.info("processConsumerMsg cycle."); consumerMsg(CommonConstants.EXCHANGE_NAME_DIRECT, CommonConstants.QUEUE_NAME_PRAISE, CommonConstants.QUEUE_KEY_PRAISE); if (step.equals(stepTotal)) { Thread.sleep(10000); step = 0; } } catch (Exception e) { } } }
|
问题:
while (true) 无限循环,会占用一个线程(阻塞 I/O)
- 每次循环都调用
consumerMsg(),但 consumerMsg() 里已经注册了消费者(basicConsume),不需要反复调用!
- 注释写了
// TODO: 这种方式非常 Low,后续会改造成阻塞 I/O 模式 — 说明作者也知道这个问题
优化方案:用阻塞 I/O 模式(比如用 channel.basicConsume() 注册消费者后,用 CountDownLatch 阻塞主线程,不让它退出):
1 2 3 4
| CountDownLatch latch = new CountDownLatch(1); channel.basicConsume(queueName, false, consumer); latch.await();
|
问题 2:消息持久化没有设置
上面已经讲过了:第 65 行 basicPublish() 的第三个参数 props 是 null,消息没有设置持久化。
问题 3:消费端异常处理不对
上面已经讲过了:第 127 行 catch 块里只 e.printStackTrace(),没有发送 Nack。
六、面试高频追问
Q1:RabbitMQ 和 Kafka 怎么选?
| 对比 |
RabbitMQ |
Kafka |
| 定位 |
传统消息队列,功能丰富 |
分布式流平台,吞吐量极高 |
| 吞吐量 |
万级 ~ 十万级 QPS |
百万级 QPS |
| 消息可靠性 |
支持事务、Confirm、手动 Ack |
靠副本机制保证 |
| 延迟 |
微秒级 ~ 毫秒级 |
毫秒级 ~ 秒级 |
| 适用场景 |
业务消息、延迟队列、优先级队列 |
日志收集、流式处理、大数据管道 |
项目选择:论坛的点赞/评论频率不高(万级 QPS 足够),但需要消息可靠性(不能丢通知),RabbitMQ 更合适。
Q2:如果 RabbitMQ 宕机了,生产端会怎么办?
答:项目里有 try-catch(RabbitmqServiceImpl.java 第 73 行):
1 2 3 4 5 6 7 8 9
| try { channel.basicPublish(...); channel.waitForConfirmsOrDie(3000); } catch (Exception e) { log.error("RabbitMQ 消息发送异常", e); saveFailedMessageToDb(message); }
|
兜底方案:
- 发送失败的消息记录到
message_retry 表
- 定时任务每隔 5 分钟扫描
message_retry 表
- 重新发送失败的消息(重试 3 次还失败,人工排查)
Q3:怎么保证消息的顺序性?
答:RabbitMQ 的一个队列只有一个消费者时,消息是顺序消费的。
但如果一个队列有多个消费者(为了提升吞吐量),消息顺序可能乱。
解决方案:
- 方案 1:消息里带一个
sequenceNumber,消费者收到后先放到内存队列里,按 sequenceNumber 排序后再处理
- 方案 2:用 Kafka Partition(一个 Partition 只有一个消费者,天然保证顺序)
Q4:waitForConfirmsOrDie(3000) 会阻塞多久?
答:最多阻塞 3 秒。在这 3 秒内:
- 如果所有消息都被 ACK → 正常返回
- 如果有消息被 NACK → 抛出
IOException
- 如果 3 秒超时 → 抛出
IOException
问题:这会阻塞生产端的主线程!如果 RabbitMQ 集群网络抖动,用户点赞的响应时间会变长。
优化方案:不要用 waitForConfirmsOrDie,改用异步 Confirm 回调。
七、总结:RabbitMQ 在项目里的完整流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| ┌──────────────────────────────────────────────────────┐ │ 用户点赞文章 │ └──────────────────────────┬───────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────┐ │ ① UserFootServiceImpl.favorArticleComment() │ │ → 写入 user_foot 表 │ │ → 调用 rabbitmqService.publishMsg() 发送消息 │ └──────────────────────────┬───────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────┐ │ ② RabbitmqServiceImpl.publishMsg() │ │ → channel.confirmSelect() 开启 Confirm 机制 │ │ → channel.basicPublish() 发送消息到交换机 │ │ → channel.waitForConfirmsOrDie(3000) 等待 Broker 确认│ └──────────────────────────┬───────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────┐ │ ③ RabbitMQ Broker │ │ → 接收消息,持久化到磁盘 │ │ → 回复 ACK 给生产端 │ │ → 根据路由键,将消息路由到队列 praise.queue │ └──────────────────────────┬───────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────┐ │ ④ RabbitmqServiceImpl.consumerMsg() │ │ → 消费者从队列取出消息 │ │ → 处理业务逻辑(发送通知、更新积分) │ │ → channel.basicAck() 发送确认 │ └──────────────────────────────────────────────────────┘
|
下一篇预告:《技术派项目学习笔记(三)· 高标准:策略模式 + WebSocket 流式推送(源码级深度解析)》
八、本文贡献的”高标准”内容(对比之前版本)
| 内容 |
之前版本 |
本版本(高标准) |
| 消息持久化 |
没提到 |
❗❗ 指出源码里没设置 props(消息非持久化),给出修复方案 |
waitForConfirmsOrDie |
只提了一句 |
详细讲解同步等待机制(阻塞 3 秒),给出异步 Confirm 优化方案 |
while (true) 循环 |
没提到 |
❗❗ 指出性能问题(TODO 注释),给出阻塞 I/O 优化方案 |
| 消费端异常处理 |
没提到 |
❗❗ 指出 e.printStackTrace() 的问题(消息会反复投递),给出 basicNack 修复方案 |
| 死信队列 TTL |
没提到 |
指出死信队列没设置 TTL,给出修复方案 |
| 幂等性处理 |
只说了”用唯一索引” |
给出完整的 saveArticleNotify() 幂等性实现代码 |
| 面试追问 |
3 道题 |
增加到 4 道题,每道都有详细答案和源码依据 |