Python项目实战用pg-mcp搞定PostgreSQL集群读写分离附完整配置代码PostgreSQL作为企业级开源数据库在高并发场景下如何保证稳定性和性能一直是开发者关注的焦点。pg-mcp这个Python库的出现让原本复杂的PostgreSQL集群管理变得像调用普通数据库连接一样简单。本文将带你从零开始在Flask和Django项目中实现专业的读写分离方案解决真实业务中的性能瓶颈问题。1. 环境准备与基础配置1.1 安装与最小化配置首先通过pip安装pg-mcp库pip install pg-mcp创建基础配置文件config/db_cluster.json这是整个系统的核心{ masters: [ { host: db-master-1.prod.internal, port: 5432, user: app_user, password: ${DB_PASSWORD}, // 建议使用环境变量 database: order_system } ], replicas: [ { host: db-replica-1.prod.internal, port: 5432, user: app_ro_user, // 只读账号 password: ${DB_RO_PASSWORD}, database: order_system } ], connection_pool: { min_connections: 3, max_connections: 15, idle_timeout: 180 } }提示生产环境建议将密码等敏感信息通过环境变量注入不要直接写在配置文件中1.2 连接池的健康检查初始化连接池时增加健康检查机制from pg_mcp import ConnectionPool from tenacity import retry, stop_after_attempt retry(stopstop_after_attempt(3)) def init_db_pool(): pool ConnectionPool.from_config( config/db_cluster.json, health_check_timeout5, # 秒 health_check_interval30 ) # 立即触发首次健康检查 pool.health_check() return pool db_pool init_db_pool()这种设计可以确保应用启动时数据库就绪避免冷启动问题。2. Web框架集成实战2.1 Flask中的优雅集成在Flask中创建数据库扩展模块extensions/db.pyfrom flask import _app_ctx_stack from pg_mcp import ConnectionPool class PGMCP: def __init__(self, appNone): self.app app if app is not None: self.init_app(app) def init_app(self, app): app.config.setdefault(PGMCP_CONFIG_PATH, config/db_cluster.json) app.teardown_appcontext(self.teardown) def connect(self): ctx _app_ctx_stack.top if not hasattr(ctx, pgmcp_pool): ctx.pgmcp_pool ConnectionPool.from_config( self.app.config[PGMCP_CONFIG_PATH] ) return ctx.pgmcp_pool.get_connection() def teardown(self, exception): ctx _app_ctx_stack.top if hasattr(ctx, pgmcp_pool): ctx.pgmcp_pool.close_all()使用时通过上下文管理器自动管理连接from extensions.db import PGMCP db PGMCP(app) app.route(/users/int:user_id) def get_user(user_id): with db.connect() as conn: with conn.cursor() as cur: cur.execute(SELECT * FROM users WHERE id %s, (user_id,)) return jsonify(cur.fetchone())2.2 Django自定义数据库后端创建自定义数据库引擎django_pgmcp/backend.pyfrom django.db.backends.postgresql.base import DatabaseWrapper as PGWrapper from pg_mcp import ConnectionPool class PGMCPWrapper(PGWrapper): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.pool None def get_new_connection(self, conn_params): if not self.pool: self.pool ConnectionPool.from_config( config/db_cluster.json ) return self.pool.get_connection() def _close(self): if self.connection is not None: self.connection.close() # 实际是归还到连接池在settings.py中配置DATABASES { default: { ENGINE: django_pgmcp.backend.PGMCPWrapper, OPTIONS: { config_path: config/db_cluster.json } } }3. 高级读写分离策略3.1 智能路由的装饰器模式创建路由装饰器decorators/db_router.pyfrom functools import wraps from enum import Enum class DBRoute(Enum): MASTER 1 REPLICA 2 def route_db(route_type): def decorator(f): wraps(f) def wrapper(pool, *args, **kwargs): if route_type DBRoute.MASTER: conn pool.get_master_connection() else: conn pool.get_replica_connection() try: return f(conn, *args, **kwargs) finally: conn.close() return wrapper return decorator业务层使用示例route_db(DBRoute.REPLICA) def get_product_list(conn, category_id): with conn.cursor() as cur: cur.execute( SELECT id, name, price FROM products WHERE category_id %s ORDER BY created_at DESC , (category_id,)) return cur.fetchall() route_db(DBRoute.MASTER) def create_product(conn, product_data): with conn.cursor() as cur: cur.execute( INSERT INTO products (name, price, category_id) VALUES (%s, %s, %s) RETURNING id , (product_data[name], product_data[price], product_data[category_id])) conn.commit() return cur.fetchone()[0]3.2 基于SQL解析的自动路由实现自动判断SQL类型的路由中间件import re from pg_mcp import ConnectionPool READ_ONLY_SQL_PATTERNS [ r^SELECT\s, r^WITH\s, r^EXPLAIN\s ] class AutoRouter: def __init__(self, pool): self.pool pool def execute(self, sql, paramsNone): if self._is_read_only(sql): conn self.pool.get_replica_connection() else: conn self.pool.get_master_connection() try: with conn.cursor() as cur: cur.execute(sql, params or ()) if cur.description: # 有返回结果 return cur.fetchall() return None finally: conn.close() def _is_read_only(self, sql): sql sql.strip().upper() return any( re.match(pattern, sql, re.IGNORECASE) for pattern in READ_ONLY_SQL_PATTERNS )4. 性能优化与监控4.1 连接池调优参数对比参数默认值推荐值说明min_connections1CPU核心数/2避免冷启动延迟max_connections10(max_worker_processes * 2) 1匹配工作进程数idle_timeout30060-180平衡资源与重建成本health_check_timeout03-5快速失败检测connection_timeout305避免长时间阻塞4.2 关键监控指标实现from prometheus_client import Gauge import threading # 定义指标 DB_POOL_SIZE Gauge(db_pool_size, Total connection pool size) DB_ACTIVE_CONNECTIONS Gauge(db_active_connections, Currently active connections) DB_REPLICA_LAG Gauge(db_replica_lag_bytes, Replication lag in bytes, [replica_host]) class DBMetrics: def __init__(self, pool): self.pool pool self._stop_event threading.Event() def start_metrics_loop(self, interval30): def collect(): while not self._stop_event.wait(interval): self._collect_metrics() thread threading.Thread(targetcollect, daemonTrue) thread.start() def _collect_metrics(self): status self.pool.get_status() DB_POOL_SIZE.set(status[max_connections]) DB_ACTIVE_CONNECTIONS.set(status[active_connections]) lag_info self._get_replication_lag() for host, lag in lag_info.items(): DB_REPLICA_LAG.labels(replica_hosthost).set(lag) def _get_replication_lag(self): conn self.pool.get_master_connection() try: with conn.cursor() as cur: cur.execute( SELECT client_addr, pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) FROM pg_stat_replication ) return {str(row[0]): row[1] for row in cur.fetchall()} finally: conn.close()4.3 批量操作性能对比测试不同批量插入方法的性能对比10,000条记录import time from faker import Faker fake Faker() def generate_test_data(num): return [(fake.name(), fake.random_int(18, 80)) for _ in range(num)] def test_batch_insert(pool, data): start time.time() conn pool.get_master_connection() try: with conn.cursor() as cur: cur.executemany( INSERT INTO users (name, age) VALUES (%s, %s), data ) conn.commit() finally: conn.close() return time.time() - start def test_bulk_copy(pool, data): start time.time() conn pool.get_master_connection() try: with conn.cursor() as cur: f io.StringIO() for row in data: f.write(\t.join(map(str, row)) \n) f.seek(0) cur.copy_from(f, users, columns(name, age)) conn.commit() finally: conn.close() return time.time() - start # 测试结果示例 test_data generate_test_data(10000) print(fexecutemany耗时: {test_batch_insert(db_pool, test_data):.2f}s) print(fCOPY耗时: {test_bulk_copy(db_pool, test_data):.2f}s)典型测试结果executemany: 12.34秒COPY命令: 0.87秒5. 故障处理与最佳实践5.1 实现自动故障转移自定义故障转移策略from pg_mcp import FailoverStrategy from datetime import timedelta failover_strategy FailoverStrategy( check_interval10, # 秒 max_retry_attempts3, retry_delaytimedelta(seconds1), promote_replicaTrue, # 自动提升副本为主库 notification_callbacklambda event: print(f故障转移事件: {event}) ) pool ConnectionPool.from_config( config/db_cluster.json, failover_strategyfailover_strategy )5.2 连接泄露检测模式在开发环境启用严格泄露检测import atexit from weakref import WeakSet class LeakDetector: _connections WeakSet() classmethod def track(cls, conn): cls._connections.add(conn) return conn classmethod def check_leaks(cls): if cls._connections: print(f警告: 检测到{len(cls._connections)}个未关闭的连接!) for conn in cls._connections: print(f泄露连接创建于: {conn.create_stack}) # 修改获取连接方法 def get_connection(): conn pool.get_connection() return LeakDetector.track(conn) # 应用退出时检查 atexit.register(LeakDetector.check_leaks)5.3 生产环境检查清单[ ] 配置了适当的连接池大小避免超过PostgreSQL的max_connections[ ] 启用了SSL连接加密[ ] 为只读副本配置了专用的数据库用户只有SELECT权限[ ] 设置了合理的连接超时和健康检查间隔[ ] 实现了完善的监控指标连接数、查询延迟、复制延迟等[ ] 定期测试故障转移流程[ ] 对敏感配置进行了加密处理