直接用数据库事务发消息会出问题,因为SaveChanges()后调用消息发送若失败,业务已提交但消息丢失,破坏一致性;Transactional Outbox通过将消息写入同事务的outbox表,再由独立幂等投递器轮询发送来解决。
在 C# 应用中,你可能写过类似这样的代码:SaveChanges() 之后立刻调用 bus.Publish() 或 producer.SendAsync()。表面看是“先存库再发消息”,但一旦消息发送失败(网络抖动、Broker 不可用、序列化异常),业务已提交,消息却丢了——违反了“要么都成功,要么都不发生”的一致性要求。
Transactional Outbox 的核心思路是:把要发的消息也当作业务数据,写进同一个数据库事务里。消息不是“发出去”,而是“记下来”,后续由一个独立的、幂等的投递器(Outbox Processor)去轮询并转发。
你需要一张 OutboxMessages 表,字段至少包含:Id(GUID)、TypeName(事件全名)、Content(JSON 字符串)、ProcessedAt(NULL 表示未处理)、CreatedAt。关键在于:它必须和你的业务实体共享同一个 DbContext 实例,并在同一个 SaveChanges() 中被插入。
推荐做法是封装一个 OutboxService,在业务逻辑中调用 AddOutboxMessage,内部只是 new 一个 OutboxMessage 并 context.OutboxMessages.Add()。EF Core 会把它当成普通实体参与事务。
public class OutboxMessage
{
public Guid Id { get; set; }
public string TypeName { get; set; } = null!;
public string Content { get; set; } = null!;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? ProcessedAt { get; set; }
}OutboxMessages DbSet 在 OnModelCreating 中配置了 HasIndex(x => x.ProcessedAt).IsDescending(),方便后续查询未处理项SaveChanges() 多次;所有操作(业务实体 + outbox 记录)必须在一次 SaveChanges() 中完成SaveChangesAsync(cancellationToken) 配合超时控制,避免事务卡死投递器不能和业务应用跑在同一个进程里(否则进程崩溃会导致消息丢失),建议作为独立后台服务(如 .NET Worker Service),或用 Quartz.NET / Hangfire 定时触发。每次只取少量(例如 100 条)ProcessedAt IS NULL 的记录,按 CreatedAt 排序,逐条尝试发送到消息队列(如 RabbitMQ、Kafka)。
重点在于“发送成功后才更新 ProcessedAt”——这步更新也必须走数据库事务,且必须是**同一个数据库连接**(不能新开 DbContext)。否则会出现消息已发、但 DB 更新失败,导致重复投递。
SELECT ... FOR UPDATE(PostgreSQL)或 UPDLOCK, ROWLOCK(SQL Server)锁定待处理行,防止多个投递器实例并发处理同一条消息ProcessedAt),下次轮询重试;不要 throw 异常中断整个批次InitTransactions + SendOffsetsToTransaction)将 offset 提交与 ProcessedAt 更新绑定,但实现复杂,多数场景用 DB 事务更稳Outbox 表里的 Content 是 JSON,必须保证序列化前后完全一致。别用 System.Text.Json 默认设置——它会忽略 null 字段、按字母序排序属性。务必显式配置 JsonSerializerOptions,并全局复用同一实例:
var options = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false
};投递失败后的重试天然带来重复风险。解决方案不在 outbox 层,而在消费者端:每条消息带唯一 MessageId(通常就是 outbox 表的 Id),消费者需维护已处理 ID 的去重表(或 Redis Set),收到重复 ID 直接丢弃。
ORDER BY CreatedAt,并在消息队列端确保单分区/单队列消费最易被忽略的是:投递器的数据库连接字符串是否启用了连接池?是否设置了合理的 Max Pool Size?高吞吐下连接耗尽会导致投递停滞,而业务库仍在持续写入 outbox,最终填满磁盘。