【Redis】发布订阅与消息队列Day8(2026年)
写在前面消息队列是分布式系统中不可或缺的组件Redis除了作为缓存数据库还提供了发布订阅Pub/Sub和Stream两种消息机制。虽然Redis不是专业的消息队列但在轻量级场景下它是一个不错的选择。今天我们来深入理解Redis的消息能力。文章目录写在前面一、Redis发布订阅Pub/Sub1.1 什么是发布订阅1.2 Pub/Sub基本命令1.3 Pub/Sub使用示例1.4 模式订阅二、发布订阅的局限性2.1 主要局限性2.2 适用场景三、Redis Stream数据结构3.1 什么是Stream3.2 Stream基本命令3.3 Stream使用示例四、Stream消费组4.1 消费组概念4.2 消费组操作4.3 消费组配置五、Stream vs Kafka对比六、踩坑提醒七、实际应用案例7.1 实时消息推送7.2 日志收集7.3 事件溯源八、面试高频考点考点1Redis做消息队列的优缺点考点2Pub/Sub和Stream的区别考点3如何保证消息不丢失考点4Stream的消息ID有什么特点九、参考资料十、互动话题一、Redis发布订阅Pub/Sub1.1 什么是发布订阅实际场景即时通讯系统中用户上线后需要订阅多个群组消息当有人在群组发言时所有在线成员都能实时收到消息。发布订阅Pub/Sub是一种消息通信模式发布者Publisher发送消息到频道订阅者Subscriber订阅频道接收消息频道Channel消息传递的通道┌────────────┐ │ Publisher │──────┐ └────────────┘ │ ▼ ┌──────────────┐ │ Channel │ └──────────────┘ │ ┌───────────┼───────────┐ ▼ ▼ ▼ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ Subscriber │ │ Subscriber │ │ Subscriber │ └────────────┘ └────────────┘ └────────────┘1.2 Pub/Sub基本命令命令说明PUBLISH channel message发布消息SUBSCRIBE channel [channel...]订阅频道UNSUBSCRIBE [channel...]取消订阅PSUBSCRIBE pattern模式订阅PUNSUBSCRIBE [pattern...]取消模式订阅PUBSUB CHANNELS查看活跃频道PUBSUB NUMSUB channel查看频道订阅数1.3 Pub/Sub使用示例订阅消息# 客户端1订阅频道 SUBSCRIBE news:sports # 返回 # 1) subscribe # 2) news:sports # 3) (integer) 1 # 进入阻塞状态等待消息发布消息# 客户端2发布消息 PUBLISH news:sports NBA总决赛湖人vs凯尔特人 # 返回(integer) 1 表示有1个订阅者收到消息订阅者收到消息# 客户端1收到 # 1) message # 2) news:sports # 3) NBA总决赛湖人vs凯尔特人1.4 模式订阅使用通配符订阅多个频道# 订阅所有news开头的频道 PSUBSCRIBE news:* # 可以匹配 # news:sports, news:tech, news:weather 等二、发布订阅的局限性踩坑提醒Redis Pub/Sub是实时的如果订阅者离线消息会丢失2.1 主要局限性局限性说明消息不持久化订阅者离线时收不到消息无确认机制发布后无法确认是否被消费无消费组不支持消费者组负载均衡无消息回溯无法重新消费历史消息阻塞模式订阅时客户端处于阻塞状态2.2 适用场景适用场景不适用场景实时消息推送需要消息持久化即时通讯需要消息确认简单事件通知需要消费组在线状态同步高可靠性要求三、Redis Stream数据结构3.1 什么是Stream经验之谈Redis 5.0引入的Stream是专门为消息队列设计的数据结构解决了Pub/Sub的主要痛点。Stream是Redis 5.0引入的数据结构具有以下特点消息持久化存储支持消费组支持消息确认ACK支持消息回溯类似Kafka的设计理念3.2 Stream基本命令命令说明XADD添加消息XREAD读取消息XRANGE范围读取XGROUP创建消费组XREADGROUP消费组读取XACK确认消息XDEL删除消息XLEN消息长度XINFO查看Stream信息3.3 Stream使用示例添加消息# 添加消息到Stream XADD mystream * name zhangsan age 25 # 返回1640000000000-0消息ID # * 表示由Redis自动生成ID # 格式毫秒时间戳-序列号读取消息# 从头开始读取 XRANGE mystream - # 返回所有消息 # 读取最新消息非阻塞 XREAD COUNT 2 STREAMS mystream 0 # 阻塞读取最新消息 XREAD BLOCK 5000 STREAMS mystream $ # $表示最新消息位置BLOCK 5000表示阻塞5秒范围查询# 按时间范围查询 XRANGE mystream 1640000000000 1640000001000 # 只取2条 XRANGE mystream - COUNT 2四、Stream消费组4.1 消费组概念消费组Consumer Group是Stream的核心特性多个消费者组成一个组每条消息只能被组内一个消费者消费支持消息确认和重新消费┌──────────────┐ │ Stream │ └──────────────┘ │ ┌────────────┼────────────┐ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │Consumer-1│ │Consumer-2│ │Consumer-3│ └──────────┘ └──────────┘ └──────────┘ │ │ │ └────────────┼────────────┘ │ ┌──────────────┐ │ Consumer Group│ └──────────────┘4.2 消费组操作创建消费组# 从头开始消费 XGROUP CREATE mystream mygroup 0 # 只消费新消息 XGROUP CREATE mystream mygroup $ # 创建成功后查看信息 XINFO GROUPS mystream消费组读取消息# 消费组读取 XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream # 表示读取未消费的消息 # consumer1是消费者名称确认消息# 处理完消息后确认 XACK mystream mygroup 1640000000000-0 # 返回(integer) 1查看待处理消息# 查看待处理消息 XPENDING mystream mygroup # 查看详细信息 XPENDING mystream mygroup - 10转移消息所有权# 将消息转移给其他消费者 XCLAIM mystream mygroup consumer2 0 1640000000000-04.3 消费组配置# 设置最大长度防止Stream无限增长 XADD mystream MAXLEN 1000 * field value # 精确限制长度性能更好 XADD mystream MAXLEN ~ 1000 * field value五、Stream vs Kafka对比对比项Redis StreamKafka消息持久化支持内存磁盘支持磁盘消息容量有限内存限制大磁盘存储吞吐量高10万级/秒极高百万级/秒消费组支持支持消息确认支持支持消息回溯支持支持分区单分区多分区运维复杂度低高适用场景轻量级消息队列大数据流处理六、踩坑提醒踩坑提醒消息丢失问题问题1Pub/Sub消息丢失# 订阅者离线时消息直接丢失 SUBSCRIBE channel # 离线期间发布的消息无法收到解决方案使用Stream替代Pub/Sub使用专业消息队列RabbitMQ、Kafka问题2Stream消息堆积# Stream无限增长会耗尽内存 XADD stream * data value # 持续添加不删除解决方案# 使用MAXLEN限制长度 XADD stream MAXLEN ~ 10000 * data value # 定期清理 XTRIM stream MAXLEN 10000问题3消费者宕机消息未确认# 消费者读取消息后宕机消息处于pending状态 XREADGROUP GROUP mygroup consumer1 STREAMS stream # 宕机...解决方案# 其他消费者接管pending消息 XCLAIM stream mygroup consumer2 60000 pending-message-id # 60000是消息空闲时间毫秒七、实际应用案例7.1 实时消息推送# 发布者 XADD notifications * user_id 1001 content 您有新消息 # 消费者 XREADGROUP GROUP notify_group consumer1 BLOCK 5000 STREAMS notifications 7.2 日志收集# 应用服务写入日志 XADD app:logs * level ERROR service user-api message 连接超时 # 日志处理服务消费 XREADGROUP GROUP log_group log_processor STREAMS app:logs 7.3 事件溯源# 记录领域事件 XADD order:events * event OrderCreated order_id 1001 amount 99.9 XADD order:events * event OrderPaid order_id 1001 paid_at 1640000000 XADD order:events * event OrderShipped order_id 1001 shipped_at 1640000100 # 回放事件 XRANGE order:events - 八、面试高频考点考点1Redis做消息队列的优缺点答案优点缺点部署简单无需额外组件内存限制不适合海量数据低延迟高性能不支持复杂路由支持消费组和消息确认功能不如专业MQ丰富支持消息持久化Stream没有死信队列机制考点2Pub/Sub和Stream的区别答案对比项Pub/SubStream消息持久化不支持支持离线消息不支持支持消费组不支持支持消息确认不支持支持消息回溯不支持支持适用场景实时推送可靠消息队列考点3如何保证消息不丢失答案使用Stream替代Pub/Sub开启AOF持久化appendfsync everysec使用消费组确认机制XACK监控pending消息及时处理未确认消息设置合理的消息重试机制考点4Stream的消息ID有什么特点答案格式毫秒时间戳-序列号如1640000000000-0时间戳由Redis服务器生成保证递增同一毫秒内通过序列号区分支持自定义ID但通常使用*让Redis自动生成九、参考资料Redis官方文档 - Pub/SubRedis官方文档 - Streams十、互动话题你在项目中使用Redis作为消息队列遇到过什么问题Stream和Kafka你会如何选择各自的适用场景是什么如何设计一个基于Stream的可靠消息队列系统欢迎在评论区分享你的经验和见解下一期预告Day9 - Redis主从复制敬请期待