17370845950

asyncio.timeout() 如何与 asyncio.wait() 组合使用不泄漏任务
asyncio.timeout() 不能直接包裹 asyncio.wait(),因其只限制 wait() 调用等待时间,不中断已提交的 pending task;正确做法是用 wait() 的 timeout 参数获取 done/pending,再显式 cancel 并 await gather(pending, return_exceptions=True) 清理。

asyncio.timeout() 不能直接包裹 asyncio.wait() 调用

因为 asyncio.wait() 本身返回的是 (done, pending) 集合,并不 await 任何协程,所以把 asyncio.timeout() 放在它外面(比如 async with asyncio.timeout(1): await asyncio.wait(...))根本不会中断内部仍在运行的 task —— 它只限制了 wait() 这个函数调用本身的等待时间,而那些已提交但未完成的 task 会继续后台执行,造成任务泄漏。

必须显式取消 pending 任务并 awai

t 其 cleanup

正确做法是:先用 asyncio.wait()timeout 参数获取结果,再检查哪些 task 还在 pending 中,手动调用 .cancel(),然后 await asyncio.gather(*pending, return_exceptions=True) 确保它们真正结束。

  • asyncio.wait()timeout 参数是“最多等多久”,不是“强制中断”,它只影响等待行为,不触达 task 内部
  • task.cancel() 只是设 flag,需后续 await taskgather 才真正让协程退出(比如响应 CancelledError
  • return_exceptions=True 是必须的,否则任意一个 task 因 cancel 抛出 CancelledError 会让整个 gather 中断

推荐封装成可复用的 timeout_wait 函数

避免每次重复写 cancel + gather 逻辑。例如:

async def timeout_wait(fs, *, timeout):
    done, pending = await asyncio.wait(fs, timeout=timeout)
    if pending:
        for t in pending:
            t.cancel()
        await asyncio.gather(*pending, return_exceptions=True)
    return done, pending

注意:fs 应该是 asyncio.Task 实例列表(不是协程对象),否则 task.cancel() 无效;如果传入的是协程,先用 asyncio.create_task() 包一层。

asyncio.timeout() 更适合包裹单个协程,而非 wait 场景

如果你实际只需要等「任意一个完成」或「全部完成」且有超时控制,优先考虑 asyncio.wait_for()asyncio.shield() + 显式 cancel 模式。例如:

  • 等第一个完成:await asyncio.wait_for(asyncio.shield(your_coro), timeout=1)
  • 等全部完成但整体限时:await asyncio.wait_for(asyncio.gather(*coros), timeout=1)
  • asyncio.timeout() 最自然的用法是 async with asyncio.timeout(1): await some_coro()

混合使用时最容易忽略的是:cancel 后没 await cleanup,导致 task 状态停留在 CANCELLED 但协程栈未退出,event loop 里残留引用,下次 run_until_complete 可能报 Task was destroyed but it is pending!