Featured image of post 消息队列镜像实战:RabbitMQ / EMQX / ActiveMQ 部署、集群、鉴权与坑点

消息队列镜像实战:RabbitMQ / EMQX / ActiveMQ 部署、集群、鉴权与坑点

把 RabbitMQ 3.x、EMQX 5.x、ActiveMQ 5.15 三款消息队列的 docker 部署、集群、客户端鉴权、生产坑点整理成实战清单——每款都给完整 docker-compose 模板 + 排错指南。

消息队列(Message Queue)是分布式系统的"解耦器"——把同步调用改成异步消息,把"必须双方都在"改成"消息进来,订阅者啥时候消费都行"。RabbitMQ / ActiveMQ 是 AMQP 协议的"老牌双雄";EMQX 严格说是 MQTT Broker(IoT 场景为主),但在国内实际项目里经常被混用——所以本文把它一起整理。

这篇文章把三款组件的 docker 部署、集群、鉴权、坑点整理成实战清单。和 0.4 批次的"CI 工具对比"不同,本文只聚焦"镜像实战"——拉哪个镜像、怎么配集群、客户端怎么连、鉴权怎么做

阅读对象:需要从零搭一套消息队列、或正在做 IoT(MQTT)基础设施的开发者、运维
覆盖范围:三款组件的核心特性对比、RabbitMQ 3.x 部署 + 镜像集群、EMQX 5.x 部署 + 集群 + MySQL 鉴权、ActiveMQ 5.15 部署 + 控制台改密、常见排错

一、三款消息队列对比

维度RabbitMQEMQXActiveMQ
协议AMQP 0-9-1、STOMP、MQTTMQTT 5.0/3.1.1、WebSocket、CoAPOpenWire、STOMP、AMQP、MQTT
语言ErlangErlangJava
镜像大小200 MB80 MB350 MB
内存占用500 MB~200 MB~1 GB~
启动时间30 秒5 秒60 秒
集群模式内置(多节点镜像)内置(静态种子 / DNS / etcd)Master-Slave(已废弃)/ 第三方 ZK 选主
一致性弱(最终一致)强(Raft,企业版)
客户端语言多语言多语言(IoT 设备为主)多语言
核心场景企业应用异步IoT 设备接入传统企业应用(JMS)

When to use

  • 企业应用异步解耦 → RabbitMQ(社区活跃、Erlang 高并发、AMQP 生态成熟)
  • IoT 设备 / 移动端长连接 → EMQX(百万级连接、MQTT 原生)
  • 遗留 JMS 应用 → ActiveMQ(仅在 Java 老项目里使用)

二、RabbitMQ 部署

2.1 镜像

1
docker pull rabbitmq:3.12-management

3.12-management 自带 rabbitmq_management 插件(WebUI + API),生产推荐用这个

2.2 最小启动

1
2
3
4
5
6
docker run -d \
  --name rabbitmq \
  --restart=always \
  -p 5672:5672    \   # AMQP
  -p 15672:15672  \   # WebUI
  rabbitmq:3.12-management

访问 http://<HOST>:15672/,默认账号 guest/guest仅限 localhost 访问)。

坑 1guest 用户默认只能从 localhost 登录——如果从远程访问 WebUI 报 “User can only log in via localhost”,需要在容器内改:

1
2
3
4
5
6
docker exec -it rabbitmq bash
rabbitmqctl set_permissions -p / guest ".*" ".*" ".*"
# 创建远程用户
rabbitmqctl add_user admin <REDACTED>
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

2.3 持久化 + 主机名

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
docker run -d \
  --name rabbitmq \
  --restart=always \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=<REDACTED> \
  -v /data/rabbitmq:/var/lib/rabbitmq \
  --hostname rabbitmq-1 \
  rabbitmq:3.12-management

坑 2--hostname 必须设——RabbitMQ 集群节点靠 hostname 通信,默认容器 hostname 是随机字符串,每次重启会变。--hostname 固定 hostname

2.4 启用插件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
docker exec -it rabbitmq bash

# 启用管理插件(已默认启用)
rabbitmq-plugins enable rabbitmq_management

# 启用延迟消息插件(社区)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 启用日志追踪(用于排查生产问题)
rabbitmq-plugins enable rabbitmq_tracing

坑 3rabbitmq_tracing 插件会显著影响性能(写文件 IO)——只用于排错时临时开启,用完及时关。

2.5 集群部署

RabbitMQ 集群有 3 种模式:

模式特点适用
普通集群队列只在创建它的节点上,其他节点转发请求几乎不用
镜像队列(HA)队列在多个节点复制,主从同步生产推荐
联邦(Federation)跨地域同步多数据中心

2.5.1 启动 3 节点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# 节点 1
docker run -d \
  --name rabbitmq-1 \
  --restart=always \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=<REDACTED> \
  -v /data/rabbitmq-1:/var/lib/rabbitmq \
  --hostname rabbitmq-1 \
  --link rabbitmq-2 \
  --link rabbitmq-3 \
  rabbitmq:3.12-management

# 节点 2 / 节点 3: 同上, hostname 改 rabbitmq-2 / rabbitmq-3

2.5.2 加入集群

1
2
3
4
5
6
7
# 节点 2
docker exec -it rabbitmq-2 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbitmq-1
rabbitmqctl start_app

# 节点 3: 同样

2.5.3 配置镜像队列策略

1
2
# 在任一节点执行
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

坑 4ha-mode=all 表示所有队列在所有节点都有副本——最安全但磁盘 IO 最大。生产建议ha-mode: exactly, ha-params: 2(每个队列在 2 个节点上同步)。

2.6 客户端使用

Java(Spring Boot)

1
2
3
4
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
spring:
  rabbitmq:
    host: <INTERNAL_HOST>
    port: 5672
    username: admin
    password: <REDACTED>
    virtual-host: /
    listener:
      simple:
        prefetch: 1           # 一次只处理一条(公平分发)
        acknowledge-mode: manual
        retry:
          enabled: true
          max-attempts: 3

Go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import "github.com/rabbitmq/amqp091-go"

conn, _ := amqp.Dial("amqp://admin:<REDACTED>@<INTERNAL_HOST>:5672/")
defer conn.Close()

ch, _ := conn.Channel()
ch.QueueDeclare("my-queue", true, false, false, false, nil)

ch.Publish("", "my-queue", false, false, amqp.Publishing{
  ContentType: "application/json",
  Body:        []byte(`{"hello":"world"}`),
})

三、EMQX 部署

EMQX 5.x 是国内最主流的 MQTT Broker——百万级连接、规则引擎、Kafka/数据库桥接都内置。IoT 项目首选

3.1 镜像

1
2
docker pull emqx/emqx:5.0.24
# 升级到 5.5.x 或 5.7.x 也是同样方法

3.2 单机部署

1
2
3
4
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.24

# 设置自动启动
docker update --restart=always emqx

端口说明

端口用途
1883TCP(MQTT 主流)
8883TCP + SSL
8083WebSocket
8084WebSocket + SSL
18083后台管理 Web

访问 http://<HOST>:18083/,默认账号 admin/public首次登录强制改密)。

3.3 关闭 api-docs(生产建议)

EMQX 默认开了 api-docs/index.html 暴露所有 API 文档——生产环境应该关

1
2
3
# 复制配置
docker cp emqx:/opt/emqx/etc/emqx.conf .
vim emqx.conf
1
2
3
4
5
6
dashboard {
  listeners.http {
    bind = 18083
  }
  swagger_support = false   # 关闭 api-docs
}
1
2
docker cp emqx.conf emqx:/opt/emqx/etc/emqx.conf
docker restart emqx

3.4 鉴权:MySQL 后端

真实场景:EMQX 用户成千上万,写在配置里不现实。MySQL 鉴权让所有设备凭据从数据库读取。

3.4.1 创建表

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE TABLE `mqtt_user` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `username` varchar(100) DEFAULT NULL,
  `password_hash` varchar(100) DEFAULT NULL,
  `salt` varchar(35) DEFAULT NULL,
  `is_superuser` tinyint(1) DEFAULT 0,
  `created` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_username` (`username`)
);

3.4.2 EMQX 配置 MySQL 鉴权链

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# emqx.conf
authentication = [
  {
    mechanism = password_based
    backend = built_in_database
    user_id_type = username
  },
  {
    mechanism = password_based
    backend = mysql
    user_id_type = username
    password_hash_algorithm {
      name = sha256
      salt_position = prefix
    }
    server = "<INTERNAL_HOST>:3306"
    database = "emqx"
    username = "emqx"
    password = "<REDACTED>"
    query = "SELECT password_hash, salt FROM mqtt_user WHERE username = ${username} LIMIT 1"
  }
]

3.4.3 插入用户

1
2
INSERT INTO mqtt_user(username, password_hash, salt, is_superuser)
VALUES ('device-001', SHA2(CONCAT('slat_foo123', 'device-secret-001'), 256), 'slat_foo123', 1);

坑 5:EMQX 鉴权链的顺序很重要——built_in_database 排第一意味着先用内置数据库查,查不到再走 MySQL。如果只想用 MySQL,只保留 MySQL 那个 backend

3.5 集群部署

3.5.1 静态种子集群(推荐)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 节点 1 (10.8.33.6)
cat << "EOF" > /home/docker-compose-emqx.yml
version: '3'
services:
  emqx1:
    image: emqx:5.5.1
    container_name: emqx1
    restart: always
    environment:
    - EMQX_NAME=emqx
    - EMQX_HOST=10.8.33.6
    - EMQX_NODE_NAME=emqx@10.8.33.6
    - EMQX_CLUSTER__DISCOVERY_STRATEGY=static
    - EMQX_CLUSTER__STATIC__SEEDS=[emqx@10.8.33.6,emqx@10.8.33.7,emqx@10.8.33.8]
    network_mode: "host"
    healthcheck:
      test: ["CMD", "/opt/emqx/bin/emqx_ctl", "status"]
      interval: 5s
      timeout: 25s
      retries: 5
EOF

# 节点 2/3: 改 EMQX_HOST / EMQX_NODE_NAME 对应 IP
1
2
3
4
5
# 启动
docker-compose -f /home/docker-compose-emqx.yml up -d

# 查看集群状态
docker exec -it emqx1 emqx ctl cluster status

坑 6:EMQX 5.9.0 之后默认仅支持单节点集群——集群能力需要申请商业 License。生产用 5.5.x 或 5.7.x 即可

3.5.2 Nginx 代理 MQTT

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
stream {
  upstream emqx-tcp {
    server 10.8.33.6:1883;
    server 10.8.33.7:1883;
    server 10.8.33.8:1883;
  }
  server {
    listen 1883;
    proxy_pass emqx_tcp;
    proxy_buffer_size 8k;
    tcp_nodelay on;
  }
}

坑 7Nginx TCP 代理要 stream(不是 http 块)——加错地方 MQTT 设备就连不上。

3.6 客户端测试:MQTTX

MQTTXhttps://mqttx.app/)是 EMQX 团队做的 MQTT 客户端工具,桌面版 + CLI 都好用

1
2
3
4
Broker: mqtt://<YOUR_DOMAIN>:1883
Client ID: mqttx_<random>
Username: device-001
Password: <REDACTED>

订阅主题、发布消息、断线重连——MQTTX 全能搞定。

四、ActiveMQ 部署

ActiveMQ 是 JMS 的"老牌实现"——Java 时代遗留项目还在用。新项目不推荐(RabbitMQ / Kafka 几乎全面取代)。

4.1 镜像

1
2
docker search activemq
docker pull rmohr/activemq

坑 8:ActiveMQ 官方没提供 Docker 镜像——rmohr/activemq 是社区镜像,镜像里 OpenJDK 版本老。生产建议自己 build:

1
2
3
4
5
6
7
FROM openjdk:8-jdk
RUN wget https://archive.apache.org/dist/activemq/5.15.6/apache-activemq-5.15.6-bin.tar.gz && \
    tar -xzf apache-activemq-5.15.6-bin.tar.gz -C /opt && \
    rm apache-activemq-5.15.6-bin.tar.gz
EXPOSE 1883 61616 8161 61614
WORKDIR /opt/apache-activemq-5.15.6
CMD ["bin/activemq", "console"]

4.2 启动

1
2
3
4
5
6
7
8
docker run -d \
  -p 1883:1883   \    # MQTT
  -p 61616:61616 \    # OpenWire
  -p 8161:8161   \    # WebUI
  -p 61614:61614 \    # STOMP
  --restart=always \
  --name myactivemq \
  rmohr/activemq:latest

访问 http://<HOST>:8161/,默认账号 admin/admin

4.3 改密码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 复制默认配置
docker cp myactivemq:/opt/apache-activemq-5.15.6/conf/jetty-realm.properties ~

# 编辑
vim jetty-realm.properties
# 改:
admin: <NEW_PASSWORD_HASH>,admin
user: <NEW_PASSWORD_HASH>,user

# 复制回去
docker cp jetty-realm.properties myactivemq:/opt/apache-activemq-5.15.6/conf/jetty-realm.properties
docker restart myactivemq

坑 9:ActiveMQ 5.15 之前的版本默认密码是 admin:admin——改密码必须重启容器才能生效(ActiveMQ 把配置加载到内存里)。

4.4 客户端使用

Java(JMS)

1
2
3
4
5
6
7
8
9
import javax.jms.*;

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://<INTERNAL_HOST>:61616");
Connection connection = connectionFactory.createConnection("admin", "<REDACTED>");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createQueue("my-queue");
MessageProducer producer = session.createProducer(dest);
producer.send(session.createTextMessage("hello world"));

坑 10:ActiveMQ 默认匿名访问——必须显式传用户名密码;否则在生产环境被攻击者直接用。

五、客户端选型建议

5.1 Java

队列客户端
RabbitMQspring-boot-starter-amqp / rabbitmq-client
EMQXorg.eclipse.paho.client.mqttv3.MqttClient(Paho)
ActiveMQorg.apache.activemq:activemq-client

5.2 Go

队列客户端
RabbitMQgithub.com/rabbitmq/amqp091-go
EMQXgithub.com/eclipse/paho.mqtt.golang
ActiveMQgithub.com/go-stomp/stomp(STOMP)/ github.com/Azure/go-amqp

5.3 Python

队列客户端
RabbitMQpika / aio-pika
EMQXpaho-mqtt / asyncio-mqtt
ActiveMQstomp.py

六、常见排错

6.1 RabbitMQ 启动报 “nodedown”

症状rabbitmqctl join_clusternodedown

排查

  1. 节点间网络可达(ping / telnet 5672 25672
  2. Erlang Cookie 一致(/var/lib/rabbitmq/.erlang.cookie
  3. hostname 解析正常(/etc/hosts 里加 hostname → IP)

6.2 EMQX 集群节点不同步

症状emqx ctl cluster status 显示有节点没加入

排查

  1. EMQX_CLUSTER__STATIC__SEEDS 里的 hostname / IP 是否能互通
  2. 防火墙 5370/5371 端口是否开放(cluster 通信)
  3. 节点 EMQX_NAME 是否一致

6.3 ActiveMQ WebUI 502

症状http://<HOST>:8161/ 报 502

解决

1
2
docker logs myactivemq | tail -50
# 看启动是否完整, 等 "ActiveMQ WebConsole available at" 才算启动完

坑 11:ActiveMQ 启动慢(60 秒+)——K8s 部署时 readinessProbe 要设 initialDelaySeconds: 60

6.4 RabbitMQ 队列消息堆积

症状:队列里几万条消息消费不动

排查

  1. 消费者代码 bug?查 /api/queues 里的 messages_ready / messages_unacknowledged
  2. 消费者数量不够?加 prefetch 调小 + 多消费者
  3. 慢消费者?加监控告警

七、生产环境最佳实践

7.1 RabbitMQ

  • 队列全部设为镜像队列ha-mode: exactly, ha-params: 2
  • 禁用 guest 远程登录
  • WebUI 用 nginx 限制 IP
  • 监控告警:Prometheus + rabbitmq-prometheus 插件

7.2 EMQX

  • 3 节点集群起步(5 节点更稳)
  • 生产用 5.5.x / 5.7.x(5.9+ 集群需商业 License)
  • 关闭 api-docs
  • Dashboard 限 IP
  • SSL 双向认证(设备带证书)

7.3 ActiveMQ

  • 尽量不用于新项目
  • 老项目用 LevelDB 存储(比 KahaDB 快)
  • WebUI 限 IP
  • 监控 JMXcom.sun.management.jmxremote

八、写在最后

消息队列选型的核心问题不是"用哪个",而是"用对地方":

  • 企业内部异步 → RabbitMQ
  • IoT 设备 / 移动端 → EMQX
  • 遗留 JMS 系统 → ActiveMQ(暂时别动)

下一步建议:

  • RabbitMQ + 死信队列(DLX) 处理失败消息
  • EMQX 规则引擎 桥接 Kafka(设备消息转大数据)
  • ActiveMQ → RabbitMQ 迁移(用 rabbitmq-mqtt 插件兼容 MQTT 客户端)

参考资料

使用 Hugo 构建
主题 StackJimmy 设计