Mysql--基础知识点--105--分布式事务
MySQL 分布式事务主流方案对比与实战修正版一、XA 协议两阶段提交原理XA 规范定义了事务管理器TM与资源管理器RM的交互接口。MySQL 通过XA START、XA END、XA PREPARE、XA COMMIT/XA ROLLBACK命令实现两阶段提交。修正后的 Python 示例使用mysql-connector-python的原生 XA 命令importmysql.connectorfrommysql.connectorimportErrordefxa_transfer_example():conn1Noneconn2Nonexidbtransfer_txn_001# XID 必须是字节串try:# 连接两个不同的 MySQL 数据库或同一实例的不同库conn1mysql.connector.connect(hostlocalhost,useruser1,passwordpass1,databasedb1)conn2mysql.connector.connect(hostlocalhost,useruser2,passwordpass2,databasedb2)# 开启 XA 事务每个参与者独立执行 XA STARTcursor1conn1.cursor()cursor1.execute(fXA START {xid.decode()})cursor2conn2.cursor()cursor2.execute(fXA START {xid.decode()})# 执行 SQL 操作注意UPDATE 语句不需要 FROM 子句cursor1.execute(UPDATE accounts SET balance balance - 100 WHERE user_id 1)cursor2.execute(UPDATE accounts SET balance balance 100 WHERE user_id 2)# 结束 XA 事务分支cursor1.execute(fXA END {xid.decode()})cursor2.execute(fXA END {xid.decode()})# 第一阶段预提交cursor1.execute(fXA PREPARE {xid.decode()})cursor2.execute(fXA PREPARE {xid.decode()})# 第二阶段提交cursor1.execute(fXA COMMIT {xid.decode()})cursor2.execute(fXA COMMIT {xid.decode()})print(XA 事务提交成功)exceptErrorase:print(fXA 事务失败:{e})# 回滚对已 PREPARE 的分支执行 XA ROLLBACKifconn1:cursor1.execute(fXA ROLLBACK {xid.decode()})ifconn2:cursor2.execute(fXA ROLLBACK {xid.decode()})finally:ifconn1:conn1.close()ifconn2:conn2.close()二、TCC 模式原理TCC 将事务拆分为 Try预留资源、Confirm确认提交、Cancel回滚释放三个阶段由业务代码实现每个阶段的接口配合全局事务协调器如 DTM完成两阶段提交。示例使用 DTM 框架HTTP 接口方式# 需要先部署 DTM 服务https://github.com/dtm-labs/dtmfromdtmcliimporttccimportrequests SVChttp://localhost:8080# 业务服务地址DTM_SERVERhttp://localhost:36789deftcc_transfer():# 发起 TCC 全局事务gidtcc.tcc_global_transaction(DTM_SERVER,None,tcc_trans)print(fTCC 事务成功GID:{gid})deftcc_trans(t):req{amount:100,from_user:1,to_user:2}# 注册转出和转入的 Try/Confirm/Cancel 接口t.call_branch(req,f{SVC}/api/TransOutTry,f{SVC}/api/TransOutConfirm,f{SVC}/api/TransOutCancel)t.call_branch(req,f{SVC}/api/TransInTry,f{SVC}/api/TransInConfirm,f{SVC}/api/TransInCancel)# 对应的业务接口实现示例使用 Flask app.route(/api/TransOutTry, methods[POST]) def trans_out_try(): data request.json # 检查余额并冻结金额在用户表中增加冻结字段 cursor.execute(UPDATE accounts SET frozen frozen %s WHERE user_id %s AND balance - frozen %s, (data[amount], data[from_user], data[amount])) return {status: ok} app.route(/api/TransOutConfirm, methods[POST]) def trans_out_confirm(): data request.json # 扣减余额并解冻 cursor.execute(UPDATE accounts SET balance balance - %s, frozen frozen - %s WHERE user_id %s, (data[amount], data[amount], data[from_user])) return {status: ok} app.route(/api/TransOutCancel, methods[POST]) def trans_out_cancel(): data request.json # 解冻金额 cursor.execute(UPDATE accounts SET frozen frozen - %s WHERE user_id %s, (data[amount], data[from_user])) return {status: ok} 三、Saga 模式原理将一个长事务拆分为多个本地事务每个事务都有对应的补偿操作。正向操作依次执行若某个失败则逆序执行补偿。示例使用 DTMfromdtmcliimportsaga SVChttp://localhost:8080DTM_SERVERhttp://localhost:36789defsaga_order_flow():ssaga.Saga(DTM_SERVER,None)# 添加子事务正向操作补偿操作s.add(f{SVC}/api/CreateOrder,f{SVC}/api/CancelOrder,{order_id:ORD001,amount:100})s.add(f{SVC}/api/DeductStock,f{SVC}/api/AddBackStock,{product_id:P001,quantity:1})s.add(f{SVC}/api/DeductBalance,f{SVC}/api/AddBackBalance,{user_id:1,amount:100})gids.submit()print(fSaga 事务成功GID:{gid})四、本地消息表最终一致性原理将跨库操作通过本地事务写入业务表和消息表然后异步轮询消息表将消息投递到下游服务实现最终一致性。修正后的完整示例含正确的 SELECT FROM 子句importmysql.connectorimportjsonimporttimedefcreate_order_with_message(order_id,user_id,product_id,quantity):connNonetry:connmysql.connector.connect(hostlocalhost,userapp_user,passwordpass,databaseorder_db)cursorconn.cursor()conn.start_transaction()# 1. 插入订单订单表cursor.execute(INSERT INTO orders (order_id, user_id, product_id, quantity, status) VALUES (%s, %s, %s, %s, %s),(order_id,user_id,product_id,quantity,created))# 2. 插入消息消息表payloadjson.dumps({order_id:order_id,product_id:product_id,quantity:quantity})cursor.execute(INSERT INTO outbox_message (message_id, topic, payload, status, next_retry_at) VALUES (%s, %s, %s, %s, %s),(fmsg_{order_id},inventory_deduct,payload,pending,None))conn.commit()print(订单及消息写入成功)exceptExceptionase:ifconn:conn.rollback()print(f失败:{e})finally:ifconn:conn.close()defconsume_messages():独立轮询线程/进程connmysql.connector.connect(hostlocalhost,userapp_user,passwordpass,databaseorder_db)cursorconn.cursor()whileTrue:# 查询待发送消息注意 FROM 子句明确cursor.execute(SELECT message_id, topic, payload FROM outbox_message WHERE status pending AND (next_retry_at IS NULL OR next_retry_at NOW()) LIMIT 10 FOR UPDATE SKIP LOCKED)messagescursor.fetchall()formsg_id,topic,payloadinmessages:try:# 调用下游服务如扣库存接口send_to_downstream(topic,json.loads(payload))# 更新消息状态为已发送cursor.execute(UPDATE outbox_message SET status sent WHERE message_id %s,(msg_id,))conn.commit()exceptException:# 重试递增重试次数计算下次重试时间cursor.execute(UPDATE outbox_message SET status pending, retry_count retry_count 1, next_retry_at DATE_ADD(NOW(), INTERVAL POW(2, retry_count) SECOND) WHERE message_id %s,(msg_id,))conn.commit()time.sleep(5)defsend_to_downstream(topic,payload):iftopicinventory_deduct:print(f调用库存服务扣减:{payload})# 实际 HTTP 请求略else:raiseValueError(fUnknown topic:{topic})五、Seata AT 模式简述Seata是阿里巴巴开源的分布式事务解决方案支持AT、TCC、Saga、XA四种事务模式。其中AT模式是Seata独创的、对业务无侵入的事务模式。它在二阶段提交的基础上采用本地undo log记录数据修改前后的状态一阶段执行完后可立即释放锁和连接资源吞吐量比XA模式更高。用户在接入AT模式时只需配置好数据源事务提交/回滚流程均由Seata自动完成对业务代码几乎零侵入。AT模式要求所有SQL必须走代理默认全局隔离级别为读未提交Read Uncommitted若需全局读已提交需使用SELECT FOR UPDATE语句。但该模式目前仅官方 Java 客户端成熟Python 生态暂无正式实现。Python 项目可考虑使用 dtm框架支持 TCC/Saga/XA/事务消息跨语言友好。核心事务部分用 Java 实现Python 通过 RPC 参与。六、方案选型速查表方案一致性性能开发成本典型场景XA强一致低低已废弃不推荐TCC强一致高高支付、秒杀等短事务高并发场景Saga最终一致高中长流程、多参与方如订票本地消息表最终一致高低异步解耦如订单→扣库存Seata AT强一致高低(Java)Java 微服务低侵入需求所有示例中的 SQL 语句INSERT、UPDATE、SELECT均包含完整的表名和 FROM 子句您可以直接在 MySQL 中执行建表语句后测试。如需完整的建表 DDL 或某个方案的详细部署步骤请告知。