坑一pymysql —— 异常吞掉钱扣了没到账案发现场importpymysql connpymysql.connect(hostlocalhost,userroot,passwordpwd,databasetest)cursorconn.cursor()# 转账扣款cursor.execute(UPDATE accounts SET balance balance - 100 WHERE id 1)# 假设这里程序崩溃 / 网络超时 / 抛出异常...# 加款永远没执行cursor.execute(UPDATE accounts SET balance balance 100 WHERE id 2)conn.commit()# 第一条已经生效无法回滚结果用户A被扣了100块用户B没收到钱。数据不一致且无法挽回。根因pymysql 默认autocommitFalse但每条 execute 本身不会自动提交——问题出在异常发生后第一条 SQL 已经被 MySQL 隐式提交因为 pymysql 在某些异常场景下会触发自动提交或者更常见的情况是开发者以为异常会自动回滚但实际上没有显式调用rollback()。解决方法try:cursor.execute(UPDATE accounts SET balance balance - 100 WHERE id 1)cursor.execute(UPDATE accounts SET balance balance 100 WHERE id 2)conn.commit()exceptExceptionase:conn.rollback()# ← 必须显式回滚raise# 或记录日志后重新抛出finally:cursor.close()conn.close()关键原则try里写业务 SQLexcept里必须有rollback()finally里关闭连接。三件套缺一不可。坑二mysql-connector-python —— 连接池泄漏服务跑着跑着就挂了案发现场importmysql.connectorfrommysql.connectorimportpooling dbconfig{host:localhost,user:root,password:pwd,database:test}connection_poolpooling.MySQLConnectionPool(pool_namemypool,pool_size5,**dbconfig)defget_user(user_id):connconnection_pool.get_connection()# 从池中取连接cursorconn.cursor()cursor.execute(SELECT * FROM users WHERE id %s,(user_id,))resultcursor.fetchall()# 忘记关闭连接直接返回returnresult结果前5次请求正常第6次开始报错Too many connections。因为连接取走后没归还池子里的5个连接被永久占用。根因mysql-connector-python的连接池行为是连接常驻——取走后不会自动回收。与 SQLAlchemy 的QueuePool空闲超时后销毁连接不同这里的连接一旦泄漏池子容量就永久减少直到耗尽。解决方法defget_user(user_id):connconnection_pool.get_connection()try:cursorconn.cursor()cursor.execute(SELECT * FROM users WHERE id %s,(user_id,))returncursor.fetchall()finally:cursor.close()conn.close()# ← 必须归还连接到池中或者用上下文管理器fromcontextlibimportclosingdefget_user(user_id):withclosing(connection_pool.get_connection())asconn:withconn.cursor()ascursor:cursor.execute(SELECT * FROM users WHERE id %s,(user_id,))returncursor.fetchall()# 退出 with 自动关闭连接归还池中监控指标定期执行SHOW PROCESSLIST观察Sleep状态的连接数是否持续增长。坑三SQLAlchemy —— 批量更新时事件监听器沉默了案发现场fromsqlalchemyimportevent,updatefromsqlalchemy.ormimportSessionclassUser(Base):__tablename__usersidColumn(Integer,primary_keyTrue)nameColumn(String(50))updated_atColumn(DateTime)# 定义事件每次更新自动维护 updated_atevent.listens_for(User,before_update)defset_updated_at(mapper,connection,target):target.updated_atdatetime.now()# 业务代码批量更新用户状态sessionSession()stmtupdate(User).where(User.statuspending).values(statusprocessed)session.execute(stmt)# ← 监听器根本没触发session.commit()结果1000条记录的status改了但updated_at全是 NULL。因为session.execute(stmt)走的是 Core 层绕过了 ORM 的对象生命周期事件监听器根本不知道发生了更新。根因SQLAlchemy 的事件系统绑定的是ORM 对象的状态变化。session.execute(update(...))是直接执行 SQL不经过 ORM 对象加载所以before_update/after_update全部静默。解决方法三选一方案适用场景代码回归 ORM 遍历更新数据量 1万for u in session.query(User).filter_by(statuspending): u.statusprocessedSQL 里直接写追求性能values(statusprocessed, updated_atfunc.now())数据库触发器强一致性要求交给 MySQL 层ON UPDATE CURRENT_TIMESTAMP推荐大多数场景用方案二性能和一致性兼顾fromsqlalchemyimportfunc stmt(update(User).where(User.statuspending).values(statusprocessed,updated_atfunc.now()))session.execute(stmt)session.commit()三个坑的本质对比坑一 pymysql坑二 mysql-connector坑三 SQLAlchemy表面问题异常没回滚连接耗尽事件没触发真实原因异常处理不完整连接池机制不同Core vs ORM 路径差异一句话解法except里必写rollback()finally里必关连接批量操作别用 Core 绕过 ORM这三个坑覆盖了异常流、资源管理、抽象层边界三个维度踩过一次就不会再踩第二次。