本文详解如何基于 `aiohttp` 和 `asyncio` 构建线程安全、协程安全的单例 http 缓存类,重点解决并发请求同一 url 时的重复拉取问题,并优化时间精度与资源竞争控制。
在使用 aiohttp 构建异步 HTTP 缓存服务(如配合 Tornado 或 FastAPI)时,一个常见误区是认为“只要用了 async/await 就天然线程安全”。实际上,Python 的 GIL 仅保障纯 Python 字节码层面的线程互斥,但无法阻止 asyncio 任务在单线程内并发修改共享状态(如 dict)所引发的逻辑竞态——尤其是当
多个协程同时检测到缓存过期并触发 _fetch_update(url) 时,可能造成多次重复请求,浪费资源且增加服务压力。
原代码中 self._cache[url] = {...} 虽为原子操作(CPython 中 dict 赋值是线程安全的),但其前置判断 url not in self._cache or ... 与后续写入之间存在时间窗口。若两个协程几乎同时执行该判断,均得出“需更新”结论,则会并发执行两次 session.get(),导致:
这不是内存损坏,却是典型的 “check-then-act” 竞态(TOCTOU),必须通过同步机制消除。
我们不采用全局 asyncio.Lock()(会串行化所有 URL 请求,严重损害并发性能),而是为每个待请求的 URL 动态维护一个 asyncio.Event,实现按 URL 粒度的协同等待:
import asyncio
import logging
import aiohttp
import time
DEFAULT_TIMEOUT = 20
HTTP_READ_TIMEOUT = 1
class HTTPRequestCache:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._cache = {}
cls._instance._time_out = DEFAULT_TIMEOUT
cls._instance._http_read_timeout = HTTP_READ_TIMEOUT
cls._instance._fetching_now = {} # {url: asyncio.Event()}
cls._instance._lock = asyncio.Lock() # 仅用于保护 _fetching_now 字典本身
return cls._instance
async def _fetch_update(self, url):
# Step 1: 获取对 _fetching_now 的独占访问权,检查/注册当前 URL 的 fetch 状态
async with self._lock:
if url in self._fetching_now:
# 另一协程已在处理该 URL → 等待其完成
event = self._fetching_now[url]
await event.wait()
# 检查是否已成功缓存(避免重复等待后仍去请求)
if url in self._cache and self._cache[url]["cached_at"] >= time.monotonic() - self._time_out:
return
else:
# 首次标记该 URL 正在被获取
self._fetching_now[url] = asyncio.Event()
# Step 2: 执行实际 HTTP 请求(此时无锁,允许多 URL 并发)
try:
async with aiohttp.ClientSession() as session:
logging.info(f"Fetching {url}")
async with session.get(url, timeout=self._http_read_timeout) as resp:
resp.raise_for_status()
data = await resp.json()
cached_at = time.monotonic() # ✅ 使用 monotonic 时间,避免系统时钟跳变影响
self._cache[url] = {
"cached_at": cached_at,
"config": data,
"errors": 0
}
logging.info(f"Updated cache for {url}")
except aiohttp.ClientError as e:
logging.error(f"Failed to fetch {url}: {e}")
finally:
# Step 3: 清理状态,通知所有等待者
async with self._lock:
if url in self._fetching_now:
self._fetching_now[url].set()
del self._fetching_now[url]
async def get(self, url):
# 使用 monotonic 时间进行过期判断,更可靠
now = time.monotonic()
if (url not in self._cache
or self._cache[url]["cached_at"] < now - self._time_out):
await self._fetch_update(url)
return self._cache.get(url, {}).get("config")此方案在保持异步高性能的同时,彻底消除了缓存更新的逻辑竞态,是构建生产级异步 HTTP 客户端缓存的推荐实践。