别再手动导数据了!用Flink CDC 2.3.0 + Flink 1.16.2,5分钟搞定MySQL到Hive的实时同步
5分钟实现MySQL到Hive的实时数据同步Flink CDC 2.3.0实战指南在数据驱动的时代企业对于数据时效性的要求越来越高。传统的数据同步方案如Sqoop、DataX等批处理工具虽然成熟稳定但往往存在T1的延迟问题难以满足实时分析的需求。而基于Flink CDC的流式同步方案能够实现秒级延迟的数据同步让数据仓库始终保持最新状态。本文将手把手带你使用Flink 1.16.2和Flink CDC 2.3.0快速搭建MySQL到Hive的实时数据同步管道。相比传统方案这套技术栈具有以下显著优势真正的实时同步基于数据库日志的CDC技术秒级捕获数据变更全量增量一体化无需分别处理历史数据和增量数据SQL化操作通过Flink SQL即可完成复杂的数据同步逻辑自动schema演化源表结构变更自动同步到目标表低资源消耗相比全表扫描的批处理方式更加高效1. 环境准备与部署1.1 基础软件安装首先确保以下组件已正确安装并配置组件推荐版本备注Java1.8建议使用OpenJDK 8或11MySQL5.7/8.0需开启binlogHadoop3.x核心组件需正常运行Hive3.xMetastore服务需正常Flink1.16.2本文示例版本Flink CDC2.3.0连接器版本提示生产环境建议使用相同版本组合避免兼容性问题。1.2 Flink单机部署下载并解压Flink二进制包wget https://archive.apache.org/dist/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz tar -xzvf flink-1.16.2-bin-scala_2.12.tgz cd flink-1.16.2关键配置修改conf/flink-conf.yamljobmanager.memory.process.size: 4096m taskmanager.memory.process.size: 4096m taskmanager.numberOfTaskSlots: 4 state.backend: filesystem state.checkpoints.dir: hdfs:///flink/checkpoints execution.checkpointing.interval: 30s execution.checkpointing.mode: EXACTLY_ONCE启动Flink集群./bin/start-cluster.sh验证UI访问http://localhost:80811.3 Flink CDC连接器安装下载Flink CDC连接器jar包并放入lib目录wget https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.3.0/flink-connector-mysql-cdc-2.3.0.jar wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-hive-3.1.2_2.12/1.16.2/flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar mv *.jar ./lib/2. MySQL源表配置2.1 开启MySQL binlog确保MySQL已开启binlog并配置为ROW模式-- 检查当前binlog配置 SHOW VARIABLES LIKE log_bin; -- 修改my.cnf配置 [mysqld] server-id1 log-binmysql-bin binlog_formatROW binlog_row_imageFULL expire_logs_days72.2 创建测试表准备源表和数据CREATE DATABASE IF NOT EXISTS source_db; USE source_db; CREATE TABLE user_behavior ( id BIGINT PRIMARY KEY, user_id VARCHAR(32), item_id VARCHAR(32), category_id VARCHAR(32), behavior VARCHAR(16), ts TIMESTAMP(3), proc_time AS PROCTIME(), WATERMARK FOR ts AS ts - INTERVAL 5 SECOND ) ENGINEInnoDB DEFAULT CHARSETutf8mb4; INSERT INTO user_behavior VALUES (1, user1, item1, cat1, click, 2023-01-01 00:00:00), (2, user2, item2, cat2, buy, 2023-01-01 00:01:00);3. Hive目标表准备3.1 创建Hive数据库和表CREATE DATABASE IF NOT EXISTS target_db; USE target_db; CREATE EXTERNAL TABLE user_behavior_sink ( id BIGINT, user_id STRING, item_id STRING, category_id STRING, behavior STRING, ts TIMESTAMP ) PARTITIONED BY (dt STRING, hr STRING) STORED AS PARQUET LOCATION /data/warehouse/target_db.db/user_behavior_sink;3.2 配置Hive Catalog在Flink SQL中注册Hive CatalogCREATE CATALOG hive WITH ( type hive, hive-conf-dir /path/to/hive/conf ); USE CATALOG hive;4. 实时同步作业实现4.1 创建MySQL CDC源表CREATE TABLE mysql_user_behavior ( id BIGINT, user_id STRING, item_id STRING, category_id STRING, behavior STRING, ts TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( connector mysql-cdc, hostname localhost, port 3306, username root, password password, database-name source_db, table-name user_behavior, server-time-zone Asia/Shanghai );4.2 创建Hive目标表定义CREATE TABLE hive_user_behavior_sink ( id BIGINT, user_id STRING, item_id STRING, category_id STRING, behavior STRING, ts TIMESTAMP, dt STRING, hr STRING ) PARTITIONED BY (dt, hr) WITH ( connector hive, database target_db, table user_behavior_sink, hive-version 3.1.2, partition.evictor lru, partition.time-extractor.timestamp-pattern$dt $hr:00:00 );4.3 编写同步作业SQLINSERT INTO hive_user_behavior_sink SELECT id, user_id, item_id, category_id, behavior, ts, DATE_FORMAT(ts, yyyy-MM-dd) AS dt, DATE_FORMAT(ts, HH) AS hr FROM mysql_user_behavior;4.4 提交作业并验证通过Flink SQL Client提交作业./bin/sql-client.sh -- 在SQL Client中执行上述SQL语句在Flink UI中可以看到运行的作业在Hive中查询验证数据SELECT * FROM target_db.user_behavior_sink LIMIT 10;5. 生产环境优化建议5.1 性能调优参数# flink-conf.yaml 补充配置 table.exec.source.idle-timeout: 60s table.exec.mini-batch.enabled: true table.exec.mini-batch.size: 50005.2 分区策略优化对于时间分区表建议添加以下配置-- 在Hive表定义中添加 TBLPROPERTIES ( sink.partition-commit.policy.kindmetastore,success-file, sink.partition-commit.delay1 min, sink.partition-commit.triggerpartition-time )5.3 监控与告警建议监控以下指标数据延迟source端到sink端的处理延迟checkpoint状态成功率与耗时资源使用TM的CPU/内存使用率背压指标识别处理瓶颈-- 在Flink SQL中开启详细指标 SET pipeline.operator-chaining false; SET table.dynamic-table-options.enabled true;在实际项目中我们发现合理设置并行度和checkpoint间隔对稳定性影响很大。对于中等规模的数据同步约1000TPS建议从4个并行度开始逐步调整到最优配置。