消息发送与持久化机制分析核心流程1. 消息发送处理当客户端A发送消息给B时服务端的处理流程如下消息接收服务端通过 MQTT 协议接收客户端A的消息消息处理SendMessageHandler的action方法处理消息权限检查黑名单、禁言等敏感词过滤消息格式验证消息存储调用saveAndPublish方法m_messagesStore.storeMessage保存消息到 Hazelcastm_messagesStore.getNotifyReceivers确定消息接收者publisher.publish2Receivers发送通知给接收者2. 消息持久化流程消息持久化是通过 Hazelcast 的 MapStore 机制自动完成的存储到 HazelcastMemoryMessagesStore.storeMessage方法将消息存储到 Hazelcast自动持久化Hazelcast 配置了MessageLoader作为 MapStore调用 store 方法Hazelcast 自动调用MessageLoader.store方法持久化到数据库MessageLoader.store方法调用DatabaseStore.persistMessage方法3. 消息拉取流程客户端接收通知客户端B收到通知消息拉取请求客户端B调用PullMessageHandler拉取消息消息返回服务端根据messageSeq从存储中获取消息并返回给客户端关键组件和方法1. 消息处理相关SendMessageHandler处理消息发送请求action方法处理消息发送逻辑路径C:\workspace\im-server\broker\src\main\java\io\moquette\imhandler\SendMessageHandler.javaIMHandler所有消息处理器的基类saveAndPublish方法保存消息并发布通知路径C:\workspace\im-server\broker\src\main\java\io\moquette\imhandler\IMHandler.java2. 消息存储相关MemoryMessagesStore消息存储实现storeMessage方法保存消息到 HazelcastgetNotifyReceivers方法确定消息接收者insertUserMessages方法插入用户消息记录路径C:\workspace\im-server\broker\src\main\java\io\moquette\persistence\MemoryMessagesStore.javaDatabaseStore数据库存储实现persistMessage方法将消息持久化到数据库persistUserMessage方法将用户消息关联持久化到数据库路径C:\workspace\im-server\broker\src\main\java\io\moquette\persistence\DatabaseStore.javaMessageLoaderHazelcast MapStore 实现store方法当消息存储到 Hazelcast 时自动调用路径C:\workspace\im-server\broker\src\main\java\io\moquette\persistence\MessageLoader.java3. 消息发布相关MessagesPublisher消息发布器publish2Receivers方法发送消息通知给接收者路径C:\workspace\im-server\broker\src\main\java\io\moquette\spi\impl\MessagesPublisher.java技术实现细节1. 通知机制服务端不是直接推送消息内容而是发送一个通知包含pullType拉取类型普通消息、聊天室消息等messageSeq消息序列号客户端收到通知后根据messageSeq拉取具体消息这种设计有以下优点减少网络传输量客户端可以根据自己的需要拉取消息支持消息同步和漫游2. 消息存储Hazelcast用于缓存消息设置了过期时间普通消息7天过期透明消息10秒过期数据库用于持久化存储消息内容存储在分片表中用户消息关联存储在t_user_message表中3. 持久化机制消息持久化通过 Hazelcast 的 MapStore 机制实现消息存储到 Hazelcast 时自动调用MessageLoader.store方法MessageLoader.store方法调用DatabaseStore.persistMessage方法DatabaseStore.persistMessage方法将消息异步持久化到数据库代码示例1. 消息存储到 Hazelcast// MemoryMessagesStore.storeMessage 方法publicWFCMessage.MessagestoreMessage(StringfromUser,StringfromClientId,WFCMessage.Messagemessage){HazelcastInstancehzInstancem_Server.getHazelcastInstance();IMapLong,MessageBundlemIMaphzInstance.getMap(MESSAGES_MAP);MessageBundlemessageBundlenewMessageBundle(message.getMessageId(),fromUser,fromClientId,message);if(message.getContent().getPersistFlag()Transparent){mIMap.put(message.getMessageId(),messageBundle,10,TimeUnit.SECONDS);}else{mIMap.put(message.getMessageId(),messageBundle,7,TimeUnit.DAYS);}returnmessage;}2. MapStore 自动持久化// MessageLoader.store 方法Overridepublicvoidstore(LongaLong,MessageBundlemessageBundle){getDatabaseStore().persistMessage(messageBundle.getMessage(),false);}3. 数据库持久化// DatabaseStore.persistMessage 方法voidpersistMessage(finalWFCMessage.Messagemessage,booleanupdate){if(message.getContent().getPersistFlag()Transparent){return;}mScheduler.execute(()-{Connectionconnectionnull;PreparedStatementstatementnull;try{connectionDBUtil.getConnection();StringtableMessageShardingUtil.getMessageTable(message.getMessageId());// SQL 语句省略...// 将消息内容存储到数据库}catch(SQLExceptione){e.printStackTrace();Utility.printExecption(LOG,e,RDBS_Exception);}finally{DBUtil.closeDB(connection,statement);}});}设计优势解耦消息处理逻辑与持久化逻辑分离自动同步利用 Hazelcast 的 MapStore 机制实现内存存储和持久化存储的自动同步性能优化消息首先存储到内存再异步持久化到数据库提高响应速度可靠性即使服务器重启消息也能从数据库中恢复结论该项目采用了通知-拉取的消息传递模式结合 Hazelcast 内存缓存和数据库持久化实现了高效、可靠的消息传递机制。核心流程包括消息接收与处理、消息存储到 Hazelcast、自动持久化到数据库、发送通知、客户端拉取消息。通过这些组件的协作确保了消息的实时传递和持久化存储即使在服务器重启的情况下也能保证消息不丢失。这种设计既保证了系统的性能又确保了数据的可靠性是实时通信系统的理想选择。