本文介绍在 go 中构建高性能流媒体缓存代理的核心方法,重点解决多客户端并发读取、写入阻塞、缓冲区延迟及慢客户端处理等关键问题,通过非阻塞通道、连接级超时、goroutine 池与内存安全复用等技术实现低延迟、高并发的流分发。
构建一个健壮的视频流缓存代理,核心挑战在于:既要避免单个慢客户端拖垮全局流分发,又要保证数据一致性、低延迟与内存安全。你最初的同步 Write 方案会因 TCP 写缓冲区满而阻塞整个循环;简单加 goroutine 会导致无序、资源失控;而固定大小 channel(如 buf_chan)在慢客户端下仍会因缓冲区填满而阻塞写入——这正是 select 非阻塞发送(default 分支)能破局的关键。
type StreamCache struct {
mu sync.RWMutex
clients map[*client]struct{} // 使用 map 替代 slice,O(1) 增删
bufPool sync.Pool // 复用缓冲区,避免 GC 压力
}
func (sc *StreamCache) NewClient(conn net.Conn) *client {
c := &client{
conn: conn,
bufCha
n: make(chan []byte, 32), // 容量适中:兼顾延迟与内存
closed: make(chan struct{}),
}
sc.mu.Lock()
sc.clients[c] = struct{}{}
sc.mu.Unlock()
// 启动独立 writer goroutine(每客户端一个)
go c.writer()
return c
}
func (sc *StreamCache) Stream(source io.Reader) {
// 复用缓冲区提升性能
buf := sc.bufPool.Get().([]byte)
defer sc.bufPool.Put(buf)
for {
n, err := source.Read(buf)
if err != nil {
log.Printf("stream read error: %v", err)
break
}
// 广播到所有活跃客户端(非阻塞)
sc.mu.RLock()
for client := range sc.clients {
select {
case client.bufChan <- append(buf[:0:n], buf[:n]...): // 安全复制
// 成功入队
default:
// 客户端太慢或已断开:主动清理
sc.removeClient(client)
}
}
sc.mu.RUnlock()
}
}
type client struct {
conn net.Conn
bufChan chan []byte
closed chan struct{}
}
func (c *client) writer() {
defer func() {
c.conn.Close()
close(c.closed)
}()
for {
select {
case buf := <-c.bufChan:
// 设置短超时防止永久阻塞
c.conn.SetWriteDeadline(time.Now().Add(500 * time.Millisecond))
if _, err := c.conn.Write(buf); err != nil {
log.Printf("write to client failed: %v", err)
return // 退出 writer,触发 cleanup
}
// 归还缓冲区(若使用 Pool)
case <-c.closed:
return
}
}
}
func (sc *StreamCache) removeClient(c *client) {
sc.mu.Lock()
delete(sc.clients, c)
sc.mu.Unlock()
close(c.closed)
}通过以上设计,你将获得一个可生产部署的流缓存代理骨架:它不依赖第三方包,完全基于 Go 标准库,兼具高性能、可观测性与强健性。