本文详解如何在 asyncio 中确保子任务(如 `a_task`)在父任务(如 `fetch_values`)完成后立即并发执行,且后续任务(如 `other_task`)严格等待同一批数据的前置任务全部完成——关键在于避免阻塞调用、合理组织 await 链与并发粒度。
在你的原始代码中,看似使用了 async/await,但核心问题在于混用了同步阻塞调用与异步编程模型,导致整个协程被“冻结”,丧失并发能力。具体有两大陷阱:
✅ 正确做法是:
以下是修复后的完整可运行示例:
import asyncio
import pandas as pd
async def execute_check():
print("execute_check")
items = [1, 2, 3, 4]
# 并发启动 4 个独立 item 处理流程
tasks = [fetch_values_and_process(item) for item in items]
await asyncio.gather(*tasks)
async def fetch_values_and_process(
item):
print(f"fetch_values_and_process for item {item}")
values_df = await fetch_values(item) # ✅ 等待本 item 数据就绪
a_values = values_df[values_df["Label"] == "A"]
other_values = values_df[values_df["Label"] != "A"]
# ✅ 对当前 item 的所有 'A' 行,并发执行 a_task
if not a_values.empty:
a_tasks = [a_task(row) for _, row in a_values.iterrows()]
await asyncio.gather(*a_tasks) # ⚠️ 必须 await,否则不执行
# ✅ other_task 严格依赖本 item 的 a_task 全部完成
await other_task(other_values)
async def fetch_values(item):
print(f"fetch_values for item {item}")
await asyncio.sleep(5) # ✅ 异步等待,不阻塞事件循环
# 修正 DataFrame 数据:按 item 区分,避免混淆
return pd.DataFrame({
"Item": [item] * 4,
"Label": ["A", "B", "C", "D"]
})
async def a_task(row):
print(f"a_task for Item={row['Item']}, Label={row['Label']}")
await asyncio.sleep(2)
async def other_task(other_values):
count = len(other_values)
print(f"other_task for {count} non-A rows")
await asyncio.sleep(2)
if __name__ == "__main__":
asyncio.run(execute_check())? 预期输出特征(体现正确并发与依赖):
⚠️ 重要注意事项:
掌握“每个协程封装一个完整业务单元 + 依赖步骤显式 await + 所有延迟用 asyncio.sleep 替代”这三点,就能精准控制异步任务的并发粒度与执行时序。