消息队列深度实践
章节 1:引子 —— 从一次接口超时事故说起
2024 年 6 月某个周二下午 14:23,监控告警群里突然炸开了锅:项目服务(project-service)的 P99 延迟从 180ms 飙升到 7,400ms,紧接着触发了下游 6 个微服务的雪崩超时。我拉了一下调用链详情,从前端创建订单入口到落库成功一共 6 跳:网关 → 用户中心 → 项目服务 → 任务服务 → 文档服务 → 消息服务 → 计费服务。前 4 跳加起来不超过 200ms,但消息服务和计费服务各贡献了 3,500ms,串行等待了 6.9 秒才返回。整个下单链路卡成了 PPT,业务方在群里连发了 17 条消息。
事后复盘:触发点不是 MQ 本身,而是计费服务调用的第三方支付通道在那个下午出现了 1.2 秒的抖动。因为是同步串行调用,一点抖动就被无限放大。这种架构下,任何一个下游的毛刺都会沿着调用链反向污染上游业务。也正是这次事故,让我坚定了把所有"非主链路逻辑"全部异步化的决心——消息队列不再是"可选架构组件",而是"必选的基础设施"。
章节 2:业务场景与三大核心价值
消息队列(下称 MQ)在我们 ToB SaaS 协作平台里的核心价值,归纳起来就是三个词:解耦、异步、削峰。这三个词看着像八股文套话,但每一个背后都踩过真实的坑。
2.1 解耦:三方协作的"看不见的手"
项目上线初期,我们的"创建项目"流程是同步调 3 个服务:订单服务、库存服务、支付服务,链路如下:前端 → 项目服务 → (订单 → 库存 → 支付),3 跳同步链。问题立刻出现——支付服务挂了,订单和库存也跟着挂,因为前端拿不到统一响应。一开始我们用 Spring Retry + CircuitBreaker 兜底,但随着业务方越来越多(项目里要带"任务、文档、知识库、工时"等模块),同步调用链从 3 跳扩到 9 跳,每次新接一个模块都得改主链路代码、回归测试、回滚预案全套。
引入 RocketMQ 5.x 之后,项目服务只负责落库 + 投递一条 ProjectCreated 事件,订单/库存/支付/任务/文档订阅同一 Topic 各自消费。新接入一个下游模块平均上线时间从 5 天压到 1.5 天(含联调),回归测试用例减少 60%(因为主链路不再感知下游)。截至 2025 年 12 月,ProjectCreated 事件 Topic 有 12 个订阅方,每天投递约 380 万条,单 Topic TPS 峰值 1.2 万,没有任何一次因为下游故障拖垮主链路。
2.2 异步:让主链路轻装上阵
用户注册是我们最典型的异步场景。早年注册流程是同步:写库 → 发短信 → 发邮件 → 发欢迎 push → 初始化租户空间 → 同步给数据仓库(ODS),P99 延迟 1,420ms。我们做了 profiling 才发现:发短信 380ms、邮件 220ms、push 180ms、租户空间初始化 530ms,这 4 步是完全可以异步且互不依赖的。
改造方案:注册服务只做"写库 + 投递 UserRegistered 事件"两件事,短信/邮件/push/租户初始化各起独立的 Consumer Group 消费。改造后注册主链路 P99 从 1,420ms 降到 168ms(-88%),4 个下游子任务在 800ms 内全部完成。这个数字后来成了内部架构评审的"必问三连":能异步吗?能并行吗?能降级吗?
2.3 削峰:用 8000 万日吞吐接住 5 万 QPS 秒杀
2025 年双 11,我们做了一场"协作空间年费会员秒杀",运营 0 元到 1 元秒杀 5,000 个名额。开抢瞬间网关侧 QPS 冲到 4.8 万(业务量评估的 5 倍),如果直接打到 MySQL 主库,按单机 8,000 QPS 算至少要 6 台才能扛住,而且开抢结束后 6 台机器会在 1 小时内闲置——典型的"为了 10 分钟高峰买单 6 台机器一整月"。
我们的方案是网关 → Redis 预扣 → 写消息队列 → 计费/库存/营销/日志异步消费。具体路径:
- 网关层做用户限流(一人一单)和 IP 限流(200 QPS/IP),拦掉 35% 的机器流量
- 预扣成功后直接写一条 SeckillOrder 事件到 RocketMQ,主链路立即返回"排队中"给用户
- 6 个 Consumer Group 并行消费:扣库存、发券、加积分、写订单、发通知、写审计日志
- 削峰效果:MQ 入口峰值 4.8 万 QPS → 消费端 6 组 Consumer 共消耗 1,400 QPS(每组 230 QPS),MySQL 主库实际写流量控制在 1,200 QPS 以内
最终这场秒杀我们用了 1 台 8C16G 的 RocketMQ Broker(双副本)+ 2 台 4C8G 的 Consumer 节点,高峰期资源成本不超过 ¥280/小时。秒杀结束 30 分钟内所有积压消息消费完毕,零丢消息、零重复扣款、零资损。
章节 3:选型对比 —— RocketMQ vs RabbitMQ vs EMQX
2024 年那次事故之后,架构委员会让我牵头做一次全面的 MQ 选型评估。备选对象是三款:RocketMQ 5.x、RabbitMQ 3.13、EMQX 5.6。当时市面上能打的开源 MQ 还有很多(Kafka、Pulsar、NATS、ActiveMQ Artemis),但考虑到我们要替换的不只是异步消息、还有金融级可靠性(计费链路)+ 海量 Topic(30+ 微服务)+ 低延迟小消息(IM 推送),这三类场景刚好对应 RocketMQ、RabbitMQ、EMQX 的强项,所以最终锁定了这三位做横评。
3.1 七维选型矩阵
我从 7 个维度对三款产品做了实测 + 文档调研:
| 维度 | RocketMQ 5.x | RabbitMQ 3.13 | EMQX 5.6 |
|---|
| 协议 | 自有协议(Java/Go/C++ 客户端);5.x 兼容 gRPC 和 HTTP/2 | AMQP 0-9-1(事实标准)、MQTT、STOMP | MQTT 3.1.1/5.0、WebSocket、CoAP、LwM2M |
| 单机吞吐 | 顺序写盘 + 零拷贝,单 Broker 8-10 万 TPS(实测) | 单机 1-3 万 TPS,路由匹配成为瓶颈 | 单节点 100 万 MQTT 连接 / 10 万 TPS |
| 事务消息 | 原生支持(二阶段提交 + 反查) | 通过 AMQP 事务或插件实现,无原生 | 不支持事务(MQTT 协议层决定) |
| 顺序消息 | 全局 / 分区顺序(Sharding Key) | 不保证(多消费者并行消费) | 不支持(MQTT 设计就是发布订阅,不强调顺序) |
| 社区活跃 | Apache 顶级项目,GitHub 21.4k stars,2024 年发版 4 次 | Pivotal/Spring 维护,GitHub 12.6k stars,年发版 3-4 次 | EMQ 公司主控,GitHub 14.8k stars,年发版 6+ 次 |
| 商业支持 | 阿里云 RocketMQ;原厂阿里中间件 | VMware Tanzu RabbitMQ / 第三方云厂商 | EMQX Enterprise(商业版含规则引擎 + 数据桥接) |
| 适用场景 | 金融交易、订单链路、大规模分布式事务 | 传统企业集成、复杂路由、已有 AMQP 生态 | IoT 设备、移动端、长连接、千万级设备管理 |
3.2 商业版 vs 开源版
三款产品都有"开源 + 商业版"双轨,但玩法完全不同:
- RocketMQ:开源版(Apache 2.0)功能已经覆盖 90% 生产场景,Dledger 模式(我们用的)自带主从自动切换和 Raft 共识。阿里云商业版(消息队列 RocketMQ 版)额外提供全托管、监控告警、慢消费分析、消息轨迹、OpenAPI、专线接入。我们最终选了自建开源版 + 商业版监控组件混部——核心集群我们自己运维(掌控力强),监控告警走阿里云 ARMS(省人力)。
- RabbitMQ:开源版(MPL 2.0)单机性能优秀但集群模式复杂(镜像队列、Quorum Queue 各有坑)。商业版(VMware Tanzu RabbitMQ)主打企业级运维特性。如果走 VMware 商业版,License 成本按节点算,30 个 Broker 规模一年约 ¥180 万——超出我们预算。
- EMQX:开源版(Apache 2.0 for v5+)就够用,EMQX Enterprise 商业版(¥35 万/节点/年)主要卖规则引擎和数据桥接(Kafka/MySQL/PostgreSQL 双向同步)。如果只是用 MQTT broker,开源版完全够用。
3.3 社区活跃度对比(2025 年 12 月数据)
我截取了 2025 年 12 月的数据做对比:
| 指标 | RocketMQ 5.x | RabbitMQ 3.13 | EMQX 5.6 |
|---|
| GitHub stars | 21,420 | 12,580 | 14,820 |
| 2025 年发版次数 | 5 次(5.0→5.3.2) | 4 次(3.13.x patch) | 8 次(5.6.x patch) |
| 月活 issue 数 | ~280 | ~150 | ~190 |
| Issue 平均首次响应 | 14 小时 | 28 小时 | 11 小时 |
| 中文文档完整度 | ★★★★★(阿里云文档丰富) | ★★★(英文为主) | ★★★★(EMQ 自营中文文档) |
| 中文社区(钉钉/微信群) | 6 个 2000 人群 | 2 个 500 人群 | 3 个 1500 人群 |
我们最终的选择:核心交易链路用 RocketMQ 5.x(事务消息、顺序消息、金融级可靠性),IM 推送/移动端消息用 EMQX(千万级长连接、原生 MQTT、推送 SDK 完善),RabbitMQ 只在两个旧系统里保留(历史包袱,不动了)。
这三款产品的深度对比和实操经验,会在后续章节里逐一展开。
章节 4:MQ 核心原理
讲到落地,不能只停留在"会用 API",得把 RocketMQ 的内核机制说透。这一节我们从三个层面剖开它:通用消息模型、RocketMQ 独有的存储架构、投递语义。
通用消息模型:Producer/Broker/Consumer/Topic/Queue
任何一款消息中间件,无论 RocketMQ、Kafka 还是 RabbitMQ,都脱不开下面这个五元素模型。Producer 把消息按 Topic 分类,Broker 负责落地存储,Consumer 按订阅关系拉取,Queue 是 Topic 内部的并行分片,NameServer/Registry 维护路由发现。
1
2
3
4
5
6
7
8
9
10
11
12
13
| ┌──────────┐ 1.send(topic, msg) ┌──────────┐
│ Producer │ ──────────────────────▶ │ Broker │
│ (订单服务)│ │ Node-1 │
└──────────┘ │ ┌──────┐ │
│ │TopicA│ │
┌──────────┐ 4.pull(topic, queue) │ │Q0 Q1 │ │
│ Consumer │ ◀────────────────────── │ │Q2 Q3 │ │
│(库存服务) │ │ └──────┘ │
└──────────┘ └──────────┘
┌──────────┐
2.register ◀─────── │NameServer│
3.update ────────▶ │ 集群 │
└──────────┘
|
我们项目里 Topic 数稳定在 87 个,每个 Topic 默认 4 个 Queue,单 Broker 节点承载 12 万 QPS 写入无压力。Queue 数决定了最大并行消费的并发度——这跟 Kafka 的 Partition 概念一致,生产环境扩容第一条就是加 Queue。
RocketMQ 存储架构:CommitLog + ConsumeQueue + IndexFile
RocketMQ 的存储设计是它能从阿里双十一扛住万亿级流量的核心,精髓就是顺序写 + 索引读。所有 Topic 的消息统一追加写入磁盘上的 CommitLog 文件(默认 1G 一片),ConsumeQueue 是逻辑队列索引(20 字节定长,记录 msgOffset/size/tagHash),IndexFile 则是按 key 建的哈希索引,支持按消息 Key 快速反查。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| Producer 写入路径
────────────────
msg1 ─┐
msg2 ─┤
msg3 ─┼──▶ Broker ──▶ CommitLog(顺序写,append only)
msg4 ─┤ (1G/片,文件名=起始偏移)
msg5 ─┘ │
│ 每条 msg 落盘时异步构建
▼
┌─────────────────┐
│ ConsumeQueue │ ◀── Consumer 拉取入口
│ (TopicA/Queue0) │ (类似"目录")
│ offset/size/tag │
└─────────────────┘
│
│ 按 msgKey 反查时
▼
┌─────────────────┐
│ IndexFile │ ◀── 按 key 查询
│ (Hash 500w槽) │
└─────────────────┘
|
我在 2024 年做消息轨迹系统时实测过,按 msgKey 反查一次消息,IndexFile 路径耗时稳定在 2-4 ms;按 offset 走 ConsumeQueue 路径,直接命中 page cache 时是 0.3 ms。所以官方文档反复强调:能走 ConsumeQueue 就不要走 IndexFile。我们业务里 90% 场景是按 Queue offset 顺序消费,只有管理后台查"某订单号对应的消息体"才用到 IndexFile。
CommitLog 顺序写带来的另一个红利是Page Cache 友好——我们 Broker 用的是 NVMe SSD,但实测机械盘也能扛 5 万 QPS,瓶颈在网络而不是磁盘。生产环境我们用 ASYNC_FLUSH(异步刷盘) + SYNC_MASTER(主从同步)双 11 配置,P99 写入延迟 8 ms。
投递语义:at-most-once / at-least-once / exactly-once
| 语义 | 含义 | 实现代价 | 适用场景 |
|---|
| at-most-once | 消息最多被消费一次,可能丢 | 最低 | 日志采集(丢一两条无所谓) |
| at-least-once | 消息至少被消费一次,可能重 | 中 | 绝大多数业务场景 |
| exactly-once | 消息恰好被消费一次 | 高 | 金融扣款、库存扣减 |
RocketMQ 5.x 之前,严格意义上没有原生的 exactly-once,业界通用做法是 at-least-once + 消费端幂等。5.x 引入了 transaction 消息和 pop 消费模式,前者保证生产者侧不丢,后者配合 BROADCASTING/CLUSTERING 模式 + 本地消息表,可以做到接近 exactly-once 的效果。我们项目里所有"金额"类操作(充值、扣费)走的就是事务消息 + 幂等表双保险,99.99% 场景没出过重复扣款。
章节 5:Spring Cloud Stream + RocketMQ 5.x 集成
理论讲完,这一节我们落到代码。Spring Cloud Stream(简称 SCSt)屏蔽了底层 MQ 差异,业务代码只面向 MessageChannel 编程,真正切换 MQ 时改个 starter 依赖即可。我们项目 30+ 微服务全部统一了 SCSt + RocketMQ Binder,日均收发 8000 万条消息零故障运行。
application.yml 完整配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| spring:
application:
name: chat-service
cloud:
stream:
rocketmq:
binder:
name-server: rocketmq-nameserver:9876
access-key: ${ROCKETMQ_AK}
secret-key: ${ROCKETMQ_SK}
vip-channel-enabled: false
bindings:
output-chat-message:
producer:
group: chat-message-producer
sync: true
delivery-timeout: 5000
max-message-size: 4194304
message-orderly: false
transactional-message-enabled: true
input-chat-message:
consumer:
group: chat-message-consumer
orderly: false
max-attempts: 3
pull-batch-size: 32
push-orderly: false
bindings:
output-chat-message:
destination: CHAT_MESSAGE_TOPIC
content-type: application/json
input-chat-message:
destination: CHAT_MESSAGE_TOPIC
group: chat-message-consumer
consumer:
concurrency: 8
max-attempts: 3
|
要点:每个 binding 显式声明 destination(Topic 名)和 group(消费组);concurrency: 8 等于这个消费者实例起 8 个线程并发拉;transactional-message-enabled: true 打开事务消息开关。
生产端:ChatClient 发送消息(含事务消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
| package com.saas.chat.mq;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.apache.rocketmq.client.producer.LocalTransactionChecker;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessagingException;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Slf4j
@Component
public class ChatMessageProducer {
@Autowired
private StreamBridge streamBridge;
/**
* 普通消息发送:同步等待 Broker ACK,3 秒内未确认抛异常走重试
*/
public boolean sendNormal(ChatMessageDTO msg) {
Message<ChatMessageDTO> message = MessageBuilder
.withPayload(msg)
.setHeader("msgId", msg.getMsgId())
.setHeader("tenantId", msg.getTenantId())
.build();
try {
streamBridge.send("output-chat-message", message);
return true;
} catch (MessagingException e) {
log.error("sendNormal failed, msgId={}", msg.getMsgId(), e);
return false;
}
}
/**
* 事务消息发送:先发 half 消息,本地事务执行后 commit/rollback
* RocketMQ Broker 会在 unknown 状态回调 checkLocalTransactionState
*/
public boolean sendTransactional(ChatMessageDTO msg, LocalBizExecutor executor) {
Map<String, Object> headers = new HashMap<>();
headers.put("msgId", msg.getMsgId());
headers.put(MessageConst.PROPERTY_TRANSACTION_KEY, msg.getMsgId());
Message<ChatMessageDTO> message = MessageBuilder
.withPayload(msg)
.copyHeaders(headers)
.build();
try {
streamBridge.send("output-tx-chat-message", message, executor);
return true;
} catch (MessagingException e) {
log.error("sendTransactional failed, msgId={}", msg.getMsgId(), e);
return false;
}
}
}
|
事务消息需要绑定一个 TransactionSynchronization 或 RocketMQTransactionChecker,核心是 half 消息发送 → 本地事务执行 → Broker 二次回调确认。我们用 RocketMQTemplate + TransactionListener 方式,这里给出 LocalTransactionChecker 实现的精简版:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| package com.saas.chat.mq.tx;
import org.apache.rocketmq.client.producer.LocalTransactionChecker;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
@RocketMQTransactionListener
public class ChatTxMessageChecker implements RocketMQLocalTransactionListener {
@Resource
private ChatMessageTxLogService txLogService;
/**
* 本地事务执行器
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
String msgId = (String) message.getHeaders().get("msgId");
try {
LocalBizExecutor executor = (LocalBizExecutor) arg;
boolean ok = executor.doBiz();
if (ok) {
txLogService.markSuccess(msgId);
return RocketMQLocalTransactionState.COMMIT;
} else {
txLogService.markRollback(msgId);
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
log.error("executeLocalTransaction failed, msgId={}", msgId, e);
return RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* Broker 回调:消息状态 UNKNOWN 时触发,反查本地事务表
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String msgId = messageExt.getProperty("msgId");
boolean exists = txLogService.checkExists(msgId);
if (exists) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
|
消费端:@StreamListener 接收 + 手动 ACK + 幂等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
| package com.saas.chat.mq.consumer;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Autowired;
import com.saas.chat.service.ChatService;
import com.saas.chat.entity.ChatMessageDTO;
import com.saas.chat.mq.ack.AckManualHelper;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.common.message.MessageConst;
@Slf4j
@Component
public class ChatMessageConsumer {
@Autowired
private ChatService chatService;
@Autowired
private IdempotentChecker idempotentChecker;
@Autowired
private AckManualHelper ackHelper;
@Autowired
private ObjectMapper objectMapper;
/**
* 接收消息 + 幂等校验 + 手动 ACK
* ackMode=MANUAL 时必须显式调用 ackHelper.ack()
*/
@StreamListener(value = Sink.INPUT, condition = "headers['rocketmq_flag']=='chat_msg'")
public void onMessage(@Payload ChatMessageDTO msg,
org.springframework.messaging.MessageHeaders headers) {
String msgId = (String) headers.get("msgId");
String tenantId = (String) headers.get("tenantId");
try {
// 1. 幂等校验:已处理过直接 ACK 跳过
if (idempotentChecker.isProcessed(msgId)) {
log.debug("msg already processed, skip. msgId={}", msgId);
ackHelper.ack();
return;
}
// 2. 业务处理
chatService.handleMessage(msg, tenantId);
// 3. 写入幂等表(用 msgId 主键,带过期时间 7 天)
idempotentChecker.markProcessed(msgId, 7 * 24 * 3600);
// 4. 手动 ACK
ackHelper.ack();
} catch (Exception e) {
log.error("onMessage failed, msgId={}", msgId, e);
// 5. 异常:不 ACK,Broker 会在 30s 后重新投递
// max-attempts=3 控制最大重试次数,超 3 次进死信队列
}
}
}
|
幂等表我们用的是 MySQL chat_msg_idempotent,主键 msg_id,带 tenant_id 联合索引;Redis 版本也做过,大促期间 Redis TTL 设 24 小时兜底。
顺序消息:Spring Cloud Stream partitionKey + 自定义 Partitioner
业务上"同一会话的聊天消息必须按发送顺序投递"——这就是顺序消息的应用场景。RocketMQ 5.x 通过 Sharding Key + Queue 锁定 保证同一 Key 的消息进同一 Queue,Consumer 用 MessageListenerOrderly 串行消费。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| package com.saas.chat.mq.partition;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.binder.PartitionSelectorStrategy;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class ConversationPartitioner implements PartitionSelectorStrategy {
/**
* 根据会话 ID 算分区号,保证同一会话的消息进同一 Queue
* partitionCount 必须和 Topic 的 Queue 数一致
*/
@Override
public int selectPartition(Message<?> message) {
Object convId = message.getHeaders().get("conversationId");
if (convId == null) {
return 0;
}
int hash = Math.abs(convId.hashCode());
int partitionCount = 8; // 与 Broker 配置 readQueueNums=8 一致
return hash % partitionCount;
}
}
|
application.yml 里绑定顺序消息的 binding 要显式声明 partitioned: true 和 partition-key-expression:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| spring:
cloud:
stream:
bindings:
output-orderly-message:
destination: CHAT_ORDERLY_TOPIC
partitioned: true
partition-key-expression: headers['conversationId']
producer:
partition-count: 8
input-orderly-message:
destination: CHAT_ORDERLY_TOPIC
group: chat-orderly-consumer
consumer:
partitioned: true
concurrency: 8
|
发送时只用 MessageBuilder.withPayload(msg).setHeader("conversationId", convId).build() 即可,RocketMQ Binder 会自动调我们写的 ConversationPartitioner 计算分区号。
💡 工程经验
顺序消息性能上限是单 Queue 的消费能力,8 Queue 时单实例 8 并发,P99 延迟 12 ms。如果某会话热度极高(如万人群聊),需要做会话分片——把超大会话拆成 N 个逻辑子会话,各走不同 Key 哈希,避免单 Queue 阻塞。
🛑 避坑提示
事务消息不能和顺序消息混用——messageOrderly=true 时 transactional-message-enabled 必须关闭;另外生产端务必设置 delivery-timeout: 5000 以上的值,否则事务 half 消息在 3 秒内未 commit 会被 Broker 主动回滚。
章节 6:K8s 部署详细步骤
我们在 2025 年 Q1 把 RocketMQ 5.x 从 ECS 物理部署整体迁到 K8s,完整链路 NameServer + Broker + Proxy + Controller 全部容器化,6 节点集群滚动升级耗时从原先 50 分钟压到 12 分钟,资源利用率(CPU 平均使用率)从 18% 提到 47%。这一节把我们当时落地用的全套 YAML 资源拆开讲。
6.1 部署前准备
我们用独立 Namespace 隔离 MQ 组件,存储走 SSD StorageClass(阿里云 ESSD PL1, 100Gi 提供 6800 IOPS),够 commitlog 顺序写吃满。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| apiVersion: v1
kind: Namespace
metadata:
name: rocketmq
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: rocketmq-ssd
provisioner: diskplugin.csi.alibabacloud.com
parameters:
type: cloud_essd
performanceLevel: PL1
fsType: ext4
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true
|
reclaimPolicy: Retain 是为了防止 PVC 误删导致 commitlog 丢失——这是我在第一次演练 K8s 部署时差点踩的坑,后来强制写入规范。
6.2 NameServer 部署
NameServer 无状态、内存级注册中心,只跑 2 副本(P99 RT 1.2ms,2 副本足够扛 8 万 QPS 元数据查询)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| apiVersion: apps/v1
kind: Deployment
metadata:
name: rocketmq-nameserver
namespace: rocketmq
spec:
replicas: 2
selector:
matchLabels: { app: rocketmq-nameserver }
template:
metadata:
labels: { app: rocketmq-nameserver }
spec:
containers:
- name: nameserver
image: apache/rocketmq:5.3.0
command: ["sh", "mqnamesrv"]
ports:
- containerPort: 9876
env:
- name: JAVA_OPT_EXT
value: "-Xms2g -Xmx2g -Xmn1g"
resources:
requests: { cpu: "500m", memory: "2Gi" }
limits: { cpu: "2", memory: "4Gi" }
livenessProbe:
tcpSocket: { port: 9876 }
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
tcpSocket: { port: 9876 }
initialDelaySeconds: 15
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: rocketmq-nameserver
namespace: rocketmq
spec:
selector: { app: rocketmq-nameserver }
ports:
- port: 9876
targetPort: 9876
type: ClusterIP
|
6.3 Broker 部署(StatefulSet + Dledger)
Broker 是有状态的(commitlog 持久化),必须用 StatefulSet,3 副本组成 Dledger Raft 组(N=3 容忍 1 节点宕机)。每个 Pod 挂 100Gi PVC 存储 commitlog/consumequeue/index。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rocketmq-broker
namespace: rocketmq
spec:
serviceName: rocketmq-broker-headless
replicas: 3
selector:
matchLabels: { app: rocketmq-broker }
template:
metadata:
labels: { app: rocketmq-broker }
spec:
containers:
- name: broker
image: apache/rocketmq:5.3.0
command: ["sh", "mqbroker", "-c", "/etc/rocketmq/broker.conf"]
ports:
- { containerPort: 10909, name: vip }
- { containerPort: 10911, name: broker }
- { containerPort: 10912, name: ha }
env:
- name: NAMESRV_ADDR
value: "rocketmq-nameserver:9876"
- name: JAVA_OPT_EXT
value: "-Xms8g -Xmx8g -Xmn4g -XX:+UseG1GC"
resources:
requests: { cpu: "2", memory: "10Gi" }
limits: { cpu: "8", memory: "16Gi" }
volumeMounts:
- { name: store, mountPath: /home/rocketmq/store }
- { name: conf, mountPath: /etc/rocketmq }
livenessProbe:
tcpSocket: { port: 10911 }
initialDelaySeconds: 90
periodSeconds: 15
failureThreshold: 4
readinessProbe:
tcpSocket: { port: 10911 }
initialDelaySeconds: 60
periodSeconds: 10
volumes:
- name: conf
configMap: { name: rocketmq-broker-conf }
volumeClaimTemplates:
- metadata: { name: store }
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: rocketmq-ssd
resources:
requests: { storage: 100Gi }
|
Dledger 关键配置(挂在 ConfigMap rocketmq-broker-conf 里):
1
2
3
4
5
6
7
8
| brokerClusterName=DefaultCluster
brokerName=broker-dledger-1
enableDLegerCommitLog=true
dLegerGroup=broker-dledger-1
dLegerPeers=n0-rocketmq-broker-0.rocketmq-broker-headless:40911;n1-rocketmq-broker-1.rocketmq-broker-headless:40911;n2-rocketmq-broker-2.rocketmq-broker-headless:40911
dLegerSelfId=n0
storePathRootDir=/home/rocketmq/store
flushDiskType=SYNC_FLUSH
|
livenessProbe.initialDelaySeconds=90 是我们踩过坑算出来的——broker 启动要先把 commitlog 索引 mmap 到内存,集群有 30GB 历史数据时,启动耗时实测 78s,延迟探针小于 90s 就会被反复 kill。
6.4 Proxy 部署(5.x gRPC 接入层)
RocketMQ 5.x 引入 Proxy 层,把 gRPC 协议转换成 Remoting,客户端不再直连 Broker。无状态、横向扩展。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| apiVersion: apps/v1
kind: Deployment
metadata:
name: rocketmq-proxy
namespace: rocketmq
spec:
replicas: 2
selector:
matchLabels: { app: rocketmq-proxy }
template:
metadata:
labels: { app: rocketmq-proxy }
spec:
containers:
- name: proxy
image: apache/rocketmq:5.3.0
command: ["sh", "mqproxy", "-n", "rocketmq-nameserver:9876"]
ports:
- { containerPort: 8080, name: remoting }
- { containerPort: 8081, name: grpc }
resources:
requests: { cpu: "1", memory: "4Gi" }
limits: { cpu: "4", memory: "8Gi" }
livenessProbe:
tcpSocket: { port: 8081 }
initialDelaySeconds: 45
periodSeconds: 10
readinessProbe:
tcpSocket: { port: 8081 }
initialDelaySeconds: 30
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: rocketmq-proxy
namespace: rocketmq
spec:
selector: { app: rocketmq-proxy }
ports:
- { name: remoting, port: 8080 }
- { name: grpc, port: 8081 }
type: ClusterIP
|
6.5 Controller 部署(自动选主)
Controller 是 5.x 新组件,负责在 Master 宕机时把 Slave 自动切为 Master(SwitchPriority/CheckSyncStateSet 协议),把切换 RTO 从原先 30+s 压到 8s 内。我们生产部 3 节点 StatefulSet 形成 Raft 组。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rocketmq-controller
namespace: rocketmq
spec:
serviceName: rocketmq-controller-headless
replicas: 3
selector:
matchLabels: { app: rocketmq-controller }
template:
metadata:
labels: { app: rocketmq-controller }
spec:
containers:
- name: controller
image: apache/rocketmq:5.3.0
command: ["sh", "mqcontroller", "-c", "/etc/rocketmq/controller.conf"]
ports:
- { containerPort: 9878 }
resources:
requests: { cpu: "500m", memory: "2Gi" }
limits: { cpu: "2", memory: "4Gi" }
livenessProbe:
tcpSocket: { port: 9878 }
initialDelaySeconds: 30
periodSeconds: 10
volumeMounts:
- { name: data, mountPath: /home/rocketmq/controller }
volumeClaimTemplates:
- metadata: { name: data }
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: rocketmq-ssd
resources: { requests: { storage: 10Gi } }
|
部署后验证
我们标准化的上线 checklist 一共 3 步,任一失败立即回滚:
1
2
3
4
5
6
7
8
9
10
11
12
| # 1. 所有 Pod Running 且 Ready
kubectl -n rocketmq get pod -o wide
# 预期:nameserver 2/2、broker 3/3、proxy 2/2、controller 3/3 均 1/1 Running
# 2. 集群拓扑列表(Master/Slave 角色 + Dledger 主从)
kubectl -n rocketmq exec rocketmq-broker-0 -- sh mqadmin clusterList -n rocketmq-nameserver:9876
# 预期:看到 3 个 Master,BID=0,InSyncReplicas=3
# 3. 创建测试 Topic 并验证消息收发
kubectl -n rocketmq exec rocketmq-broker-0 -- sh mqadmin updateTopic \
-n rocketmq-nameserver:9876 -c DefaultCluster -t TestTopic -r 8 -w 8
# 预期:8 读 8 写队列分配到 3 个 broker,无 ERROR 返回
|
我们生产环境跑完这套流程总耗时约 9 分钟(冷启动),其中 broker 启动占 6 分钟(commitlog mmap)。验证全过后挂 Grafana 看 TPS/RT/堆积曲线,稳定 30 分钟才算正式接管流量。
章节 7:可靠性保证
可靠性是消息队列的命门。生产上消息丢一条、重复一条、顺序错一条都可能是事故级问题。2025 年我们做了一次大促压测,百万级订单消息零丢失、零重复、零错序,关键就是下面这三板斧。
7.1 不丢:同步刷盘 + 主从同步
RocketMQ 默认的刷盘策略是 ASYNC_FLUSH(异步刷盘),Master 收到消息后立即返回 Producer 成功,后台线程再批量刷盘。这种模式吞吐高但有数据丢失风险——如果 Master 刷盘前宕机,内存里的消息就丢了。
生产环境我坚持用 flushDiskType=SYNC_FLUSH + brokerRole=SYNC_MASTER 双保险:
1
2
3
4
5
6
7
8
9
10
| # broker.conf 核心配置
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
# 同步刷盘:消息写入 CommitLog 后才返回 Producer 成功
flushDiskType=SYNC_FLUSH
# 同步双写:Master 等 Slave 同步刷盘成功才返回成功
brokerRole=SYNC_MASTER
# 主从异步复制会让 Slave 落后 Master 50-200ms
# 同步复制 SLA 99.99%,但吞吐下降 30%
|
实测下来:同步刷盘单机 TPS 从 8 万降到 5.5 万,但 5 万 QPS 峰值对我们完全够用(实际打满到 4.2 万)。代价是磁盘 IO 翻倍,SSD 必须用 NVMe,普通云盘 3000 IOPS 撑不住。
7.2 不重:幂等消费的 3 种武器
RocketMQ 的 at-least-once 语义决定了:消费端一定会收到消息,但可能收到多次(网络重试、Consumer 宕机重启、位点回退)。我们项目最终选了 Redis 去重表 + Lua 脚本,原因是:
| 方案 | 性能 | 复杂度 | 适用场景 |
|---|
| 数据库唯一索引 | 低(每次 INSERT) | 低 | 订单创建等强一致场景 |
| Redis SETNX 去重 | 高(单次 Redis 写) | 中 | 高 QPS 业务,我们用的 |
| 业务状态机 | 最高(无 IO) | 高 | 状态流转场景 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| // Redis 去重表 + Lua 原子写入
public class RedisIdempotentChecker {
private static final String LUA_SCRIPT =
"if redis.call('SETNX', KEYS[1], '1') then " +
" return redis.call('EXPIRE', KEYS[1], 86400) " +
"else return 0 end";
public boolean isProcessed(String msgId) {
Long result = redisTemplate.execute(
new DefaultRedisScript<>(LUA_SCRIPT, Long.class),
List.of("mq:processed:" + msgId)
);
return result == null || result == 0;
}
}
|
消费侧三步走:先查 Redis 判重 → 没处理过则执行业务 → 业务失败时回滚 Redis 标记位。我们项目实测 Redis 判重 99.99% 命中率,单条消息处理耗时增加 0.8ms,完全可接受。
7.3 有序:全局有序 vs 分区有序
RocketMQ 的有序是队列级别的,单个队列内严格 FIFO。订单创建→支付→发货必须保证同一订单的消息进同一队列,这靠 partitionKey:
1
2
3
4
5
6
7
8
9
| spring:
cloud:
stream:
rocketmq:
bindings:
output:
producer:
# 同一 orderId 的消息路由到同一 Queue
messageQueueSelector: orderIdHashSelector
|
- 全局有序:1 个 Topic 1 个 Queue,吞吐上限单机约 5 万 TPS(我们没采用,代价太大)
- 分区有序:4 个队列按
orderId % 4 路由,同一订单消息聚集到同一队列,实测峰值 4.5 万 TPS,完全够用
代价是:消费并行度被队列数锁死。4 个队列最多 4 个 Consumer 实例并行消费,所以队列数要谨慎评估,太少吞吐不够,太多运维复杂。
章节 8:消息积压应急
大促过后最怕的事:消费者跟不上,RocketMQ 队列里堆了上千万条消息。我们项目 2024 年双 11 后第一小时就遇到了——告警短信连发 200 条,Lag 飙到 800 万。
监控:三大黄金指标
Grafana 面板三件套(基于 RocketMQ-exporter 抓取):
| 指标 | 含义 | 预警阈值 |
|---|
rocketmq_consumer_lag | 消费位点落后最大值 | > 10 万 |
rocketmq_consumer_tps | 消费 TPS | < 100(基线 800) |
rocketmq_producer_tps | 生产 TPS | 突增 3 倍 |
实测我们项目基线:生产 TPS 1200、消费 TPS 950,Lag 稳定在 5 万以内。Lag 突破 10 万必然有问题。
排查四象限
应急先定位根因,再动手:
- 消费者宕机:
kubectl get pod 看 Consumer 实例,某节点 OOM 就绪失败是最常见 - 消费慢:JVM Full GC、慢 SQL、第三方接口超时——
arthas trace 抓链路 - 生产突增:大促/数据导入/对账任务,生产端 3 倍突增时正常
- 路由错配:
orderId % 4 后某队列全是大订单,出现单队列倾斜
应急三板斧
按代价从小到大:
第一板斧:临时扩容 Consumer 实例
1
2
3
4
| # K8s 快速扩容(平时 3 个副本,临时到 12 个)
kubectl scale deployment notification-consumer --replicas=12
# 队列数锁死了并行度上限,扩到队列数=4 时已无意义
# 4 个队列,扩 4 个以上实例不会提升吞吐
|
第二板斧:批量消费
1
2
3
4
5
6
7
8
9
10
| spring:
cloud:
stream:
rocketmq:
bindings:
input:
consumer:
# 默认单条拉取,改批量
pullBatchSize: 1000
consumeMessageBatchMaxSize: 1000
|
批量消费 TPS 从 800 飙到 4500,但要注意:批量内一条失败会整批重试,要做好单条异常隔离。
第三板斧:降级非核心消费者
订单/支付核心链路:保 100% 吞吐
通知/统计/日志链路:暂时关掉,等核心消化完再开
通知服务降级:直接走 HTTP 同步调用,代价是接口慢 200ms
我们 2024 双 11 实战:扩容到 6 个 Consumer + 批量改 1000 + 关闭统计消费者,Lag 从 800 万降到 5 万以内只用了 1.5 小时。
章节 9:进阶特性
RocketMQ 5.x 有四个进阶特性,踩过的坑换来的经验。
9.1 事务消息
订单创建需要同时落 DB 和发 MQ,本地事务 + MQ 要么都成功要么都失败,这就是分布式事务。RocketMQ 的事务消息分两阶段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| // 第一阶段:发送半消息(Half Message),对消费者不可见
rocketMQTemplate.sendMessageInTransaction(
"order-tx-topic",
MessageBuilder.withPayload(order).build(),
order // 业务参数,传递给本地事务检查器
);
// 本地事务检查器 - 第二阶段
@RocketMQTransactionListener
public class OrderTxListener implements RocketMQLocalTransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
orderService.createOrder((Order) arg);
return LocalTransactionState.COMMIT_MESSAGE; // 提交
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// MQ Broker 端反查:60s 还没收到 COMMIT/ROLLBACK 就回调这里
Order order = orderService.findById(msg.getKeys());
return order != null ? COMMIT_MESSAGE : ROLLBACK_MESSAGE;
}
}
|
注意:Half Message 在事务检查器返回前对消费者不可见,反查默认 60s 一次,最多 5 次。我们项目踩过坑:有次本地事务卡了 5 分钟,反查 5 次后 RocketMQ 直接当成 UNKNOWN 状态丢弃,最后靠人工补单捞回来。
9.2 顺序消息
订单状态机:已创建→已支付→已发货,顺序错一格就出乱子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| rocketMQTemplate.syncSendOrderly(
"order-status-topic",
MessageBuilder.withPayload(status).build(),
orderId, // hashKey,相同 orderId 进同一队列
3000 // 超时
);
// 或用 MessageQueueSelector 自己控制
rocketMQTemplate.getProducer().send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> queues, Message msg, Object arg) {
long orderId = (long) arg;
int idx = (int) (orderId % queues.size());
return queues.get(idx);
}
}, orderId);
|
代价:同一 orderId 的消息阻塞后续消费,单条消息处理耗时不能超过 30s(默认超时),否则顺序保证破功。我们把热点订单(直播秒杀)单独走普通消息,顺序消息留给状态变更这种低 QPS 但强顺序场景。
9.3 延迟消息
18 个固定级别(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),5.x 后支持任意秒级:
1
2
3
4
5
| Message msg = MessageBuilder.withPayload(order)
.setHeader("KEYS", orderId)
.build();
msg.setDelayTimeLevel(3); // 10 秒
rocketMQTemplate.getProducer().send(msg);
|
实战场景:订单 30 分钟未支付自动取消(用 level 16,30m)、退款 7 天后自动到账提醒(用 level 18,2h)+ 业务自己定时扫表。注意:延迟消息一旦发送不能撤回,所以业务上要先判一次状态再发延迟消息,避免重复执行。
9.4 消息回溯
需求:把 3 天前的消息重新投递到消费端(数据修复、对账)。RocketMQ 5.x 的 pop API 实现了"无损回溯":
1
2
3
4
5
6
| // 替代旧的 reconsumeLater,直接拉历史消息
List<MessageExt> messages = popConsumer.reconsumeFromTimestamp(
"order-topic",
startTimestamp, // 3 天前的时间戳
1000 // 最多 1000 条
);
|
生产建议:回溯任务走独立 Consumer Group,不能跟在线消费混用,否则会乱位点。我们用 Canal 订阅 commitlog 同步到 MySQL 归档,长期回溯靠 ES 检索——直接走 RocketMQ 的 commitlog 检索,7 天前的消息已经清理掉,只能查归档库。这套机制在我们做财务对账时救过场,某天凌晨 1 亿条订单消息一次性重投,把线下对账从 4 小时压缩到 20 分钟。
章节 10:MQ 在我们项目的演进
10.1 阶段一(2023 Q3 之前):同步调用为主的"重链路"时代
回看 2023 年之前的架构,30+ 微服务之间几乎全是 OpenFeign + Sentinel 同步调用。一次创建任务的接口,从网关 → 用户中心 → 租户中心 → 项目服务 → 任务服务 → 文档服务 → 通知服务,平均链路 6 跳,P95 延迟 450ms。最怕的是链路中任何一个下游服务抖动,整条调用链雪崩,更别提秒杀场景下 5 万 QPS 的冲击。Sentinel 的熔断虽然能兜底,但用户体验是"下单失败重试再失败"。这一年我开始琢磨:核心链路必须解耦,非核心链路必须异步。
10.2 阶段二(2023 Q4):RocketMQ 5.x 试点引入
2023 年 10 月我们完成 RocketMQ 5.x 6 节点集群(3 Master + 3 Slave, Dledger 模式)上线。没有一刀切全量切换,而是按"非核心 → 半核心 → 核心"的顺序逐步迁移:第一批是通知服务(短信/邮件/站内信)、日志服务、审计服务这类丢一条不影响主流程的场景。这一阶段完成后,P95 延迟从 450ms 降至 280ms,链路平均跳数从 6 跳降至 3 跳。团队也借此摸清了消息幂等、顺序消息、消费位点回溯这些工程化能力。
10.3 阶段三(2024 全年):EDA 全量升级
2024 年是 EDA(事件驱动架构)全面铺开的一年。我们把订单创建、库存扣减、支付回调、计费、出账这些核心链路全部事件化,30+ 微服务 100% 接入消息总线。配套做了 Seata 分布式事务改造,保证"本地事务 + 事务消息"双一致性。年终盘点:系统可用性从 99.9% 提升到 99.95%(月度故障时间从 43 分钟降至 21 分钟),日均事件吞吐突破 5000 万条,峰值 QPS 3.2 万。这一年也暴露了几个新问题:消息积压偶发、消费位点难追溯、跨服务事件链路看不清——倒逼出阶段四的可观测升级。
10.4 阶段四(2025 至今):智能化可观测
2025 年至今,我们围绕"事件可观测"做三件事:事件回放平台——任意 topic 任意时间段可重放,排查生产 bug 时不用再"提工单让测试造数据";消费位点智能告警——基于历史消费速率动态计算 lag 阈值,慢消费 5 分钟内告警,误报率压到 3% 以下;事件血缘追踪——消费端自动解析 event header 中的 source/timestamp/traceId,接入 SkyWalking,任意消息可反查完整调用链。当前数据:日均事件吞吐 8000 万条,峰值 QPS 5 万,系统可用性 99.97%。
| 阶段 | 时间 | 架构特征 | 链路跳数 | P95 | 可用性 |
|---|
| 一 | 2023 Q3 前 | 同步调用 | 6 | 450ms | 99.9% |
| 二 | 2023 Q4 | MQ 试点 | 3 | 280ms | 99.92% |
| 三 | 2024 | EDA 全量 | 2 | 180ms | 99.95% |
| 四 | 2025 至今 | 智能可观测 | 2 | 150ms | 99.97% |
章节 11:避坑指南:12 个真实踩坑
1. Topic 名拼写错误。 一次发版后,任务创建事件"消失"了 3 小时,排查发现 task.created 写成了 task_created,消费者订阅的是前者。解决:统一用 Spring Cloud Stream 的 binding name + 配置中心校验,禁止业务代码硬编码 topic。
2. Consumer Group 重复。 两个团队都建了 notification-service-consumer group,导致消息被均分消费,部分通知漏发。解决:Consumer Group 命名按"服务-环境-业务域"三段式,如 notification-service-prod-sms,并接入 Nacos 配置中心统一登记。
3. 顺序消息被 Hash 到不同队列。 我们用 MessageQueueSelector 按订单 ID 路由,结果同事改成了 SelectMessageQueueByHash,同一笔订单的"下单→支付→发货"事件被散到 3 个队列,顺序乱了。解决:顺序消息必须用自定义 selector + 业务键,禁用 SDK 自带的 hash 策略。
4. 事务消息未实现回查导致永久 Pending。 用 TransactionListener 只写了 executeLocalTransaction,忘了 checkLocalTransaction,本地事务回滚后 broker 永远查不到结果,消息一直 pending。解决:两个方法必须成对实现,回查逻辑要幂等且能反映本地事务最终状态。
5. 延迟消息级别错配。 想发 5 分钟延迟,用了 messageDelayLevel=3(10 秒),超时 30 倍。解决:RocketMQ 默认支持 18 个固定延迟级别(1s/5s/10s/30s/1m/2m/3m/4m/5m…),不满足只能改 cron 定时投递,别硬凑级别。
6. RocketMQ 5.x Controller 节点少于 3 选举失败。 我们一开始只部署 1 个 Controller 节点,重启时集群脑裂。解决:Controller 至少 3 节点(奇数),Raft 协议要求多数派存活才选举成功,K8s 部署必须配 podAntiAffinity 强制分散。
7. Broker 磁盘满未告警。 Broker 默认磁盘使用率超过 85% 才告警,线上 8 小时才有人发现,期间 200 万消息积压。解决:告警阈值改到 70%,rocketmq-broker.yaml 配 diskMaxUsedSpaceRatio=70,Prometheus 抓取 + Alertmanager 5 分钟内必须短信通知 SRE。
8. 消费位点跳过了 RMW 死信队列。 重置位点时手动 resetOffset 把 offset 调到了最新,中间 3 小时的消息(含死信)全部丢失。解决:位点重置必须在 RocketMQ Console 上先查询再确认,并先在测试 topic 演练;生产环境禁止用 admin 客户端直接调 resetOffset。
9. NameServer 单点故障。 早期只部署 1 个 NameServer Pod,K8s 滚动升级时短暂不可用,Producer 报 No route info。解决:NameServer 至少 2 节点(无状态,互不通信,增删节点无需重启),addresses 配置多个逗号分隔地址,客户端自动 failover。
10. Producer 实例未关闭导致连接泄漏。 测试环境用 new DefaultMQProducer() 后没调 shutdown(),跑压测时连接数打满,Broker 报 TooManyConnectionsException。解决:Producer/Consumer 必须是单例(Spring 容器管理生命周期),加 DisposableBean 兜底关闭,或用 try-with-resources。
11. Spring Cloud Stream 重试配置导致消息重复消费。 默认 defaultRequeueRejected=true + retry.maxAttempts=3 + 未做幂等,网络抖动时一条消息被消费 3 次。解决:消费端必须实现幂等(用业务键 + Redis SETNX 或 DB 唯一索引),requeueRejected 改为 false(失败走死信队列人工处理)。
12. Schema 演进不向后兼容。 上游给 OrderEvent 加了 discountAmount 字段(必填),下游老版本反序列化直接抛异常,消费积压 10 万。解决:Schema 演进遵循"加字段兼容、删字段不兼容、改类型不兼容",所有消息体加 schemaVersion 字段,消费端按版本走不同反序列化分支。
章节 12:总结与展望
RocketMQ 在我们项目里经历了"试点→全量→智能化"三步走,核心收益是链路解耦 + 异步提速 + 削峰抗量,P95 从 450ms 降至 150ms,可用性从 99.9% 提到 99.97%。展望未来,三个方向值得关注:Serverless MQ(阿里云 RocketMQ 5.x Serverless 按量付费,免运维,适合中小租户隔离场景);云原生 MQ(RocketMQ 5.x Operator 已 GA,K8s 上 Broker 弹性扩缩容、自动故障转移、配置热更新全部 YAML 化);5.x 新特性(Pop 消费模型把 Pull 长轮询改造成服务端推送,延迟降低 30%;Raft Controller 取代 DLedger,选主时间从 10s 降至 2s;Pull 消费模型给大数据团队更灵活的消费位点控制)。建议新项目直接上 5.x,老项目按 Controller → Pop → 弹性 顺序渐进升级。