RocketMQ源码深度解析(二)Netty通信、Broker心跳注册、消息收发、客户端负载均衡原理
一、RocketMQ 底层 Netty 通信框架原理1.1 核心定位RocketMQ所有网络交互全部基于 Netty NIO 通信实现是整个中间件的通信基石。所有组件NameServer、Broker、Producer、Consumer的请求、响应、心跳、数据传输都统一基于 Netty 异步非阻塞模型完成。Netty的所有远程通信功能都由remoting模块实现。remoting模块中有两个对象最为重要。就是RPC的服务端RemotingServer以及客户端RemotingClient。在RocketMQ中涉及到的远程服务⾮常多同⼀个服务可能既是RPC的服务端也可以是RPC的客户端。例如Broker服务对于Client来说他需要作为服务端响应他们发送消息以及拉取消息等请求所以Broker是需要RemotingServer的。⽽另⼀⽅⾯Broker需要主动向NameServer发送⼼跳请求这时Broker⼜需要RemotingClient。因此Broker既是RPC的服务端⼜是RPC的客户端。核心设计特点基于Reactor 主从多线程模型高吞吐、低延迟统一协议封装RemotingCommand所有请求、响应统一对象全异步通信支持同步/异步/单向调用模式服务端NameServer(9876)、Broker(10911)客户端Producer、Consumer、Broker 内部远程调用1.2 Netty 服务端启动初始化流程以 Broker、NameServer 公共 Netty 启动逻辑为例启动流程标准化加载NettyServerConfig端口、线程数、缓冲区配置创建 Boss 主线程组1个线程负责监听连接创建 Worker 工作线程组多线程负责读写处理初始化 Channel 流水线注册自定义编解码器、空闲检测、业务处理器绑定监听端口启动 NIO 服务循环接收客户端连接异步处理所有 RemotingCommand 请求1.3 Netty 通信流程图1.4 核心源码片段Netty 启动模板Override public void start() { this.defaultEventExecutorGroup new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl(NettyServerCodecThread_)); //初始化netty服务端参数信息 initServerBootstrap(serverBootstrap); //绑定对应通道信息 try { ChannelFuture sync serverBootstrap.bind().sync(); InetSocketAddress addr (InetSocketAddress) sync.channel().localAddress(); if (0 nettyServerConfig.getListenPort()) { this.nettyServerConfig.setListenPort(addr.getPort()); } log.info(RemotingServer started, listening {}:{}, this.nettyServerConfig.getBindAddress(), this.nettyServerConfig.getListenPort()); this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this); } catch (Exception e) { throw new IllegalStateException(String.format(Failed to bind to %s:%d, nettyServerConfig.getBindAddress(), nettyServerConfig.getListenPort()), e); } if (this.channelEventListener ! null) { this.nettyEventExecutor.start(); } //其他定时器启动 ...... }1.5 服务端构建处理链的核心代码protected ChannelPipeline configChannel(SocketChannel ch) { return ch.pipeline() .addLast(getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, new HandshakeHandler()) .addLast(getDefaultEventExecutorGroup(), encoder,//请求编码器 new NettyDecoder(),//请求解码器 distributionHandler,//请求计数器 new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//⼼跳管理器 connectionManageHandler,//连接管理器 serverHandler//核⼼的业务处理器 ); }1.5.1 请求参数从请求的编解码器可以看出RocketMQ的所有RPC请求数据都封装成RemotingCommand对象。RemotingCommand对象中有⼏个重要的属性private int code; //响应码表示请求处理成功还是失败 private int opaque requestId.getAndIncrement(); //服务端内部会构建唯⼀的请求ID。 private transient CommandCustomHeader customHeader; //⾃定义的请求头。⽤来区分不同的业务请求 private transient byte[] body; //请求参数体 private int flag 0; //参数类型 默认0表示请求1表示响应1.5.2 处理逻辑所有核⼼的业务请求都是通过⼀个NettyServerHandler进⾏统⼀处理。他处理时的核⼼代码如下ChannelHandler.Sharable public class NettyServerHandler extends SimpleChannelInboundHandlerRemotingCommand { //统一处理所有业务请求 Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) { int localPort RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress()); NettyRemotingAbstract remotingAbstract NettyRemotingServer.this.remotingServerTable.get(localPort); if (localPort ! -1 remotingAbstract ! null) { remotingAbstract.processMessageReceived(ctx, msg);//核⼼处理请求的⽅法 return; } // The related remoting server has been shutdown, so close the connected channel RemotingHelper.closeChannel(ctx.channel()); } Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { //调整channel的读写属性 } }public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) { if (msg ! null) { switch (msg.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, msg); break; case RESPONSE_COMMAND: processResponseCommand(ctx, msg); break; default: break; } } }在最核⼼的处理请求的processMessageReceived⽅法中会将请求类型分为 REQUEST__COMMAND 和 RESPONSE_COMMAND来处理。**为什么会有两种不同类型的请求呢这是因为客户端的业务请求会有两种类型⼀种是客户端发过来的业务请求另⼀种是客户上次发过来的业务请求可能并没有同步给出相应。这时就需要客户端再发⼀个response类型的请求获取上⼀次请求的响应。这也就能⽀持异步的RPC调⽤。如何处理request类型的请求服务端和客户端都会维护⼀个processorTable。这是个HashMap,key是服务码也就对应RemotingCommand的code。value是对应的运行单元PairNettyRequestProcessor,ExecutorService。包含了执行线程的线程池和具体处理业务的Processor。而这些Processor是由业务系统自主注册的。也就是说想要看每个服务具体有哪些业务能力就只要看他们注册了哪些Processor就知道了。public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { //根据业务类型找到线程中的事务处理器 final PairNettyRequestProcessor, ExecutorService matched this.processorTable.get(cmd.getCode()); //未找到对应的事务处理器,则初始化一个默认的处理器 final PairNettyRequestProcessor, ExecutorService pair null matched ? this.defaultRequestProcessorPair : matched; final int opaque cmd.getOpaque(); //无默认处理器,抛出异常 if (pair null) { String error request type cmd.getCode() not supported; final RemotingCommand response RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); this.writeResponse(ctx.channel(), cmd, response, null); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) error); return; } //构建请求信息,并执行请求获取最终结果 Runnable run buildProcessRequestHandler(ctx, cmd, pair, opaque); ...... }Broker服务注册详⻅ BrokerController.registerProcssor()⽅法。NameServer的服务注册⽅法重点如下private void registerProcessor() { if (namesrvConfig.isClusterTest()) {//是否测试集群模式默认是false。也就是说现在阶段不推荐。 this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor); } else { // Support get route info only temporarily ClientRequestProcessor clientRequestProcessor new ClientRequestProcessor(this); this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor); this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor); } }如何处理response类型的请求NettyServer处理完request请求后会先缓存到responseTable中等NettyClient下次发送response类型的请求再来获取。这样就不⽤阻塞Channel提升请求的吞吐量。优雅的⽀持了异步请求。RocketMQ的RemotingServer服务端会维护一个responseTable这是一个线程同步的Map结构。 key为请求的IDvalue是异步的消息结果。ConcurrentMapInteger /* opaque */, ResponseFuture 。处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时处理的结果会存responseTable通过ResponseFuture提供一定的服务端异步处理支持提升服务端的吞吐量。 请求返回后⽴即从responseTable中移除请求记录。//org.apache.rocketmq.remoting.netty.ResponseFuture //发送消息后通过countDownLatch阻塞当前线程造成同步等待的效果。 public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; } //等待异步获取到消息后再通过countDownLatch释放当前线程。 public void putResponse(final RemotingCommand responseCommand) { this.responseCommand responseCommand; this.countDownLatch.countDown(); }处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时处理的结果依然会存responsTable等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture也就是在客户端请求结果时再去获取真正的结果。另外在RemotingServer启动时会启动⼀个定时的线程任务不断扫描responseTable将其中过期的response清除掉。TimerTask timerScanResponseTable new TimerTask() { Override public void run(Timeout timeout) { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error(scanResponseTable exception, e); } finally { timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS); } } }; this.timer.newTimeout(timerScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS);整体流程可以看到RocketMQ基于Netty框架实现的这一套基于服务码的服务注册机制即可以让各种不同的组件都按照自己的需求注册自己的服务方法又可以以一种统一的方式同时支持同步请求和异步请求。所以这一套框架其实是非常简洁易用的。在使用Netty框架进行相关应用开发时都可以借鉴他的这一套服务注册机制。例如开发一个大型的IM项目要添加好友、发送文本、发送图片、发送附件、甚至还有表情、红包等等各种各样的请求。这些请求如何封装就可以参考这一套服务注册框架。二、Broker 心跳注册与路由管理机制2.1 机制核心作用Broker 启动后必须主动向 NameServer注册 定时心跳保活让 NameServer 维护全局最新的集群路由信息为生产者、消费者提供路由发现能力。2.2 完整运行机制启动立即注册Broker 初始化完成后立刻调用 registerBrokerAll 向所有 NameServer 注册自身信息定时心跳保活每30s执行一次注册请求复用注册接口作为心跳NameServer 维护路由NameServer 将 Broker 信息、Topic 队列信息存入内存 RouteInfoManager,失效剔除机制NameServer 每10s扫描一次120s 无心跳则判定 Broker 下线剔除路由2.3 Broker 心跳注册流程图2.4 核心源码定时心跳任务//K4 Broker向NameServer进⾏⼼跳注册 if (!isIsolated !this.messageStoreConfig.isEnableDLegerCommitLog() !this.messageStoreConfig.isDuplicationEnable()) { changeSpecialServiceStatus(this.brokerConfig.getBrokerId() MixAll.MASTER_ID); this.registerBrokerAll(true, false, true); } //启动后定时注册 scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { Override public void run0() { try { if (System.currentTimeMillis() shouldStartTime) { BrokerController.LOG.info(Register to namesrv after {}, shouldStartTime); return; } if (isIsolated) { BrokerController.LOG.info(Skip register for broker is isolated); return; } BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { BrokerController.LOG.error(registerBrokerAll Exception, e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));2.5 整体运行流程三、Producer 消息发送源码完整流程3.1Producer的核⼼启动流程所有Producer的启动过程最终都会调用到DefaultMQProducerlmpl#start方法。在start方法中的通过一个mQClientFactory对象启动生产者的一大堆重要服务。这个mQClientFactory是最为重要的一个对象负责生产所有的Client,包括Producer和Consumer。这里其实就是一种设计模式虽然有很多种不同的客户端但是这些客户端的启动流程最终都是统一的全是交由mQClientFactory对象来启动。而不同之处在于这些客户端在启动过程中按照服务端的要求注册不同的信息。例如生产者注册到producerTable消费者注册到consumerTable管理控制端注册到adminExtTable3.2 发送消息的核⼼流程发送消息时会维护一个本地的topicPublishInfoTable缓存DefaultMQProducer会尽量保证这个缓存数据是最新的。但是如果NameServer挂了那么DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker还是可以正常发送消息到Broker的。-可以在生产者示例中start后打一个断点然后把NameServer停掉这时Producer还是可以发送消息的。生产者如何找MessageQueue默认情况下生产者是按照轮训的方式依次轮训各个MessageQueue。但是如果某一次往一个Broker发送请求失败后下一次就会跳过这个Broker。//org.apache.rocketmq.client.impl.producer.TopicPublishInfo //QueueFilter是⽤来过滤掉上⼀次失败的Broker的表示上⼀次向这个Broker发送消息是失败的这时就尽量不要再往这个Broker发送消息了。 private MessageQueue selectOneMessageQueue(ListMessageQueue messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) { if (messageQueueList null || messageQueueList.isEmpty()) { return null; } if (filter ! null filter.length ! 0) { for (int i 0; i messageQueueList.size(); i) { int index Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); MessageQueue mq messageQueueList.get(index); boolean filterResult true; for (QueueFilter f: filter) { Preconditions.checkNotNull(f); filterResult f.filter(mq); } if (filterResult) { return mq; } } return null; } int index Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); return messageQueueList.get(index); }如果在发送消息时传了Selector那么Producer就不会⾛这个负载均衡的逻辑⽽是会使⽤Selector去寻找⼀个队列。 具体参⻅org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl ⽅法//K4 Producer顺序消息的发送⽅法 public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg, final long timeout) throws MQClientException, RemotingTooMuchRequestException { long beginStartTime System.currentTimeMillis(); this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); TopicPublishInfo topicPublishInfo this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo ! null topicPublishInfo.ok()) { MessageQueue mq null; try { ListMessageQueue messageQueueList mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); Message userMessage MessageAccessor.cloneMessage(msg); String userTopic NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); userMessage.setTopic(userTopic); //由selector选择出⽬标mq mq mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); } catch (Throwable e) { throw new MQClientException(select message queue threw exception., e); } long costTime System.currentTimeMillis() - beginStartTime; if (timeout costTime) { throw new RemotingTooMuchRequestException(sendSelectImpl call timeout); } if (mq ! null) { return mq; } else { throw new MQClientException(select message queue return null., null); } } validateNameServerSetting(); throw new MQClientException(No route info for this topic, msg.getTopic(), null); }四、Consumer 消息拉取源码完整流程4.1 核心特点RocketMQ 消费者为主动 Pull 拉取模式区别于推模式流量完全可控、不会压垮消费者稳定性极强。4.1 启动流程:Consumer的核⼼启动过程和Producer是⼀样的 最终都是通过mQClientFactory对象启动。不过之间添加了⼀些注册信息。整体的启动过程如下4.3 ⼴播模式与集群模式的Offset处理在DefaultMQPushConsumerImpl的start⽅法中启动了⾮常多的核⼼服务。 ⽐如对于⼴播模式与集群模式的Offset处理if (this.defaultMQPushConsumer.getOffsetStore() ! null) { this.offsetStore this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load();可以看到⼴播模式是使⽤LocalFileOffsetStore在Consumer本地保存Offset⽽集群模式是使⽤RemoteBrokerOffsetStore在Broker端远程保存offset。 ⽽这两种Offset的存储⽅式最终都是通过维护本地的offsetTable缓存来管理Offset。4.4Consumer与MessageQueue建⽴绑定关系在 Consumer 的 start 方法中有一处关键逻辑是给rebalanceImpl设置队列分配策略AllocateMessageQueueStrategy该策略用于完成 Consumer 和 MessageQueue 的分配绑定。相关源码this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); //Consumer负载均衡策略 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());AllocateMessageQueueStrategy负责建立 Consumer 与 MessageQueue 的对应关系Topic 的队列数量、同一个消费组下 Consumer 实例数量没有变动时单个 Consumer 固定消费分配到自身的一个或多个 MessageQueue同组其他 Consumer 不会抢占该队列。AllocateMessageQueueStrategy 的 7 个实现类AllocateMessageQueueStrategy是队列分配顶层接口RocketMQ 原生提供 7 种策略实现AbstractAllocateMessageQueueStrategy抽象父类AllocateMachineRoomNearbyAllocateMessageQueueAveragelyAllocateMessageQueueAveragelyByCircleAllocateMessageQueueByConfigAllocateMessageQueueByMachineRoomAllocateMessageQueueConsistentHash思考一下:为什么一个 MessageQueue 只能被同一个消费组中的一个 Consumer 消费Broker 按照ConsumerGroupMessageQueue维度管理队列的 Offset 消费位点。 如果同一个队列被同组多个 Consumer 实例消费不同消费者的消息处理进度不一致会造成队列的 Offset 错乱因此 RocketMQ 做了约束单个 MessageQueue 在同一个消费组内仅能由一个 Consumer 实例负责消费。4.5 顺序消费与并发消费启动逻辑在 Consumer 的start()方法内会根据注册的监听器类型创建不同的消息拉取服务ConsumerMessageService以此区分顺序消费和并发消费。// 判断是否为顺序消费监听器 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {//顺序消费监听器 this.consumeOrderly true; this.consumeMessageService new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); //POPTODO reuse ExecutorPOP消费模式本篇暂不关注 this.consumeMessagePopService new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {//并发消费监听器 this.consumeOrderly false; this.consumeMessageService new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); //POPTODO reuse ExecutorPOP消费模式本篇暂不关注 this.consumeMessagePopService new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } //启动消息消费服务 this.consumeMessageService.start(); //POPTODO //this.consumeMessagePopService.start();核心规则注册MessageListenerOrderly→ 标记consumeOrderlytrue创建顺序消费服务 ConsumeMessageOrderlyService注册MessageListenerConcurrently→ 标记consumeOrderlyfalse创建并发消费服务 ConsumeMessageConcurrentlyService代码预留POP新模式实现ConsumeMessagePopOrderlyService/ConsumeMessagePopConcurrentlyService源码 TODO 标记当前暂不启用。消费者通过registerMessageListener注册的回调监听最终会被封装为对应ConsumerMessageService的实现类。二、POP/PULL 消息拉取入口PullMessageService#runPullMessageService随客户端一同启动run()为消息拉取主循环方法区分PULL、POP两种拉取模式Override public void run() { logger.info(this.getServiceName() service started); while (!this.isStopped()) { try { MessageRequest messageRequest this.messageRequestQueue.take(); if (messageRequest.getMessageRequestMode() MessageRequestMode.POP) { this.popMessage((PopRequest) messageRequest); } else { this.pullMessage((PullRequest) messageRequest); } } catch (InterruptedException ignored) { } catch (Exception e) { logger.error(Pull Message Service Run Method exception, e); } } logger.info(this.getServiceName() service end); }代码说明死循环阻塞从messageRequestQueue队列获取拉取任务根据枚举MessageRequestMode区分POP执行popMessage()新增 POP 工作模式TODO 暂不深究PULL执行pullMessage()传统主动拉取消息逻辑异常捕获只打印日志不终止循环保证消息拉取服务持续运行。4.6 PullCallback 与顺序 / 并发消费提交逻辑消费者拉取消息后的逻辑流转会进入DefaultMQPushConsumerImpl#pullCallback回调对象Consumer 每从 Broker 拉取到一批消息就通过该回调提交消费请求由此印证顺序消费仅在异步推送 (Push) 模式生效同步拉取模式无法实现顺序消费原因是同步拉取场景下 pullCallback 不会被传递执行但拉模式可由业务代码手动指定消费队列实现自定义顺序消费。PullCallback 核心源码PullCallback pullCallback new PullCallback() { Override public void onSuccess(PullResult pullResult) { if (pullResult ! null) { switch (pullResult.getPullStatus()) { case FOUND: // 向消费服务提交消费任务 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); break; // ...其他状态 } } } }拉取到消息状态为FOUND时调用consumeMessageService.submitConsumeRequest()提交消费任务生成ConsumeRequest交由消费线程处理。并发消费ConsumeMessageConcurrentlyService#submitConsumeRequest源码逻辑Override public void submitConsumeRequest( final ListMessageExt msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); // 批量消息小于等于单次消费上限整批提交 if (msgs.size() consumeBatchSize) { ConsumeRequest consumeRequest new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { // 消息量大时按批次拆分循环提交多个消费任务 for (int total 0; total msgs.size(); ) { ListMessageExt msgThis new ArrayList(); for (int i 0; i consumeBatchSize; i, total) { if (total msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total msgs.size(); total) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }并发消费特点仅控制单次批量消费消息条数consumeBatchSize不锁定 MessageQueue线程池consumeExecutor多线程并行执行消费任务同一个队列的消息会被多个线程并发处理天然无法保证消息有序。顺序消费ConsumeMessageOrderlyService 核心规则和并发消费最大区别锁定单个 MessageQueue当前队列全部消息处理完成后才会拉取消费下一个队列消息同一个队列消息单线程串行消费以此实现消息顺序性。五、客户端负载均衡机制Producer ConsumerRocketMQ 负载均衡分为发送负载均衡Producer和消费负载均衡Consumer Rebalance两者完全独立是集群高可用、高吞吐的核心保障。5.1 Producer 发送负载均衡核心目的将消息均匀分散到所有队列、所有 Broker 节点最大化集群吞吐量避免单队列热点瓶颈。默认策略轮询 RoundRobin基于原子计数器自增对队列数量取模消息均匀打散到所有队列无状态、高性能、无需协调5.2 Consumer 消费负载均衡Rebalance重平衡核心目的同一消费组内将 Topic 的所有队列平均分配给组内所有消费者实例保证消费均匀、不重复、不遗漏。触发时机默认20s一次服务启动、服务上下线、集群扩容缩容Topic 队列数量变更默认策略平均分配 AllocateMessageQueueAveragely队列总数、消费者总数做平均计算尽量保证每个消费者分配队列数量一致保证有序消息不冲突、消费负载均衡5.3 消费者重平衡流程图5.4 核心源码消费者平均分配算法public ListMessageQueue allocate( String consumerGroup, String currentCID, ListMessageQueue mqAll, ListString cidAll) { int currentIndex cidAll.indexOf(currentCID); int mod mqAll.size() % cidAll.size(); // 计算当前消费者需要分配的队列数量 int avgSize mqAll.size() / cidAll.size() (mod currentIndex ? 1 : 0); int start currentIndex * avgSize; return mqAll.subList(start, start avgSize); }