Python 数据库分片模式完全指南本教程演示数据库水平分片Sharding的多种实现策略。涵盖分片键选择、基于 SQLAlchemy 的分片方案、应用层分片、一致性哈希分片以及查询路由。from sqlalchemy import create_engine, Column, Integer, String, Float, DateTimefrom sqlalchemy.orm import declarative_base, Sessionfrom datetime import datetimeimport hashlibBase declarative_base()# 数据模型 class Order(Base):订单模型——将被分片存储__tablename__ ordersid Column(Integer, primary_keyTrue, autoincrementTrue)order_id Column(String(30), uniqueTrue, nullableFalse, comment全局订单号)user_id Column(Integer, nullableFalse, comment用户 ID分片键)amount Column(Float, nullableFalse, comment订单金额)region Column(String(20), nullableFalse, comment所属区域)created_at Column(DateTime, defaultdatetime.utcnow, comment创建时间)# 分片引擎管理 class ShardManager:分片管理器管理多个数据库分片。支持水平分片根据分片键路由到不同数据库。def __init__(self):# 定义分片数据库连接self.shards {shard_0: create_engine(sqlite:///shard_0.db, echoFalse),shard_1: create_engine(sqlite:///shard_1.db, echoFalse),shard_2: create_engine(sqlite:///shard_2.db, echoFalse),shard_3: create_engine(sqlite:///shard_3.db, echoFalse),}self.shard_count len(self.shards)def init_tables(self):在所有分片上创建表结构for shard_name, engine in self.shards.items():Base.metadata.create_all(engine)print(f分片 {shard_name} 表结构创建完成)def get_shard_for_user(self, user_id: int) - str:基于用户 ID 的分片策略取模。这是最简单的分片键选择方式。shard_index user_id % self.shard_countreturn fshard_{shard_index}def get_shard_by_region(self, region: str) - str:基于区域的分片策略按地域划分。适用于数据有天然地域属性的场景。region_shard_map {华北: shard_0,华东: shard_1,华南: shard_2,西部: shard_3,}return region_shard_map.get(region, shard_0)def get_shard_consistent(self, key: str) - str:一致性哈希分片策略。使用哈希值取模确保分片均匀分布。hash_val int(hashlib.md5(key.encode()).hexdigest(), 16)shard_index hash_val % self.shard_countreturn fshard_{shard_index}# 应用层分片操作 class ShardedOrderService:应用层分片服务封装分片感知的 CRUD 操作def __init__(self, shard_manager: ShardManager):self.manager shard_managerdef create_order(self, order_data: dict) - Order:创建订单根据分片键user_id路由到对应分片。这是应用层分片的核心写入时确定数据位置。shard_name self.manager.get_shard_for_user(order_data[user_id])engine self.manager.shards[shard_name]with Session(engine) as session:order Order(**order_data)session.add(order)session.commit()session.refresh(order)print(f订单 {order.order_id} 写入分片 {shard_name})return orderdef get_order_by_user(self, user_id: int, order_id: str) - Order:查询订单通过 user_id 定位分片再查询。这是最高效的查询方式直接路由到目标分片。shard_name self.manager.get_shard_for_user(user_id)engine self.manager.shards[shard_name]with Session(engine) as session:order session.query(Order).filter(Order.order_id order_id,Order.user_id user_id,).first()if order:print(f订单 {order_id} 在分片 {shard_name} 中找到)return orderdef get_user_orders(self, user_id: int) - list:查询用户所有订单定位到单个分片。由于 user_id 是分片键只需查询一个分片。shard_name self.manager.get_shard_for_user(user_id)engine self.manager.shards[shard_name]with Session(engine) as session:orders session.query(Order).filter(Order.user_id user_id).all()print(f用户 {user_id} 在分片 {shard_name} 上有 {len(orders)} 笔订单)return ordersdef scan_all_shards(self, condition: dict None) - list:全分片扫描对所有分片执行相同查询。适用于不支持分片键的查询需要跨分片。results []for shard_name, engine in self.manager.shards.items():with Session(engine) as session:query session.query(Order)if condition:for key, value in condition.items():query query.filter(getattr(Order, key) value)shard_results query.all()results.extend(shard_results)print(f分片 {shard_name}: 查询到 {len(shard_results)} 条)return resultsdef get_shard_statistics(self) - dict:统计各分片的数据分布情况stats {}for shard_name, engine in self.manager.shards.items():with Session(engine) as session:count session.query(Order).count()stats[shard_name] countreturn stats# 演示代码 def demo_sharding():演示分片模式的各种操作manager ShardManager()manager.init_tables()service ShardedOrderService(manager)# 插入测试数据分散到不同分片print( 写入数据到各分片 )test_orders [{order_id: ORD_001, user_id: 1, amount: 99.9, region: 华北},{order_id: ORD_002, user_id: 2, amount: 199.0, region: 华东},{order_id: ORD_003, user_id: 3, amount: 299.0, region: 华南},{order_id: ORD_004, user_id: 4, amount: 399.0, region: 西部},{order_id: ORD_005, user_id: 5, amount: 499.0, region: 华北},{order_id: ORD_006, user_id: 6, amount: 599.0, region: 华东},{order_id: ORD_007, user_id: 1001, amount: 1599.0, region: 华北},{order_id: ORD_008, user_id: 1002, amount: 2699.0, region: 华东},]for data in test_orders:order service.create_order(data)print(f {order.order_id} - user_id{data[user_id]} - f分片{manager.get_shard_for_user(data[user_id])})# 查询指定用户的订单print(\n 按用户查询 )orders service.get_user_orders(1)for o in orders:print(f 用户 1 的订单: {o.order_id}, 金额: {o.amount})# 跨分片扫描按区域查询没有分片键print(\n 全分片扫描 )all_orders service.scan_all_shards({region: 华北})print(f所有华北订单: {len(all_orders)} 条)# 查看数据分布print(\n 数据分布 )stats service.get_shard_statistics()for shard, count in stats.items():print(f {shard}: {count} 条记录)print(\n所有分片模式演示完成)if __name__ __main__:demo_sharding()