Featured image of post MySQL 实时同步实战:Canal + Debezium + Kafka Connect 全家桶

MySQL 实时同步实战:Canal + Debezium + Kafka Connect 全家桶

本文对比 MySQL 实时同步的两大流派:基于 Binlog 解析的 Canal / Maxwell 与基于 CDC 的 Debezium,并给出 Docker 部署、Source/Sink Connector 注册、JDBC Sink 配置、坑点排查(RecordTooLargeException、Snapshot mode)的完整实战。

为什么写这篇:MySQL 实时同步有两条主流路径——应用层(Canal / Maxwell 解 binlog → 自定义消费者)和CDC 标准(Debezium → Kafka Connect → JDBC Sink)。前者灵活但要写代码,后者标准但坑点多。本文把两条路都走一遍,重点写真实环境的配置和报错

适用读者:需要做 MySQL → MySQL/PG/ES 实时同步的中间件开发、DBA。

前置知识:了解 MySQL binlog、知道 Kafka 是什么、能用 docker 跑容器。

目录

  1. 同步方案对比:Canal vs Debezium vs Flink CDC
  2. Canal:基于 binlog 解析的轻量同步
  3. Debezium:CDC 标准的工业级实现
  4. JDBC Sink Connector:写入目标库
  5. 常见坑与排错
  6. 生产建议

维度CanalDebeziumFlink CDC
厂商AlibabaRed Hat / DecodableApache
协议自定义 TCP / Kafka / RocketMQKafka Connect(标准)Flink SQL(标准)
部署canal-server + canal-adapterConnect + Source/Sink ConnectorFlink Job
转换能力adapter 内置 ES/HBase/MySQL需配 Sink,转换靠 SMTSQL 表达
优点文档全、中文社区、Java 友好标准化、生态广流式计算能力强
缺点强耦合阿里系K8s 集成较新重型,要 Flink 集群
适用中小规模同步、阿里系架构大规模、跨团队标准化实时数仓 + 流计算

选型建议

  • 数据量 < 10 万 QPS,且目标库固定:选 Canal
  • 跨团队、多源异构(MySQL + PG + Mongo + Oracle):选 Debezium
  • 要做聚合、JOIN、流式 ETL:选 Flink CDC

2. Canal:基于 binlog 解析的轻量同步

2.1 架构

1
2
3
4
5
6
7
MySQL master
    │ binlog ROW
canal-server  ← 模拟 MySQL slave 协议
    │ CanalTSDB / MQ
canal-adapter  ← 消费 + 转换 + 写目标库(ES/HBase/MySQL)

2.2 Docker 部署

1
2
3
docker pull canal/canal-server:latest
docker pull slpcat/canal-adapter:v1.1.5-jdk8
docker pull canal/canal-admin:latest

2.3 MySQL 前置配置

1
2
3
4
5
# /etc/my.cnf
[mysqld]
log-bin=mysql-bin
binlog-format=ROW    # 必须 ROW
server_id=6          # 主从唯一
1
2
3
4
5
6
-- 给 canal 专用账号
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, INSERT, UPDATE, DELETE, ALTER, DROP,
      REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

2.4 canal-admin(Web 管理)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 1. 启动临时 admin 把配置导出来
docker run --name canal-admin -d canal/canal-admin:latest
docker exec -it canal-admin bash
# 配置目录:/home/admin/canal-admin/conf/

# 2. 拷贝配置到宿主机
docker cp canal-admin:/home/admin/canal-admin/conf /home/docker/canal/admin

# 3. 修改 application.yml 配置 MySQL
vim /home/docker/canal/admin/conf/application.yml
# datasource:
#   url: jdbc:mysql://internal.example.com:3306/canal_manager
#   username: canal
#   password: {{REDACTED}}

# 4. 初始化 SQL(canal-1.1.6+)
# https://github.com/alibaba/canal/blob/canal-1.1.6/admin/admin-web/src/main/resources/canal_manager.sql

# 5. 删除临时容器,重启正式容器
docker rm -f canal-admin
docker run -d --name canal-admin --restart=always \
  -p 8089:8089 \
  -v /home/docker/canal/admin/conf:/home/admin/canal-admin/conf \
  -v /home/docker/canal/admin/logs:/home/admin/canal-admin/logs \
  canal/canal-admin:latest

Web 地址:http://internal.example.com:8089,默认账号 admin/123456

2.5 canal-server 启动

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 1. 拷贝配置
docker run --name canal-server -d canal/canal-server:v1.1.5
docker exec -it canal-server bash
docker cp canal-server:/home/admin/canal-server/conf /home/docker/canal/server
docker rm -f canal-server

# 2. 修改 instance.properties(指定监听的库)
vim /home/docker/canal/server/conf/example/instance.properties
# canal.instance.master.address=internal.example.com:3306
# canal.instance.dbUsername=canal
# canal.instance.dbPassword=canal

# 监听多个库:canal.properties
vim /home/docker/canal/server/conf/canal.properties
# canal.destinations = example,example2,example3

# 3. 启动
docker run -d --name canal-server --restart=always \
  -p 11110:11110 -p 11111:11111 -p 11112:11112 -p 9100:9100 \
  -v /home/docker/canal/server/conf:/home/admin/canal-server/conf \
  -v /home/docker/canal/server/logs:/home/admin/canal-server/logs \
  canal/canal-server:latest

2.6 canal-adapter 启动

1
2
3
4
5
6
7
8
9
# 拷贝 adapter 配置
docker run --name canal-adapter -d slpcat/canal-adapter:v1.1.5
docker cp canal-adapter:/opt/canal-adapter/conf/application.yml /home/docker/canal/adapter
docker cp canal-adapter:/opt/canal-adapter/conf/rdb/mytest_user.yml /home/docker/canal/adapter/rdb

# 启动
docker run -d --name canal-adapter -p 8081:8081 \
  -v /home/docker/canal/adapter:/opt/canal-adapter/conf \
  slpcat/canal-adapter:v1.1.5

坑点:canal-adapter 官方 JDK 镜像兼容性差,生产建议用自定义镜像(lwd/jdk:8u371)。


3. Debezium:CDC 标准的工业级实现

3.1 三种部署方式

方式适用
Kafka Connect(推荐)生产环境,标准化、可观测性最好
Debezium Server不想维护 Kafka 时,单独的应用
Embedded Engine应用代码里嵌入 Debezium(自定义流)

3.2 Kafka Connect 部署

1
2
3
4
5
mkdir -p /home/docker/debezium/2.5.4/
cd /home/docker/debezium/2.5.4/

# docker-compose.yaml 启动 Zookeeper + Kafka + Connect + UI
docker-compose up -d

K8s 上可以参考 Strimzi 项目(包含 Kafka/Kafka Connect/Topic/User 三个 Operator)。

3.3 关键配置:调大消息体

默认 Kafka 消息体 1MB,大表 UPDATE/INSERT 必报:

1
2
The message is 2554545 bytes when serialized which is larger than 1048576,
which is the value of the max.request.size configuration.

修法(kafka 端)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# docker-compose.yaml
kafka:
  environment:
    - KAFKA_MESSAGE_MAX_BYTES=1048576000
    - KAFKA_MAX_REQUEST_SIZE=1048576000
    - KAFKA_PRODUCER_MAX_REQUEST_SIZE=1048576000

connect:
  environment:
    - CONNECT_MESSAGE_MAX_BYTES=1048576000
    - CONNECT_MAX_REQUEST_SIZE=1048576000
    - CONNECT_PRODUCER_MAX_REQUEST_SIZE=1048576000

坑点:直接在容器内改 server.properties 不会生效,必须通过环境变量注入——这是无数人踩过的坑。

3.4 注册 Source Connector(监听 MySQL)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 1. 注册 MySQL 源
curl -i -X POST -H "Content-Type: application/json" \
  http://localhost:28083/connectors/ -d '
{
  "name": "source-hnDev",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "topic.prefix": "hnDev",
    "database.hostname": "internal.example.com",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "{{REDACTED}}",
    "database.server.id": "1",
    "database.include.list": "industry_iot",
    "snapshot.mode": "SCHEMA_ONLY",
    "snapshot.locking.mode": "none",
    "tombstones.on.delete": "false",
    "schema.history.internal.kafka.topic": "history.hnDev.industry_iot",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "time.precision.mode": "connect",
    "database.serverTimezone": "Asia/Shanghai"
  }
}'

# 2. 验证
curl http://10.0.0.127:28083/connectors/source-hnDev/status

snapshot.mode 4 种模式

行为适用
initial(默认)先全量快照,之后持续订阅 binlog首次接入
initial_only只做全量快照一次性数据迁移
schema_only只同步 schema(不导数据),但持续订阅 DML已有数据,只关心后续变更
schema_only_recovery仅恢复 schema history topic(不导数据)灾难恢复

关键区别initial 会把表所有数据先读一遍写入 topic;schema_only 不读数据只读 schema history。DML 是 binlog 持续订阅,与 snapshot 无关。

3.5 常用管理 API

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 列出所有连接器
curl -s http://internal.example.com:8083/connectors

# 查看某个连接器状态
curl -s http://internal.example.com:8083/connectors/inventory-connector/status

# 查看连接器配置
curl -s http://internal.example.com:8083/connectors/inventory-connector/config

# 暂停
curl -s -X PUT http://internal.example.com:8083/connectors/inventory-connector/pause

# 恢复
curl -s -X PUT http://internal.example.com:8083/connectors/inventory-connector/resume

# 重启
curl -s -X PUT http://internal.example.com:8083/connectors/inventory-connector/resume

# 删除
curl -s -X DELETE http://internal.example.com:8083/connectors/inventory-connector

4. JDBC Sink Connector:写入目标库

4.1 准备 Sink Connector JAR

Debezium Connect 自带 Source Connector(MySQL/PG/Mongo/Oracle/SQL Server/Db2),但 Sink Connector 需要自己注册

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# 1. 进 connect 容器建目录
docker exec -it connect bash
mkdir -p /kafka/connect/kafka-connect-jdbc
exit

# 2. 下载 kafka-connect-jdbc JAR
wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.7.4/kafka-connect-jdbc-10.7.4.jar

# 3. 拷贝进 connect 容器
docker cp kafka-connect-jdbc-10.7.4.jar connect:/kafka/connect/kafka-connect-jdbc

# 4. 必须同时拷 MySQL JDBC 驱动(否则 sink 报 No suitable driver found)
docker cp mysql-connector-java-8.0.23.jar connect:/kafka/connect/kafka-connect-jdbc

# 5. 重启 connect 让新插件加载
docker restart connect

4.2 注册 Sink(同步多张表)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
curl -i -X POST -H "Content-Type: application/json" \
  http://localhost:28083/connectors/ -d '
{
  "name": "sink-prod-hnDev",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "hnDev.industry_iot.risk_task,hnDev.industry_iot.risk_object,hnDev.industry_iot.risk_unit",
    "connection.url": "jdbc:mysql://internal.example.com:3306/industry_iot_prod?characterEncoding=utf-8&autoReconnect=true",
    "connection.user": "root",
    "connection.password": "{{REDACTED}}",
    "key.converter.encoding": "UTF-8",
    "value.converter.encoding": "UTF-8",
    "pk.mode": "record_key",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "batch.size": "3000",
    "auto.create": "true",
    "auto.evolve": "true",
    "transforms": "dropPrefix,ExtractField,tsFormat1",
    "transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.ExtractField.field": "after",
    "transforms.tsFormat1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.tsFormat1.target.type": "string",
    "transforms.tsFormat1.field": "create_time,modify_time",
    "transforms.tsFormat1.format": "yyyy-MM-dd HH:mm:ss",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex": "hnDev.industry_iot.(.*)",
    "transforms.dropPrefix.replacement": "$1",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "hn-dev-errors",
    "errors.deadletterqueue.context.headers.enable": "true",
    "errors.log.enable": "true"
  }
}'

4.3 关键参数解读

参数含义
pk.mode=record_key用 topic key 作为目标表主键(必须有主键)
insert.mode=upsert存在则更新、不存在则插入
auto.create=true目标表不存在时自动建(慎用,生产应提前建好)
auto.evolve=true自动加字段(DDL 同步)
delete.enabled=true同步 DELETE 事件
transforms.dropPrefix用正则去掉 topic 前缀,让表名干净
errors.tolerance=all出错进死信队列,不中断同步流

4.4 死信队列(DLQ)

启用 DLQ 后失败的记录会进指定 topic,方便排查:

1
2
3
4
5
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "my-connector-errors",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.log.enable": "true",
"errors.log.include.messages": "false"

5. 常见坑与排错

5.1 RecordTooLargeException

1
2
The message is 1691268 bytes when serialized which is larger than 1048576,
which is the value of the max.request.size configuration.

修法:见 3.3 调大消息体

5.2 Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector

没注册 Sink Connector JAR,按 4.1 操作

5.3 No suitable driver found for jdbc:mysql://...

没把 MySQL JDBC 驱动 JAR 放到 connect 容器:

1
2
docker cp mysql-connector-java-8.0.23.jar connect:/kafka/connect/kafka-connect-jdbc
docker restart connect

5.4 Source Connector 卡在 RUNNING 但不消费

查看 task 状态:

1
curl http://internal.example.com:8083/connectors/source-hnDev/tasks

state=PENDING 多半是历史 topic 恢复中;RUNNING 但 Kafka offset 没变,可能是 binlog 已丢失——把 connector 删了重建并改 snapshot.mode=initial

5.5 时间类型不匹配

Source 是 DATETIME(0) 但 Sink 写入失败。把 time.precision.mode 改成 connect 或在 SMT 里转字符串:

1
2
3
4
"transforms.tsFormat1.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsFormat1.target.type": "string",
"transforms.tsFormat1.field": "create_time",
"transforms.tsFormat1.format": "yyyy-MM-dd HH:mm:ss"

5.6 Canal “table meta tsdb is out of sync”

canal-server 重启时找不回 binlog 位点。两种解法

  1. /home/admin/canal-server/conf/{instance}/meta.dat 让它重做快照
  2. instance.propertiescanal.instance.tsdb.db.username/password 把 tsdb 存进 MySQL

6. 生产建议

6.1 选型

  • 单向同步(MySQL → MySQL):Canal adapter 最快
  • 多源聚合:Debezium + JDBC Sink
  • 强一致 + 转换:Flink CDC + Flink SQL

6.2 监控

指标工具
Canal lagcanal admin web 页面
Debezium lagJMX debezium.metrics + Prometheus + Grafana
Kafka consumer lagkafka-consumer-groups.sh --describe
Sink 写入速率JDBC Sink task metrics

6.3 反向思考:什么场景不能用?

场景不建议
写入压力已经接近 MySQL 极限同步会把 source 和 sink 同时打挂
频繁 DDL(加列改类型)Debezium schema history 会越来越大
表没有主键同步后 upsert 行为不可预期
大事务(> 1MB)需要先调大 max.request.size

6.4 自建镜像

canal-adapter 官方镜像老旧、glibc 兼容性差。生产推荐自建镜像

1
2
3
4
5
6
7
# Dockerfile
FROM lwd/jdk:8u371
COPY canal.adapter-1.1.6.tar.gz /tmp/
RUN tar -xzf /tmp/canal.adapter-1.1.6.tar.gz -C /opt/ && \
    mv /opt/canal.adapter-1.1.6 /opt/canal-adapter
COPY startup.sh /opt/canal-adapter/bin/
ENTRYPOINT ["/opt/canal-adapter/bin/startup.sh"]
1
docker build -t local/canal-adapter:v1.1.6 .

下一步

  • MySQL 备份 + binlog 反解:见《MySQL 备份恢复实战》
  • 多库适配(MySQL ↔ PG ↔ 国产):见《多数据库适配架构》
使用 Hugo 构建
主题 StackJimmy 设计