17370845950

如何正确实现 Python 异步任务的依赖顺序与并发执行

本文详解如何在 asyncio 中确保子任务(如 `a_task`)在父任务(如 `fetch_values`)完成后立即并发启动,且后续任务(如 `other_task`)严格等待同一批数据的前置任务全部完成,避免因阻塞调用和错误 await 导致的串行化问题。

在 Python 的异步编程中,一个常见误区是混淆同步阻塞异步挂起——例如使用 time.sleep() 会完全阻塞事件循环,导致所有协程“假并发”;而真正的并发必须依赖 await asyncio.sleep() 或其他 awaitable 对象(如 aiohttp.get()、asyncpg.fetch() 等)。此外,任务依赖关系必须通过 await 显式表达:只有当 fetch_values() 完成并返回结果后,才能基于该结果派生 a_task 和 other_task,且 other_task 必须在同 item 的全部 a_task 结束后才执行。

以下是修正后的完整可运行示例,已修复三大核心问题:

  1. ✅ 将 time.sleep() 替换为 await asyncio.sleep(),释放事件循环控制权;
  2. ✅ 确保 fetch_values() 被 await,使 a_values/other_values 分析基于真实返回数据;
  3. ✅ 将 a_task 的并发执行与 other_task 的串行依赖(按 item 隔离)严格对齐——即每个 item 内部形成 fetch → [a_task

    × N] → other_task 的原子链,不同 item 之间完全并发。
import asyncio
import pandas as pd


async def execute_check():
    print("execute_check")
    items = [1, 2, 3, 4]
    # 并发启动 4 个独立 item 处理流程
    tasks = [fetch_values_and_process(item) for item in items]
    await asyncio.gather(*tasks)


async def fetch_values_and_process(item):
    print(f"fetch_values_and_process for item {item}")
    values_df = await fetch_values(item)  # ✅ 关键:await 确保数据就绪

    a_values = values_df[values_df["Label"] == "A"]
    other_values = values_df[values_df["Label"] != "A"]

    # 同一 item 的所有 a_task 并发执行
    if not a_values.empty:
        a_tasks = [a_task(row) for _, row in a_values.iterrows()]
        await asyncio.gather(*a_tasks)  # ✅ 所有 a_task 完成后才继续

    # other_task 严格依赖本 item 的全部 a_task 完成
    await other_task(other_values)


async def fetch_values(item):
    print(f"fetch_values for item {item}")
    await asyncio.sleep(5)  # ✅ 非阻塞挂起,允许其他协程运行
    # 为区分 item,使用 item 值填充 DataFrame(原代码逻辑有歧义)
    return pd.DataFrame({"Item": [item] * 4, "Label": ["A", "B", "C", "D"]})


async def a_task(row):
    print(f"a_task for item {row['Item']}, label {row['Label']}")
    await asyncio.sleep(2)


async def other_task(other_values):
    count = len(other_values)
    print(f"other_task processing {count} non-A rows")
    await asyncio.sleep(2)


if __name__ == "__main__":
    asyncio.run(execute_check())

预期输出特征(体现正确并发与依赖):

  • 四组 fetch_values for item X 几乎同时打印(因 asyncio.gather 并发启动);
  • 每组 fetch 完成后,立即并发打印其对应的 a_task(如 item 1 的 a_task 不等待 item 2);
  • 每个 item 的 other_task 仅在其自身 a_task 全部结束后触发;
  • 总耗时约 7 秒左右(最长路径:5s fetch + 2s a_task/other_task 串行),而非原始代码的 ~14s(因 time.sleep 强制串行)。

关键注意事项:

  • ❌ 切勿在协程中使用 time.sleep()、requests.get()、pandas.read_csv() 等同步阻塞操作——它们会冻结整个事件循环;
  • ✅ I/O 密集型操作请改用异步库(如 aiohttp、aiosqlite、asyncpg),CPU 密集型任务应使用 loop.run_in_executor() 脱离事件循环;
  • ✅ asyncio.gather() 是并发执行多个协程的首选,但需确保所有参数均为 awaitable;
  • ? 若需更复杂的依赖图(如跨 item 协调),可考虑 asyncio.Semaphore、asyncio.Event 或 asyncio.Queue 进行同步控制。

掌握“await 显式声明依赖”与“非阻塞 I/O 替代同步调用”这两条原则,即可构建真正高效、可预测的异步数据处理流水线。