从钉钉在线表格到 MySQL 的可追溯同步实现版异常分支 重试 幂等在生产数据链路里钉钉在线表格经常被当作临时采集源但真正的问题不是“能不能同步”而是“同步后能否复盘”。这篇写的是可落地实现思路以“批次”为基本单元把每次同步做成可回放、可审计、可恢复的流程。1. 核心目标一条同步任务需要回答三类问题发起时间、发起人是谁这次跑了哪些源表、每个源表读了多少行任一失败都能快速定位到具体阶段并支持重试或人工介入同时要满足每次同步都有稳定批次号sync_run_id异常可分级处理可恢复与不可恢复区分明确支持按批次回滚不产生重复写入幂等2. 数据模型最小集合CREATETABLEsync_runs(sync_run_idBIGINTAUTO_INCREMENTPRIMARYKEY,request_idVARCHAR(64)NOTNULLUNIQUE,operator_idVARCHAR(64)NOTNULL,source_urlVARCHAR(512)NOTNULL,started_atDATETIMENOTNULL,finished_atDATETIMENULL,statusENUM(CREATED,FETCHING,SNAPSHOTTING,TRANSFORMING,LOADING,VERIFYING,SUCCESS,FAILED,ABORTED)NOTNULL,stage_error_codeVARCHAR(64)NULL,stage_error_messageTEXTNULL,sheet_countINTDEFAULT0,row_counts_json JSONNULL,attemptINTDEFAULT0,max_attemptINTDEFAULT3,created_atDATETIMEDEFAULTCURRENT_TIMESTAMP,updated_atDATETIMEDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,INDEXidx_status(status),INDEXidx_created(created_at));CREATETABLEsync_raw_rows(sync_run_idBIGINTNOTNULL,source_sheet_nameVARCHAR(128)NOTNULL,source_row_noBIGINTNOTNULL,raw_json JSONNOTNULL,row_hashCHAR(64)NOTNULL,PRIMARYKEY(sync_run_id,source_sheet_name,source_row_no),FOREIGNKEY(sync_run_id)REFERENCESsync_runs(sync_run_id));CREATETABLEsync_incident_rows(idBIGINTAUTO_INCREMENTPRIMARYKEY,sync_run_idBIGINTNOTNULL,biz_idVARCHAR(128)NOTNULL,fault_titleVARCHAR(256),system_nameVARCHAR(128),provinceVARCHAR(64),fault_levelVARCHAR(32),last_update_timeDATETIME,payload_hashCHAR(64)NOTNULL,updated_atDATETIMEDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,UNIQUEKEYuk_sync_biz(sync_run_id,biz_id),KEYidx_sync(sync_run_id),FOREIGNKEY(sync_run_id)REFERENCESsync_runs(sync_run_id));3. 幂等策略核心3.1 统一request_id执行入口先生成幂等键防止重复触发产生重复运行request_id sha256( operator_id | sheet_id_list | start_window | end_window | day )sync_runs.request_id建唯一约束重复请求时复用同一个sync_run_id。3.2 两层去重Raw 层PRIMARY KEY(sync_run_id, source_sheet_name, source_row_no)防止同一批次重复写入源行。业务层UNIQUE(sync_run_id, biz_id)防止同批次事实表重复落地。4. 同步状态机CREATED - FETCHING拉取表结构与原始数据FETCHING - SNAPSHOTTING逐行写入sync_raw_rowsSNAPSHOTTING - TRANSFORMING字段映射与清洗TRANSFORMING - LOADING落地到sync_incident_rowsLOADING - VERIFYING对账与完整性校验VERIFYING - SUCCESS写入成功失败时进入FAILED记录失败阶段和错误码保留失败行明细。5. 异常分支与处理动作5.1 拉取失败网络 / 鉴权场景钉钉 API 超时、token 过期、网络抖动动作写FETCH_ERROR触发重试支持指数退避超过重试上限后状态置为FAILED并停止后续阶段5.2 Schema 漂移场景字段缺失、字段新增、类型不一致动作预校验 header 与映射白名单非关键字段可降级补空关键字段缺失则直接SCHEMA_MISMATCH失败5.3 变换失败场景日期格式错误、枚举值异常、金额转型失败动作单条打标后继续处理局部容错若失败率超过阈值如 5%直接FAILED并进入人工复核5.4 落库失败场景数据库短暂连接异常、约束冲突动作连接异常重试后重试成功继续执行约束冲突按幂等策略去重计数后跳过重复行5.5 对账失败raw 行数 ≠ 拉取行数成功行 失败行 ≠ raw 行数结果行数与筛选规则不匹配任一失败则VERIFYING失败状态为FAILED待人工确认后重跑。6. 重试策略可恢复异常才重试避免“业务规则错误”空转浪费时间。defshould_retry(error_code,attempt,max_attempt):recoverable{FETCH_ERROR,API_TIMEOUT,DB_CONN_ERROR}ifattemptmax_attempt:returnFalsereturnerror_codeinrecoverabledefbackoff_ms(attempt):# 1, 3, 5, 9, 15...returnmin(60000,(2**attempt1)*1000)建议参数max_attempt 5max_wait_ms 30000仅对recoverable类错误重试7. 参考执行骨架Pythondefrun_sync(request_id,source_urls,config):runget_or_create_run(request_id,config)try:set_stage(run,FETCHING)sheetsfetch_sheets(source_urls)set_stage(run,SNAPSHOTTING)snapshot_rowswrite_raw_rows(run.id,sheets)set_stage(run,TRANSFORMING)transformed,bad_rowstransform_rows(snapshot_rows,config)set_stage(run,LOADING)write_rows_idempotent(run.id,transformed)set_stage(run,VERIFYING)verify_run(run.id,snapshot_rows,transformed,bad_rows)mark_success(run)exceptRecoverableErrorase:ifshould_retry(e.code,run.attempt1,run.max_attempt):schedule_retry(run.id,delay_msbackoff_ms(run.attempt))else:mark_failed(run,e.code,str(e))exceptExceptionase:mark_failed(run,FATAL,str(e))8. 运营查询上线后建议SELECTsync_run_id,status,sheet_count,row_counts_json,started_at,finished_at,stage_error_code,stage_error_messageFROMsync_runsWHEREstatusIN(FAILED,SUCCESS)ORDERBYsync_run_idDESCLIMIT50;SELECTsync_run_id,COUNT(*)ASraw_rowsFROMsync_raw_rowsWHEREsync_run_id:run_idGROUPBYsync_run_id;SELECTsync_run_id,COUNT(*)ASloaded_rows,SUM(biz_idISNULLORbiz_id)ASempty_bizFROMsync_incident_rowsWHEREsync_run_id:run_idGROUPBYsync_run_id;9. 结论把同步做成“可追溯 可重试 可幂等”的工程化链路比追求一次性完美通过更实用。你真正需要的不是“更多字段”而是三件事每批次有身份request_id / sync_run_id每批次有快照raw 结构化结果每次失败能准确判断“自动重试”还是“人工介入”有了这条链路排障不再是“数据是不是对上了”而是“第几步错了、错在哪行、如何复盘”。