redisstream实现消息队列的实践
在数据库实战开发的过程中,我们经常会遇到一些这样那样的问题,然后要卡好半天,等问题解决了才发现原来一些细节知识点还是没有掌握好。今天golang学习网就整理分享《redisstream实现消息队列的实践》,聊聊消息队列、RedisStream,希望可以帮助到正在努力赚钱的你。
Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。
基于redis实现消息队列的方式有很多:
- PUB/SUB,订阅/发布模式
- 基于List的 LPUSH+BRPOP 的实现
redis 实现消息对列4中方法
发布订阅
发布订阅优点: 典型的一对的,所有消费者都能同时消费到消息。主动通知订阅者而不是订阅者轮询去读。
发布订阅缺点: 不支持多个消费者公平消费消息,消息没有持久化,不管订阅者是否收到消息,消息都会丢失。
使用场景:微服务间的消息同步,如 分布式webSocker,数据同步等。
list 队列
生产者通过lpush生成消息,消费者通过blpop阻塞读取消息。
**list队列优点:**支持多个消费者公平消费消息,对消息进行存储,可以通过lrange查询队列内的消息。
**list队列缺点:**blpop仍然会阻塞当前连接,导致连接不可用。一旦blpop成功消息就丢弃了,期间如果服务器宕机消息会丢失,不支持一对多消费者。
zset 队列
生产者通过zadd 创建消息时指定分数,可以确定消息的顺序,消费者通过zrange获取消息后进行消费,消费完后通zrem删除消息。
zset优点: 保证了消息的顺序,消费者消费失败后重新入队不会打乱消费顺序。
zset缺点: 不支持一对多消费,多个消费者消费时可能出现读取同一条消息的情况,得通过加锁或其他方式解决消费的幂等性。
zset使用场景:由于数据是有序的,常常被用于延迟队列,如 redisson的DelayQueue
Stream 队列
Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。
参考kafka的思想,通过多个消费者组和消费者支持一对多消费,公平消费,消费者内维护了pending列表防止消息丢失。
提供消息ack机制。

基本命令
xadd 生产消息
往 stream 内创建消息 语法为:
XADD key ID field string [field string …]
# * 表示自动生成id redis会根据时间戳+序列号自动生成id,不建议我们自己指定id xadd stream1 * name zs age 23
读取消息
读取stream内的消息,这个并不是消费,只是提供了查看数据的功能,语法为:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
#表示从 stream1 内取出一条消息,从第0条消息读取(0表示最小的id) xread count 1 streams stream1 0 #表示从 stream1 内 id=1649143363972-0 开始读取一条消息,读取的是指定id的下一条消息 xread count 1 streams msg 1649143363972-0 #表示一直阻塞读取最新的消息($表示获取下一个生成的消息) xread count 1 block 0 streams stream1 $ xrange stream - + 10
XRANGE key startID endID count
#表示从stream1内取10条消息 起始位置为 -(最小ID) 结束位置为+(最大ID) xrange stream1 - + 10
xgroup 消费者组
redis stream 借鉴了kafka的设计,采用了消费者和消费者组的概念。允许多个消费者组消费stream的消息,每个消费者组都能收到完整的消息,例如:stream内有10条消息,消费者组A和消费者组B同时消费时,都能获取到这10条消息。
每个消费者组内可以有多个消费者消费,消息会平均分摊给各个消费者,例如:stream有10条消息,消费者A,B,C同时在同一个组内消费,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9
创建消费者组:
#消费消息首先得创建消费者组 # 表示为队列 stream1 创建一个消费者组 group1 从消息id=0(第一条消息)开始读取消息 xgroup create stream1 group1 0 #查询stream1内的所有消费者组信息 xinfo groups stream1
xreadgroup 消费消息
通过xreadgroup可以在消费者组内创建消费者消费消息
XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
#创建消费者读取消息 #在group1消费者组内通过consumer1消费stream1内的消息,消费1条未分配的消息 (> 表示未分配过消费者的消息) xreadgrup group group1 consumer1 count 1 streams stream1 >
Pending 等待列表
通过 xreadgroup 读取消息时消息会分配给对应的消费者,每个消费者内都维护了一个Pending列表用于保存接收到的消息,当消息ack后会从pending列表内移除,也就是说pending列表内维护的是所有未ack的消息id
每个Pending的消息有4个属性:
- 消息ID
- 所属消费者
- IDLE,已读取时长
- delivery counter,消息被读取次数
XPENDING key group [start end count] [consumer]
#查看pending列表 # 查看group1组内的consumer1的pending列表 - 表示最小的消息id + 表示最大的消息ID xpending stream1 group1 - + 10 consumer1 # 查看group1组内的所有消费者pending类表 xpending stream1 group1 - + 10
消息确认
当消费者消费了消息,需要通过 xack 命令确认消息,xack后的消息会从pending列表移除
XACK key gruopName ID
xack stream1 group1 xxx
消息转移
当消费者接收到消息却不能正确消费时(报错或其他原因),可以使用 XCLAIM 将消息转移给其他消费者消费,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。
通过xclaim转移的消息只是将消息移入另一个消费者的pending列表,消费者并不能通过xreadgroup读取到消息,只能通过xpending读取到。
# 表示将ID为 1553585533795-1 的消息转移到消费者B消费,前提是消费 XCLAIM stream1 group1 consumer1 3600000 1553585533795-1
信息监控
redis提供了xinfo来查看stream的信息
#查看sream信息 xinfo stream steam1 #查询消费者组信息 xinfo groups group1 #查询消费者信息 xinfo consumers consumer1
SpringBoot 整合
1 引入依赖
org.springframework.boot spring-boot-starter-data-redis
2 编写消费者
@Slf4j @Component public class EmailConsumer implements StreamListener> { public final String streamName = "emailStream"; public final String groupName = "emailGroup"; public final String consumerName = "emailConsumer"; @Autowired private StringRedisTemplate stringRedisTemplate; @Override public void onMessage(MapRecord message) { //log.info("stream名称-->{}",message.getStream()); //log.info("消息ID-->{}",message.getId()); log.info("消息内容-->{}",message.getValue()); Map msgMap = message.getValue(); if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){ //消费异常导致未能ack时,消息会进入pending列表,我们可以启动定时任务来读取pending列表处理失败的任务 log.info("消费异常-->"+message); return; } StreamOperations streamOperations = stringRedisTemplate.opsForStream(); //消息应答 streamOperations.acknowledge( streamName,groupName,message.getId() ); } //我们可以启动定时任务不断监听pending列表,处理死信消息 }
3 配置redis
序列化配置
@EnableCaching
@Configuration
public class RedisConfig {
/**
* 设置redis序列化规则
*/
@Bean
public Jackson2JsonRedisSerializer
消费者组和消费者配置
@Slf4j
@Configuration
public class RedisStreamConfig {
@Autowired
private EmailConsumer emailConsumer;
@Autowired
private RedisTemplate redisTemplate;
@Bean
public StreamMessageListenerContainer.StreamMessageListenerContainerOptions> emailListenerContainerOptions(){
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
//block读取超时时间
.pollTimeout(Duration.ofSeconds(3))
//count 数量(一次只获取一条消息)
.batchSize(1)
//序列化规则
.serializer( stringRedisSerializer )
.build();
}
/**
* 开启监听器接收消息
*/
@Bean
public StreamMessageListenerContainer> emailListenerContainer(RedisConnectionFactory factory,
StreamMessageListenerContainer.StreamMessageListenerContainerOptions> streamMessageListenerContainerOptions){
StreamMessageListenerContainer> listenerContainer = StreamMessageListenerContainer.create(factory,
streamMessageListenerContainerOptions);
//如果 流不存在 创建 stream 流
if( !redisTemplate.hasKey(emailConsumer.streamName)){
redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", ""));
log.info("初始化stream {} success",emailConsumer.streamName);
}
//创建消费者组
try {
redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName);
} catch (Exception e) {
log.info("消费者组 {} 已存在",emailConsumer.groupName);
}
//注册消费者 消费者名称,从哪条消息开始消费,消费者类
// > 表示没消费过的消息
// $ 表示最新的消息
listenerContainer.receive(
Consumer.from(emailConsumer.groupName, emailConsumer.consumerName),
StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()),
emailConsumer
);
listenerContainer.start();
return listenerContainer;
}
}
4.生产者生产消息
@GetMapping("/redis/ps")
public String redisPublish(String content,Integer count){
StreamOperations streamOperations = redisTemplate.opsForStream();
for (int i = 0; i
参考文档:
SpringBoot整合redis stream 实现消息队列
终于介绍完啦!小伙伴们,这篇关于《redisstream实现消息队列的实践》的介绍应该让你收获多多了吧!欢迎大家收藏或分享给更多需要学习的朋友吧~golang学习网公众号也会发布数据库相关知识,快来关注吧!
Redis中键和数据库通用指令详解
- 上一篇
- Redis中键和数据库通用指令详解
- 下一篇
- Redis分布式锁之红锁的实现
-
- 活力的鲜花
- 很有用,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,帮助很大,总算是懂了,感谢大佬分享技术贴!
- 2023-03-05 08:29:06
-
- 彩色的小蝴蝶
- 这篇博文真是及时雨啊,太详细了,很棒,已收藏,关注师傅了!希望师傅能多写数据库相关的文章。
- 2023-03-03 12:50:51
-
- 完美的泥猴桃
- 细节满满,收藏了,感谢老哥的这篇博文,我会继续支持!
- 2023-02-04 10:32:17
-
- 数据库 · Redis | 22小时前 | Redis · 缓存治理 · Keyspace Notifications · 过期事件 · redis Pub/Sub Keyspace Notifications 过期事件 缓存监听 补偿任务
- Redis 过期事件监听实践:用 Keyspace Notifications 做轻量补偿
- 181浏览 收藏
-
- 数据库 · Redis | 2星期前 | Redis · Streams · 消费者组 · Pending · XACK · 消息堆积 消费者组 XACK XPENDING XAUTOCLAIM Redis Streams
- Redis Streams 消费者组消息堆积怎么办:从 XPENDING 到 XACK 一步步排查
- 385浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ljg-skills
- ljg-skills 是李继刚开源的 AI 技能与提示词集合,面向大模型使用者整理了一批可复用的 prompt、角色设定和任务技能模板,适合用于学习提示词设计、搭建个人 AI 工作流和沉淀团队常用智能体能力。
- 2915次使用
-
- MELO音乐
- MELO音乐是一站式AI视频与音乐制作助手,对标suno, udio的高品质体验。提供伴奏生成、原创写词、无损导出、哼唱识曲、混音变声等全套音频与短视频编辑工具。无论是流行Kpop、电音说唱、民谣古风、摇滚儿歌还是商用轻音乐,MELO为你免费谱曲,轻松做同款!
- 2700次使用
-
- UniScribe
- UniScribe 是一款 AI 音视频转文字与内容整理工具,支持上传音频、视频文件或粘贴 YouTube 链接,自动生成转写文本、摘要、思维导图和关键问题,并支持多格式导出,适合会议记录、课程学习、访谈整理和内容创作复盘。
- 2630次使用
-
- 剧云
- 剧云是专业中文剧本创作平台,安全稳定运行十余年,集成AI编剧、剧本医生审核、人物小传、剧情关系图、大纲编写、多人协作、Word导入导出、版权管控功能,数据安全防护,轻松高效创作剧本。
- 2867次使用
-
- 万象有声
- 万象有声,一个专为有声创作者打造的新一代智能有声内容创作平台。平台提供专业的智能拆章、智能画本编辑、AI配音、AI生成音效、后期制作、智能对轨、智能审听等有声创作全流程工具,可以帮助创作者高效、低成本创作出引人入胜的有声作品。立即体验,让有声书制作更简单!
- 2806次使用
-
- Redis 使用 List 实现消息队列的优缺点
- 2022-12-30 114浏览
-
- go+redis实现消息队列发布与订阅的详细过程
- 2023-01-07 161浏览
-
- 详解RedisStream做消息队列
- 2022-12-31 170浏览
-
- Redis Stream 消息队列实战:消费组、ACK 和失败重投怎么配
- 2026-06-13 187浏览
-
- Golang中优秀的消息队列NSQ基础安装及使用详解
- 2022-12-27 260浏览

