不能,Redis Pub/Sub 不适合微服务间可靠通信。它是发即忘机制,无持久化、无ACK、订阅者离线消息丢失,仅适用于低频广播场景如配置刷新;可靠通信应选Kafka/RabbitMQ或Redis Streams。
不能直接用作生产级服务间通信。Redis 的 PUB/SUB 是「发即忘」机制:消息不持久、无 ACK、订阅者离线期间消息全丢,且不支持多消费者组语义。它适合广播通知(如配置刷新、缓存失效),但不适合订单创建、支付回调这类需要至少一次投递的业务场景。
SUBSCRIBE 后收不到消息 → 检查是否在 redis-cli 中用了 MONITOR 占用连接,或 Go 客户端未保持长连接Redis Streams 是更接近消息队列的结构,支持消费者组(Consumer Group)、消息 ID 自增、ACK 和 pending list,可模拟至少一次语义。
package mainimport ( "context" "fmt" "time" "github.com/go-redis/redis/v8" )
var rdb = redis.NewClient(&redis.Options{ Addr: "localhost:6379", })
func sendOrderEvent() { ctx := context.Background() _, err := rdb.XAdd(ctx, &redis.XAddArgs{ Stream: "order_stream", Values: map[string]interface{}{"order_id": "12345", "status": "created"}, }).Result() if err != nil { panic(err) } }
func consumeOrderEvents() { ctx := context.Background() // 创建消费者组(仅需执行一次) rdb.XGroupCreateMkStream(ctx, "order_stream", "order_service_group", "$").Err()
for { msgs, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: "order_service_group", Consumer: "svc-a-01", Streams: []string{"order_stream", ">"}, Count: 1, Block: 100 * time.Millisecond, }).Result() if err != nil && err != redis.Nil { fmt.Printf("read error: %v\n", err) continue } if len(msgs) == 0 { continue } for _, msg := range msgs[0].Messages { fmt.Printf("received: %v\n", msg.Values) // 处理成功后手动 ACK rdb.XAck(ctx, "order_stream", "order_service_group", msg.ID) // 从 pending list 中移除(可选,ACK 后自动清理) } }}
Streams: []string{"order_stream", ">"} 中的 > 表示只读取新消息;首次消费用 0 可回溯历史order_service_group)必须全局唯一,不同微服务应使用不同组名,否则会争抢同一条消息Consumer 名(如 svc-a-01),便于排查 pending 消息归属XAck 不影响吞吐,但若处理逻辑失败又未 NAK(Redis 无原生 NAK,需用 XClaim 抢回重试),pending 消息会堆积根本上不是靠 Redis 隔离,而是靠「服务职责划分 + 幂等设计」。Redis Streams 本身不阻止多组消费,但你可以通过命名和路由策略控制:
event_stream,而是分 user_event_stream、payment_event_stream 等
流 + 多个消费者组」做扇出:这会导致每组都收到全量消息,浪费带宽和解析开销;改用 XADD 时显式写入多个 stream(如同时发到 notify_stream 和 audit_stream)id(如 UUID 或业务单号 + 时间戳),消费者写入前先查 SETNX order_id_processed 1 EX 3600,失败则跳过Go 微服务常因连接复用不当或序列化不一致导致通信失败或 CPU 暴涨。
redis.NewClient 默认是单连接,必须显式设 PoolSize(如 10–30),并确保整个服务共用一个 *redis.Client 实例json: tag,json.Marshal 后全是空对象;或用了 map[interface{}]interface{} 导致 Redis 存的是 "key":{} 这种无法反序列化的格式XREADGROUP 拿不到预期数据redis: nil reply from server → 通常是连接被服务端踢出(timeout 设置过短或 maxclients 超限);invalid character 'x' looking for beginning of value → 接收方用 json.Unmarshal 解析了非 JSON 字符串(比如原始字符串没包在 {} 里)实际部署时,Streams 的 MAXLEN 策略(如 XADD ... MAXLEN ~ 10000)比单纯依赖 TTL 更可控;而真正难的是跨服务的错误传播——比如支付服务消费失败,怎么让订单服务知道要重试?这已经超出 Redis 能力范围,得靠补偿事务或 Saga 模式。