Featured image of post 消息队列:Kafka / RocketMQ / RabbitMQ 三件套选型与生产部署

消息队列:Kafka / RocketMQ / RabbitMQ 三件套选型与生产部署

消息队列:Kafka / RocketMQ / RabbitMQ 三件套选型与生产部署

Java Web 微服务系列 · 第 9 篇 · 消息队列 阅读时长:约 65 分钟 本文写于 2026 年 6 月 配套版本:Kafka 3.7 / RocketMQ 5.3 / RabbitMQ 3.13

引子:Kafka 顺序错乱的那次生产事故

2024 年某电商订单系统使用 Kafka 3.0,同一笔订单的"创建订单"和"扣库存"两条消息被分到不同 Partition、不同 Consumer 实例,消费者并行消费导致先扣库存后建订单——库存被扣成负数,订单却还没建。

技术复盘时发现:Kafka 文档里写得很清楚——只保证单 Partition 严格有序,不保证跨 Partition 全局有序。问题是我们把"创建订单"和"扣库存"用 null key(轮询分配)发送,2 条消息被分到 2 个不同 Partition,2 个 Consumer 并行消费,顺序就乱套了

修复方案:把消息 key=orderId(同一笔订单都走同一 Partition + 同一 Consumer)。但生产事故已经造成 30 万元损失

这个事故的根因,是当时选型时没把消息中间件的核心差异讲透。本文就以"选型 + 架构 + 部署"三件套为线索,把 3 大消息中间件(Kafka / RocketMQ / RabbitMQ)一次性讲透——读完你会知道每家的核心架构、关键特性、生产集群怎么搭、6 个避坑点。

一、Kafka:流处理领域的"事实标准"

1.1 是什么 + 解决什么问题

Kafka 是 LinkedIn 在 2010 年开源的分布式事件流平台(注意:不是单纯的 MQ),2011 年进 Apache 孵化,2012 年成为 Apache 顶级项目。GitHub Stars 30k+,是 Java 生态里最成熟的消息基础设施

3 大典型场景

  • 日志管道:取代传统的 Scribe/Flume,统一日志收集
  • 流处理:与 Flink / Spark Streaming 配合做实时计算
  • 业务消息:订单、支付、通知(与 RocketMQ / RabbitMQ 竞争)

与其他 MQ 的本质区别:Kafka 设计哲学是"高吞吐 + 持久化 + 可重放",核心是"日志"。RocketMQ / RabbitMQ 设计哲学是"业务消息",核心是"队列"。前者偏基础设施,后者偏业务工具

💡 原理:Kafka 为什么叫"分布式事件流平台"而不是"消息队列"

2010 年 LinkedIn 的初衷是解决"日志统一收集"——传统 Scribe/Flume 扩展性差、丢失率高。Kafka 用"追加写日志“模型:

  • Producer 只追加 log,不删
  • Consumer 维护自己的 offset,可以重放
  • 持久化到磁盘,支持 TB 级数据保留

这个"日志"模型意外地适合做消息队列——消息可以被多个消费者组反复消费,消息"消费"完不删除。这是 Kafka 跟传统 MQ 最大的架构差异。

1.2 核心架构

Kafka 集群由 4 类角色组成:

角色职责备注
Producer生产消息发到指定 Topic
Consumer消费消息通过消费者组协作
Broker存储 + 转发一台机器一个 Broker
Controller集群元数据管理KRaft 模式 3 节点集群

Topic 与 Partition

  • Topic:消息的逻辑分类(订单 topic、支付 topic)
  • Partition:Topic 的物理分片(每个 Partition 是一份有序日志)
  • 副本:每个 Partition 有 N 个副本(默认 3),1 个 Leader + N-1 个 Follower
  • ISR(In-Sync Replicas):与 Leader 同步的 Follower 集合

存储机制:每个 Partition 在磁盘上是一组 Segment(默认 1GB 一个):

1
2
3
4
5
6
partition-0/
├── 00000000000000000000.log      # 消息数据
├── 00000000000000000000.index     # 位移索引
├── 00000000000000000000.timeindex # 时间戳索引
├── 00000000000123456789.log
└── ...

关键点:Kafka 的高性能来自 3 个设计——顺序写磁盘(HDD 顺序写 600MB/s)+ 零拷贝 sendfile(数据直接从 PageCache 发到网卡,不进用户态)+ PageCache 利用(操作系统文件缓存)。

💡 原理:KRaft 模式 vs 旧 ZK 模式

旧 Kafka 用 ZooKeeper 管理集群元数据(Controller 选举、Partition Leader 选举)。但 ZK 是个独立系统,运维复杂、性能有瓶颈(单 ZK 集群撑不住 5 万 Partition)。

KRaft 模式(Kafka 2.8+ 引入,3.3+ GA):Controller 自己组成 Raft 集群,不再依赖 ZK。3.3+ 已经生产就绪,3.7 已是默认。

生产建议新项目一律 KRaft。ZK 模式只用于必须升级的旧项目。

1.3 关键特性

3 种消息语义

语义配置实现方式代价
at-most-onceacks=0发完就忘可能丢消息
at-least-onceacks=all + 手动 ack重试 + 手动 commit可能重复
exactly-once幂等 + 事务Producer 幂等 + 事务协调性能降 20-30%

顺序保证

  • 单 Partition 严格有序(FIFO)
  • 跨 Partition 不保证有序(开篇引子事故的根因)
  • 同 key 消息落到同一 Partition(用 key=orderId 保证)

副本 ISR 机制

  • acks=0:Producer 不等任何确认(最快,最不安全)
  • acks=1:等 Leader 写入(默认,Leader 挂了可能丢)
  • acks=all:等所有 ISR 写入(最安全,推荐生产用)

消费者组(Consumer Group)

  • 多个 Consumer 实例组成一个 Group
  • 一个 Partition 只能被 Group 内一个 Consumer 消费
  • 多个 Group 可以独立消费同一 Topic(实现"广播"语义)

1.4 典型使用(Java + Spring Boot)

Maven 依赖(Spring Boot 3.2 + Spring Kafka 3.1):

1
2
3
4
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
spring:
  kafka:
    bootstrap-servers: kafka-cluster:9092
    producer:
      acks: all
      retries: 3
      properties:
        enable.idempotence: true           # 开启幂等
        max.in.flight.requests.per.connection: 5
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: order-consumer
      auto-offset-reset: earliest
      enable-auto-commit: false            # 关闭自动提交,手动 ack
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
    listener:
      ack-mode: manual_immediate

生产者(带幂等配置):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@Service
public class OrderEventProducer {
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public void sendOrderCreated(OrderEvent event) {
        // key=orderId 保证同一订单消息落到同一 Partition
        CompletableFuture<SendResult<String, OrderEvent>> future =
            kafkaTemplate.send("order-topic", event.getOrderId(), event);
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                log.info("消息发送成功: orderId={}, offset={}",
                    event.getOrderId(), result.getRecordMetadata().offset());
            } else {
                log.error("消息发送失败", ex);
                // 业务侧重试或告警
            }
        });
    }
}

消费者(手动 ack + 异常处理):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class OrderEventConsumer {
    @KafkaListener(topics = "order-topic", groupId = "order-consumer")
    public void onMessage(
        @Payload OrderEvent event,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.OFFSET) long offset,
        Acknowledgment ack
    ) {
        try {
            // 业务处理
            orderService.handle(event);
            // 手动 ack
            ack.acknowledge();
        } catch (BusinessException e) {
            // 业务异常:nack 不重试(避免死循环)
            log.warn("业务异常,跳过: {}", e.getMessage());
            ack.acknowledge();
        } catch (Exception e) {
            // 系统异常:抛出让 Spring Kafka 重试
            throw e;
        }
    }
}

事务生产者(跨服务数据一致性):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
    KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
    template.setTransactionIdPrefix("order-tx-");
    return template;
}

@Service
public class OrderTxService {
    @Transactional("kafkaTransactionManager")
    public void createOrderWithEvent(Order order) {
        // 1. 写订单库
        orderRepository.save(order);
        // 2. 发 Kafka 消息(事务保证原子性)
        kafkaTemplate.send("order-topic", order.getId(), toEvent(order));
        // 任一失败,整体回滚
    }
}

🎯 避坑点:Kafka 事务 vs RocketMQ 事务消息

Kafka 事务是”Producer 端事务"——只保证"消息发送 + 业务操作"的原子性,不解决 Consumer 端消费 + 业务操作的原子性

RocketMQ 事务消息是"两阶段提交 + 回查“机制,能解决"本地事务 + 消息发送"的最终一致性。

业务场景选型:如果消费侧不需要事务,Kafka 够用;如果需要复杂分布式事务,优先 RocketMQ

1.5 生产集群部署(手把手)

目标拓扑

  • 3 节点 Controller 集群(KRaft 模式)
  • 4 节点 Broker(2 个 Partition Leader + 2 个 Follower)
  • 1 个 Prometheus JMX Exporter
  • 1 个 Kafka UI 容器

完整 docker-compose.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
version: "3.8"

# ============= Controller 集群(KRaft 模式) =============
services:
  kafka-controller-1:
    image: bitnami/kafka:3.7
    container_name: kafka-controller-1
    ports:
      - "9093:9093"
    environment:
      # KRaft 模式关键配置
      KAFKA_CFG_NODE_ID: 1
      KAFKA_CFG_PROCESS_ROLES: "controller,broker"  # 同节点既做 controller 也做 broker(小集群)
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093"
      KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
      KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-controller-1:9092"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      # 持久化
      KAFKA_CFG_LOG_DIRS: "/bitnami/kafka/data"
      # JMX 导出给 Prometheus
      KAFKA_JMX_PORT: 9095
      KAFKA_JMX_HOSTNAME: localhost
      JMX_PROMETHEUS_PORT: 9095
    volumes:
      - kafka1-data:/bitnami/kafka/data
    networks:
      - kafka-net

  kafka-controller-2:
    image: bitnami/kafka:3.7
    container_name: kafka-controller-2
    ports:
      - "9094:9093"
    environment:
      KAFKA_CFG_NODE_ID: 2
      KAFKA_CFG_PROCESS_ROLES: "controller,broker"
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093"
      KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
      KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-controller-2:9092"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      KAFKA_CFG_LOG_DIRS: "/bitnami/kafka/data"
      KAFKA_JMX_PORT: 9095
    volumes:
      - kafka2-data:/bitnami/kafka/data
    networks:
      - kafka-net

  kafka-controller-3:
    image: bitnami/kafka:3.7
    container_name: kafka-controller-3
    ports:
      - "9095:9093"
    environment:
      KAFKA_CFG_NODE_ID: 3
      KAFKA_CFG_PROCESS_ROLES: "controller,broker"
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093"
      KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
      KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-controller-3:9092"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      KAFKA_CFG_LOG_DIRS: "/bitnami/kafka/data"
      KAFKA_JMX_PORT: 9095
    volumes:
      - kafka3-data:/bitnami/kafka/data
    networks:
      - kafka-net

  # ============= Kafka UI(管理界面) =============
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: "kafka-cluster"
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka-controller-1:9092,kafka-controller-2:9092,kafka-controller-3:9092"
    depends_on:
      - kafka-controller-1
      - kafka-controller-2
      - kafka-controller-3
    networks:
      - kafka-net

volumes:
  kafka1-data:
  kafka2-data:
  kafka3-data:

networks:
  kafka-net:
    driver: bridge

启动与验证

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 1. 启动集群
docker-compose up -d

# 2. 验证 Controller 集群状态
docker exec kafka-controller-1 kafka-metadata-quorum --bootstrap-server kafka-controller-1:9092 describe --status
# 应显示 3 个 nodeId 都在线

# 3. 创建 Topic(3 分区 3 副本)
docker exec kafka-controller-1 kafka-topics --create \
  --bootstrap-server kafka-controller-1:9092 \
  --topic order-topic \
  --partitions 3 \
  --replication-factor 3

# 4. 验证 Topic
docker exec kafka-controller-1 kafka-topics --list --bootstrap-server kafka-controller-1:9092
# 应输出:order-topic

# 5. 访问 UI
# 浏览器打开 http://localhost:8080

监控告警(关键指标)

指标告警阈值排查
kafka_server_UnderReplicatedPartitions> 0 持续 5 分钟Follower 同步跟不上,磁盘 IO 瓶颈
kafka_controller_ActiveControllerCount!= 1Controller 选举异常
kafka_consumer_ConsumerLag> 10000 持续 10 分钟消费慢或消费挂
kafka_server_BrokerTopicMetrics_MessagesInPerSec突降 50%Producer 端异常
kafka_log_Log_Size> 80% 磁盘调 retention.ms

故障演练(生产上线必做):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 演练 1:杀 Broker 节点
docker stop kafka-controller-2
# 观察:剩余 2 节点仍可服务(replication-factor=3,2 副本够用)
# 30 秒内 kafka-ui 应显示 Broker 2 offline

# 演练 2:杀 Controller 节点(半数以上存活)
docker stop kafka-controller-2
docker stop kafka-controller-3
# 观察:只剩 1 节点 Controller,集群**仍可读不可写**(写入需要 Controller 仲裁)
# 这是为什么 Controller 必须 3 节点(容忍 1 节点故障)

# 演练 3:网络分区模拟
docker network disconnect kafka-net kafka-controller-2
# 观察:Controller 2 被网络隔离,集群 30 秒内完成 Controller 重选

🎯 避坑点:Controller 必须 3 或 5 节点

2 节点 Controller 集群没有容错能力——任意 1 节点故障,剩 1 节点不满足半数(quorum),整个集群无法写入

3 节点容忍 1 节点故障,5 节点容忍 2 节点故障。生产环境至少 3 节点 Controller 集群。

1.6 6 个避坑点

🛑 误区警示:Kafka 6 大经典坑

  1. 消息丢失acks=1 而非 acks=all——Leader 写入即返回,Follower 还没同步,Leader 挂了消息丢
  2. 重复消费:未开启 enable.idempotence=true + 关闭手动 ack——网络抖动导致 Producer 重发 + Consumer 重平衡后重复消费
  3. 顺序错乱:用 null key 发送——分到不同 Partition 后顺序无法保证(开篇事故的根因
  4. 消费者 rebalance 风暴session.timeout.ms 配太小(如 6s)——短暂 GC 停顿被误判为下线,触发全 Group rebalance
  5. PageCache 被打爆:Broker 机器内存只留 1GB 给 PageCache——大量写盘时 PageCache 被踢出,性能暴跌
  6. 日志保留策略配错retention.ms 没设或太长——磁盘被吃满后 Kafka 拒绝写

核心防御acks=all + enable.idempotence=true + 手动 ack + 消息 key 设计 + 监控告警齐全。

1.7 实战经验集锦

Kafka 演进时间线(从诞生到生产主流):

  • 2010 年:LinkedIn 开源,最初用于日志统一收集
  • 2011 年:进 Apache 孵化,Scala 重写
  • 2012 年:成为 Apache 顶级项目,Kafka 0.8 发布
  • 2014 年:0.9 版本加入 Kafka Connect,0.10 加入 Kafka Streams
  • 2017 年:1.0 发布,Kafka API 稳定
  • 2018 年:Confluent 成立,公司化运营
  • 2020 年:2.5 发布,KRaft 模式引入
  • 2022 年:3.3 发布,KRaft GA(生产可用)
  • 2024 年:3.7 发布,KRaft 成为默认,弃用 ZK 模式
  • 2026 年:4.0 计划中,进一步简化运维

典型生产事故与教训

  1. 2018 年某电商大促事故:Kafka 0.10 + ZK 集群,ZK 写入瓶颈导致整个集群不可写。教训:单 ZK 集群撑不住 5 万 Partition,必须 KRaft。
  2. 2020 年某社交平台:用户 Feed 流 Kafka 集群,Consumer rebalance 风暴导致延迟从 100ms 飙升到 10 秒。教训session.timeout.ms 不能配太小,建议 30 秒。
  3. 2022 年某支付公司:消息丢失,根因是 acks=1 而非 acks=all,Leader 写入即返回,Follower 还没同步。教训所有金融场景必须 acks=all
  4. 2023 年某物流公司:Kafka 集群 PageCache 被打爆,磁盘 IO 飙升。教训:Broker 机器内存至少留 50% 给 OS PageCache,JVM 堆不要超过 8GB。

性能调优实战

  • Broker 端
    • num.network.threads = 8(网络线程数)
    • num.io.threads = 16(IO 线程数)
    • log.flush.interval.messages = 10000(刷盘间隔)
    • log.retention.hours = 72(默认 3 天,业务调整)
  • Producer 端
    • batch.size = 65536(64KB 批量)
    • linger.ms = 10(等待 10ms 凑批)
    • compression.type = lz4(压缩降低网络 IO)
  • Consumer 端
    • fetch.min.bytes = 1(小消息不等待)
    • fetch.max.wait.ms = 500(最多等 500ms)
    • max.partition.fetch.bytes = 1048576(1MB 单次拉取)

与 Spark/Flink 集成案例

Flink 作为 Kafka 消费者是实时计算的事实标准。典型架构:

1
业务系统 → Kafka(数据采集层)→ Flink(实时计算层)→ Kafka / MySQL / ES(结果层)

踩坑点:Flink 消费 Kafka 时必须用手动提交 offset(Flink checkpoint 触发 commit),否则故障时会出现"已消费但未 commit"的数据丢失。

国内大厂落地情况

  • 阿里:日志平台、监控数据用 Kafka(日均 PB 级)
  • 字节跳动:A/B 测试数据流、用户行为日志用 Kafka(峰值 2000 万 TPS)
  • 美团:外卖订单状态变更、支付通知用 Kafka
  • 滴滴:行程事件、用户行为分析用 Kafka
  • 小米:IoT 设备上报数据用 Kafka

未来趋势

  • KRaft 取代 ZK:3.7 已默认,2027 年 ZK 模式完全弃用
  • Tiered Storage:冷数据自动转 S3/OSS,磁盘成本降 80%(3.6+ 实验)
  • eBPF 加速:用 eBPF 优化网络路径,Consumer 延迟降 30%
  • Serverless Kafka:Confluent Cloud 推无服务器模式,按量付费

二、RocketMQ:阿里系业务消息的"扛把子”

2.1 是什么 + 解决什么问题

RocketMQ 是阿里在 2012 年研发的企业级消息中间件,最初用于支撑双 11 秒杀场景,2016 年开源,2017 年进 Apache 孵化,2018 年成为 Apache 顶级项目。GitHub Stars 21k+,是国内业务消息的事实标准

3 大典型场景

  • 业务消息:订单、支付、交易(核心场景
  • 事务消息:分布式事务的"本地消息表"替代方案
  • 大规模 IoT:百万 TPS + 严格顺序

与 Kafka 的本质区别:RocketMQ 设计哲学是"业务消息"——核心是"队列 + 事务 + 顺序 + 延迟"四大业务特性。Kafka 设计哲学是"日志"——核心是"高吞吐 + 持久化 + 可重放"。

💡 原理:RocketMQ 为什么从 Kafka 演化而来

阿里在 2010-2012 年调研过 Kafka + ActiveMQ,发现:

  • Kafka 事务消息做不到(早期版本)
  • 延迟消息做不到(早期版本没有内置)
  • 严格顺序消息做不到(多 Partition 顺序难保证)

因此阿里基于 Kafka 的"CommitLog"思想,重新设计了 NameServer + Broker + Dledger 架构,把业务消息的 4 大特性补齐。RocketMQ 5.x 已经把事务/顺序/延迟做成了生产级特性。

2.2 核心架构

RocketMQ 集群由 4 类角色组成:

角色职责备注
NameServer路由注册中心无状态、互相独立、AP 模型
Broker消息存储 + 转发Master-Slave 模式(5.x 推荐 Dledger 自动选主)
Producer生产消息从 NameServer 拉路由
Consumer消费消息从 NameServer 拉路由

Topic 与 MessageQueue

  • Topic:消息逻辑分类(每个 Topic 默认 4 个读队列 + 4 个写队列)
  • MessageQueue:Topic 的物理分片(最小并行单位)
  • Broker 关系:1 个 Broker 可以承载多个 Topic 的多个 MessageQueue

存储机制:Broker 上的消息按 3 类文件组织:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
store/
├── CommitLog/         # 所有消息顺序写(1 个文件 1GB,默认)
│   ├── 00000000000000000000
│   └── 00000000001073741824
├── ConsumeQueue/      # 消费队列(每个 MessageQueue 一个)
│   └── order-topic/
│       └── 0/         # Queue 0 的消费索引
│           ├── 00000000000000000000
│           └── ...
├── Index/             # 按 key / 时间戳 索引
│   └── 202606091230
└── config/            # Broker 配置

关键点

  • CommitLog 顺序写——所有 Topic 的消息追加到同一个 CommitLog,避免多文件随机写
  • ConsumeQueue 索引——按 Topic + QueueId 索引到 CommitLog 偏移
  • IndexFile 检索——按消息 key 或时间戳快速定位

💡 原理:为什么 CommitLog 顺序写比 Kafka 的多 Partition 写还快

Kafka 每个 Partition 是独立文件,多 Producer 并发写不同 Partition 时,磁盘磁头要频繁寻道

RocketMQ 把所有 Topic 的所有消息顺序追加到同一个 CommitLog——单文件顺序追加,磁头不动

实测单 Broker CommitLog 顺序写可达 600MB/s(HDD),SSD 可达 2GB/s。比 Kafka 多 Partition 写高 3-5 倍

2.3 关键特性

事务消息(最核心特性)

解决"本地事务 + 消息发送“的最终一致性。流程:

1
2
3
4
5
6
7
1. Producer 发送 prepared 消息(half message)→ RocketMQ
2. RocketMQ 标记消息为"prepared"(消费者不可见)
3. Producer 执行本地事务(如创建订单)
4. Producer 根据本地事务结果发 commit 或 rollback
   - commit → RocketMQ 让消息对消费者可见
   - rollback → RocketMQ 删除 prepared 消息
5. 如果 Producer 阶段 4 失败,RocketMQ **定期回查** Producer 的本地事务状态

与本地消息表对比

方案业务侵入性能一致性
本地消息表高(要建消息表 + 定时扫)低(DB 双写)最终一致
RocketMQ 事务消息低(@RocketMQTransactionListener)高(无 DB 双写)最终一致

顺序消息

  • 原理:同一业务 key 的消息路由到同一 MessageQueue
  • 代价:MessageQueue 数 ≤ Consumer 实例数(否则有的 Consumer 分不到消息
  • 实战:订单顺序消息,MessageQueueSelectororderId hash 选队列

延迟消息

  • 18 个固定级别1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  • 使用场景:15 分钟未支付取消订单(用 30 分钟延迟)
  • 限制不支持任意时间延迟(如 7 天),需要用任务调度(xxl-job)+ 延迟消息组合

消息回溯

  • 按时间戳回退消费位点(consumer.resetOffsetByTimestamp
  • 用于补数据、补偿消费

性能参数

  • 单集群百万级 TPS(阿里双 11 实测)
  • 单 Broker 顺序写 600MB/s(HDD)
  • 消息投递延迟 P99 < 10ms(同机房)

2.4 典型使用(Java + Spring Boot)

Maven 依赖(Spring Cloud Alibaba + RocketMQ):

1
2
3
4
5
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.0</version>
</dependency>

application.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
rocketmq:
  name-server: rocketmq-namesrv-1:9876,rocketmq-namesrv-2:9876
  producer:
    group: order-producer
    send-message-timeout: 10000
    retry-times-when-send-failed: 3
    retry-times-when-send-async-failed: 3
    max-message-size: 4194304      # 4MB
    compress-message-body-threshold: 4096
  consumer:
    group: order-consumer
    enable-message-trace: false
    pull-batch-size: 32

事务消息生产者(解决分布式事务):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Service
public class OrderTxProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void createOrderWithEvent(Order order) {
        // 发送事务消息
        rocketMQTemplate.sendMessageInTransaction(
            "order-tx-topic",          // Topic
            MessageBuilder.withPayload(order).build(),
            order                       // arg 传给本地事务监听器
        );
    }

    @RocketMQTransactionListener(txProducerGroup = "order-tx-producer")
    public class OrderTransactionListener implements RocketMQLocalTransactionListener {
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            try {
                // 1. 本地事务:创建订单(写 DB)
                Order order = (Order) arg;
                orderRepository.save(order);
                // 2. 成功 → 提交消息
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                // 失败 → 回滚消息
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }

        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // 回查:RocketMQ 询问未确认消息的本地事务状态
            String orderId = msg.getKeys();
            boolean exists = orderRepository.existsById(orderId);
            return exists
                ? RocketMQLocalTransactionState.COMMIT
                : RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

顺序消费者(保证同一订单消息严格有序):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer",
    consumeMode = ConsumeMode.ORDERLY   // 顺序消费模式
)
public class OrderSeqConsumer implements RocketMQListener<OrderEvent> {
    @Override
    public void onMessage(OrderEvent event) {
        // 业务处理
        orderService.handle(event);
    }
}

// 生产者侧用 MessageQueueSelector 保证同 orderId 路由到同队列
public void sendOrderEvent(OrderEvent event) {
    rocketMQTemplate.syncSendOrderly(
        "order-topic",
        MessageBuilder.withPayload(event).build(),
        event.getOrderId(),  // hashKey
        3000                 // 超时
    );
}

广播消费(每个 Consumer 收到全量消息):

1
2
3
4
5
6
7
8
@RocketMQMessageListener(
    topic = "config-topic",
    consumerGroup = "config-broadcast",
    messageModel = MessageModel.BROADCASTING   // 广播模式
)
public class ConfigConsumer implements RocketMQListener<ConfigEvent> {
    // 每个 Consumer 收到所有消息(适合配置刷新场景)
}

2.5 生产集群部署(手把手)

目标拓扑

  • 2 节点 NameServer(无状态,2 节点足够)
  • 2 主 4 从 Broker(Dledger 模式,6 节点 = 2 副本组 × 3 节点)
  • 1 节点 Controller(5.x 新增,负责 Dledger 选主)
  • 1 个 RocketMQ Console

完整 docker-compose.yml

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
version: "3.8"

services:
  # ============= NameServer 集群 =============
  rocketmq-namesrv-1:
    image: apache/rocketmq:5.3.0
    container_name: rocketmq-namesrv-1
    command: sh mqnamesrv
    ports:
      - "9876:9876"
    volumes:
      - namesrv1-logs:/home/rocketmq/logs
    networks:
      - rocketmq-net

  rocketmq-namesrv-2:
    image: apache/rocketmq:5.3.0
    container_name: rocketmq-namesrv-2
    command: sh mqnamesrv
    ports:
      - "9877:9876"
    volumes:
      - namesrv2-logs:/home/rocketmq/logs
    networks:
      - rocketmq-net

  # ============= Controller 集群(5.x 负责 Dledger 选主) =============
  rocketmq-controller:
    image: apache/rocketmq:5.3.0
    container_name: rocketmq-controller
    command: sh mqcontroller
    ports:
      - "8080:8080"
    environment:
      JAVA_OPT_EXT: "-Xms512m -Xmx512m"
    volumes:
      - controller-logs:/home/rocketmq/logs
    networks:
      - rocketmq-net

  # ============= Broker 节点 1(Dledger 副本组 1) =============
  rocketmq-broker-1:
    image: apache/rocketmq:5.3.0
    container_name: rocketmq-broker-1
    command: sh mqbroker
    ports:
      - "10911:10911"   # Broker 主端口
      - "10912:10912"   # Broker HA 端口
    environment:
      JAVA_OPT_EXT: "-Xms1g -Xmx1g -Xmn512m"
      NAMESRV_ADDR: "rocketmq-namesrv-1:9876;rocketmq-namesrv-2:9876"
    volumes:
      - broker1-store:/home/rocketmq/store
      - broker1-logs:/home/rocketmq/logs
      - ./broker-1.conf:/home/rocketmq/rocketmq-5.3.0/conf/broker.conf
    depends_on:
      - rocketmq-namesrv-1
      - rocketmq-namesrv-2
      - rocketmq-controller
    networks:
      - rocketmq-net

  # ============= Broker 节点 2、3、4(省略,依同样模式) =============
  # 略:rocketmq-broker-2、3、4 同样配置
  # 重要:每个 broker.conf 里需要:
  #   brokerClusterName=DefaultCluster
  #   brokerName=broker-a
  #   brokerId=0 (master) / 1 (slave)
  #   brokerRole=DLEDGER (5.x 必须)
  #   listenPort=10911
  #   namesrvAddr=rocketmq-namesrv-1:9876;rocketmq-namesrv-2:9876
  #   storePathRootDir=/home/rocketmq/store
  #   dledgerGroup=broker-a-dledger  # 同副本组同名
  #   dledgerPeers=n0;rocketmq-broker-1:40911;n1;rocketmq-broker-2:40911;n2;rocketmq-broker-3:40911
  #   dledgerSelfId=n0  # 当前节点 ID
  #   enableDLegerCommitLog=true

  # ============= RocketMQ Console =============
  rocketmq-console:
    image: apacherocketmq/rocketmq-console-ng:latest
    container_name: rocketmq-console
    ports:
      - "8081:8080"
    environment:
      JAVA_OPTS: "-Drocketmq.namesrv.addr=rocketmq-namesrv-1:9876;rocketmq-namesrv-2:9876"
    depends_on:
      - rocketmq-namesrv-1
      - rocketmq-namesrv-2
    networks:
      - rocketmq-net

volumes:
  namesrv1-logs:
  namesrv2-logs:
  controller-logs:
  broker1-store:
  broker1-logs:
  # 其他 broker 同

networks:
  rocketmq-net:
    driver: bridge

broker-1.conf 示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
brokerRole=DLEDGER
listenPort=10911
namesrvAddr=rocketmq-namesrv-1:9876;rocketmq-namesrv-2:9876
storePathRootDir=/home/rocketmq/store
dledgerGroup=broker-a-dledger
dledgerPeers=n0;rocketmq-broker-1:40911;n1;rocketmq-broker-2:40911;n2;rocketmq-broker-3:40911
dledgerSelfId=n0
enableDLegerCommitLog=true
flushCommitLogLeastPages=4
flushCommitLogThoroughInterval=10000

启动与验证

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
docker-compose up -d
# 等待 30 秒(启动顺序:NameServer → Controller → Broker)

# 1. 验证 NameServer
docker exec rocketmq-namesrv-1 sh mqadmin clusterList -n rocketmq-namesrv-1:9876
# 应看到所有 Broker 节点

# 2. 创建 Topic
docker exec rocketmq-broker-1 sh mqadmin updateTopic \
  -n rocketmq-namesrv-1:9876 \
  -t order-topic \
  -c DefaultCluster \
  -b rocketmq-broker-1:10911 \
  -r 4 -w 4   # 4 读 4 写队列

# 3. 发送测试消息
docker exec rocketmq-broker-1 sh tools.sh org.apache.rocketmq.example.quickstart.Producer

# 4. 访问 Console
# 浏览器打开 http://localhost:8081

监控告警

指标告警阈值排查
rocketmq_dledger_role角色变化Dledger 选主(需关注)
rocketmq_queue_offset_diff消费位点差 > 10000消费慢或挂
rocketmq_broker_tps突降 50%Producer 端异常
rocketmq_commitlog_disk_usage> 80%调 fileReservedTime

故障演练

1
2
3
4
5
6
7
8
9
# 演练 1:杀 Broker Master
docker stop rocketmq-broker-1
# 观察:rocketmq-broker-2 自动升级为新 Master(Dledger Raft 选主,30 秒内)
# 验证:rocketmq-console 看 broker-2 role=MASTER

# 演练 2:杀 NameServer
docker stop rocketmq-namesrv-1
# 观察:剩 1 个 NameServer,**生产消费仍可继续**(NameServer 无状态,路由信息已缓存)
# 启动新的 NameServer 自动加入集群

2.6 6 个避坑点

🛑 误区警示:RocketMQ 6 大经典坑

  1. NameServer 单点:只部署 1 个 NameServer,挂了所有路由失效——必须至少 2 节点
  2. 事务消息无限回查checkLocalTransaction 死循环或返回 UNKNOWN——回查有最大次数(默认 15 次),超了 RocketMQ 强制丢弃
  3. 顺序消息队列数 < 消费者数:比如 4 队列 + 6 消费者,2 个消费者永远分不到消息——队列数 ≥ 消费者数
  4. 延迟消息固定级别不够用:想发 7 天延迟,RocketMQ 不支持任意时间——用任务调度(xxl-job)+ 延迟消息组合
  5. 消费失败无限重试:默认重试 16 次,间隔 10s~10min——消费代码必须幂等,否则重试 16 次会重复扣款/扣库存
  6. Broker 磁盘满:CommitLog 默认保留 72 小时——必须监控磁盘水位 + 配 fileReservedTime + 容量规划

2.7 实战经验集锦

RocketMQ 演进时间线

  • 2012 年:阿里内部启动,代号 “MetaQ”,支撑双 11 秒杀
  • 2013 年:阿里内部规模化使用,NameServer 架构定型
  • 2016 年 11 月:正式开源,捐赠给 Apache
  • 2017 年:进 Apache 孵化,3.x 系列发布
  • 2018 年:成为 Apache 顶级项目,4.x 系列
  • 2019 年:4.4 发布,事务消息生产级
  • 2020 年:4.7 发布,Dledger 模式成熟
  • 2022 年:5.0 发布,Controller 集群、Pop 消费等新特性
  • 2023 年:5.1 发布,Controller 集群多活
  • 2024 年:5.3 发布,存储分层优化、消息轨迹增强

典型生产事故与教训

  1. 2017 年阿里双 11:NameServer 部署 1 节点,挂掉后全集群路由失效。教训NameServer 必须 2 节点以上,无状态可以随意扩
  2. 2019 年某电商:事务消息 checkLocalTransaction 死循环,RocketMQ 频繁回查。教训回查逻辑必须能在 1 秒内返回 + 业务状态机要稳定
  3. 2021 年某支付公司:RocketMQ 4.x 升级 5.x 没灰度,5.x 改用 Controller 选主,配置错误导致 Dledger 一直选主失败。教训先灰度 1% 流量观察 1 周再全量
  4. 2023 年某物流公司:Broker 磁盘满,CommitLog 拒绝写。教训必须监控磁盘水位 + fileReservedTime 不要设太短

阿里双 11 RocketMQ 实战

2012-2024 年双 11,RocketMQ 经历了从 0 到 1 的考验:

  • 2012 年:支撑 100 万 TPS 的秒杀,NameServer 设计就是为了高可用 + 简单(无状态,任意挂任意起)
  • 2015 年:事务消息 1.0 上线,解决订单创建 + 库存扣减 + 支付通知的分布式事务
  • 2017 年:双 11 峰值 1.5 亿 TPS(百万级 QPS × 数百 Topic),RocketMQ 4.0 扛住了
  • 2020 年:Dledger 自动选主,主从切换从 5 分钟降到 30 秒
  • 2023 年:5.x Controller 多活,支持跨机房部署

性能调优实战

  • Broker 端
    • brokerRole = DLEDGER(5.x 必须)
    • flushCommitLogLeastPages = 4(积累 4 页才刷盘)
    • flushCommitLogThoroughInterval = 10000(10 秒强制刷盘)
    • maxMessageSize = 4194304(4MB 单条)
  • Producer 端
    • sendMessageTimeout = 10000(10 秒超时)
    • retryTimesWhenSendFailed = 3(重试 3 次)
    • maxMessageSize = 4194304
  • Consumer 端
    • consumeMessageBatchMaxSize = 32(批量消费 32 条)
    • pullBatchSize = 32(一次拉 32 条)
    • rebalanceInterval = 20000(20 秒一次 rebalance)

业务消息设计模式

  • 事务消息模式:本地事务 + 消息发送,保证最终一致性
  • 顺序消息模式:同一业务 key 路由到同一队列,保证严格顺序
  • 延迟消息模式:用 18 个固定级别实现延迟投递
  • 广播消息模式:每个 Consumer 收到全量消息(适合配置刷新)
  • 过滤消息模式:Consumer 按 tag/SQL 过滤自己关心的消息

Dledger 主从切换实战日志分析

1
2
3
4
5
6
7
# 正常状态
INFO  Dledger - dledger[broker-a-dledger] elected new leader: n0
INFO  BrokerController - broker-a 切换为新 Master

# 异常状态(split brain)
WARN  Dledger - 发现两个节点都认为自己是 Leader
ERROR BrokerController - Dledger 选主冲突,需要人工介入

运维常见命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 查看集群状态
mqadmin clusterList -n namesrv:9876

# 查看 Topic 列表
mqadmin topicList -n namesrv:9876

# 查看消费进度
mqadmin consumerProgress -n namesrv:9876 -g consumer-group

# 重置消费位点
mqadmin resetOffsetByTime -n namesrv:9876 -t topic -g group -s timestamp

# 查看 Broker 状态
mqadmin brokerStatus -n namesrv:9876 -b broker:10911

国内大厂落地情况

  • 阿里:核心交易(订单/支付)100% 用 RocketMQ,日均 1 万亿条消息
  • 京东:自研 JMQ(基于 RocketMQ 4.x),日均 800 亿条
  • 滴滴:核心交易 + 业务通知,月均 500 亿条
  • 小米:IoT 设备消息(部分场景)
  • 字节跳动:IM 消息、推送通知

未来趋势

  • Controller 多活:5.x 已支持,6.x 计划完善跨机房消息复制
  • Pop 消费:5.0 引入,Consumer 可以"偷看"消息,降低延迟 30%
  • 存储分层:冷数据自动转对象存储(OSS/S3),磁盘成本降 80%
  • 轻量级 SDK:Rust SDK 计划中,嵌入式场景可直连 Broker

三、RabbitMQ:传统企业消息的"事实标准”

3.1 是什么 + 解决什么问题

RabbitMQ 是 2007 年用 Erlang 写的 AMQP 0-9-1 协议实现,2010 年被 Pivotal(VMware 旗下)收购,现在是 Broadcom 旗下产品。GitHub Stars 12k+,是传统企业消息中间件的事实标准

3 大典型场景

  • 传统企业集成:银行、保险、证券的核心系统
  • 复杂路由:4 种 Exchange × 多种 Binding 规则
  • 任务队列:Celery(Python)/ Sidekiq(Ruby)/ Laravel Queue(PHP)的后端

与 Kafka / RocketMQ 的本质区别:RabbitMQ 设计哲学是"灵活路由 + 复杂业务消息"——核心是"Exchange + Queue + Binding"的灵活拓扑。Kafka 偏基础设施,RocketMQ 偏业务消息,RabbitMQ 偏复杂路由

💡 原理:为什么用 Erlang

Erlang 是爱立信 1986 年为电信交换机设计的语言:

  • 进程隔离:每个进程独立内存,单进程崩溃不影响整体
  • OTP 行为库:内置 supervisor tree,自动故障恢复
  • 软实时:消息处理延迟可预测

RabbitMQ 用 Erlang 的"轻量进程 + 监督树“模型实现”每个连接一个进程"——百万级连接管理不卡。这是为什么 RabbitMQ 适合长连接 + 复杂路由

3.2 核心架构

RabbitMQ 集群由 6 类组件构成:

组件职责备注
Publisher生产消息发到 Exchange
Exchange路由4 种类型(direct/topic/fanout/headers)
Queue存储消息绑定到 Exchange
BindingExchange → Queue 的规则含 routing key
Consumer消费消息订阅 Queue
ChannelTCP 多路复用1 个 TCP 连接可多 Channel

4 种 Exchange 路由模式

类型路由规则典型场景
directrouting key 完全匹配点对点
topicrouting key 通配符(* 1 段 / # 多段)复杂路由
fanout广播到所有绑定 Queue通知、广播
headers按消息 header 路由极少见

队列类型(3 种):

类型实现推荐度
Classic Queue经典队列,单节点存储⭐ 已过时
Quorum Queue基于 Raft 共识,多节点复制⭐⭐⭐ 生产推荐
Stream Queue持久化日志流,类似 Kafka适合流场景

💡 原理:Quorum Queue 为什么取代 Classic 镜像队列

经典镜像队列(Classic Mirrored Queue)通过"主从异步复制“实现高可用,但有个致命问题——脑裂

  • 网络分区时主从双方都以为自己是主
  • 分区恢复后两边数据合并,产生重复或丢失消息

Quorum Queue(RabbitMQ 3.8+)用 Raft 共识算法——任意时刻只有一个 Leader,所有写操作走 Leader + 半数以上 Follower 确认

生产建议新项目一律 Quorum Queue。经典镜像队列只用于必须升级的旧项目。

3.3 关键特性

消息确认机制

  • Publisher Confirm:发消息后等 Broker 确认(替代"发完就忘”)
  • Consumer Ack
    • basic.ack——确认消费成功
    • basic.nack——消费失败,可指定 requeue
    • basic.reject——单条拒绝

死信队列(DLX)

  • 触发条件:消息被 nack + requeue=false / TTL 过期 / 队列满
  • 用途:失败消息隔离(不会无限循环)

流控(Flow Control)

  • 内存高水位(默认 40% 内存):Producer 被限速
  • 磁盘高水位(默认 50% 磁盘):所有生产者被阻塞
  • 目的:保护 Broker 不被压垮

延迟消息

  • 官方:rabbitmq-delayed-message-exchange 插件(需安装
  • 精度:毫秒级任意延迟
  • 使用场景:30 分钟未支付取消订单

消费者预取(QoS)

  • basic.qos(prefetch_count=N):限制每个 Consumer 单次预取数量
  • 避免:一个 Consumer 抢占所有消息

3.4 典型使用(Java + Spring Boot)

Maven 依赖

1
2
3
4
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
spring:
  rabbitmq:
    host: rabbitmq-cluster
    port: 5672
    username: admin
    password: ${RABBITMQ_PASSWORD}
    virtual-host: /order
    publisher-confirm-type: correlated   # 开启 Publisher Confirm
    publisher-returns: true              # 开启消息路由失败回调
    listener:
      simple:
        acknowledge-mode: manual         # 手动 ack
        prefetch: 32                     # 消费者预取
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 2s

Exchange + Queue 声明(Quorum Queue):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Configuration
public class RabbitConfig {
    @Bean
    public TopicExchange orderExchange() {
        return ExchangeBuilder
            .topicExchange("order-events")
            .durable(true)
            .build();
    }

    @Bean
    public Queue orderCreatedQueue() {
        return QueueBuilder.durable("order-created-quorum")
            .quorum()                        // ⭐ Quorum Queue
            .deadLetterExchange("order-dlx")  // 死信交换器
            .deadLetterRoutingKey("dlx")
            .ttl(86400000)                    # 消息 TTL 24 小时
            .build();
    }

    @Bean
    public Binding orderCreatedBinding(Queue orderCreatedQueue, TopicExchange orderExchange) {
        return BindingBuilder
            .bind(orderCreatedQueue)
            .to(orderExchange)
            .with("order.created");           // routing key 通配
    }
}

生产者(Publisher Confirm):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Service
public class OrderEventPublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void publishOrderCreated(OrderEvent event) {
        // 设置 ConfirmCallback
        rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> {
            if (!ack) {
                log.error("消息未确认: {}", cause);
                // 重试或告警
            }
        });

        // 设置 ReturnsCallback(消息路由失败回调)
        rabbitTemplate.setReturnsCallback(returned ->
            log.error("消息路由失败: exchange={}, routingKey={}",
                returned.getExchange(), returned.getRoutingKey())
        );

        rabbitTemplate.convertAndSend(
            "order-events",
            "order.created",
            event,
            message -> {
                message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
                return message;
            }
        );
    }
}

消费者(手动 ack + 死信队列):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class OrderCreatedConsumer {
    @RabbitListener(queues = "order-created-quorum")
    public void onMessage(
        @Payload OrderEvent event,
        Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag
    ) {
        try {
            // 业务处理
            inventoryService.deduct(event);
            // 手动 ack
            channel.basicAck(deliveryTag, false);
        } catch (BusinessException e) {
            // 业务异常:nack 不重试(走死信队列)
            channel.basicNack(deliveryTag, false, false);
        } catch (Exception e) {
            // 系统异常:nack 重试
            channel.basicNack(deliveryTag, false, true);
        }
    }
}

延迟消息(插件方式):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 1. 启用插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
// 2. 声明延迟 Exchange
@Bean
public CustomExchange delayedExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange("order-delayed", "x-delayed-message", true, false, args);
}

// 3. 发送延迟消息(30 分钟)
rabbitTemplate.convertAndSend(
    "order-delayed",
    "order.cancel",
    orderId,
    message -> {
        message.getMessageProperties().setDelay(30 * 60 * 1000);  // 30 min
        return message;
    }
);

3.5 生产集群部署(手把手)

目标拓扑

  • 3 节点 RabbitMQ 集群(Quorum Queue)
  • 2 节点 HAProxy(4 层负载 + 健康检查)
  • 1 个 Prometheus Exporter(rabbitmq_prometheus 插件)

完整 docker-compose.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
version: "3.8"

services:
  # ============= RabbitMQ 节点 1 =============
  rabbitmq-1:
    image: rabbitmq:3.13-management
    container_name: rabbitmq-1
    hostname: rabbitmq-1
    ports:
      - "5672:5672"     # AMQP
      - "15672:15672"   # Management UI
      - "15692:15692"   # Prometheus Exporter
    environment:
      RABBITMQ_ERLANG_COOKIE: "secret-cookie-here"   # 节点间发现用
    volumes:
      - rabbitmq1-data:/var/lib/rabbitmq
    networks:
      - rabbitmq-net

  rabbitmq-2:
    image: rabbitmq:3.13-management
    container_name: rabbitmq-2
    hostname: rabbitmq-2
    ports:
      - "5673:5672"
      - "15673:15672"
      - "15693:15692"
    environment:
      RABBITMQ_ERLANG_COOKIE: "secret-cookie-here"
    volumes:
      - rabbitmq2-data:/var/lib/rabbitmq
    networks:
      - rabbitmq-net

  rabbitmq-3:
    image: rabbitmq:3.13-management
    container_name: rabbitmq-3
    hostname: rabbitmq-3
    ports:
      - "5674:5672"
      - "15674:15672"
      - "15694:15692"
    environment:
      RABBITMQ_ERLANG_COOKIE: "secret-cookie-here"
    volumes:
      - rabbitmq3-data:/var/lib/rabbitmq
    networks:
      - rabbitmq-net

  # ============= HAProxy 负载均衡 =============
  haproxy:
    image: haproxy:3.0
    container_name: rabbitmq-haproxy
    ports:
      - "5672:5672"      # 对外 AMQP 入口
    volumes:
      - ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
    depends_on:
      - rabbitmq-1
      - rabbitmq-2
      - rabbitmq-3
    networks:
      - rabbitmq-net

volumes:
  rabbitmq1-data:
  rabbitmq2-data:
  rabbitmq3-data:

networks:
  rabbitmq-net:
    driver: bridge

haproxy.cfg

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
global
    log /dev/log local0
    maxconn 4096

defaults
    mode tcp
    log global
    timeout connect 5s
    timeout client 30s
    timeout server 30s

# AMQP 入口(4 层负载)
frontend rabbitmq-frontend
    bind *:5672
    default_backend rabbitmq-backend

backend rabbitmq-backend
    balance roundrobin
    option tcp-check
    tcp-check send-binary ff
    tcp-check expect binary 01
    server rabbitmq-1 rabbitmq-1:5672 check inter 5s
    server rabbitmq-2 rabbitmq-2:5672 check inter 5s
    server rabbitmq-3 rabbitmq-3:5672 check inter 5s

启动与验证

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
docker-compose up -d

# 1. 节点加入集群(在节点 2、3 上执行)
docker exec rabbitmq-2 rabbitmqctl stop_app
docker exec rabbitmq-2 rabbitmqctl join_cluster rabbit@rabbitmq-1
docker exec rabbitmq-2 rabbitmqctl start_app

# 2. 启用管理插件 + Quorum Queue 插件
docker exec rabbitmq-1 rabbitmq-plugins enable rabbitmq_management rabbitmq_prometheus

# 3. 验证集群状态
docker exec rabbitmq-1 rabbitmqctl cluster_status
# 应显示 3 节点

# 4. 声明 Quorum Queue
docker exec rabbitmq-1 rabbitmqadmin declare queue \
  name=order-created-quorum \
  durable=true \
  arguments='{"x-queue-type":"quorum","x-dead-letter-exchange":"order-dlx"}'

# 5. 访问 UI
# 浏览器打开 http://localhost:15672 (guest/guest)

监控告警

指标告警阈值排查
rabbitmq_queue_messages持续增长消费慢或挂
rabbitmq_queue_consumers= 0消费者掉线
rabbitmq_process_resident_memory_bytes> 80% 内存高水位调小消费者预取
rabbitmq_disk_space_available_bytes< 5GB清日志或扩盘

故障演练

1
2
3
4
5
6
7
8
9
# 演练 1:杀单节点
docker stop rabbitmq-2
# 观察:HAProxy 30 秒内剔除 rabbitmq-2,剩余 2 节点继续服务
# Quorum Queue 自动选新 Leader

# 演练 2:网络分区
docker network disconnect rabbitmq-net rabbitmq-3
# 观察:剩 2 节点满足 Raft 仲裁(半数 = 2),继续服务
# rabbitmq-3 重新接入后自动同步数据

3.6 6 个避坑点

🛑 误区警示:RabbitMQ 6 大经典坑

  1. 经典镜像队列脑裂:用 Classic 镜像队列,网络分区时主从同时接收消息——换 Quorum Queue
  2. 队列无限增长:未设 x-max-length——队列无限增长直到磁盘满,必须配最大长度
  3. 死信循环:业务 bug 导致消息反复失败 + 死信路由又指回原队列——死信队列单独监控 + 告警
  4. 单 Channel 性能瓶颈:所有生产者共用 1 个 Channel——每个线程独立 Channel + ChannelPool 池化
  5. 内存高水位流速:未监控内存高水位(默认 40%)——超限后 Producer 限速,业务感知
  6. 客户端连接泄漏:每次发消息新建 Connection——用 ConnectionFactory + Spring AMQP 自动管理

3.7 实战经验集锦

RabbitMQ 演进时间线

  • 2007 年:Erlang 写就,最初用于电信行业消息传递
  • 2010 年:被 SpringSource(后被 VMware 收购)收购
  • 2013 年:进 Pivotal(VMware 拆分)
  • 2017 年:3.0 发布,多协议支持
  • 2019 年:3.8 发布,Quorum Queue GA
  • 2020 年:Stream Queue 引入(类似 Kafka)
  • 2021 年:Broadcom 收购 VMware,RabbitMQ 归 Broadcom
  • 2023 年:3.12 发布,Quorum Queue 增强
  • 2024 年:3.13 发布,性能优化 + K8s Operator

典型生产事故与教训

  1. 2018 年某银行:经典镜像队列脑裂,分区恢复后数据不一致。教训新项目必须用 Quorum Queue
  2. 2020 年某电商:RabbitMQ 队列无限增长,磁盘满后无法写入。教训所有队列必须配 x-max-length
  3. 2022 年某物流公司:单 Channel 性能瓶颈,所有生产者共用 1 个 Channel,吞吐量只有 1000 TPS。教训每个线程独立 Channel + ChannelPool 池化
  4. 2023 年某金融公司:死信队列未监控,业务 bug 导致死信队列堆积 1000 万条消息。教训死信队列必须独立监控 + 告警

从经典镜像到 Quorum 的迁移实战

经典镜像队列(Classic Mirrored Queue)迁移到 Quorum Queue 是 RabbitMQ 3.10+ 的强制升级路径。迁移步骤:

  1. 新建 Quorum Queue:用 rabbitmqadmin 声明新队列,指定 x-queue-type: quorum
  2. 双写过渡期:生产者同时写老队列(Classic)和新队列(Quorum),持续 1 周
  3. 消费者切换:消费者从老队列切到新队列,观察 3 天
  4. 下线老队列:确认新队列消费正常后,删除老队列
  5. 监控双队列:迁移期间双队列独立监控,避免遗漏

Erlang 集群调优实战

  • 节点配置
    • RABBITMQ_ERLANG_COOKIE 必须所有节点一致
    • RABBITMQ_NODENAME=rabbit@node-1(节点名固定)
    • RABBITMQ_USE_LONGNAME=true(DNS 解析)
  • 内存调优
    • vm_memory_high_watermark = 0.4(默认 40%,生产建议 0.6
    • vm_memory_high_watermark_paging_ratio = 0.5
  • 磁盘调优
    • disk_free_limit = 1GB(最小 1GB 磁盘)
    • disk_free_limit_relative = 1.0(至少 1 倍 RAM 磁盘)
  • 网络调优
    • tcp_listen_options.backlog = 128
    • tcp_listen_options.nodelay = true

复杂路由实战案例

订单系统涉及多种业务事件(订单创建、支付、库存、物流),用 topic exchange 实现灵活路由:

1
2
3
4
5
6
Exchange: order-events (topic)
├─ routing key: order.created    → Queue: order-created (库存服务订阅)
├─ routing key: order.paid       → Queue: order-paid (财务服务订阅)
├─ routing key: order.shipped    → Queue: order-shipped (物流服务订阅)
├─ routing key: order.cancelled  → Queue: order-cancelled (客服订阅)
└─ routing key: order.#          → Queue: order-all (数仓订阅,全量数据)

关键点

  • * 匹配 1 段(order.created 只匹配 order.created
  • # 匹配多段(order.# 匹配 order.createdorder.paid.x 等)
  • 不同服务订阅不同 routing key,互不干扰

Stream Queue 实战(Kafka 替代方案)

RabbitMQ 3.9+ 引入的 Stream Queue 是基于日志的持久化队列,对标 Kafka

  • 单条消息可保留 100GB+
  • 消费者按 offset 拉取(类似 Kafka
  • 可重放历史消息(类似 Kafka

适用场景:RabbitMQ 已经是团队主流,但又想要 Kafka 的"日志重放"能力。生产建议:如果只需要消息队列 + 简单路由,用 Classic/Quorum Queue;如果需要日志重放 + 大容量历史数据,考虑 Stream Queue 或直接换 Kafka。

国内大厂落地情况

  • 银行:工行、建行、招行的核心系统,AMQP 是事实标准
  • 证券:中信证券、华泰证券的交易通知
  • 政企:国家电网、税务系统的内部集成
  • 互联网老项目:腾讯早期 QQ 农场、阿里早期电商(部分仍在用)
  • 新项目:越来越少(阿里系转 RocketMQ,字节转 Kafka)

未来趋势

  • Quorum Queue 成为默认:3.10+ 已默认推荐,4.x 计划废弃 Classic
  • K8s Operator 完善:Broadcom 推 RabbitMQ Cluster Operator for Kubernetes
  • Serverless 化:CloudAMQP 等 SaaS 提供按量付费
  • 协议扩展:MQTT 5.0 协议插件(部分场景可替代 EMQX)

RabbitMQ vs Kafka vs RocketMQ:本质差异

维度RabbitMQKafkaRocketMQ
设计哲学灵活路由(消息总线)高吞吐(日志)业务消息(队列)
核心优势4 种 Exchange 灵活路由高吞吐 + 持久化事务/顺序/延迟
核心劣势性能(万级 TPS)事务能力弱运维复杂
学习曲线陡(AMQP)
运维工具完善(Management UI)完善(Kafka UI)中等(Console)
生态成熟度完善完善(最大生态)国内成熟

四、横向对比 + 选型决策树

4.1 9 维度对比表

核心维度对比

维度Kafka 3.7RocketMQ 5.3RabbitMQ 3.13
协议自定义二进制(基于 TCP)自定义二进制AMQP 0-9-1
设计哲学分布式事件流平台(日志)企业级业务消息(队列)灵活路由(消息总线)
消息顺序单 Partition 严格有序单 MessageQueue 严格有序单 Queue 内有序(但多 Consumer 不保证)
事务消息支持(Producer 端事务)⭐ 支持(half message + 回查,生产级)支持(AMQP 事务,性能差)
延迟消息不支持(需外部调度)⭐ 支持(18 固定级别)支持(插件,任意延迟)
峰值 TPS百万级/集群百万级/集群万级/集群(受 Erlang 调度限制)
消息回溯⭐ 支持(按 offset)支持(按时间戳)不支持(消费即删除)
集群模式KRaft(无 ZK)NameServer + DledgerErlang 集群 + Quorum Queue
运维成本中(KRaft 简化)中(5.x 简化)中(Quorum Queue 简化)
典型场景日志管道 / 流处理 / 业务消息业务消息 / 事务消息 / IoT传统企业 / 复杂路由 / 任务队列
学习曲线中-高(AMQP 复杂)
国内生产案例阿里(日志)、字节、美团、滴滴阿里、京东、滴滴、得物银行、证券、政企、互联网老项目

💡 原理:为什么 RocketMQ 的事务消息最强

三个 MQ 都有事务消息,但实现方式不同

  • Kafka:Producer 端事务(只解决"发消息 + 业务操作"的原子性,不解决消费侧
  • RabbitMQ:AMQP 事务(性能极差,实战几乎不用
  • RocketMQ:half message + 回查(真正生产级,能解决跨服务数据最终一致性

业务侧选型:如果"本地事务 + 消息发送"原子性是刚需,RocketMQ 是唯一选择

4.2 4 类场景决策树

4 类场景决策清单

业务特征推荐方案关键理由
日志管道 / 大数据流处理Kafka高吞吐 + 持久化 + 与 Flink/Spark 集成
业务消息(事务 / 顺序 / 延迟)RocketMQhalf message + 严格顺序 + 18 延迟级别
传统企业集成 / 复杂路由RabbitMQAMQP 标准 + 4 种 Exchange 灵活路由
金融级(强可靠 + 复杂路由)RocketMQ + RabbitMQ 混用核心交易用 RocketMQ 事务,外围通知用 RabbitMQ 灵活路由

🎯 避坑点:决策树不是"非此即彼"

真实生产中很少有项目只用一种 MQ。常见组合:

  • 阿里:Kafka(日志)+ RocketMQ(业务)
  • 字节:Kafka(数据管道)+ RocketMQ(IM 消息)
  • 美团:Kafka + RocketMQ 双轨

选型原则:按"业务场景"分别选型,不要追求"一套打天下"

4.3 IoT/设备场景:EMQX 对比

EMQX 严格说是 IoT MQTT broker(不是传统消息中间件),但任何涉及 IoT 设备的系统都要选型。这里把 EMQX 也加入对比:

维度EMQX 5.xKafka 3.7RocketMQ 5.3RabbitMQ 3.13
协议MQTT 3.1.1 / 5.0自定义二进制自定义二进制AMQP 0-9-1
客户端IoT 设备(嵌入式 C/MQTT)服务端服务端服务端
Qos 等级⭐ 0/1/2(MQTT 标准)无(消费者自己控制)
离线消息⭐ 支持(持久会话)⭐ 支持(按 offset 重放)支持(按时间戳)不支持(消费即删除)
设备影子⭐ 内置(保留设备最新状态)
峰值连接数⭐ 千万级(单集群)不适用不适用百万级
典型场景IoT 设备接入 / 车联网 / 工业互联网日志 / 流处理业务消息传统集成

IoT 架构推荐

选型建议

  • 设备 → 服务:用 EMQX(MQTT 协议 + QoS + 设备影子)
  • EMQX → 业务消息:用 RocketMQ(事务 / 业务处理)
  • EMQX → 大数据:用 Kafka(日志 / 实时计算)

💡 原理:MQTT vs AMQP vs 自定义协议

MQTT:为 IoT 设备设计,协议头极小(2 字节),适合嵌入式、低带宽、不稳定网络 AMQP:为企业集成设计,协议完整但较重,适合服务-服务通信 Kafka / RocketMQ 自定义协议:为分布式日志设计,性能最优但跨语言兼容性差

设备场景选 EMQX——协议层决定了 90% 的取舍

4.4 真实案例

阿里:RocketMQ 为主 + Kafka 辅助

  • 核心交易(订单/支付/库存):RocketMQ(事务消息 + 严格顺序 + 18 延迟级别)
  • 日志/监控(应用日志/调用链/审计):Kafka(高吞吐 + 持久化)
  • 理由:阿里是 RocketMQ 诞生地,业务消息场景必选 RocketMQ

字节跳动:Kafka + RocketMQ 双轨

  • 数据管道 / 实时计算Kafka(与 Flink 集成做实时推荐、A/B 测试数据流)
  • IM 消息 / 推送通知RocketMQ(顺序消息 + 海量 TPS)
  • 理由:字节数据量大,Kafka 高吞吐优势明显;IM 业务对消息可靠性要求高,RocketMQ 事务消息更稳

美团:Kafka + RocketMQ 双轨

  • 订单/支付核心RocketMQ(事务消息保数据一致性)
  • 用户行为日志Kafka(高吞吐 + 与 Druid/ES 集成做实时分析)
  • 理由:美团业务复杂,不同业务用不同 MQ 反而简化架构

滴滴:RocketMQ 核心交易

  • 核心交易(打车/支付/账户):RocketMQ(强可靠 + 事务)
  • 业务通知/IMRocketMQ(统一一套,运维简单)
  • 理由:滴滴对数据一致性要求高,RocketMQ 事务消息是刚需

京东:JMQ(自研 RocketMQ-like)

  • 核心交易JMQ(自研,基于 RocketMQ 4.x 改造)
  • 理由:京东 2015 年开始用 RocketMQ 4.x,2017 年深度定制成 JMQ,性能优化 + 内部运维工具齐全

启示

  • 业务消息场景,国内主流是 RocketMQ(阿里系生态)
  • 日志/数据管道,Kafka 是事实标准
  • 传统企业,RabbitMQ 仍占大头
  • IoT/设备,EMQX 必选

4.5 混用策略:多 MQ 共存

3 个混用理由

  1. 不同业务用不同 MQ(核心交易用 RocketMQ,日志用 Kafka)
  2. 新旧系统迁移(老系统用 RabbitMQ,新系统用 RocketMQ)
  3. 不同部门技术栈(A 部门用 Kafka,B 部门用 RocketMQ)

统一抽象层:避免业务代码耦合具体 MQ

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 1. 定义统一接口
public interface MessageBus {
    void send(String topic, Object message);
    void sendWithTransaction(String topic, Object message, Runnable localTx);
}

// 2. RocketMQ 实现
@Service
@ConditionalOnProperty(name = "mq.type", havingValue = "rocketmq")
public class RocketMQBus implements MessageBus {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    // 实现接口
}

// 3. Kafka 实现
@Service
@ConditionalOnProperty(name = "mq.type", havingValue = "kafka")
public class KafkaBus implements MessageBus {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    // 实现接口
}

// 4. 业务代码注入接口
@Service
public class OrderService {
    @Autowired
    private MessageBus messageBus;  // 不关心是哪种 MQ
    public void createOrder(Order order) {
        orderRepository.save(order);
        messageBus.send("order-topic", order);
    }
}

Spring Cloud Stream 进一步封装:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
spring:
  cloud:
    stream:
      bindings:
        order-out:
          destination: order-topic
          content-type: application/json
      rocketmq:
        binder:
          name-server: rocketmq-namesrv:9876

数据流转:Kafka → Flink → RocketMQ(典型实时计算链路):

为什么这样组合

  • Kafka:采集层高吞吐 + 大容量
  • Flink:计算层实时聚合 / 关联 / 过滤
  • RocketMQ:结果层业务消息(事务 + 顺序 + 延迟)

这是现代数据架构的事实标准——采集、计算、消费三种组件各司其职

4.6 MQ 选型决策流程图(详版)

第一步:明确业务核心需求

  • Q1:核心场景是什么?

    • 业务消息(订单/支付)→ 跳到第二步
    • 日志管道 → 直接选 Kafka
    • IoT 设备 → EMQX + 业务 MQ 桥接
    • 复杂路由 → RabbitMQ
  • Q2:需要事务消息吗?

    • 是 → RocketMQ(half message + 回查,生产级)
    • 否 → 跳到 Q3
  • Q3:需要严格顺序消息吗?

    • 是 → RocketMQ(单 MessageQueue 严格 FIFO)
    • 否 → 跳到 Q4
  • Q4:需要延迟消息吗?

    • 是(固定级别)→ RocketMQ(18 级别)
    • 是(任意时间)→ RabbitMQ(插件支持任意延迟)
    • 否 → 跳到 Q5
  • Q5:峰值 TPS 多大?

    • 百万级 → Kafka / RocketMQ
    • 万级 → RabbitMQ Quorum 够用

第二步:评估运维能力

  • Q6:团队有 Kafka 运维经验吗?

    • 是 → Kafka 优先
    • 否 → RocketMQ(国内资料多)
  • Q7:团队有 RabbitMQ 运维经验吗?

    • 是 → 复杂路由选 RabbitMQ
    • 否 → 业务消息选 RocketMQ
  • Q8:能否接受多套 MQ 运维?

    • 能 → 多 MQ 混用(按业务分)
    • 不能 → 选一个主用 MQ + 简单场景用其他

第三步:评估成本

  • Q9:预算是否允许商业版?

    • 是 → 阿里云 RocketMQ / AWS MSK / Confluent Cloud
    • 否 → 自建开源版
  • Q10:是否需要 Serverless 化?

    • 是 → Confluent Cloud / AWS MSK Serverless
    • 否 → 自建集群

决策矩阵(按"业务场景 + 团队能力 + 预算"三维度评分):

业务数据一致团队 Kafka 经验团队 RocketMQ 经验团队 RabbitMQ 经验预算推荐
业务消息RocketMQ
业务消息RocketMQ + 培训
业务消息Kafka
日志管道Kafka
IoT 设备EMQX + Kafka/RocketMQ 桥接
金融级RocketMQ + RabbitMQ 混用

4.7 未来趋势:MQ + Serverless + eBPF

趋势 1:Serverless 化

传统自建 MQ 集群需要 1-2 个 SRE 专职维护,Serverless 化后按量付费

  • Confluent Cloud(Kafka):$0.11/GB 写入 + $0.04/GB 存储
  • AWS MSK Serverless:$0.75/小时 + 流量费
  • 阿里云 RocketMQ Serverless:0.5 元/百万条

优势:免运维、弹性扩缩、起步成本低 劣势:跨云锁定、长期成本可能高于自建

趋势 2:eBPF 加速

eBPF(extended Berkeley Packet Filter)允许在内核安全地运行用户态程序,性能比 IPVS(传统 LVS)高 5-10 倍:

  • Cilium 用 eBPF 替代 kube-proxy
  • Merbridge 用 eBPF 加速 Service Mesh(替代 iptables)
  • 未来:eBPF 可能直接加速 Kafka / RocketMQ 内部网络

趋势 3:MQ + 实时计算深度融合

Kafka 已成为 Flink / Spark Streaming 的事实标准数据源。未来:

  • Kafka + Flink:实时计算 + 业务消息统一抽象
  • RocketMQ 5.x Pop 消费:Consumer 可以"偷看"消息,降低延迟 30%
  • 流批一体:Flink 同时处理流 + 批,MQ 作为统一数据通道

趋势 4:AI 驱动的 MQ 运维

AIOps(智能运维)在 MQ 领域的应用:

  • 自动调优:根据流量模式自动调整 Partition / Consumer 数量
  • 异常检测:AI 检测消费延迟突增,自动告警
  • 根因分析:AI 分析日志 + 指标,定位故障根因
  • 预测扩容:AI 预测流量峰值,提前扩容

趋势 5:MQ + 边缘计算

IoT 场景下,边缘节点也需要 MQ

  • EMQX Edge:轻量级 MQTT broker,部署在边缘设备
  • Apache Pulsar:分层架构,天然支持边缘 + 云端
  • Kafka Tiered Storage:冷数据自动转对象存储,降低边缘节点存储压力

总结:3 大消息中间件选型口诀

💡 记忆口诀:3 大 MQ 选型三字诀

Kafka 选吞吐——日志、管道、流处理,吞吐为王 RocketMQ 选业务——订单、支付、事务,业务为王 RabbitMQ 选路由——企业、集成、复杂路由,灵活为王

选型时先问三个问题:

  1. 核心场景是什么?(业务 / 日志 / 路由)
  2. 需要事务/顺序/延迟吗?(RocketMQ 强项)
  3. 能接受多 MQ 运维吗?(多 MQ 混用 vs 单一 MQ)

4.8 选型常见误区

最后总结 6 个选型常见误区,给技术决策者提个醒:

🛑 误区警示:6 个选型经典坑

  1. “最新就是最好的”——听说 Kafka 4.0 出了就上 4.0,结果 4.0 还在 beta。生产稳定 > 功能先进
  2. “选最难的显得有技术”——团队 3 个人,非要上 Kafka + Flink + 三机房。量力而行
  3. “选最便宜的”——用 ActiveMQ 0-day 漏洞,业务停摆 2 天。安全 > 成本
  4. “听大厂的选型”——阿里用 RocketMQ 不代表你也得用。业务规模不同,选型不同
  5. “选型一次性到位”——选了 RabbitMQ 3 年后才换 RocketMQ,演进成本是预期 5 倍
  6. “忽略运维成本”——选 Kafka 但团队没人懂 KRaft,故障时只能干瞪眼

正确心态:选型是"5-10 年的事",演进能力比"一次选对"更重要。

附录 A:常见问题 FAQ

A1: Kafka 应该用 KRaft 还是 ZK?

A新项目一律 KRaft(3.3+ GA,3.7 默认)。ZK 模式只用于必须升级的旧项目。

  • KRaft 性能更好(无 ZK 跨集群 RPC)
  • 运维简单(少一个组件)
  • 3 节点 Controller 集群即可生产

A2: RocketMQ 5.x Dledger 真的可以替代手工切换吗?

A可以,但有限制。Dledger 用 Raft 选主:

  • ✅ 自动选主(30 秒内)
  • ✅ 数据强一致(半数确认)
  • ❌ 不支持平滑切换(有秒级不可用)
  • ❌ 不支持跨集群容灾

如果业务需要"跨机房容灾",需要双活集群 + 消息复制(用 RocketMQ 5.x 的 Controller 集群多活能力)。

A3: RabbitMQ 经典镜像队列还能用吗?

A新项目不要用。经典镜像队列(Classic Mirrored Queue)有脑裂风险。

  • 新项目:Quorum Queue(基于 Raft,无脑裂)
  • 旧项目:经典队列可以继续用,但至少升级到 3.10+(社区支持 + 关键 bug 修复)

A4: 3 个 MQ 能不能共存?

A完全可以。但要遵守 3 个原则:

  1. 统一抽象层(如 MessageBus 接口 + Spring Cloud Stream)
  2. 不跨 MQ 做事务(不要试图"通过 Kafka + RocketMQ 做分布式事务")
  3. 监控独立(3 套 MQ 各自一套监控,不要混在一起)

A5: 怎么选 Kafka 3.x 客户端版本?

A用 Spring Boot 兼容的稳定版本

Spring BootSpring KafkaKafka Client
3.2.x3.1.x3.6.x
3.1.x3.0.x3.5.x
3.0.x3.0.x3.4.x
2.7.x2.9.x3.0.x

不要直接用最新 Kafka Client——可能跟 Spring Boot 不兼容。

A6: 消息积压怎么排查?

排查流程(4 步):

  1. 确认是 Producer 端问题还是 Consumer 端问题——看 Producer TPS vs Consumer TPS
  2. 如果是 Consumer 慢——查 Consumer 日志,看是否有 GC 停顿 / DB 慢查询 / 下游 RPC 超时
  3. 如果是 Consumer 数量不足——临时扩容 Consumer 实例数(注意 Partition 数 ≥ Consumer 数
  4. 如果是 Consumer 代码 bug——修复后重启 + 补消费历史数据

Kafka 排查命令

1
2
3
4
5
6
7
# 看消费位点
kafka-consumer-groups --bootstrap-server kafka:9092 --describe --group order-consumer
# 输出:LAG 列显示积压量

# 手动重置消费位点(积压严重时)
kafka-consumer-groups --bootstrap-server kafka:9092 \
  --group order-consumer --reset-offsets --to-earliest --topic order-topic --execute

A7: 怎么保证消费幂等?

3 种方案(按推荐度排序):

  1. 唯一索引(数据库层):用消息 ID 做唯一索引,重复消费时 INSERT 失败
  2. Redis SETNX(业务层):用消息 ID 在 Redis 上锁,处理完释放
  3. 状态机(业务层):业务状态机保证同一消息只处理一次

生产建议业务层做幂等,不依赖 MQ 自身(MQ 的 exactly-once 是有限制的)。

A8: EMQX 和 Kafka 怎么对接?

A:用 EMQX 的 MQTT 桥接(Bridge) 功能把消息转发到 Kafka。

1
2
3
4
5
6
7
# emqx.conf
bridges.mqtt.kafka {
  server = "kafka-cluster:9092"
  topics = ["device/telemetry"]
  kafka_topic = "device-events"
  # ... 其他参数
}

这样 EMQX 负责设备接入 + QoSKafka 负责大数据流处理,两者各司其职。

A9: 消息队列和数据库事务的区别是什么?

A:两者解决的问题域不同

数据库事务(ACID):

  • 解决单个数据库内的多表数据一致性问题
  • 强一致(隔离级别控制)
  • 性能高(本地操作)

分布式事务(2PC / 3PC / TCC / SAGA / RocketMQ 事务消息):

  • 解决跨服务的数据一致性问题
  • 最终一致(强一致会引入性能问题)
  • 性能较差(跨网络协调)

实战选择

  • 单服务多表:用数据库事务(@Transactional)
  • 跨服务多库:用最终一致性(MQ 异步消息 + 幂等消费)
  • 强一致要求(金融):用 Seata / RocketMQ 事务消息 + 业务补偿

关键认知没有免费的强一致 + 高性能。MQ 事务消息是"最终一致",不是"强一致"——业务侧要能容忍秒级延迟。

A10: 为什么 Kafka 的 Partition 数是性能关键?

A:Partition 数决定并行度上限

  • Partition 数 < Consumer 数:多余的 Consumer 永远分不到消息(最常见的部署错误
  • Partition 数 > Consumer 数:部分 Consumer 处理多个 Partition,单 Consumer 是瓶颈
  • 理想情况:Partition 数 = Consumer 数,每个 Consumer 处理 1 个 Partition

Partition 数调整原则

  • 单 Topic < 1000 Partition:单 Broker 集群够用
  • 单 Topic 1000-10000 Partition:需要 5+ Broker 集群
  • 单 Topic > 10000 Partition:需要 10+ Broker + KRaft 模式

性能对比

  • 100 Partition + 100 Consumer:单 Consumer 处理 1000 TPS,总 100,000 TPS
  • 1000 Partition + 1000 Consumer:单 Consumer 处理 1000 TPS,总 1,000,000 TPS

结论Partition 数 = Consumer 数 是经验最优。

A11: 消息重试机制在 3 个 MQ 中怎么实现的?

A:3 个 MQ 都有重试,但机制不同

Kafka

  • Producer 端:retries=3 自动重试
  • Consumer 端:@RetryableTopic 注解(Spring Kafka 2.7+)
  • 重试 Topic:失败消息进 topic-retry-0/1/2,3 次后进 topic-dlt

RocketMQ

  • Producer 端:retryTimesWhenSendFailed=3 自动重试
  • Consumer 端:默认重试 16 次(间隔 10s ~ 10min)
  • 重试 Topic:失败消息进 %RETRY%consumer-group

RabbitMQ

  • Producer 端:publisher-confirm-type: correlated + ConfirmCallback
  • Consumer 端:ack/nack + 死信队列(DLX)
  • 重试 Topic:失败消息进死信队列 + TTL 触发回原队列

对比

维度KafkaRocketMQRabbitMQ
重试次数3(可配)16(可配)无限(nack requeue=true)
重试间隔立即10s~10min立即(容易死循环)
死信机制DLT Topic%DLQ% TopicDLX + TTL
实战推荐重试 Topic + DLT默认即可必须配死信 + TTL

A12: 怎么监控消息中间件的健康度?

A3 层监控(业务 / 中间件 / 基础设施):

业务层

  • 订单创建成功率
  • 支付通知延迟(P99 < 1s)
  • 库存同步延迟(P99 < 5s)

中间件层

  • 消息 TPS(Producer / Consumer 速率)
  • 消息积压(LAG)
  • 消费延迟(消息入队到消费的时间)
  • 副本同步状态(ISR 数)
  • 节点存活(Controller / Broker 状态)

基础设施层

  • 磁盘水位(> 80% 告警)
  • 网络流量(入 / 出带宽)
  • CPU / 内存使用率
  • 文件描述符数

工具链

  • Kafka:JMX Exporter + Prometheus + Grafana
  • RocketMQ:RocketMQ Exporter + Prometheus + Grafana
  • RabbitMQ:rabbitmq_prometheus 插件 + Prometheus + Grafana

告警策略

  • 紧急(P0):节点宕机、消费完全停止
  • 严重(P1):LAG > 10 万、消费延迟 > 5s
  • 警告(P2):磁盘 > 80%、重试率 > 5%

A13: 怎么评估"我应该升级到哪个版本"?

A升级决策树

  1. 看版本支持周期

    • Kafka:每个 minor 版本支持 12 个月
    • RocketMQ:每个 minor 版本支持 18 个月
    • RabbitMQ:每个 minor 版本支持 18 个月
  2. 看安全漏洞

    • CVE 评级 > 7.0:必须升级
    • CVE 评级 < 7.0:可延后
  3. 看新功能需求

    • 业务需要新功能 → 升级
    • 业务无新需求 → 延后
  4. 看运维成本

    • 当前版本已经 6+ 个月没新 bug fix → 升级
    • 当前版本有持续补丁 → 保持
  5. 看大版本升级风险

    • Kafka 2.x → 3.x:协议有破坏性变更,必须灰度
    • RocketMQ 4.x → 5.x:架构有变化(Controller 集群),必须灰度
    • RabbitMQ 3.10 → 3.13:兼容性好,可一次性升级

A14: 消息中间件和 gRPC 是什么关系?

A两者解决不同问题,但有重叠。

gRPC

  • 是一种 RPC 框架(不是 MQ)
  • 基于 HTTP/2 + Protobuf
  • 适合同步请求-响应模式
  • 不持久化消息(默认)

MQ

  • 消息中间件(中间层)
  • 基于 TCP/AMQP/MQTT 等协议
  • 适合异步发布-订阅模式
  • 持久化消息

实战选择

  • 同步调用(< 100ms 响应):gRPC
  • 异步消息(无强实时要求):MQ
  • 流式数据(持续数据流):gRPC Streaming / Kafka Streams

gRPC 替代 MQ 的场景

  • 极低延迟(< 10ms):gRPC 比 MQ 快
  • 小数据量(KB 级):gRPC 性能优势
  • 强类型契约:Protobuf IDL 强类型

MQ 替代 gRPC 的场景

  • 解耦服务(Producer 不关心 Consumer)
  • 削峰填谷(MQ 做缓冲)
  • 数据持久化(消息可重放)

结论gRPC 和 MQ 是互补的,不是替代关系。现代微服务通常两者都用(内部 gRPC + 异步 MQ)。

A15: 中小公司应该用云厂商 MQ 还是自建?

A直接看预算和团队规模

云厂商 MQ 适用场景

  • 团队 < 10 人(无 SRE)
  • 业务量 < 100 万 TPS
  • 预算允许商业版
  • 不想运维底层

云厂商 MQ 商业版

  • 阿里云 RocketMQ / 阿里云 Kafka / 阿里云 RabbitMQ
  • AWS MSK(Managed Streaming for Apache Kafka)
  • Confluent Cloud(Kafka 商业版鼻祖)
  • 腾讯云 CKafka / 腾讯云 RocketMQ

自建开源版适用场景

  • 团队 > 10 人(有 SRE)
  • 业务量 > 100 万 TPS
  • 成本敏感(长期)
  • 特殊定制需求

实战建议

团队规模业务量推荐
1-3 人< 1 万 TPS云厂商共享版(最简单)
3-10 人1-10 万 TPS云厂商专享版(平衡)
10-50 人10-100 万 TPS云厂商独占实例 / 自建
50+ 人> 100 万 TPS自建 + 商业版混合

关键点不要过早自建。云厂商 MQ 已经覆盖 80% 场景,自建成本是云厂商的 3-5 倍。

附录 B:关键术语对照表

为方便读者快速理解术语,本附录整理本文涉及的关键术语:

术语英文含义所属
BrokerBroker消息代理服务器通用
TopicTopic消息主题(逻辑分类)通用
PartitionPartition分区(Topic 的物理分片)Kafka
MessageQueueMessageQueue消息队列(Topic 的物理分片)RocketMQ
ISRIn-Sync Replicas与 Leader 同步的副本Kafka
KRaftKRaftKafka Raft 共识模式(取代 ZK)Kafka
ControllerController集群元数据管理节点Kafka / RocketMQ
ProducerProducer消息生产者通用
ConsumerConsumer消息消费者通用
Consumer GroupConsumer Group消费者组(多个 Consumer 协作)Kafka / RocketMQ
ExchangeExchange交换机(消息路由)RabbitMQ
QueueQueue队列(消息存储)通用
BindingBindingExchange 到 Queue 的绑定规则RabbitMQ
Routing KeyRouting Key路由键RabbitMQ
ChannelChannel通道(TCP 多路复用)RabbitMQ
VhostVirtual Host虚拟主机(多租户隔离)RabbitMQ
Quorum QueueQuorum Queue基于 Raft 的队列RabbitMQ
Stream QueueStream Queue日志流队列RabbitMQ
DledgerDledgerRocketMQ 5.x Raft 模式RocketMQ
NameServerNameServer路由注册中心RocketMQ
CommitLogCommitLog顺序写日志(消息存储)RocketMQ
ConsumeQueueConsumeQueue消费队列索引RocketMQ
IndexFileIndexFile按 key / time 索引RocketMQ
Half MessageHalf Message半消息(事务消息未确认)RocketMQ
回查CheckRocketMQ 询问未确认事务状态RocketMQ
事务消息Transaction Message本地事务 + 消息发送原子性RocketMQ
顺序消息Orderly Message严格 FIFO 消息RocketMQ
延迟消息Delayed Message延迟投递消息RocketMQ / RabbitMQ
死信队列DLQ / DLX失败消息隔离RabbitMQ / RocketMQ
Publisher ConfirmPublisher Confirm生产者确认机制RabbitMQ
Consumer AckConsumer Ack消费者确认机制RabbitMQ / Kafka
PageCachePageCache操作系统文件缓存Kafka
零拷贝Zero-Copysendfile 系统调用Kafka
顺序写Sequential Write磁盘顺序追加写Kafka / RocketMQ
ACIDACID原子性 / 一致性 / 隔离性 / 持久性事务
最终一致Eventual Consistency异步达成一致分布式系统
TPSTransactions Per Second每秒事务数性能指标
QPSQueries Per Second每秒查询数性能指标
P99P99 Latency99% 请求延迟性能指标
MQTTMQTT消息队列遥测传输协议IoT
QoSQuality of Service服务质量等级MQTT
设备影子Device Shadow设备状态缓存IoT / EMQX
持久会话Persistent SessionMQTT 离线消息保留MQTT
桥接Bridge不同协议 / 系统间消息转发EMQX
规则引擎Rule Engine消息路由规则处理EMQX

附录 C:系列预告 + 推荐阅读

系列预告

这是 Java Web 微服务系列 的第 9 篇。系列当前规划:

序号主题状态
第 1 篇异地多活(高可用终极形态)✅ 已发布
第 2 篇从 Nginx 到 LVS 流量调度✅ 已发布
第 3 篇k8s 微服务编排✅ 已发布
第 4 篇技术选型:Spring Cloud Alibaba + Dubbo 3✅ 已发布
第 5 篇Nacos 实战✅ 已发布
第 6 篇Spring Cloud Gateway 网关✅ 已发布
第 7 篇熔断限流实战✅ 已发布
第 8 篇数据库演化(分库分表 + ShardingSphere)✅ 已发布
第 9 篇消息队列:Kafka / RocketMQ / RabbitMQ 三件套本文
第 10 篇分布式链路追踪(SkyWalking / Jaeger)🔜 计划中
第 11 篇全链路压测 + 混沌工程🔜 计划中

推荐阅读

官方文档

中文社区

真实生产案例

书籍

  • 《数据密集型应用系统设计》(DDIA, Martin Kleppmann)—— 第 7 章消息系统
  • 《深入理解 Kafka:核心设计与实践原理》(朱忠华)—— Kafka 原理 + 实战
  • 《RocketMQ 实战与原理解析》(杨开元)—— RocketMQ 国内最系统的书

工具

参考文章


本文完。下篇预告:第 10 篇 · 分布式链路追踪(SkyWalking / Jaeger)。

本系列共 16 篇,本文为第 13 篇 · 查看全部
使用 Hugo 构建
主题 StackJimmy 设计