多线程下需用 threading.Lock 串行化 rich.progress.update() 调用,主线程创建 Progress 和锁,子线程持 task_id 并在锁内更新;或改用 Live 配合线程安全状态管理;multiprocessing 不支持 Progress 共享。
直接在多个线程里调用 progress.update() 会导致 RuntimeError: Cannot update progress bar from multiple threads。这是因为 Progress 内部的渲染器(ConsoleRender)不是线程安全的,且进度条状态(如任务计数、已用时间)共享但未加锁。
最轻量、可控性最强的做法:所有对 progress.update() 的调用都包裹在

threading.Lock 里。这不是“绕过” rich 的限制,而是显式地串行化 UI 更新逻辑。
Progress 实例和 Lock,然后传给子线程progress.update(task_id, advance=1) 前必须 lock.acquire(),完成后 lock.release()(推荐用 with lock:)task_id 必须由主线程分配并传入线程,不能在线程里调用 progress.add_task()
from rich.progress import Progress import threading import timeprogress = Progress() lock = threading.Lock() task = progress.add_task("[cyan]Processing...", total=100)
def worker(i): time.sleep(0.1) # 模拟工作 with lock: progress.update(task, advance=1)
threads = [threading.Thread(target=worker, args=(i,)) for i in range(100)] for t in threads: t.start() for t in threads: t.join()
progress.stop()
当需要更灵活的多线程 UI(比如显示多个实时指标、非进度类状态),Live 比 Progress 更适合——它只负责刷新整个渲染结果,不管理任务生命周期,因此线程安全责任完全落在你身上。
threading.local() 或带锁的 dict)收集各线程状态live.update()
live.update() 中做计算;所有聚合逻辑放在刷新线程外Live 默认启用 refresh_per_second=4,太高会刷屏,太低会卡顿Progress 对象无法被序列化(pickle),也不能跨进程传递。即使你用 Manager 或 Queue 试图同步进度,在子进程中调用 progress.update() 会直接报 AttributeError 或静默失败。
Queue 发送完成信号,由主线程统一更新 Progress
Progress —— 控制台输出会混乱,甚至阻塞tqdm + concurrent.futures.as_completed 这类更适配多进程的组合真正麻烦的不是“怎么更新”,而是“谁来决定更新时机”。rich 的设计默认把进度控制权交给单一线程,强行多线程更新容易掩盖数据竞争问题——比如两个线程同时 advance=1,却只生效一次。锁只是表层解法,背后得理清你的工作流是否真的需要并发更新 UI。