RabbitMQ 发送方确认与重试机制
RabbitMQ 可靠投递发送方确认与消息重试机制实战在使用 RabbitMQ 做异步解耦时消息可靠性通常不只取决于“消息有没有持久化”。持久化解决的是消息到达 RabbitMQ 之后Broker 异常重启时尽量不丢数据的问题但如果生产者发送消息时网络抖动、交换机不存在或者消息已经到达交换机却没有路由到任何队列单靠持久化就无能为力了。所以生产者侧需要关注两件事消息有没有成功到达交换机。消息到达交换机后能不能正确路由到队列。对应到 Spring AMQP 中常用的方案就是ConfirmCallback和ReturnsCallback。前者关注生产者到交换机后者关注交换机到队列。发送方确认判断消息是否到达交换机生产者发送消息之后可以通过 confirm 机制感知 RabbitMQ 是否已经接收到了这条消息。只要开启发送确认并设置确认回调无论发送成功还是失败回调方法都会被触发。在 Spring Boot 中可以先开启 publisher confirmspring:rabbitmq:addresses:amqp://user:passwordhost:port/vhostlistener:simple:acknowledge-mode:manualpublisher-confirm-type:correlatedpublisher-confirm-type: correlated表示开启带关联数据的确认模式。发送消息时可以携带CorrelationData这样在回调里就能知道是哪一条消息收到了确认结果。下面是一个常见的RabbitTemplate配置Bean(confirmRabbitTemplate)publicRabbitTemplateconfirmRabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplaterabbitTemplatenewRabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback((correlationData,ack,cause)-{StringidcorrelationDatanull?unknown:correlationData.getId();if(ack){System.out.printf(消息到达交换机成功id: %s%n,id);}else{System.out.printf(消息到达交换机失败id: %s原因: %s%n,id,cause);}});returnrabbitTemplate;}发送消息时传入CorrelationDataResource(nameconfirmRabbitTemplate)privateRabbitTemplateconfirmRabbitTemplate;RequestMapping(/confirm)publicStringconfirm(){CorrelationDatacorrelationDatanewCorrelationData(msg-1001);confirmRabbitTemplate.convertAndSend(confirm_exchange,confirm,confirm test...,correlationData);return发送成功;}如果交换机存在回调中的ack会是true。如果交换机不存在例如把交换机名称写错ack会是falsecause中通常会包含类似no exchange的错误信息。这样生产者就能及时记录失败、触发告警或者把消息落库等待补偿。ConfirmCallback的核心参数含义如下correlationData发送消息时附带的关联数据通常用来标识消息。ack交换机是否成功接收到消息。cause失败原因成功时一般为null。如果直接使用 RabbitMQ Java Client也能通过ConfirmListener处理确认事件在 Spring Boot 项目中通常使用RabbitTemplate.ConfirmCallback它和 Spring 的配置、依赖注入配合得更自然。消息退回处理无法路由到队列的消息confirm 只能说明消息有没有到达交换机。消息到达交换机之后还要根据 routing key 和绑定关系路由到队列。如果 routing key 写错或者交换机没有绑定匹配的队列消息就可能无法被任何队列接收。这类问题需要使用 return 机制处理。关键点是把mandatory设置为true并配置ReturnsCallback。Bean(confirmRabbitTemplate)publicRabbitTemplateconfirmRabbitTemplate(CachingConnectionFactoryconnectionFactory){RabbitTemplaterabbitTemplatenewRabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(returned-{System.out.printf(消息被退回replyCode: %dreplyText: %sexchange: %sroutingKey: %s%n,returned.getReplyCode(),returned.getReplyText(),returned.getExchange(),returned.getRoutingKey());});returnrabbitTemplate;}发送时故意使用一个无法匹配队列的 routing keyRequestMapping(/msgReturn)publicStringmsgReturn(){CorrelationDatacorrelationDatanewCorrelationData(msg-1002);confirmRabbitTemplate.convertAndSend(confirm_exchange,wrong.routing.key,message return test...,correlationData);return发送成功;}此时交换机能收到消息所以 confirm 可能是成功的但路由失败return 回调会被触发。回调参数ReturnedMessage中包含消息体、回复码、回复文本、交换机名称和 routing key 等信息便于定位是哪条消息、在哪个路由环节出现问题。实际业务中return 回调里不建议只打印日志。更稳妥的做法是把失败消息、失败原因、交换机、routing key 等信息保存下来再由补偿任务或人工处理流程兜底。可靠传输的完整链路RabbitMQ 的消息可靠性可以按链路拆开看生产者发送到 RabbitMQ 失败使用 confirm 机制确认消息是否到达交换机。消息到达交换机但无法进入队列使用 return 机制处理不可路由消息。消息到达队列后 Broker 异常开启交换机、队列和消息持久化关键业务可以结合高可用队列方案。消息到达消费者后处理失败使用消费者手动确认、重试、死信队列等方式兜底。confirm 和 return 解决的是生产者投递环节的可观测性。它们不会替代消息持久化也不会替代消费者确认。要想整体可靠通常需要把这些能力组合起来使用。重试机制给临时故障恢复机会消息消费过程中经常会遇到临时故障例如网络波动、依赖服务短暂不可用、数据库连接抖动等。这类问题可能过几秒就恢复直接丢弃消息不合适因此可以开启消费者重试。在 Spring Boot 中可以通过配置开启监听容器的重试能力spring:rabbitmq:addresses:amqp://user:passwordhost:port/vhostlistener:simple:acknowledge-mode:autoretry:enabled:trueinitial-interval:5000msmax-attempts:5这段配置表示消费失败后开启重试首次等待 5 秒最多尝试 5 次。这里的次数包含首次消费本身。可以准备一个普通的交换机和队列publicstaticfinalStringRETRY_QUEUEretry_queue;publicstaticfinalStringRETRY_EXCHANGE_NAMEretry_exchange;Bean(retryExchange)publicExchangeretryExchange(){returnExchangeBuilder.fanoutExchange(RETRY_EXCHANGE_NAME).durable(true).build();}Bean(retryQueue)publicQueueretryQueue(){returnQueueBuilder.durable(RETRY_QUEUE).build();}Bean(retryBinding)publicBindingretryBinding(Qualifier(retryExchange)FanoutExchangeexchange,Qualifier(retryQueue)Queuequeue){returnBindingBuilder.bind(queue).to(exchange);}生产者发送一条测试消息RequestMapping(/retry)publicStringretry(){rabbitTemplate.convertAndSend(RETRY_EXCHANGE_NAME,,retry test...);return发送成功;}消费者中故意制造异常RabbitListener(queuesRETRY_QUEUE)publicvoidlistenerQueue(Messagemessage){System.out.printf(接收到消息: %s, deliveryTag: %d%n,newString(message.getBody(),StandardCharsets.UTF_8),message.getMessageProperties().getDeliveryTag());intnum3/0;System.out.println(num);}如果异常继续向外抛出Spring AMQP 会根据重试配置再次执行消费逻辑。需要注意的是如果在业务代码中把异常捕获掉并且没有继续抛出框架会认为本次处理已经结束重试也就不会触发。try{intnum3/0;System.out.println(num);}catch(Exceptione){System.out.println(处理失败);}上面这种写法只是打印了失败信息没有把异常交给监听容器因此不会进入重试流程。实际项目里如果希望触发框架重试要么不要吞掉异常要么在记录日志后继续抛出业务异常。自动确认与手动确认下的差异重试机制和确认模式关系很密切。在自动确认模式下消费者方法抛出异常后监听容器会按配置进行重试。达到最大尝试次数后消息会进入失败恢复逻辑。如果没有配置死信队列或自定义 recoverer失败消息可能会被拒绝并不再回到原队列。因此自动重试最好搭配死信队列或失败记录表避免最终失败的消息无处可查。在手动确认模式下是否确认、是否拒绝、是否重新入队主要由代码控制。例如RabbitListener(queuesRETRY_QUEUE)publicvoidlistenerQueue(Messagemessage,Channelchannel)throwsIOException{longdeliveryTagmessage.getMessageProperties().getDeliveryTag();try{System.out.printf(接收到消息: %s, deliveryTag: %d%n,newString(message.getBody(),StandardCharsets.UTF_8),deliveryTag);intnum3/0;System.out.println(num);channel.basicAck(deliveryTag,false);}catch(Exceptione){channel.basicNack(deliveryTag,false,true);}}如果basicNack的requeue设置为true消息会重新回到队列随后再次投递。这样可以实现重试但如果错误来自业务逻辑本身消息可能会无限循环投递造成日志刷屏、消费者空转甚至消息堆积。更推荐的处理方式是为消息设计最大重试次数。超过限制后不再重新入队而是投递到死信队列、失败表或人工处理通道。这样既能给临时故障恢复机会也能避免一条坏消息拖垮整个消费链路。实战建议confirm 负责确认消息是否到达交换机适合处理生产者到 Broker 之间的失败。return 负责处理不可路由消息适合发现 routing key、绑定关系、队列配置错误。消费者重试适合处理临时异常不适合解决确定性的代码错误或脏数据问题。自动确认模式下要关注重试耗尽后的去向最好接入死信队列或失败落库。手动确认模式下不要简单地无限requeue要有次数限制和兜底通道。关键业务消息建议配合持久化、发送方确认、消费端确认、重试、死信队列一起使用。可靠投递不是某一个配置项就能彻底解决的问题而是要把生产者、交换机、队列、消费者这条链路上的每个风险点都看见并为每个风险点准备对应的处理策略。