应使用 asyncio.wait + done/pending 分离处理:启动 task 后调用 asyncio.wait(tasks, timeout=5.0),从 done 提取结果,对 pending 显式 cancel;封装为 timeout_gather 函数可复用,注意协程内捕获 CancelledError 做清理。
直接用 asyncio.wait_for 包裹整个 asyncio.gather,会导致超时后所有子任务一并被 cancel——这通常不是你想要的。真正需要的是:只中断超时的那几个,其余已完成或正在运行的任务结果照常返回。
asyncio.wait 不会主动 cancel 未完成任务,它只帮你区分哪些已结束、哪些还在跑。结合 return_when=asyncio.FIRST_COMPLETED 或手动控制超时,就能实现“只放弃超时项,保留已有结果”。
tasks = [asyncio.create_task(coro) for coro in coros]
asyncio.wait(tasks, timeout=5.0) 等待指定时间done 集合中用 task.result() 提取成功结果;对 pending 中的 task 显式调用 task.cancel()(仅清理,不影响已 done 的)pending 里的 task 若后续自行完成,其结果不会自动加入返回值——你得决定是否 await 它们(通常不等,否则就失去超时意义)下面这个轻量封装能替代原生 gather,行为更可控:
async def timeout_gather(*coros, timeout=None):
if timeout is None:
return await asyncio.gather(*coros)
tasks = [asyncio.create_task(c) for c in coros]
done, pending = await asyncio.wait(tasks, timeout=timeout)
results = []
for t in done:
try:
results.append(t.re
sult())
except Exception as e:
results.append(e) # 保留异常,而非抛出
for t in pending:
t.cancel()
return results
调用方式和 gather 一致:results = await timeout_gather(coro_a(), coro_b(), timeout=3.0)。每个位置的结果对应原始顺序,失败/超时项以异常对象或 None 占位(可根据需要调整)。
被 cancel 的 task 如果在 await 某个 IO 操作(如 asyncio.sleep、HTTP 请求),会抛出 CancelledError。若没在协程内捕获,可能污染日志或导致未关闭连接。
try/except CancelledError 块,做必要清理(如关闭 client session)asyncio.shield() 不能用于防 cancel——它只防外层取消,对 wait 中的 pending 无作用超时逻辑本质是“放弃等待”,不是“强制终止执行”。真要中止不可中断的计算,得靠协程自己轮询 asyncio.current_task().cancelled() 并主动退出。