PHP数据同步与CDC变更数据捕获CDCChange Data Capture是一种跟踪数据库变更的技术。PHP可以通过多种方式实现CDC和数据同步。今天说说PHP中CDC的实现方案。CDC的核心是捕获数据的插入、更新和删除操作并将变更传播到其他系统。phpclass ChangeTracker{private PDO $pdo;private string $table;public function __construct(PDO $pdo, string $table){$this-pdo $pdo;$this-table $table;$this-initTrackingTable();}private function initTrackingTable(): void{$this-pdo-exec(CREATE TABLE IF NOT EXISTS cdc_log (id BIGINT AUTO_INCREMENT PRIMARY KEY,table_name VARCHAR(100) NOT NULL,operation ENUM(insert, update, delete) NOT NULL,entity_id INT NOT NULL,old_data JSON,new_data JSON,changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,processed BOOLEAN DEFAULT FALSE,INDEX idx_unprocessed (processed, changed_at)));}public function logInsert(int $entityId, array $data): void{$stmt $this-pdo-prepare(INSERT INTO cdc_log (table_name, operation, entity_id, new_data)VALUES (?, insert, ?, ?));$stmt-execute([$this-table, $entityId, json_encode($data, JSON_UNESCAPED_UNICODE)]);}public function logUpdate(int $entityId, array $oldData, array $newData): void{$stmt $this-pdo-prepare(INSERT INTO cdc_log (table_name, operation, entity_id, old_data, new_data)VALUES (?, update, ?, ?, ?));$stmt-execute([$this-table,$entityId,json_encode($oldData, JSON_UNESCAPED_UNICODE),json_encode($newData, JSON_UNESCAPED_UNICODE),]);}public function logDelete(int $entityId, array $oldData): void{$stmt $this-pdo-prepare(INSERT INTO cdc_log (table_name, operation, entity_id, old_data)VALUES (?, delete, ?, ?));$stmt-execute([$this-table, $entityId, json_encode($oldData, JSON_UNESCAPED_UNICODE)]);}public function getUnprocessed(int $limit 100): array{$stmt $this-pdo-prepare(SELECT * FROM cdc_logWHERE processed FALSEORDER BY id ASCLIMIT ?);$stmt-execute([$limit]);return $stmt-fetchAll();}public function markProcessed(int $id): void{$this-pdo-prepare(UPDATE cdc_log SET processed TRUE WHERE id ?)-execute([$id]);}}class UserServiceWithCDC{private PDO $pdo;private ChangeTracker $tracker;public function __construct(PDO $pdo, ChangeTracker $tracker){$this-pdo $pdo;$this-tracker $tracker;}public function createUser(string $name, string $email): int{$stmt $this-pdo-prepare(INSERT INTO users (name, email) VALUES (?, ?));$stmt-execute([$name, $email]);$id (int)$this-pdo-lastInsertId();$this-tracker-logInsert($id, [name $name, email $email]);return $id;}public function updateUser(int $id, array $data): void{$oldStmt $this-pdo-prepare(SELECT * FROM users WHERE id ?);$oldStmt-execute([$id]);$oldData $oldStmt-fetch(PDO::FETCH_ASSOC);$sets [];$params [];foreach ($data as $key $value) {$sets[] {$key} ?;$params[] $value;}$params[] $id;$sql UPDATE users SET . implode(, , $sets) . WHERE id ?;$this-pdo-prepare($sql)-execute($params);$this-tracker-logUpdate($id, $oldData, $data);}public function deleteUser(int $id): void{$stmt $this-pdo-prepare(SELECT * FROM users WHERE id ?);$stmt-execute([$id]);$oldData $stmt-fetch(PDO::FETCH_ASSOC);$this-pdo-prepare(DELETE FROM users WHERE id ?)-execute([$id]);$this-tracker-logDelete($id, $oldData);}}?CDC消费者将变更同步到其他系统phpclass CdcConsumer{private ChangeTracker $tracker;private array $handlers [];public function __construct(ChangeTracker $tracker){$this-tracker $tracker;}public function registerHandler(string $operation, callable $handler): void{$this-handlers[$operation][] $handler;}public function process(): int{$processed 0;$changes $this-tracker-getUnprocessed(50);foreach ($changes as $change) {try {$handlers $this-handlers[$change[operation]] ?? [];foreach ($handlers as $handler) {$handler($change);}$this-tracker-markProcessed($change[id]);$processed;} catch (\Exception $e) {error_log(CDC处理失败: {$e-getMessage()});}}return $processed;}}?CDC是数据同步和事件驱动架构的基础技术。通过捕获数据库变更日志可以将数据同步到缓存、搜索引擎或数据仓库。CDC的关键是变更的顺序性和幂等性确保数据最终一致。PHP实现的CDC适合轻量级的同步场景大规模场景建议使用Debezium等专业工具。