本文详解如何在 asyncio 中确保子任务(如 `a_task`)在父任务(如 `fetch_values`)完成后立即并发启动,且后续任务(如 `other_task`)严格等待同一批数据的前置任务全部完成,避免因阻塞调用和错误 await 导致的串行化问题。
在 Python 的异步编程中,一个常见误区是混淆同步阻塞与异步挂起——例如使用 time.sleep() 会完全阻塞事件循环,导致所有协程“假并发”;而真正的并发必须依赖 await asyncio.sleep() 或其他 awaitable 对象(如 aiohttp.get()、asyncpg.fetch() 等)。此外,任务依赖关系必须通过 await 显式表达:只有当 fetch_values() 完成并返回结果后,才能基于该结果派生 a_task 和 other_task,且 other_task 必须在同 item 的全部 a_task 结束后才执行。
以下是修正后的完整可运行示例,已修复三大核心问题:

import asyncio
import pandas as pd
async def execute_check():
print("execute_check")
items = [1, 2, 3, 4]
# 并发启动 4 个独立 item 处理流程
tasks = [fetch_values_and_process(item) for item in items]
await asyncio.gather(*tasks)
async def fetch_values_and_process(item):
print(f"fetch_values_and_process for item {item}")
values_df = await fetch_values(item) # ✅ 关键:await 确保数据就绪
a_values = values_df[values_df["Label"] == "A"]
other_values = values_df[values_df["Label"] != "A"]
# 同一 item 的所有 a_task 并发执行
if not a_values.empty:
a_tasks = [a_task(row) for _, row in a_values.iterrows()]
await asyncio.gather(*a_tasks) # ✅ 所有 a_task 完成后才继续
# other_task 严格依赖本 item 的全部 a_task 完成
await other_task(other_values)
async def fetch_values(item):
print(f"fetch_values for item {item}")
await asyncio.sleep(5) # ✅ 非阻塞挂起,允许其他协程运行
# 为区分 item,使用 item 值填充 DataFrame(原代码逻辑有歧义)
return pd.DataFrame({"Item": [item] * 4, "Label": ["A", "B", "C", "D"]})
async def a_task(row):
print(f"a_task for item {row['Item']}, label {row['Label']}")
await asyncio.sleep(2)
async def other_task(other_values):
count = len(other_values)
print(f"other_task processing {count} non-A rows")
await asyncio.sleep(2)
if __name__ == "__main__":
asyncio.run(execute_check())预期输出特征(体现正确并发与依赖):
关键注意事项:
掌握“await 显式声明依赖”与“非阻塞 I/O 替代同步调用”这两条原则,即可构建真正高效、可预测的异步数据处理流水线。