Redis Stream,是Redis 5.0版本新增加的一个高级数据结构,从字面意思看就是一个流结构,但是在功能上,Redis Stream是Redis对消息队列的一个最佳实现,几乎满足了一个典型消息队列组件应该具备的所有功能,这是Redis作者谋划多年,也是Redis 5.0版本最大的一个Feature了。 上图是我根据Redis Stream的功能和命令画的大体结构图,可以看出Redis Stream有一个Stream消息队列将所有的消息都串起来,Stream内的每个消息都有一个唯一的ID和相应的消息内容,并且消息还是持久化的,在Redis重启后,消息内容还在;Stream可以接收多个生产者的消息;每个Stream都有一个唯一标识,本质上就是Redis的一个key;Stream可以挂多个消费者组或者单消费者,它们之间是相互独立的,都可以消费到Stream内的所有消息。 在消费者组内部,每个消费者有一个组内名称,它们之间是竞争的,共同维护一个last_delivered_id变量,向前推进消费消息;在每个消费者内部会有一个状态数组变量pending_ids,记录了当前已经被Client读取,但是还没有ack的消息,如果Client没有ack,这个变量里面的消息ID会越来越多,ack之后,就开始减少了。 Redis Stream是一个新的强大的支持多播的可持久化的消息队列,作者坦言Redis Stream狠狠地借鉴了Kafka的设计,难道是想取代Kafka吗?来看一下它实现了哪些内容:包括但不限于: 消息ID的序列化生成:由两部分组成:时间戳-序号。时间戳是毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型(int64);序号是在这个毫秒时间点内的消息序号,也是个64位整型。 消息遍历 消息的阻塞和非阻塞读取 消息的分组消费 未完成消息的处理 坏消息问题、Dead Letter、死信问题 消息队列监控、pending等待ack列表 持久化、主从复制、高可用 Redis Stream涉及的基本命令: 生产消息: ID为消息ID,最常使用*,表示由Redis自动生成消息ID,这也是强烈建议的方案。当然也支持自己设定。 field string [field string]就是当前消息内容,由1个或多个key-value构成。 消费消息: [BLOCK milliseconds],用于设置XREAD为阻塞模式,默认为非阻塞模式。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待,需要设置一个等待时间。 key为消息队列的名称。 ID,用于设置由哪个消息ID开始读取。ID是单调递增,0表示从第一条消息开始,可以设置ID向后读取,使用Block模式,配合$作为ID,表示读取最新的消息。 消费者组管理: 在消息队列mq上创建消费组 mpGroup,最后一个参数0,表示该消费组是从第一条消息开始消费的。 消息丢失: 命令XPENDING用来获消费者组或组内消费者的未处理完毕的消息列表;命令XACK则负责确认消费者组内消息处理已完成。 每个Pending的消息有4个属性,在执行命令后可以看到,分别是:消息ID、所属消费者、已读取时长IDLE、消息被读取次数delivery counter。 消息转移: 坏消息删除: 信息监控: Redis Stream命令汇总: 一个最典型的消息队列就是XADD命令配合XREAD BLOCK命令完成,XADD负责生成消息,XREAD BLOCK负责堵塞消费消息。Redis Stream可以进行组内消费的基本原理是,Stream类型会为每个组记录一个最后处理(交付)的消息ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。可以参考:Redis Stream消费组结构图。 Redis Stream和基于BLOCKED LIST、PUB/SUB、ZSET的消息队列比较: Redis现有的BLOCKED LIST、PUB/SUB、ZSET数据类型,虽然也都可以在简单的场景下作为消息队列来使用,但是Redis Stream无疑要完善很多:Redis Stream提供了消息的持久化和主备复制功能、采用新的RadixTree(基数树)数据结构来支持更高效的内存使用和消息读取、甚至是类似于Kafka的Consumer Group功能。 好了,有关Redis Stream的基础知识,我们就讲到这里,由于文章篇幅有限,更深层的知识我们讲的不多,对Redis Stream感兴趣的同学可以自行百度搜索,包括RadixTree、Redis版本更新、命令等等,也可以亲自动手实践一下,说不定哪一天Redis Stream会帮上大忙。 PS:如有任何问题或疑问,请留言告诉我。 喜欢这篇文章的朋友,欢迎关注公众号,第一时间收到更新内容。
XADD key ID field string [field string ...]
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XGROUP CREATE mq mqGroup 0
XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >
XPENDING mq mqGroup [start end count] [consumerA]
XACK mq mqGroup ID
/*转移超过3600s的消息1553585533795-1到消费者A的Pending列表*/
XCLAIM mq mqGroup consumerA 3600000 1553585533795-1
XDEL mq 1553585533795-1 /*删除队列mq中的消息*/
XRANGE mq - + /*查看队列中消息*/
XINFO STREAM mq /*消息队列信息*/
XINFO GROUPS mq /*消费组信息*/
XINFO CONSUMERS mq mqGroup /*消费组成员信息*/
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算