RabbitMQ 高级特性消息确认与持久性在 RabbitMQ 的基础使用中我们已经知道了生产者、交换机、队列、消费者之间的基本流转关系。但真正落到业务系统里仅仅“能发、能收”是不够的。消息可能在消费者处理失败时丢失也可能在 RabbitMQ 服务重启后消失。消息确认解决“消息到达消费者后是否真的被成功处理”的问题。持久性解决“RabbitMQ 服务异常或重启后交换机、队列、消息是否还在”的问题。1. 消息确认1.1 消息确认机制生产者把消息发送到 RabbitMQ 后消费者拿到消息并处理时可能出现两种结果消息处理成功。消息处理异常。如果 RabbitMQ 只要把消息推给消费者就立刻把消息从队列中删除那么第二种情况就会造成消息丢失。比如消费者刚收到消息业务代码还没执行完就宕机了此时 RabbitMQ 如果已经删除消息这条消息就无法再次被消费。为了解决这个问题RabbitMQ 提供了消息确认机制也就是 message acknowledgement。消费者订阅队列时可以通过autoAck参数决定确认方式。自动确认当autoAcktrue时RabbitMQ 会认为消息只要投递给消费者就已经消费成功然后直接从内存或磁盘中删除。这种方式代码简单吞吐量也高但可靠性较弱。它适合日志、统计等对消息丢失不敏感的场景。Java Client 中消费者订阅队列的核心方法如下StringbasicConsume(Stringqueue,booleanautoAck,Consumercallback)throwsIOException;示例代码DefaultConsumerconsumernewDefaultConsumer(channel){OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{System.out.println(接收到消息: newString(body));}};channel.basicConsume(Constants.TOPIC_QUEUE_NAME1,true,consumer);这里第二个参数传入true表示自动确认。也就是说即使handleDelivery中后续业务处理失败RabbitMQ 也不会再重新投递这条消息。手动确认当autoAckfalse时RabbitMQ 不会在投递后立即删除消息而是等待消费者显式回复确认信号。此时队列中的消息会出现两种状态Ready等待投递给消费者的消息。Unacked已经投递给消费者但还没有收到确认信号的消息。如果 RabbitMQ 一直没有收到确认并且对应消费者连接断开RabbitMQ 会把这条消息重新放回队列等待投递给下一个消费者。这也是实际业务中更常用的方式因为它能让 RabbitMQ 感知消费者是否真的处理成功。1.2 手动确认方法手动确认时消费者可以根据业务处理结果选择确认、拒绝或者批量拒绝。RabbitMQ Java Client 主要提供三个方法。basicAck肯定确认channel.basicAck(longdeliveryTag,booleanmultiple);表示消费者已经成功处理消息RabbitMQ 可以删除该消息。参数说明deliveryTag消息在当前 Channel 内的唯一标识是单调递增的长整型值。multiple是否批量确认。为true时会确认所有小于等于当前deliveryTag的未确认消息为false时只确认当前这条消息。注意deliveryTag是按 Channel 维护的因此在哪个 Channel 收到消息就要在哪个 Channel 上确认。basicReject拒绝单条消息channel.basicReject(longdeliveryTag,booleanrequeue);表示消费者拒绝当前消息。参数说明deliveryTag要拒绝的消息标识。requeue是否重新入队。为true时消息会重新进入队列等待再次投递为false时消息会从队列中移除。basicReject一次只能拒绝一条消息。basicNack否定确认支持批量拒绝channel.basicNack(longdeliveryTag,booleanmultiple,booleanrequeue);basicNack可以理解为增强版拒绝方法。它比basicReject多了一个multiple参数可以批量拒绝消息。参数说明deliveryTag消息标识。multiple是否批量拒绝。为true时拒绝当前deliveryTag之前所有未确认消息。requeue是否重新入队。业务中常见的写法是处理成功就basicAck处理失败就basicNack并根据失败类型决定是否重新入队。1.3 Spring Boot 中的消息确认示例Spring AMQP 对确认机制做了一层封装常见确认模式有三种publicenumAcknowledgeMode{NONE,MANUAL,AUTO;}AcknowledgeMode.NONENONE表示不启用消费者确认机制。消息一旦投递给消费者RabbitMQ 就会认为消费成功并删除消息。配置示例spring:rabbitmq:addresses:amqp://user:passwordhost:port/vhostlistener:simple:acknowledge-mode:none先准备交换机、队列和绑定关系publicclassConstant{publicstaticfinalStringACK_EXCHANGE_NAMEack_exchange;publicstaticfinalStringACK_QUEUEack_queue;}ConfigurationpublicclassAckConfig{Bean(ackExchange)publicExchangeackExchange(){returnExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();}Bean(ackQueue)publicQueueackQueue(){returnQueueBuilder.durable(Constant.ACK_QUEUE).build();}Bean(ackBinding)publicBindingackBinding(Qualifier(ackExchange)Exchangeexchange,Qualifier(ackQueue)Queuequeue){returnBindingBuilder.bind(queue).to(exchange).with(ack).noargs();}}生产者发送消息RestControllerRequestMapping(/producer)publicclassProducerController{AutowiredprivateRabbitTemplaterabbitTemplate;RequestMapping(/ack)publicStringack(){rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME,ack,consumer ack test...);return发送成功!;}}消费者模拟异常ComponentpublicclassAckQueueListener{RabbitListener(queuesConstant.ACK_QUEUE)publicvoidlistenerQueue(Messagemessage,Channelchannel)throwsException{System.out.printf(接收到消息: %s, deliveryTag: %d%n,newString(message.getBody(),StandardCharsets.UTF_8),message.getMessageProperties().getDeliveryTag());intnum3/0;System.out.println(处理完成);}}在NONE模式下即使消费者抛出异常消息也已经被 RabbitMQ 视为消费成功并移除。因此可以看到Ready0Unacked0但业务实际并没有处理完成。AcknowledgeMode.AUTOAUTO是 Spring AMQP 的默认确认模式。消费者方法正常执行完成时Spring 会自动确认消息如果方法抛出异常则不会确认消息。配置示例spring:rabbitmq:addresses:amqp://user:passwordhost:port/vhostlistener:simple:acknowledge-mode:auto如果继续使用上面带异常的消费者代码消息会被不断重新投递。日志中可以看到deliveryTag持续递增接收到消息: consumer ack test..., deliveryTag: 1 接收到消息: consumer ack test..., deliveryTag: 2 接收到消息: consumer ack test..., deliveryTag: 3这种模式比NONE更可靠因为异常时不会直接删除消息。但如果业务代码一直异常又没有配合重试次数、死信队列等机制就可能造成消息反复重投或积压。AcknowledgeMode.MANUALMANUAL是手动确认模式。消费者需要自己决定什么时候确认、什么时候拒绝。配置示例spring:rabbitmq:addresses:amqp://user:passwordhost:port/vhostlistener:simple:acknowledge-mode:manual消费者代码ComponentpublicclassAckQueueListener{RabbitListener(queuesConstant.ACK_QUEUE)publicvoidlistenerQueue(Messagemessage,Channelchannel)throwsException{longdeliveryTagmessage.getMessageProperties().getDeliveryTag();try{System.out.printf(接收到消息: %s, deliveryTag: %d%n,newString(message.getBody(),StandardCharsets.UTF_8),deliveryTag);System.out.println(处理业务逻辑);channel.basicAck(deliveryTag,false);}catch(Exceptione){channel.basicNack(deliveryTag,false,true);}}}这里的处理逻辑是业务处理成功调用basicAck确认消息。业务处理失败调用basicNack拒绝消息。requeuetrue消息重新进入队列等待下一次投递。如果为了测试异常把业务代码改成下面这样System.out.println(处理业务逻辑);intnum3/0;channel.basicAck(deliveryTag,false);异常会进入catch然后执行channel.basicNack(deliveryTag,false,true);由于requeuetrue消息会不断重新入队并再次投递控制台会看到类似输出接收到消息: consumer ack test..., deliveryTag: 1 处理业务逻辑 接收到消息: consumer ack test..., deliveryTag: 2 处理业务逻辑 接收到消息: consumer ack test..., deliveryTag: 3 处理业务逻辑手动确认模式最灵活也最适合可靠性要求高的业务。但实际项目中不要让失败消息无限重试通常要配合重试次数、死信队列、告警日志等机制。2. 持久性刚刚解决的是消费者处理消息时如何避免丢失。接下来要解决另一个问题如果 RabbitMQ 服务停止、重启或异常崩溃交换机、队列和消息是否还能保留下来默认情况下如果没有进行持久化配置RabbitMQ 在退出或崩溃后可能会忽略原来的队列和消息。RabbitMQ 的持久化主要分为三个部分交换机持久化。队列持久化。消息持久化。这三者都很重要。只持久化其中一部分通常无法完整保证消息在重启后仍然可用。2.1 交换机持久化交换机持久化通过声明交换机时设置durabletrue实现。ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();交换机持久化后RabbitMQ 会保存交换机的元数据。即使 RabbitMQ 服务重启交换机也会恢复不需要应用程序重新创建。如果交换机没有持久化RabbitMQ 重启后交换机元数据会丢失。对于长期使用的业务交换机建议都声明为持久化。2.2 队列持久化队列持久化通过声明队列时设置durabletrue实现。QueueBuilder.durable(Constant.ACK_QUEUE).build();Spring AMQP 中QueueBuilder.durable(name)默认就是创建持久化队列。源码逻辑大致如下publicstaticQueueBuilderdurable(Stringname){returnnewQueueBuilder(name).setDurable();}privateQueueBuildersetDurable(){this.durabletrue;returnthis;}如果要创建非持久化队列可以使用QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();需要注意的是队列持久化只能保证队列本身的元数据不会因为 RabbitMQ 重启而丢失不能保证队列里的消息一定还在。原因很简单队列还在不代表队列中的消息被持久化了。要让消息也在重启后保留还必须设置消息持久化。2.3 消息持久化消息持久化需要把消息的投递模式设置为持久化也就是MessageDeliveryMode.PERSISTENT。Spring AMQP 中的枚举如下publicenumMessageDeliveryMode{NON_PERSISTENT,PERSISTENT}如果使用 RabbitMQ Java Client可以在发送消息时设置MessageProperties.PERSISTENT_TEXT_PLAIN// 非持久化消息channel.basicPublish(,QUEUE_NAME,null,msg.getBytes());// 持久化消息channel.basicPublish(,QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());PERSISTENT_TEXT_PLAIN本质上是把deliveryMode设置为2publicstaticfinalBasicPropertiesPERSISTENT_TEXT_PLAINnewBasicProperties(text/plain,null,null,2,0,null,null,null,null,null,null,null,null,null);如果使用RabbitTemplate发送持久化消息可以手动构造MessageStringtextThis is a persistent message;MessagePropertiesmessagePropertiesnewMessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);MessagemessagenewMessage(text.getBytes(StandardCharsets.UTF_8),messageProperties);rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME,ack,message);消费端打印MessageProperties时也可以观察消息是否是持久化投递例如receivedDeliveryModePERSISTENT队列持久化和消息持久化要一起使用持久化时要特别注意组合关系只设置队列持久化RabbitMQ 重启后队列还在但消息可能丢失。只设置消息持久化RabbitMQ 重启后队列没了消息也没有地方存放。队列和消息都持久化RabbitMQ 重启后消息才有机会继续保留在队列中。所以在可靠性要求高的场景中通常至少要做到BeanpublicExchangeackExchange(){returnExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();}BeanpublicQueueackQueue(){returnQueueBuilder.durable(Constant.ACK_QUEUE).build();}publicvoidsendPersistentMessage(Stringtext){MessagePropertiespropertiesnewMessageProperties();properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);MessagemessagenewMessage(text.getBytes(StandardCharsets.UTF_8),properties);rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME,ack,message);}也就是交换机持久化、队列持久化、消息持久化一起配置。持久化不是百分之百不丢把交换机、队列、消息都设置为持久化之后是不是就能百分之百保证数据不丢失答案是否定的。第一种情况和消费者确认有关。如果消费者使用自动确认RabbitMQ 把消息投递出去后就删除消息但消费者还没处理完就宕机消息仍然会丢失。因此关键业务需要结合第一章的手动确认机制。第二种情况和落盘时机有关。持久化消息到达 RabbitMQ 后并不代表每一条消息都会立刻同步刷盘。RabbitMQ 可能先把消息写入操作系统缓存再由系统刷入磁盘。如果在这个极短的时间窗口里 RabbitMQ 节点宕机仍然可能丢失少量消息。针对这个问题给出两个方向引入 RabbitMQ 仲裁队列提高高可用能力。主节点异常时可以切换到从节点降低单节点宕机造成消息丢失的风险。在发送端引入事务机制或发送方确认机制保证消息已经正确发送并存储到 RabbitMQ。实际项目中事务机制性能开销较大使用得相对少。更常见的方案是发送方确认也就是后续章节会讲到的 publisher confirm。持久化的性能取舍持久化会提高可靠性但也会带来性能成本。写磁盘比写内存慢得多如果所有消息都强制持久化会影响 RabbitMQ 的整体吞吐量。因此是否持久化要结合业务重要程度做权衡订单、支付、库存等关键消息建议开启持久化并配合手动确认、发送方确认、死信队列等机制。日志、埋点、实时统计等允许少量丢失的消息可以根据吞吐量要求选择非持久化。小结RabbitMQ 的可靠性不是靠某一个配置单独完成的而是由多个环节共同保证。第一章的消息确认主要解决消费者侧问题消息到达消费者后只有真正处理成功才应该通知 RabbitMQ 删除消息。自动确认简单但可能丢消息手动确认更可靠也更适合关键业务。第二章的持久性主要解决 RabbitMQ 服务侧问题交换机、队列和消息都需要持久化RabbitMQ 重启后消息才有机会保留下来。但持久化也不是绝对可靠还需要结合手动确认、仲裁队列、发送方确认等机制才能构建更完整的可靠消息链路。可以用一句话概括这两章的重点消费端靠手动确认防止“处理失败却删除消息”服务端靠持久化防止“服务重启后消息消失”。