异步 Session 必须用 AsyncSession 和 async_sessionmaker,不能复用同步 sessionmaker;事务需显式 await session.begin() 或用 async with session.begin(),expire_on_commit=False 防止提交后字段为 None。
AsyncSession,不能复用同步 sessionmaker
SQLAlchemy 2.x 的异步支持是彻底分离的:同步用 sessionmaker,异步必须用 async_sessionmaker。直接把同步工厂传给 async 会报 
RuntimeError: This event loop is already running 或静默失败。
正确写法是:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker from sqlalchemy import textengine = create_async_engine("sqlite+aiosqlite:///db.sqlite", echo=True) AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
✅ 正确:await 获取 session 实例
async def get_user(user_id: int): async with AsyncSessionLocal() as session: result = await session.execute(text("SELECT * FROM users WHERE id = :id"), {"id": user_id}) return result.fetchone()
expire_on_commit=False 很关键——否则 commit() 后对象字段变 None,尤其在返回 Pydantic 模型时直接崩create_async_engine 必须用带 aiosqlite、asyncpg 或 aiomysql 的 URL,sqlite:/// 这种同步驱动不支持异步async with 外保存 session 引用,它不是线程/协程安全的await session.begin() 或用 async with session.begin()
异步 Session 不会自动开启事务。执行 session.execute() 或 session.add() 时,若没处于事务中,SQLAlchemy 会临时开一个只读事务(对写操作无效),导致 commit 报错 InvalidRequestError: No transaction is in progress。
两种可靠方式:
# 方式一:显式 begin + commit/rollback(适合复杂控制流)
async def transfer_money(from_id: int, to_id: int, amount: float):
async with AsyncSessionLocal() as session:
await session.begin() # ⚠️ 必须 await
try:
await session.execute(
text("UPDATE accounts SET balance = balance - :amt WHERE id = :id"),
{"amt": amount, "id": from_id}
)
await session.execute(
text("UPDATE accounts SET balance = balance + :amt WHERE id = :id"),
{"amt": amount, "id": to_id}
)
await session.commit()
except Exception:
await session.rollback()
raise
方式二:async with session.begin()(更简洁,自动 rollback on exception)
async def create_post(title: str, content: str):
async with AsyncSessionLocal() as session:
async with session.begin(): # ✅ 自动 commit,异常时自动 rollback
session.add(Post(title=title, content=content))
session.begin() 是协程函数,必须 await;漏掉 await 会导致后续操作在无事务上下文中执行async with session.begin() 内部已包含 await session.begin(),无需再手动调session.rollback() 和 session.commit() 也必须 await,否则事务挂起不提交async with 事务会报 InvalidRequestError: Transaction is already begun
SQLAlchemy 2.x 异步不支持真正的嵌套事务(savepoint 是另一回事)。如果外层用了 async with session.begin(),内层再 async with session.begin() 就会触发该错误。
需要“子事务”语义时,改用 session.begin_nested():
async def outer_logic():
async with AsyncSessionLocal() as session:
async with session.begin():
await session.execute(text("INSERT INTO logs (msg) VALUES ('start')"))
# 子事务:失败不影响外层
nested = await session.begin_nested() # ✅ 返回 SavepointTransaction
try:
await session.execute(text("INSERT INTO users (name) VALUES ('test')"))
await nested.commit() # 提交 savepoint
except Exception:
await nested.rollback() # 回滚到 savepoint,外层仍可 commit
begin_nested() 返回的是 SavepointTransaction,不是新 AsyncSession,所有操作仍在同一 session 中savepoint 在 PostgreSQL / MySQL 上有效,SQLite 的 aiosqlite 不支持(会静默退化为普通事务)session.begin_nested() 和 async_sessionmaker(begin_nested=True) —— 后者无效,参数只存在于同步版AsyncSession 要注意生命周期和 scopeFastAPI 的 Depends 默认每次请求新建依赖,但如果你手动 yield 一个未关闭的 AsyncSession,或在中间件里缓存 session 实例,容易引发 ResourceClosedError 或连接泄漏。
安全做法是让依赖函数本身负责 async with 并 yield:
from fastapi import Dependsasync def get_db(): async with AsyncSessionLocal() as session: yield session # ✅ yield session,而非 yield AsyncSessionLocal()
@app.get("/users/{id}") async def read_user(id: int, session: AsyncSession = Depends(get_db)): result = await session.execute(select(User).where(User.id == id)) return result.scalar_one_or_none()
async with 确保 session 在请求结束时自动 closereturn session —— 那样 session 会在依赖返回后立即被 GC 关闭,后续使用报 ResourceClosedError
Depends 共享 session(比如 repo 层 + service 层),必须用同一个 get_db 实例,不能定义两个独立依赖实际用的时候,最常踩的坑是忘了 await 所有 session 方法,以及误以为 async_sessionmaker 创建的是“自动事务 session”。事务边界永远要自己画清楚,async 版本不会替你猜。