本文介绍一种基于线程池与信号机制的健壮方案,用于在多线程硬件 api 系统中实现主线程与接收线程的双向状态同步,并在任一关键线程(尤其是后台接收线程)发生未捕获异常时,安全、可扩展地终止整个程序。
在构建面向硬件的实时通信 API(如通过双 socket 分别收发命令与响应)时,常见的架构包含三个核心组件:API(主业务接口)、Controller(命令发送器)和 ReceiverRTD(高频数据接收器)。其中 ReceiverRTD 通常以独立线程运行,持续监听响应 socket 并更新共享状态;而 API 和 Controller 运行于主线程,由用户脚本直接调用。
这种分工带来一个关键挑战:当主线程因异常退出时,后台接收线程需自动终止;反之,若 ReceiverRTD 线程自身抛出未捕获异常(如 socket 断连、解析失败),则必须能主动触发整个程序的安全退出——但又不能依赖 os._exit() 这类绕过 Python 正常清理流程的“暴力手段”。
我们摒弃继承 threading.Thread 的紧耦合设计,转而采用更灵活、可维护性更高的函数式任务提交模型:
以下为精简可运行示例:
import signal
import time
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Event
# 全局退出事件(可选,用于协作式关闭)
shutdown_event = Event()
def signal_handler(signum, frame):
print(f"[INFO] Received signal {signum}, initiating graceful shutdown...")
shutdown_event.set()
# 执行自定义清理(如关闭 socket、保存缓存等)
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
# 可选:也捕获 Ctrl+C
signal.signal(signal.SIGINT, signal_handler)
class ReceiverRTD:
def __init__(self, rtd_socket=None):
self.rtd_socket = rtd_socket or DummySocket()
self.latest_data = None
def start_receiving(self):
"""模拟持续接收数据;实际中应包含 while not shutdown_event.is_set() 循环"""
print("[RECEIVER] Starting data reception...")
for i in range(5):
if shutdown_event.is_set():
break
time.sleep(1)
self.latest_data = f"DATA_{i}"
print(f"[RECEIVER] Updated: {self.latest_data}")
# 模拟某个时刻发生致命异常
raise RuntimeError("Hardware connection lost: socket timeout")
class API:
def __init__(self, receiver: ReceiverRTD):
self.receiver = receiver
def get_latest_status(self):
return self.receiver.latest_data or "NO_DATA"
# —— 主程序入口 ——
if __name__ == "__main__":
receiver = ReceiverRTD()
api = API(receiver)
# 启动接收任务(单线程池)
with Threa
dPoolExecutor(max_workers=1) as executor:
future = executor.submit(receiver.start_receiving)
try:
# 主线程执行用户逻辑(此处模拟)
print("[MAIN] API ready. Performing business logic...")
time.sleep(2)
print("[MAIN] Current status:", api.get_latest_status())
# 模拟用户长时间操作(不阻塞异常传播)
time.sleep(4)
except KeyboardInterrupt:
print("[MAIN] User interrupted.")
finally:
# 等待接收任务完成或超时(非必需,因 SIGTERM 已触发退出)
pass
# 注意:此处不会执行到——异常会触发 SIGTERM,进程被终止
print("[MAIN] This line will NOT be printed.")通过该方案,你获得了一个可测试、可扩展、符合 Python 生态惯例的线程协同模型:主线程专注业务逻辑,后台线程专注 I/O,异常成为跨线程通信的第一等公民,而退出则始终由单一、可控的入口点(信号处理器)统一调度。