消息不丢失需生产者、Broker、消费者三端协同保障:开启confirm机制并配异步监听、队列与消息均持久化、消费者手动ACK,金融级系统还需本地消息表+定时对账。
消息不丢失不是靠单个配置或一次确认就能解决的,而是生产者、Broker、消费者三端协同保障的结果。漏掉任意一环,都可能在高并发或异常场景下丢消息。
channel.confirmSelect()
这是防止消息“发出去但没到交换机”的第一道防线。不启用它,basicPublish 调用成功只代表 TCP 写入成功,不代表 Broker 已接收。
channel 创建后、声明队列前调用 channel.confirmSelect()
addConfirmListener),而非阻塞式 waitForConfirms(),否则吞吐暴跌publisher-return 监听路由失败(比如 routingKey 错误导致消息被丢弃)confirmSelect,却没注册监听器,失败时完全无感知durable=true 和 MessageProperties.PERSISTENT_TEXT_PLAIN
持久化是防 Broker 重启丢消息的关键。但很多人只设了队列持久化,忘了消息本身也要标记为持久化。
true:channel.queueDeclare("q1", true, false, false, null)
channel.basicPublish("", "q1", MessageProperties.PERSISTENT_TEXT_PLAIN, body)
channel.exchangeDeclare("ex", "direct", true)),否则重启后绑定关系丢失,新消息路由失败acks=all + min.insync.replicas=2,不是简单设个 retention.ms
channel.basicAck()
这是最容易被跳过的环节。开 autoAck=true 意味着只要消息一推过去,RabbitMQ 就认为消费成功,哪怕你的业务逻辑还没跑完就崩了。
autoAck=false:channel.basicConsume("q1", false, consumer)
channel.basicAck(deliveryTag, false)
channel.basicNack(deliveryTag, false, true)(重入队)或转发至死
信队列上面三步能覆盖 95% 场景,但遇到网络分区、磁盘损坏、回调丢失等极端情况,仍可能出问题。金融/订单类系统建议加一层业务级保障。
msg_log 表,字段至少含:msg_id(UUID)、content、exchange、routing_key、status('sending')、next_retry_time
'sent';消费方处理完,通过回调或反向 MQ 通知发送方,将其改为 'confirmed'
status = 'sent' AND next_retry_time 的记录,重新投递并更新重试时间
msg_id,且所有写操作走同一 DB 实例,避免分布式事务复杂度真正难的不是写几行确认代码,而是在每个环节都预设“它会失败”——网络会断、磁盘会坏、回调会丢、人会配错。所谓可靠性,就是把所有“应该发生”的事,都变成“必须验证发生”。