Python 数据库读写分离完全指南本教程演示数据库读写分离架构的实现。涵盖主从复制架构、SQLAlchemy 多引擎绑定、基于 Session 的路由、读写分离中间件以及故障切换。from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, textfrom sqlalchemy.orm import declarative_base, Session, sessionmakerfrom sqlalchemy.ext.declarative import declarative_basefrom datetime import datetimeimport threadingBase declarative_base()# 数据模型 class Product(Base):产品模型——用于演示读写分离操作__tablename__ productsid Column(Integer, primary_keyTrue, autoincrementTrue)name Column(String(100), nullableFalse, comment产品名称)price Column(Float, nullableFalse, comment产品价格)stock Column(Integer, default0, comment库存数量)version Column(Integer, default1, comment乐观锁版本号)# 主从引擎管理 class MasterSlaveEngineManager:主从数据库引擎管理器。管理主库写和从库读的引擎连接。def __init__(self):# 主库引擎处理写操作self.master_engine create_engine(sqlite:///master.db,echoFalse,# 主库连接池配置pool_size10,max_overflow20,)# 从库引擎列表处理读操作实现负载均衡self.slave_engines [create_engine(sqlite:///slave_1.db, echoFalse),create_engine(sqlite:///slave_2.db, echoFalse),create_engine(sqlite:///slave_3.db, echoFalse),]self._slave_index 0 # 轮询计数器self._lock threading.Lock() # 线程安全锁def get_master(self):获取主库引擎print([路由] 使用主库写操作)return self.master_enginedef get_slave(self):获取从库引擎轮询负载均衡。多个从库间均匀分发读请求。with self._lock:engine self.slave_engines[self._slave_index]self._slave_index (self._slave_index 1) % len(self.slave_engines)print(f[路由] 使用从库 {self._slave_index}读操作)return enginedef init_tables(self):在主库上创建表结构同步到从库Base.metadata.create_all(self.master_engine)print(主库表结构创建完成)for i, engine in enumerate(self.slave_engines):Base.metadata.create_all(engine)print(f从库 {i} 表结构创建完成)# 读写分离 Session class RoutingSession(Session):自定义 Session 类自动路由读写操作。写操作INSERT/UPDATE/DELETE路由到主库。读操作SELECT路由到从库。def __init__(self, manager: MasterSlaveEngineManager, *args, **kwargs):self.manager manager# 默认使用从库super().__init__(bindmanager.get_slave(), *args, **kwargs)def get_bind(self, mapperNone, clauseNone, **kwargs):根据执行的 SQL 类型自动选择数据库引擎。这是读写分离的核心重写 get_bind 方法。# 判断当前是否在写操作上下文中if self._is_write_operation(clause):return self.manager.get_master()return self.manager.get_slave()def _is_write_operation(self, clause) - bool:判断 SQL 是否为写操作if clause is None:return Falseclause_str str(clause)# 写操作关键词检测write_keywords [INSERT, UPDATE, DELETE, CREATE, ALTER, DROP]for keyword in write_keywords:if keyword in clause_str.upper():return Truereturn Falseclass ReadWriteRouter:读写分离路由器提供明确的读写方法。比自动路由更可控适合复杂业务场景。def __init__(self, manager: MasterSlaveEngineManager):self.manager managerdef write_session(self) - Session:获取写会话强制路由到主库return Session(bindself.manager.get_master())def read_session(self) - Session:获取读会话路由到从库return Session(bindself.manager.get_slave())def insert_product(self, name: str, price: float, stock: int) - Product:写操作强制使用主库with self.write_session() as session:product Product(namename, priceprice, stockstock)session.add(product)session.commit()session.refresh(product)print(f[写] 产品 {product.name} 已创建ID{product.id})return productdef get_product(self, product_id: int) - Product:读操作使用从库with self.read_session() as session:product session.query(Product).filter(Product.id product_id).first()print(f[读] 查询产品 ID{product_id}: {product.name if product else 未找到})return productdef update_product_price(self, product_id: int, new_price: float):写操作强制使用主库with self.write_session() as session:product session.query(Product).filter(Product.id product_id).first()if product:product.price new_pricesession.commit()print(f[写] 产品 {product_id} 价格已更新为 {new_price})def list_all_products(self) - list:读操作使用从库with self.read_session() as session:products session.query(Product).all()print(f[读] 列出所有产品共 {len(products)} 条)return products# 手动绑定 Session 工厂 def create_routing_session_factory(manager: MasterSlaveEngineManager):创建路由 Session 工厂。通过 sessionmaker 绑定自定义 Session 类。return sessionmaker(class_lambda: RoutingSession(manager))# 故障切换 class FailoverHandler:故障切换处理器。当从库发生故障时自动切换到主库或其他从库。def __init__(self, manager: MasterSlaveEngineManager):self.manager managerself.slave_health {i: True for i in range(len(manager.slave_engines))}def execute_with_failover(self, operation, use_master: bool False):带故障切换的执行函数。从库失败时自动重试到主库或其他从库。engines [self.manager.get_master()] if use_master else self.manager.slave_enginesfor engine in engines:try:with Session(bindengine) as session:result operation(session)return resultexcept Exception as e:print(f数据库节点故障: {e}尝试下一个节点)continueraise RuntimeError(所有数据库节点均不可用)# 演示代码 def demo_read_write_splitting():演示读写分离的各种操作manager MasterSlaveEngineManager()manager.init_tables()router ReadWriteRouter(manager)# 写入数据路由到主库print( 写操作主库)p1 router.insert_product(笔记本电脑, 5999.0, 100)p2 router.insert_product(手机, 3999.0, 200)p3 router.insert_product(平板电脑, 2999.0, 150)# 读取数据路由到从库print(\n 读操作从库)product router.get_product(p1.id)products router.list_all_products()# 更新操作主库print(\n 更新操作主库)router.update_product_price(p1.id, 5499.0)# 再次读取从库可能读到旧数据——最终一致性print(\n 最终一致性演示 )product router.get_product(p1.id)print(f 价格可能未同步主从延迟: ¥{product.price})# 强制从主库读取最新数据print(\n 强制主库读取 )with Session(bindmanager.get_master()) as session:p session.query(Product).filter(Product.id p1.id).first()print(f 直接从主库读取: ¥{p.price})# 故障切换演示print(\n 故障切换演示 )failover FailoverHandler(manager)result failover.execute_with_failover(lambda s: s.query(Product).count())print(f 通过故障切换查询到 {result} 条产品)print(\n所有读写分离演示完成)if __name__ __main__:demo_read_write_splitting()