直接用 queue.Queue 易卡死,因其 get() 默认无限阻塞且无超时/异常穿透机制;asyncio.Queue 需配 timeout 和 task_done;Redis 用 zset + bzpopmin 支持优先级与持久化;须通过 full() 或 zcard 实现反压控制。
queue.Queue 在爬虫里容易卡死多线程爬虫中,如果直接用标准库的 queue.Queue 做任务分发,常出现消费者线程全部阻塞在 get()、生产者却因异常退出而不再放新任务——队列既没满也没空,但整个调度就僵住了。根本原因是它默认的阻塞行为缺乏超时兜底和异常穿透机制。
get(block=True) 会无限等待,一旦上游
断流,线程就挂起不响应中断asyncio.Queue 实现轻量异步调度的关键配置对中小规模 HTTP 爬取(比如每秒 10–50 请求),asyncio.Queue 比线程队列更省资源,但必须显式控制生命周期,否则协程会泄漏。
import asyncioasync def worker(queue: asyncio.Queue, session): while True: try: url = await asyncio.wait_for(queue.get(), timeout=3.0) # 必须设超时 async with session.get(url) as resp:
处理响应...
queue.task_done() # 必须调用,否则 join() 不返回 except asyncio.TimeoutError: break # 超时即退出,避免死循环 except Exception as e: print(f"Worker error on {url}: {e}") queue.task_done() # 错误也要标记完成,否则队列卡住
asyncio.wait_for(..., timeout=...) 是刚需,不能依赖 get_nowait() —— 它抛 queue.Empty 异常,但协程里没地方 catchget() 后必须配对 task_done(),哪怕出错也要调,否则 queue.join() 永远不结束await queue.put(...) 回填重试任务——容易引发循环等待,应由独立的 retry manager 处理Redis 的最小可行方案当爬虫要跑几天、或需横向加机器时,内存队列不可靠。用 redis-py 的 lpop/rpush 组合比引入 Celery 更轻,且天然支持失败重入队。
import redis import jsonr = redis.Redis()
def add_task(url: str, priority: int = 0): payload = json.dumps({"url": url, "retry": 0}) r.zadd("pending_tasks", {payload: priority}) # 用有序集合支持优先级
def get_task(timeout=1) -> dict | None:
阻塞式取一个,超时返回 None
result = r.bzpopmin("pending_tasks", timeout=timeout) if result: return json.loads(result[1]) return None
list 类型的 lpop —— 无法去重、不支持优先级、无超时原语;zset 或 stream 更稳妥bzpopmin 是原子操作,避免“取到但崩溃未处理”导致任务丢失retry 字段,失败时 r.zadd("pending_tasks", {payload: time.time() + 60}) 实现指数退避很多爬虫把 URL 批量塞进队列就不管了,结果内存暴涨 OOM。真正的调度必须让生产者知道“慢点来”。
queue.qsize() 做阈值判断不可靠(多线程下非原子),改用 queue.full() + time.sleep() 组合put() 前加 if queue.qsize() > MAX_SIZE: await asyncio.sleep(0.1)
r.zcard("pending_tasks") 监控积压量,超过阈值则暂停解析新页面链接队列不是管道,是缓冲区;缓冲区满了还硬塞,系统就从调度问题变成运维事故。