《金融支付架构实战指南》一书讨论了分布式事务。这里研究RocketMQ事务消息方案。在支付系统链路中支付结果通知、订单状态变更、账户余额扣减、优惠券核销分属不同微服务跨库操作天然面临分布式一致性难题。本地事务无法跨数据源生效TCC 开发成本高、侵入业务代码而 RocketMQ 事务消息凭借半消息 回调回查机制成为支付场景轻量化落地最终一致性的主流选型。但原生事务消息暗藏致命空回滚隐患半消息持久化 Broker 后服务宕机本地事务滞后执行极易出现「数据库业务落库、消息被回滚销毁」导致订单与通知脱节、资损隐患。本文结合线上支付落地经验基于数据库主键占位思想落地两套可直接投产的 RocketMQ 事务消息方案适配支付回调、代付扣款两大业务场景附带完整源码与异常全链路验证。一、原生RocketMQ事务消息最大致命漏洞1. 危险场景原生代码必崩完整致命时序Producer 发送Half半消息→ Broker 持久化成功此时机器线程卡顿executeLocalTransaction 完全没执行、一行代码都没跑Broker 定时回查checkLocalTransaction系统查不到订单、查不到事务记录 → 代码误判返回ROLLBACKBroker 删除消息事务作废服务重启后之前卡住的 executeLocalTransaction 继续执行成功最终致命数据不一致✅ 本地事务成功订单创建、库存扣了❌ MQ消息被回滚下游消费者永远收不到消息❌ 上下游业务彻底割裂无通知、无发货、无积分2. 问题根源RocketMQ 回查机制无法区分两种状态状态A本地事务确实执行过且失败状态B本地事务压根还没执行原生写法统一判定为「事务失败」→ 误删消息 → 数据崩坏。二、行业标准终极解决方案无BUG方案核心原理回查发现无事务记录时主动插入一条事务占位记录目的占坑打标记让后续姗姗来迟的本地事务 主键冲突、直接失败回滚从根源杜绝回滚消息 业务成功 的割裂现象完整闭环时序100%无漏洞发送半消息成功服务宕机本地事务未执行Broker 触发回查回查发现无事务记录 →主动 insert 占位记录返回 UNKNOWN继续等待服务恢复之前的 executeLocalTransaction 开始执行执行插入事务日志 →主键冲突异常本地事务整体回滚业务不生成订单下次回查发现事务失败 → 真正 ROLLBACK 消息最终绝对一致业务失败、消息回滚完全匹配三、数据库表结构sqlCREATE TABLE tx_transaction_log (tx_id VARCHAR(64) NOT NULL COMMENT 全局事务ID(订单号),create_time DATETIME DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (tx_id)) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENTRocketMQ事务状态表(防空回滚占位);极简设计只需要主键不需要状态字段靠主键唯一冲突解决所有问题。四、完整生产级代码1. 事务生产者javaServicepublic class SeckillTxProducer {Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendSeckillTxMessage(String orderNo, String json) {MessageString message MessageBuilder.withPayload(json).build();// 发送半消息rocketMQTemplate.sendMessageInTransaction(seckill_tx_topic,message,orderNo);}}2. 核心事务监听器最终无BUG版javaComponentRocketMQTransactionListener(producerGroup seckill_tx_group)public class SeckillTxListener implements RocketMQLocalTransactionListener {Autowiredprivate OrderService orderService;Autowiredprivate TxLogMapper txLogMapper;// 执行本地事务 Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {String orderNo (String) arg;SeckillOrder order JSON.parseObject(msg.getPayload().toString(), SeckillOrder.class);try {// 关键插入事务占位记录// 如果回查已经抢先插入这里直接主键冲突txLogMapper.insertTxLog(orderNo);// 执行业务创建订单 扣库存本地事务orderService.createOrderAndDeductStock(order);// 业务成功提交消息return RocketMQLocalTransactionState.COMMIT;} catch (DuplicateKeyException e) {// 主键冲突 回查已经占位本次事务作废return RocketMQLocalTransactionState.ROLLBACK;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}// 事务回查核心防BUG逻辑 Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {SeckillOrder order JSON.parseObject(msg.getPayload().toString(), SeckillOrder.class);String orderNo order.getOrderNo();// 1. 查询事务记录Integer count txLogMapper.countTxLog(orderNo);if (count null || count 0) {// 【最核心代码】无记录 → 主动占位try {txLogMapper.insertTxLog(orderNo);} catch (DuplicateKeyException e) {// 并发回查插入忽略}// 占位成功继续回查不提交、不回滚return RocketMQLocalTransactionState.UNKNOWN;}// 2. 有记录 本地事务已执行过boolean orderExist orderService.isOrderExist(orderNo);if (orderExist) {return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.ROLLBACK;}}}3. 业务服务本地事务javaServicepublic class OrderService {Autowiredprivate OrderMapper orderMapper;Autowiredprivate StockMapper stockMapper;Transactional(rollbackFor Exception.class)public void createOrderAndDeductStock(SeckillOrder order) {// 创建订单orderMapper.insert(order);// 扣减库存int rows stockMapper.deductStock(order.getGoodsId());if (rows 0) {throw new RuntimeException(库存不足);}}public boolean isOrderExist(String orderNo) {return orderMapper.selectByOrderNo(orderNo) ! null;}}4. MapperjavaMapperpublic interface TxLogMapper {Insert(INSERT INTO tx_transaction_log(tx_id) VALUES(#{txId}))void insertTxLog(Param(txId) String txId);Select(SELECT count(1) FROM tx_transaction_log WHERE tx_id #{txId})Integer countTxLog(Param(txId) String txId);}5. 消费者幂等javaComponentRocketMQMessageListener(topic seckill_tx_topic, consumerGroup seckill_consumer_group)public class SeckillConsumer implements RocketMQListenerString {Autowiredprivate OrderService orderService;Overridepublic void onMessage(String message) {SeckillOrder order JSON.parseObject(message, SeckillOrder.class);// 幂等判断if (!orderService.isOrderExist(order.getOrderNo())) {return;}// 执行下游业务通知、积分、物流等}}五、三套异常场景全覆盖场景1半消息成功本地事务完全没执行回查发现无记录 → 主动插入占位后续本地事务启动 → 主键冲突 → 失败回滚消息最终回滚数据一致场景2本地事务执行中回查提前到来事务已插入日志回查查到记录 → 返回 UNKNOWN事务执行完毕后下次回查 COMMIT场景3本地事务执行失败抛出异常事务回滚日志、订单全部回滚回查无数据 → 占位、最终 ROLLBACK消息作废无数据不一致六、总结RocketMQ事务消息存在空回滚漏洞半消息发送成功后若服务宕机本地事务未执行Broker回查不到数据会误判回滚后续服务恢复滞后的本地事务又会正常提交导致业务成功、消息回滚的数据不一致问题。生产解决方案1.在回查接口中如果查询不到事务记录主动插入一条事务占位数据利用数据库主键唯一约束让后续滞后执行的本地事务触发主键冲突强制回滚彻底杜绝空回滚导致的数据不一致保证分布式事务最终一致性。2.保守做法查不到就返回 UNKNOWN如果业务极度敏感开发人员不想在本地事务中耦合事务日志表或者懒得建表生产上常见的折中方案是回查时如果查不到业务数据订单不存在返回 ROLLBACK。让 Broker 继续等继续回查。直到超过 RocketMQ 的最大回查次数默认 15 次可设置合适的回查次数Broker 会自动回滚消息。前提假设如果本地事务没执行由于前面分析的“线程不可能死而复生”它以后也不会执行了。让 Broker 多等几次回查虽然占用了 Broker 资源但绝对安全不会误杀。