17370845950

如何使用Golang实现观察者异步通知_使用Observer Pattern触发事件
Go实现异步观察者模式的核心是用channel+goroutine解耦Subject与Observer:Notify()仅发事件到缓冲channel,dispatcher goroutine异步分发并并发调用各Observer.Update(),配合map存储+互斥锁支持安全注册/注销,Close()确保资源清理。

用 Go 实现观察者模式的异步通知,核心是解耦事件发布者(Subject)与订阅者(Observer),并通过 goroutine 避免阻塞主线程。Go 本身没有内置 Observer 接口,但可以轻量、清晰地自己构建。

定义 Observer 和 Subject 接口

先约定行为:Observer 要能接收事件;Subject 要支持注册、移除和广播。

  • Observer 接口只需一个 Update(event interface{}) 方法
  • Subject 可用结构体实现,内部维护 []Observer 切片 + 互斥锁(防止并发修改)
  • 为避免强耦合,event 类型建议用自定义 struct(如 OrderCreatedEvent{ID string, Time time.Time}),而非空接口

用 channel + goroutine 实现非阻塞通知

关键在 Notify() 方法:不直接调用每个 Observer 的 Update,而是把事件发到一个共享 channel,另起 goroutine 消费并分发。

  • 启动一个长期运行的 dispatcher goroutine:go func() { for event := range subject.eventCh { /* 遍历 observers 并 go obs.Update(event) */ } }()
  • Subject.Notify() 只需 subject.eventCh ,立刻返回
  • 每个 observer 的 Update 在独立 goroutine 中执行,彼此不干扰,也不会拖慢发布者

支持取消订阅与资源清理

真实场景中 Observer 可能生命周期短于 Subject(比如 HTTP handler 注册后退出),需提供 Unsubscribe。

  • map[uintptr]Observer 替代切片,key 可用 unsafe.Pointer(reflect.ValueOf(obs).UnsafeAddr()) 或更稳妥的自定义 ID
  • Unsubscribe 时加锁删除 map 条目
  • Subject 可暴露 Close() 方法:关闭 eventCh,等待 dispatcher 退出,避免 goroutine 泄漏

简单可用示例(无第三方依赖)

以下是最小可运行骨架:

// Observer 定义
type Observer interface {
    Update(event interface{})
}

// Subject 实现 type Subject struct { mu sync.RWMutex obs map[uintptr]Observer eventCh chan interface{} }

func NewSubject() *Subject { s := &Subject{ obs: make(map[uintptr]Observer), eventCh: make(chan interface{}, 100), // 缓冲防压垮 } go s.dispatcher() return s }

func (s *Subject) Register(obs Observer) { s.mu.Lock() defer s.mu.Unlock() ptr := uintptr(unsafe.Pointer(reflect.ValueOf(obs).UnsafeAddr())) s.obs[ptr] = obs }

func (s *Subject) Unregister(obs Observer) { s.mu.Lock() defer s.mu.Unlock() ptr := uintptr(unsafe.Pointer(reflect.ValueOf(obs).UnsafeAddr())) delete(s.obs, ptr) }

func (s *Subject) Notify(event interface{}) { select { case s.eventCh <- event: default: // 可选:日志告警或丢弃,避免阻塞 } }

func (s *Subject) dispatcher() { for event := range s.eventCh { s.mu.RLock() obsList := make([]Observer, 0, len(s.obs)) for _, o := range s.obs { obsList = append(obsList, o) } s.mu.RUnlock()

    for _, o := range obsList {
        go o.Update(event) // 异步触发
    }
}

}

func (s *Subject) Close() { close(s.eventCh) }

基本上就这些。不需要复杂框架,几份结构+channel+goroutine 就能稳稳跑起异步观察者。重点是别让 Update 同步阻塞 Notify,也别忘了清理。