Flink CDC实战:MySQL到Doris的整库同步与DDL自动处理
1. Flink CDC整库同步的核心价值当你需要把MySQL的整个数据库迁移到Doris时传统方案往往需要停服维护或者编写大量ETL脚本。而Flink CDC的出现彻底改变了这个局面——它就像个智能的数据搬运工能在业务无感知的情况下把数据实时、准确地搬运到目标库。我去年帮一家电商客户做迁移时他们原有方案需要停服6小时而改用Flink CDC后整个迁移过程业务完全无感知。最神奇的是当他们在MySQL新增了一个用户积分字段这个变更自动同步到了Doris开发团队甚至没意识到迁移正在进行。为什么选择Flink CDC做整库同步全量增量一体化先批量拷贝历史数据再无缝切换至实时变更捕获异构数据库支持MySQL到Doris的类型转换自动处理零代码配置通过SQL或简单配置文件即可完成复杂同步任务断点续传能力网络中断后能从最后位置恢复不丢数据2. 环境准备与基础配置2.1 组件版本选择这是我在多个项目中验证过的稳定组合dependency groupIdcom.ververica/groupId artifactIdflink-connector-mysql-cdc/artifactId version2.3.0/version /dependency dependency groupIdorg.apache.doris/groupId artifactIdflink-doris-connector/artifactId version1.2.0/version /dependency注意Flink版本建议用1.14MySQL需要开启binlog并设置为ROW模式2.2 MySQL关键配置在my.cnf中必须开启以下参数[mysqld] server-id 1 log_bin /var/log/mysql/mysql-bin.log binlog_format ROW binlog_row_image FULL expire_logs_days 10创建专用账号CREATE USER flink_cdc% IDENTIFIED BY YourPassword123!; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO flink_cdc%;3. 整库同步实战3.1 基础同步配置这是最简化的整库同步SQL示例CREATE TABLE mysql_source ( database_name STRING METADATA FROM database_name VIRTUAL, table_name STRING METADATA FROM table_name VIRTUAL, id BIGINT, name STRING, -- 其他字段... PRIMARY KEY (id) NOT ENFORCED ) WITH ( connector mysql-cdc, hostname mysql-host, port 3306, username flink_cdc, password YourPassword123!, database-name your_db, table-name .* -- 正则匹配所有表 ); CREATE TABLE doris_sink ( id BIGINT, name STRING, -- 其他字段... PRIMARY KEY (id) ) WITH ( connector doris, fenodes doris-fe:8030, table.identifier ${database_name}.${table_name}, username doris_user, password doris_pwd, sink.properties.format json, sink.properties.strip_outer_array true ); INSERT INTO doris_sink SELECT * FROM mysql_source;3.2 分库分表处理技巧当源库采用分库分表时可以通过字段路由实现合并。比如用户表分散在user_00到user_99这100个分表中CREATE TABLE mysql_user_source ( user_id BIGINT, name STRING, shard_key AS MOD(user_id, 100), -- 计算分片键 WATERMARK FOR ts AS ts - INTERVAL 5 SECOND ) WITH ( connector mysql-cdc, scan.incremental.snapshot.chunk.size 5000, table-name user_[0-9]{2} -- 正则匹配分表 ); -- Doris端创建合并后的目标表 CREATE TABLE doris_user_sink ( user_id BIGINT, name STRING, PRIMARY KEY (user_id) ) WITH ( table.identifier ods.user -- 其他参数同上 );4. DDL自动同步方案4.1 原理与实现Flink CDC通过监控MySQL的DDL binlog事件配合Doris的Light Schema Change特性实现自动同步。整个过程分为三个阶段DDL捕获解析ALTER TABLE等语句Schema转换MySQL到Doris的类型映射自动执行在Doris执行等效DDL需要开启以下参数scan.incremental.snapshot.enabled true, debezium.schema.history.internal io.debezium.relational.history.MemorySchemaHistory, debezium.schema.history.internal.store.only.monitored.tables.ddl true4.2 常见问题处理字段类型映射问题MySQL的DATETIME → Doris的DATETIMEV2MySQL的TEXT → Doris的STRING枚举类型需要特殊处理添加列的特殊情况-- MySQL端 ALTER TABLE orders ADD COLUMN coupon_amount DECIMAL(10,2); -- Flink CDC配置 schema.evolution enabled, column.type.adapter com.yourapp.CustomTypeAdapter5. 生产环境优化建议5.1 性能调优参数# 并行度设置 parallelism.default 4, scan.incremental.snapshot.chunk.size 5000, # 网络优化 heartbeat.interval 10s, connect.timeout 30s, # 检查点配置 execution.checkpointing.interval 1min, execution.checkpointing.tolerable-failed-checkpoints 35.2 监控与告警建议监控以下指标source.latest.offsetbinlog消费进度sink.num-records-out写入Doris的记录数currentFetchEventTimeLag数据延迟秒数Prometheus配置示例metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: 92506. 典型问题解决方案问题1大表初始化导致源库负载高方案设置scan.snapshot.fetch.size 1000限制批次大小方案在业务低峰期启动任务问题2Doris写入瓶颈方案调整sink.batch.size 5000方案增加Doris BE节点问题3网络闪断导致同步中断方案配置restart-strategy exponential-delay方案设置合理的server-id范围避免冲突7. 完整案例演示某金融公司支付系统的迁移配置-- 监控所有以pay_开头的表 CREATE TABLE mysql_pay_source ( db STRING METADATA FROM database_name VIRTUAL, tbl STRING METADATA FROM table_name VIRTUAL, id STRING, amount DECIMAL(18,2), create_time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( connector mysql-cdc, hostname 10.0.0.1, scan.startup.mode timestamp, scan.startup.timestamp-millis 1654041600000, -- 开始时间戳 database-name payment, table-name pay_.* ); -- Doris动态路由 CREATE TABLE doris_pay_sink ( id STRING, amount DECIMAL(18,2), create_time DATETIMEV2(3), PRIMARY KEY (id) ) WITH ( table.identifier ${db}.${tbl}, sink.label-prefix ${db}_${tbl} );这个配置实现了按业务时间点启动同步自动路由到对应Doris数据库每个表独立label保证幂等性