1. 项目概述与核心价值最近在搞一个需要快速迭代的后台服务数据模型复杂并发请求还不少。选型的时候我又一次把目光投向了 FastAPI SQLAlchemy 这套黄金组合。不过这次我想玩点不一样的——把同步的数据库驱动彻底换掉拥抱全异步。在网上搜了一圈发现了一个挺有意思的仓库grillazz/fastapi-sqlalchemy-asyncpg。这名字一看就直白FastAPI 做 Web 框架SQLAlchemy 做 ORMAsyncpg 做异步 PostgreSQL 驱动目标就是构建一个高性能、全异步的 Python 后端服务模板。这个项目本质上不是一个库而是一个精心设计的项目脚手架或者说最佳实践示例。它解决了一个很实际的问题当我们想用 FastAPI 和 SQLAlchemy 2.0 的异步特性时如何正确地组织代码结构处理好数据库会话Session的生命周期、依赖注入以及如何与像 Alembic 这样的迁移工具无缝协作。很多新手甚至是有经验的开发者在初次尝试将 SQLAlchemy 异步模式集成到 FastAPI 中时都会在async_sessionmaker的创建、scoped_session的替代方案、以及如何避免在异步上下文中错误地使用同步连接这些问题上踩坑。这个项目模板把这些“坑”都提前填平了给出了一套经过验证的、生产可用的代码组织方案。它适合正在从同步 Flask/SQLAlchemy 架构向异步 FastAPI 迁移的团队也适合那些希望从一开始就采用清晰、健壮架构来启动新项目的开发者。通过研究这个模板你不仅能快速搭建起服务更能深入理解 FastAPI 的依赖注入系统与 SQLAlchemy 异步会话管理是如何优雅结合的。2. 技术栈深度解析与选型理由2.1 为什么是 FastAPI SQLAlchemy 2.0 Asyncpg这个组合的每一个选择都经过了深思熟虑目标直指现代 Python 后端的高并发与开发效率。FastAPI的地位已经无需多言。基于 Pydantic 的自动请求/响应验证、基于 Python 类型提示的极致开发体验、自动生成的交互式 API 文档Swagger UI 和 ReDoc以及底层基于 Starlette 带来的高性能异步支持让它成为了构建 API 的首选。对于需要频繁与前端或其他服务交互的应用FastAPI 能极大减少样板代码和潜在的类型错误。SQLAlchemy 2.0是一个里程碑式的版本其核心变革就是原生、一流的异步支持asyncio。在 1.x 时代使用异步需要借助greenlet等第三方库存在一定的复杂性和性能损耗。2.0 版本通过asyncpg、aiomysql等方言dialect直接提供了异步核心使得异步查询能真正释放出事件循环的威力。它的 ORM 层依然强大声明式模型、关系管理、查询构建等功能完整迁移到了异步世界保证了开发体验的连续性。Asyncpg是针对 PostgreSQL 的高性能异步驱动。与传统的psycopg2通过asyncpg或psycopg2-binary配合sqlalchemy.ext.asyncio的适配层使用相比asyncpg是直接用 Python 的asyncio协议实现的没有中间层转换因此性能极高特别是在处理大量小查询或复杂事务时。它直接理解 PostgreSQL 的二进制协议能高效地序列化/反序列化数据。对于确定使用 PostgreSQL 且追求极致性能的项目asyncpg是不二之选。这个技术栈的协同效应非常明显FastAPI 处理高并发 HTTP 请求每个请求在独立的asyncio.Task中运行当需要访问数据库时通过 SQLAlchemy 2.0 的异步引擎将 IO 操作委托给asyncpg此时事件循环可以腾出手来处理其他请求的运算或新的 IO 事件从而实现真正的并发有效提升系统吞吐量尤其是在 IO 密集型的场景下。2.2 项目模板解决了哪些架构痛点直接裸用 FastAPI 和 SQLAlchemy 异步你很快会遇到几个棘手的问题会话管理混乱在哪个地方创建AsyncSession如何在每个请求开始时获取会话在请求结束时确保它被正确关闭并归还到连接池手动管理极易导致会话泄露或过早关闭。依赖注入集成如何将数据库会话优雅地注入到各个路径操作函数中如何保证在同一个请求的多个依赖项中使用的是同一个会话实例这对于事务一致性很重要事务边界界定业务逻辑应该在哪里开启和提交事务是在路由层面还是在服务层错误时如何回滚与 Alembic 协作Alembic 是 SQLAlchemy 的黄金搭档但它的命令行工具和迁移脚本默认是同步的。如何配置 Alembic 使其能运行在异步环境中执行我们的异步模型定义配置管理数据库连接字符串、连接池大小等配置如何从环境变量或配置文件中安全、灵活地加载代码组织模型Models、数据库操作CRUD、路由Routers、服务Services应该如何分层如何避免循环导入grillazz/fastapi-sqlalchemy-asyncpg这个模板项目正是通过一套清晰的目录结构和代码模式系统地回答了以上所有问题。它不是一个封装好的框架而是一个“最佳实践”的展示你可以完全理解其每一行代码的意图并根据自己项目的需要进行调整。3. 项目结构拆解与核心模块实现让我们深入项目内部看看它是如何组织的。一个典型的结构可能如下根据具体 commit 略有不同但核心思想一致fastapi-sqlalchemy-asyncpg/ ├── app/ │ ├── __init__.py │ ├── main.py # FastAPI 应用创建和生命周期事件处理 │ ├── core/ │ │ ├── __init__.py │ │ ├── config.py # 配置管理Pydantic Settings │ │ └── database.py # 异步引擎、会话工厂、依赖注入定义 │ ├── models/ │ │ ├── __init__.py # 导入所有模型方便 Alembic 发现 │ │ └── user.py # 示例用户模型 │ ├── schemas/ │ │ └── user.py # Pydantic 模型用于请求/响应验证 │ ├── crud/ │ │ └── user.py # 数据库原子操作层CRUD │ ├── api/ │ │ ├── __init__.py │ │ ├── deps.py # 可能存放其他依赖项如获取当前用户 │ │ └── v1/ │ │ ├── __init__.py │ │ ├── endpoints/ │ │ │ └── users.py # 具体的路由处理函数 │ │ └── router.py # API 路由注册 │ └── services/ # 可选业务逻辑层 │ └── user_service.py ├── alembic/ # Alembic 迁移目录 │ ├── versions/ │ ├── env.py # 修改为支持异步 │ └── alembic.ini ├── tests/ # 测试目录 ├── requirements.txt ├── .env.example └── README.md3.1 配置管理 (core/config.py)使用 Pydantic 的BaseSettings来管理配置是 FastAPI 社区的标配。它支持从环境变量、.env文件自动加载并做类型验证。from pydantic_settings import BaseSettings from pydantic import PostgresDsn class Settings(BaseSettings): PROJECT_NAME: str My FastAPI Async Project API_V1_STR: str /api/v1 # 数据库配置 POSTGRES_SERVER: str POSTGRES_USER: str POSTGRES_PASSWORD: str POSTGRES_DB: str POSTGRES_PORT: str 5432 # 构造数据库 URL使用 asyncpg 驱动 property def SQLALCHEMY_DATABASE_URI(self) - PostgresDsn: return PostgresDsn.build( schemepostgresqlasyncpg, usernameself.POSTGRES_USER, passwordself.POSTGRES_PASSWORD, hostself.POSTGRES_SERVER, portself.POSTGRES_PORT, pathself.POSTGRES_DB, ) # 连接池配置 POOL_SIZE: int 20 MAX_OVERFLOW: int 10 POOL_PRE_PING: bool True # 推荐开启检查连接是否存活 POOL_RECYCLE: int 3600 # 连接回收时间秒 class Config: env_file .env case_sensitive True settings Settings()注意这里使用了postgresqlasyncpg作为 scheme。POOL_PRE_PING在异步环境中尤为重要因为网络或数据库的临时中断可能导致连接池中的连接失效这个选项会在每次从池中取连接时执行一个轻量级查询如SELECT 1来验证连接有效性避免在业务逻辑中抛出连接错误。3.2 数据库核心 (core/database.py)这是整个模板的灵魂它创建了 SQLAlchemy 的异步引擎和会话工厂并定义了 FastAPI 的依赖项。from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.orm import declarative_base from app.core.config import settings # 1. 创建异步引擎 # echoTrue 在开发时很有用可以打印所有SQL语句 engine create_async_engine( settings.SQLALCHEMY_DATABASE_URI, echoTrue, pool_sizesettings.POOL_SIZE, max_overflowsettings.MAX_OVERFLOW, pool_pre_pingsettings.POOL_PRE_PING, pool_recyclesettings.POOL_RECYCLE, ) # 2. 创建异步会话工厂 # async_sessionmaker 替代了同步世界的 sessionmaker # class_AsyncSession 指定生成的会话类 # expire_on_commitFalse 是异步会话的推荐设置避免在commit后访问延迟加载的属性时出现问题。 AsyncSessionLocal async_sessionmaker( bindengine, class_AsyncSession, expire_on_commitFalse, autocommitFalse, autoflushFalse, ) # 3. 声明基类所有模型都继承自它 Base declarative_base() # 4. 定义FastAPI依赖项 async def get_db() - AsyncSession: 依赖项函数用于在路径操作函数中注入数据库会话。 它为每个请求创建一个新的 AsyncSession并在请求结束时确保关闭。 async with AsyncSessionLocal() as session: try: yield session # 请求正常处理完毕在这里可以执行session.commit()但更推荐在业务层控制事务。 # await session.commit() except Exception as e: # 如果发生异常回滚事务 await session.rollback() raise finally: # 确保会话被关闭归还连接到池中 await session.close()关键点解析async_sessionmaker这是一个工厂每次调用AsyncSessionLocal()都会产生一个独立的AsyncSession对象。它本身不是会话而是创建会话的工具。expire_on_commitFalse在同步 SQLAlchemy 中默认行为是提交后会话中所有实例过期再次访问属性会触发延迟加载lazy load。在异步环境中延迟加载可能会引发greenlet错误或需要在await上下文中处理变得复杂。设置为False后提交后实例状态保持不变行为更可预测。但开发者需要更主动地管理对象状态比如在更新后重新从数据库加载。get_db依赖项这是连接 FastAPI 和 SQLAlchemy 的桥梁。FastAPI 会为每个请求运行这个异步生成器 (async generator)。yield session将会话对象提供给路径操作函数使用。finally块确保无论请求处理成功还是异常会话都会被关闭。事务的提交和回滚通常不在这个通用依赖项中完成而是交给具体的业务逻辑CRUD层或服务层来控制这样更灵活。3.3 模型定义 (models/user.py)使用 SQLAlchemy 2.0 的声明式映射。from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy.sql import func from app.core.database import Base class User(Base): __tablename__ users id Column(Integer, primary_keyTrue, indexTrue) email Column(String, uniqueTrue, indexTrue, nullableFalse) hashed_password Column(String, nullableFalse) full_name Column(String) created_at Column(DateTime(timezoneTrue), server_defaultfunc.now()) updated_at Column(DateTime(timezoneTrue), onupdatefunc.now()) # 注意在异步ORM中定义关系relationship需要格外小心。 # 必须使用 lazyraise 或 lazyselectin 等异步安全的加载策略避免默认的 lazyselect。 # from sqlalchemy.orm import relationship # items relationship(Item, back_populatesowner, lazyselectin)3.4 CRUD 层 (crud/user.py)这是与数据库直接交互的一层封装原子操作。所有数据库操作都必须使用await。from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.user import User from app.schemas.user import UserCreate class CRUDUser: async def get(self, db: AsyncSession, user_id: int) - User | None: 根据ID获取用户 result await db.execute(select(User).where(User.id user_id)) return result.scalar_one_or_none() async def get_by_email(self, db: AsyncSession, email: str) - User | None: 根据邮箱获取用户 result await db.execute(select(User).where(User.email email)) return result.scalar_one_or_none() async def create(self, db: AsyncSession, *, obj_in: UserCreate) - User: 创建用户 # 这里应该包含密码哈希的逻辑示例中省略 db_user User( emailobj_in.email, hashed_passwordfake_hashed_ obj_in.password, # 请使用如passlib库进行哈希 full_nameobj_in.full_name, ) db.add(db_user) await db.commit() # 注意这里提交了事务 await db.refresh(db_user) # 刷新以获取数据库生成的默认值如id, created_at return db_user async def update(self, db: AsyncSession, *, db_obj: User, obj_in: dict) - User: 更新用户信息 for field, value in obj_in.items(): if value is not None: setattr(db_obj, field, value) db.add(db_obj) await db.commit() await db.refresh(db_obj) return db_obj user CRUDUser()实操心得await db.execute(select(...))是执行查询的标准方式。result.scalar_one_or_none()用于获取单个结果或 None。事务控制在create和update方法中我们直接进行了await db.commit()。这意味着每个create或update调用都是一个独立的事务。对于更复杂的业务逻辑比如创建用户的同时初始化其配置你可能需要在服务层管理一个更大的事务范围这时 CRUD 方法就不应提交而是由上层调用commit。await db.refresh()在插入或更新后立即从数据库重新加载该对象确保你拿到的是最新的数据包括任何由数据库生成的默认值如自增ID、server_default的时间戳。3.5 路由与端点 (api/v1/endpoints/users.py)在这里FastAPI 的依赖注入系统大显身手。from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from app import crud, schemas from app.core.database import get_db router APIRouter() router.post(/, response_modelschemas.User) async def create_user( *, db: AsyncSession Depends(get_db), # 注入数据库会话 user_in: schemas.UserCreate, ): 创建新用户。 # 检查邮箱是否已存在 user await crud.user.get_by_email(db, emailuser_in.email) if user: raise HTTPException( status_codestatus.HTTP_400_BAD_REQUEST, detailA user with this email already exists., ) # 调用CRUD层创建用户 user await crud.user.create(dbdb, obj_inuser_in) return user router.get(/{user_id}, response_modelschemas.User) async def read_user( user_id: int, db: AsyncSession Depends(get_db), ): 根据ID获取用户信息。 user await crud.user.get(db, user_iduser_id) if not user: raise HTTPException( status_codestatus.HTTP_404_NOT_FOUND, detailUser not found, ) return user可以看到路径操作函数非常干净。它只负责 HTTP 层面的逻辑参数验证、简单的业务规则判断如重复检查、调用下层服务、处理 HTTP 异常和返回响应。数据库会话通过Depends(get_db)自动注入你无需关心它的创建和关闭。3.6 应用主入口与生命周期 (app/main.py)这里创建 FastAPI 应用实例并设置启动和关闭事件。from fastapi import FastAPI from app.core.config import settings from app.api.v1.router import api_router from app.core.database import engine, Base app FastAPI(titlesettings.PROJECT_NAME) # 包含API路由 app.include_router(api_router, prefixsettings.API_V1_STR) app.on_event(startup) async def startup_event(): 应用启动时执行。 注意通常不建议在这里用 Base.metadata.create_all 创建表。 表结构管理应该交给 Alembic 迁移工具。 这里可以连接外部服务、初始化缓存等。 # 示例可以在这里验证数据库连接 # async with engine.connect() as conn: # await conn.execute(text(SELECT 1)) print(Application startup...) app.on_event(shutdown) async def shutdown_event(): 应用关闭时执行。清理资源如关闭数据库连接池。 await engine.dispose() print(Application shutdown...)重要提示在生产环境中应避免使用Base.metadata.create_all()。Alembic 提供了版本控制的、可重复的迁移能力是管理数据库模式变更的正确工具。启动事件更适合做轻量级的健康检查或初始化一些与数据库无关的组件。4. 异步 Alembic 迁移配置详解这是让很多开发者头疼的一步。Alembic 默认是同步的但我们的模型和引擎是异步的。我们需要修改alembic/env.py来适配。# alembic/env.py import asyncio from logging.config import fileConfig from sqlalchemy import pool from sqlalchemy.engine import Connection from sqlalchemy.ext.asyncio import async_engine_from_config from alembic import context from app.core.config import settings from app.models import * # 导入所有模型以便 Alembic 能发现它们 from app.core.database import Base # ... 其他标准配置 ... def run_migrations_offline() - None: 在离线模式下运行迁移。 这里我们仍然使用同步引擎因为离线模式通常只生成SQL文件。 # ... 离线模式配置通常不需要大改... def do_run_migrations(connection: Connection) - None: 实际运行迁移的核心函数。 context.configure( connectionconnection, target_metadataBase.metadata, compare_typeTrue, # 推荐开启能检测字段类型变化 compare_server_defaultTrue, # 推荐开启检测默认值变化 ) with context.begin_transaction(): context.run_migrations() async def run_async_migrations() - None: 在此函数中我们创建异步引擎并运行迁移。 # 从 alembic.ini 读取配置但用我们应用中的异步数据库URL覆盖 configuration config.get_section(config.config_ini_section) configuration[sqlalchemy.url] settings.SQLALCHEMY_DATABASE_URI # 创建异步引擎 connectable async_engine_from_config( configuration, prefixsqlalchemy., poolclasspool.NullPool, # 迁移期间不需要连接池 ) async with connectable.connect() as connection: await connection.run_sync(do_run_migrations) await connectable.dispose() def run_migrations_online() - None: 在在线模式下运行迁移。 asyncio.run(run_async_migrations()) # 关键使用 asyncio.run 启动异步函数 # 根据 context.is_offline_mode 选择执行哪个函数 if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()关键修改点导入模型from app.models import *确保 Alembic 能访问到Base.metadata从而知道要迁移哪些表。异步引擎使用async_engine_from_config创建异步引擎并传入我们的异步数据库 URL。连接适配await connection.run_sync(do_run_migrations)是精髓。do_run_migrations是一个同步函数它需要接收一个同步的Connection对象。run_sync方法将异步连接“适配”成一个同步连接供 Alembic 内部使用。事件循环asyncio.run(run_async_migrations())在在线模式下启动异步迁移流程。配置好后Alembic 命令的使用方式就和同步环境下完全一样了# 生成初始迁移如果是从零开始 alembic revision --autogenerate -m Initial migration # 升级到最新版本 alembic upgrade head # 降级一个版本 alembic downgrade -15. 高级话题与生产环境考量5.1 会话生命周期与事务管理策略在get_db依赖项中我们使用了yield将会话提供给请求。但事务的提交/回滚并没有放在这里这是有意为之的设计。这提供了两种事务管理模式模式A自动提交/回滚在依赖项中控制你可以修改get_db在yield之后没有异常时commit有异常时rollback。这种方式简单每个请求一个事务。但对于需要跨多个 CRUD 操作或调用外部服务的事务它不够灵活且一旦提交后续代码无法回滚。模式B手动控制在业务层控制—— 推荐这也是模板默认隐含的方式。在 CRUD 或 Service 层的方法内部进行commit和rollback。这要求开发者对事务边界有清晰的认识。对于复杂的业务逻辑可以创建一个更高层的“单元 of work”模式async def get_db_transactional() - AsyncSession: 一个用于需要显式事务控制的依赖项 async with AsyncSessionLocal() as session: try: yield session # 不在依赖项中提交由调用方控制 finally: await session.close() router.post(/complex-operation) async def complex_operation(db: AsyncSession Depends(get_db_transactional)): async with db.begin(): # 开启一个事务块 # 执行多个数据库操作 await crud.user.update(...) await crud.item.create(...) # 如果所有操作成功事务块退出时会自动提交 # 如果发生异常会自动回滚 return {msg: success}使用async with db.begin():可以创建一个事务上下文管理器是更清晰的手动控制方式。5.2 性能优化与连接池调优连接池配置 (POOL_SIZE,MAX_OVERFLOW,POOL_RECYCLE) 对性能至关重要。POOL_SIZE连接池中保持的常驻连接数。设置太小高并发时需要频繁创建新连接设置太大浪费数据库资源。根据应用的实际并发度和数据库承受能力调整通常建议在 10-30 之间。MAX_OVERFLOW当池中连接用完时最多可以额外创建的连接数。超过POOL_SIZE MAX_OVERFLOW的请求将会等待或报错。POOL_RECYCLE连接在池中存放的最大时间秒。超过这个时间连接在下一次被检出时会被回收并新建。这可以防止数据库端因长时间空闲而关闭连接导致的“连接失效”问题。通常设置为小于数据库的wait_timeout配置。监控与调试开启echoTrue在开发时很有用但在生产环境一定要关闭。可以考虑使用像sqlalchemy-stubs或数据库自身的监控工具来观察连接池状态和查询性能。5.3 测试策略测试异步应用需要支持asyncio的测试框架如pytest-asyncio。# tests/conftest.py import pytest import asyncio from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from app.core.database import Base, get_db from app.main import app from httpx import AsyncClient # 使用测试数据库URL TEST_DATABASE_URL postgresqlasyncpg://user:passlocalhost/test_db pytest.fixture(scopesession) def event_loop(): 为测试会话创建一个事件循环 loop asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close() pytest.fixture(scopesession) async def test_engine(): 创建测试引擎并创建所有表 engine create_async_engine(TEST_DATABASE_URL, echoFalse) async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) # 清空旧表 await conn.run_sync(Base.metadata.create_all) # 创建新表 yield engine await engine.dispose() pytest.fixture async def db_session(test_engine): 为每个测试函数提供一个干净的会话 async_session async_sessionmaker(test_engine, expire_on_commitFalse) async with async_session() as session: yield session # 每个测试后回滚保证测试隔离 await session.rollback() pytest.fixture def override_get_db(db_session: AsyncSession): 用测试会话覆盖应用的 get_db 依赖 async def _override_get_db(): yield db_session return _override_get_db pytest.fixture async def async_client(override_get_db): 创建测试用的异步HTTP客户端 app.dependency_overrides[get_db] override_get_db async with AsyncClient(appapp, base_urlhttp://test) as ac: yield ac app.dependency_overrides.clear() # tests/test_users.py pytest.mark.asyncio async def test_create_user(async_client: AsyncClient): 测试创建用户接口 response await async_client.post( /api/v1/users/, json{email: testexample.com, password: secret, full_name: Test User} ) assert response.status_code 200 data response.json() assert data[email] testexample.com assert hashed_password not in data # 确保密码哈希值没有泄露测试的关键是隔离每个测试使用独立的事务通过yield session和await session.rollback()实现确保测试之间不互相影响。使用dependency_overrides可以方便地替换掉真实的数据库依赖。6. 常见陷阱、问题排查与经验总结6.1 “这个连接正在被使用”错误问题在日志中看到This connection is still associated with an open transaction; you must close that transaction before closing the connection或类似的错误。原因这通常是因为会话Session没有在请求结束时被正确关闭。可能你在某个地方手动创建了AsyncSession但没有使用async with上下文管理器或者在异常路径下没有执行到finally块中的session.close()。排查确保所有获取会话的地方都使用了async with AsyncSessionLocal() as session:模式。检查自定义的依赖项或中间件确保它们正确地处理了会话生命周期。如果使用了yield模式的依赖项如get_db确保finally块中的await session.close()一定会被执行。6.2 异步上下文中的延迟加载Lazy Load问题问题在视图函数返回一个包含关系属性的 ORM 对象后序列化如通过jsonable_encoder时触发延迟加载导致RuntimeError: greenlet_spawn has not been called或类似的异步错误。原因SQLAlchemy 的关系默认使用lazyselect当访问未加载的关系属性时它会尝试执行一个新的查询。但在异步环境中这个查询需要在await上下文中执行而序列化过程通常不在await上下文中。解决方案急切加载Eager Loading在查询时使用selectinload或joinedload一次性加载所需关系。from sqlalchemy.orm import selectinload stmt select(User).options(selectinload(User.items)).where(User.id user_id) result await db.execute(stmt) user result.scalar_one() # 现在访问 user.items 不会触发额外的查询修改关系定义在模型关系上设置lazyraise或lazyselectin避免意外的延迟加载。class User(Base): # ... items relationship(Item, back_populatesowner, lazyselectin)使用 Pydantic 响应模型在 FastAPI 的response_model中使用 Pydantic 模型来定义返回的数据结构而不是直接返回 ORM 对象。Pydantic 模型会从 ORM 对象中提取指定的字段不会触发未加载的关系。6.3 连接池耗尽问题应用运行一段时间后出现大量超时或TimeoutError: QueuePool limit of size X overflow Y reached错误。原因数据库连接没有及时归还到池中。最常见的原因是会话没有关闭“连接泄露”。也可能是POOL_SIZE设置过小无法应对实际并发量。排查与解决检查会话关闭这是首要怀疑点。确保所有代码路径都会关闭会话。调整连接池参数适当增加POOL_SIZE和MAX_OVERFLOW。但不要盲目调大要结合数据库服务器的max_connections参数。启用POOL_PRE_PING这可以自动处理数据库端因超时关闭的连接。监控在数据库端监控活跃连接数看是否与应用配置的池大小匹配。6.4 与同步代码或第三方库的兼容性问题项目中有些遗留的同步代码或第三方库如某些同步的 Redis 客户端、机器学习库需要调用如果在异步视图函数中直接调用会阻塞事件循环。解决方案使用asyncio.to_thread对于 CPU 密集型或会阻塞的 IO 操作可以将其放到一个单独的线程中运行避免阻塞主事件循环。import asyncio import some_sync_library router.get(/slow-sync-operation) async def slow_operation(): # 将同步函数放到线程池中执行 result await asyncio.to_thread(some_sync_library.heavy_computation, arg1, arg2) return {result: result}寻找异步替代品优先寻找支持asyncio的库例如用aioredis替代redis用aiohttp替代requests。重构隔离如果同步代码块很大考虑将其重构为一个独立的微服务或进程通过消息队列如 RabbitMQ或 RPC 与主异步服务通信。6.5 部署注意事项ASGI 服务器不要使用像uvicorn main:app --reload这样的开发服务器直接生产。使用uvicorn配合多进程--workers或gunicorn配合uvicorn worker类。# 使用 gunicorn 管理多个 uvicorn worker gunicorn -w 4 -k uvicorn.workers.UvicornWorker app.main:app注意gunicorn的-k uvicorn.workers.UvicornWorker是关键它告诉gunicorn使用uvicorn的 ASGI worker。数据库连接池与 Worker 数总连接数 ≈POOL_SIZE * worker 数量。确保数据库的max_connections大于这个值。健康检查在 Kubernetes 或 Docker Swarm 中为你的服务配置livenessProbe和readinessProbe指向一个简单的健康检查端点如/health该端点可以快速检查数据库连接。这个grillazz/fastapi-sqlalchemy-asyncpg模板项目为我们展示了一条构建现代、高性能、可维护的 Python 异步后端服务的清晰路径。它不仅仅是代码的堆砌更体现了对框架特性、异步编程模型和软件设计原则的深刻理解。在实际使用中你可以以此为基础根据自己项目的复杂程度引入更高级的特性如基于 Redis 的缓存、Celery 或 ARQ 处理后台任务、更复杂的认证授权机制OAuth2, JWT、以及完善的日志和监控体系。记住好的架构是演进而来的理解其背后的原理比复制粘贴代码更重要。