使用可调用类(Functor)实现有状态的回调函数

本文介绍如何在 python 中通过定义带 `__call__` 方法的类来替代全局变量回调,实现线程安全、可复用且状态隔离的回调逻辑,特别适用于 kafka 等异步生产者场景。

在异步消息系统(如 Apache Kafka)中,生产者常需传入回调函数以处理消息投递结果。传统做法是使用带全局变量的普通函数,例如:

callback_count = 0

def delivery_callback(error, message_payload):
    if error:
        print(f'ERROR: {error}')
    else:
        global callback_count
        callback_count += 1

这种方式虽简单,但存在明显缺陷:全局状态污染、不可重入、难以单元测试、多线程下非线程安全,且无法为不同生产者实例维护独立计数。

更优解是使用 functor(可调用对象) —— 即实现 __call__ 方法的类。它天然封装状态,支持实例级隔离:

class DeliveryCallbackCounter:
    def __init__(self):
        self.count_callback = 0  # 实例变量 → 每个对象独立计数

    def __call__

(self, error, message): if error: print(f'ERROR: Kafka: Message delivery failure: {error}') else: self.count_callback += 1 def get_count(self): return self.count_callback def __str__(self): return f'DeliveryCallbackCounter(count={self.count_callback})'

使用时只需实例化并传入:

counter_a = DeliveryCallbackCounter()
counter_b = DeliveryCallbackCounter()

producer_a.produce(topic="logs", value=b"msg1", callback=counter_a)
producer_b.produce(topic="alerts", value=b"msg2", callback=counter_b)

# 各自独立计数,互不影响
print(counter_a.get_count())  # 可能为 1
print(counter_b.get_count())  # 可能为 1

优势显著

  • ✅ 状态封装在对象内,避免全局污染;
  • ✅ 支持多实例并行,天然适配多生产者或多主题场景;
  • ✅ 易于扩展(如添加日志、错误统计、超时监控等);
  • ✅ 可直接继承、mock 或注入依赖,大幅提升可测试性。

⚠️ 注意:若你确实需要所有实例共享同一计数器(即模拟“静态全局语义”),可改用类变量(class variable),而非实例变量:

class SharedDeliveryCallbackCounter:
    count_callback = 0  # 类变量,所有实例共享

    def __call__(self, error, message):
        if not error:
            SharedDeliveryCallbackCounter.count_callback += 1

    @classmethod
    def reset(cls):
        cls.count_callback = 0

    @classmethod
    def get_count(cls):
        return cls.count_callback

此时无论创建多少实例,count_callback 均为同一份内存地址的数据。但需谨慎使用——它本质上仍是全局状态,仅语法上更清晰,不解决线程安全问题。如需并发安全,应配合 threading.Lock 或使用 atomic 操作(如 threading.local() 或 concurrent.futures 工具)。

总结:Python 中的 functor 是替代全局回调的理想模式。优先使用实例变量实现状态隔离;仅当业务明确要求跨实例共享状态时,才考虑类变量,并务必评估并发与可维护性风险。