本文讲解如何用 python 类实现可调用的有状态回调对象(functor),替代全局变量方案,兼顾线程安全、可复用性与语义清晰性,并指出静态类变量并非最佳选择。
在 Kafka 生产者等异步 API 中,回调函数常用于处理消息投递结果。原始写法依赖全局变量(如 callback_count)来累积成功次数,虽简单但存在明显缺陷:破坏封装性、难以测试、多线程下不安全、无法并行追踪多个生产者流。
你提出的 DeliveryCallbackCounter 类通过实现 __call__ 方法成为合法回调对象,这是 Python 中实现“functor”的标准方式——状态应属于实例,而非全局或类本身。例如:
class DeliveryCallbackCounter:
def __init__(self):
self.count = 0 # ✅ 实例状态:每个对象独立计数
def __call__(self, error, message):
if error:

print(f'ERROR: Kafka: Message delivery failure: {error}')
else:
self.count += 1
print(f'Successfully delivered: {message.key()}')
def get_count(self):
return self.count
def __str__(self):
return f'DeliveryCallbackCounter(count={self.count})'使用时只需创建一个实例并传入:
counter = DeliveryCallbackCounter()
producer.produce(
topic="logs",
key=b"event-1",
value=b'{"status": "ok"}',
callback=counter
)
# 可随时查询当前计数
print(counter.get_count()) # → 1⚠️ 关于“静态类”(即共享类变量)的误区:
虽然可通过 DeliveryCallbackCounter.count_callback = 0 定义类变量,并在 __call__ 中修改 DeliveryCallbackCounter.count_callback,但这会让所有实例共享同一计数器,失去独立性。例如:
c1 = DeliveryCallbackCounter() c2 = DeliveryCallbackCounter() c1(None, msg1) # count → 1 c2(None, msg2) # count → 2 ← c2 的调用意外影响了 c1 的逻辑上下文
这违背了回调函数“按需隔离”的设计初衷——每个生产者任务或业务场景应拥有专属的状态容器。
✅ 正确原则:
总结:Python 的 functor 本质是“带状态的函数对象”,其优雅之处正在于将数据与行为自然绑定在实例中——无需模拟静态类,也不必妥协于全局污染。合理使用 __call__,就能写出清晰、可组合、可测试的回调逻辑。