本文概述
1.简介
Redis 5中有一个关键的新功能:流。
流是日志形式的存储结构, 你可以在其中添加数据。它将为每个数据生成一个时间戳ID。而且流还具有读取数据的便捷模型。
因此, 流适用于消息队列和时间序列存储。
2.安装
我们需要使用最新版本的Redis 5.0, 在这里我们使用docker redis容器:
docker run --name redis5 -p 6379:6379 -d redis:5.0-rc3
Redis客户端:
docker run -it --link redis5:redis --rm redis redis-cli -h redis -p 6379
启动后将进入交互式命令行:
redis:6379>
3.用法
3.1向流添加元素
流元素可以是一个或多个键值对。让我们向流中添加元素:
redis:6379> XADD mystream * sensor-id 1234 temperature 19.8
1531989605376-0
从上面我们可以知道:
- mystream是流的关键;
- *位置的参数为元素ID, *表示系统自动生成元素ID。
- 添加的元素包含2个键值对, 传感器ID 1234和温度19.8;
- 返回的值是新添加的元素的ID, 由时间戳和递增数字组成。
你还可以获取流中的元素数:
redis:6379> XLEN mystream
(integer) 1
3.2范围查询
当我们要使用范围查询时, 我们需要指定开始和结束ID, 这等效于指定时间范围:
redis:6379> XRANGE mystream 1531989605376 1531989605377
1) 1) 1531989605376-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
你可以将-用于最小的ID, 而将+用于最大的ID:
redis:6379> XRANGE mystream - +
1) 1) 1531989605376-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
当返回的元素太多时, 你可以限制返回结果的数量, 就像查询数据库并通过COUNT参数指定它时分页一样:
redis:6379> XRANGE mystream - + COUNT 2
1) 1) 1531989605376-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
你还可以使用XREVRANGE命令来反向查询, 其用法与XRANGE相同。
3.3聆听流的新元素
redis:6379> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1531989605376-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
STREAMS之后的mystream指定目标流的密钥; 0是最小的ID, 我们需要获得大于指定流中指定ID的元素; COUNT是指我们要获取的元素数。
可以一起指定多个流, 例如STREAMS mystream otherstream 0 0。
3.3.1阻止监听器
如果你在客户端1中执行以下操作:
redis:6379> XREAD BLOCK 0 STREAMS mystream $
它将进入等待状态。
如果你向客户端2添加元素:
redis:6379> XADD mystream * test 1
刚添加的元素将显示在客户端1中:
1) 1) "mystream"
2) 1) 1) 1531994510562-0
2) 1) "test"
2) "1"
0是指定的超时, 因此0表示从不超时; $表示流中的最大ID。
3.4消费群体
当流的数量非常大时, 或者当使用者处理非常耗时时, 如果只有一个使用者, 它将承受更大的压力。因此, redis流提供了使用者组的概念, 从而允许多个使用者处理同一个流以实现负载平衡。
例如, 如果有3个使用者C1, C2和C3, 并且流中有7个消息元素, 则这些使用者的分配为:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
3.4.1创建消费者组
redis:6379> XGROUP CREATE mystream mygroup01 $
OK
这里为流mystream创建了一个消费者组, 该组的名称为mygroup01; $表示读取当前最大ID之后的元素。
3.4.2添加测试数据
添加一些新数据:
redis:6379> XADD mystream * message apple
1531999977149-0
redis:6379> XADD mystream * message orange
1531999980272-0
redis:6379> XADD mystream * message strawberry
1531999983493-0
redis:6379> XADD mystream * message apricot
1531999988458-0
redis:6379> XADD mystream * message banana
1531999991782-0
3.4.3通过消费者组读取数据
redis:6379> XREADGROUP GROUP mygroup01 Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1531999977149-0
2) 1) "message"
2) "apple"
爱丽丝(Alice)是该组成员的名称, >表示该组的成员到目前为止尚未读取数据。
如你所见, 你无需预先创建组成员, 因为它们将在首次使用时自动创建。
然后让我们创建另一个成员以读取数据:
redis:6379> XREADGROUP GROUP mygroup01 Bob COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) 1531999980272-0
2) 1) "message"
2) "orange"
3.4.4消费历史
redis:6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1531999977149-0
2) 1) "message"
2) "apple"
此处最后指定的ID为0, 因此你可以获得未决的历史记录数据(已使用但未发送消耗确认的数据), 并且可以帮助恢复后的工作。
3.4.5消费确认
redis:6379> XACK mystream mygroup01 1531999977149-0
(integer) 1
1531999977149-0是Alice消耗的苹果数据。现在让我们检查爱丽丝的消费历史记录:
redis:6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream 0
1) 1) "mystream"
2) (empty list or set)
它已经是空的。
3.4.6故障处理
因此, 当使用者遇到问题时, 你可以获取尚未确认的消息数据, 然后将其恢复。这是一种安全机制, 但是如果不能再恢复有问题的使用者, 该怎么办?有什么方法可以处理尚未确认的消息数据?
redis流中提供了一个解决方案来处理这种情况:
- 找出所有已传递但未确认的消息数据;
- 更改这些数据的所有者。
现在, 它允许新的使用者处理数据。
列出未处理的数据:
redis:6379> XPENDING mystream mygroup01 - + 10
1) 1) 1531999980272-0
2) "Bob"
3) (integer) 45126376
4) (integer) 2
2) 1) 1531999983493-0
2) "Tom"
3) (integer) 867475
4) (integer) 1
你会看到有2个未处理的数据, 并且列出了每个数据的ID, 所有者, 此消息的空闲时间(以毫秒为单位)以及该消息的传递次数。
更改所有者:
redis:6379> XCLAIM mystream mygroup01 Gates 3600 1531999980272-0 1531999983493-0
1) 1) 1531999980272-0
2) 1) "message"
2) "orange"
2) 1) 1531999983493-0
2) 1) "message"
2) "strawberry"
它将指定2个ID的消息传递给Gates。 3600是指最短的空闲时间, 它将空闲时间大于3600的指定消息分配给Gates。请注意, 盖茨是一个新消费者。之前尚未声明过。
检查到目前为止盖茨尚未处理的数据:
redis:6379> XREADGROUP GROUP mygroup01 Gates STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) 1531999980272-0
2) 1) "message"
2) "orange"
2) 1) 1531999983493-0
2) 1) "message"
2) "strawberry"
有2个新分配的数据。
3.5查看相关信息
查看基本信息:
redis:6379> XINFO STREAM mystream
1) length
2) (integer) 15
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2
9) first-entry
10) 1) 1531989605376-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
11) last-entry
12) 1) 1531999991782-0
2) 1) "message"
2) "banana"
查看消费者组信息:
redis:6379> XINFO GROUPS mystream
1) 1) name
2) "mygroup"
3) consumers
4) (integer) 3
5) pending
6) (integer) 5
2) 1) name
2) "mygroup01"
3) consumers
4) (integer) 4
5) pending
6) (integer) 2
查看有关组中消费者的信息:
redis:6379> XINFO CONSUMERS mystream mygroup
1) 1) name
2) "Alice"
3) pending
4) (integer) 3
5) idle
6) (integer) 2483388
2) 1) name
2) "Bob"
3) pending
4) (integer) 2
5) idle
6) (integer) 48453755
3) 1) name
2) "Gates"
3) pending
4) (integer) 0
5) idle
6) (integer) 2385114
3.7删除消息数据
首先让我们检查现有数据:
redis:6379> XRANGE mystream - + COUNT 2
1) 1) 1531989605376-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
2) 1) 1531994510562-0
2) 1) "test"
2) "1"
删除第一条数据:
redis:6379> XDEL mystream 1531989605376-0
(integer) 1
再次检查, 你会发现先前的第一个数据不见了:
redis:6379> XRANGE mystream - + COUNT 2
1) 1) 1531994510562-0
2) 1) "test"
2) "1"
2) 1) 1531994516257-0
2) 1) "test"
2) "2"
注意:如果使用XDEL, 它并不会真正从内存中删除数据, 而只是标记数据。它不会回收内存。
3.8设置流的最大长度
添加数据并将最大长度指定为2:
redis:6379> XADD mystream MAXLEN 2 * value 1
1532049865028-0
redis:6379> XADD mystream MAXLEN 2 * value 2
1532049872075-0
redis:6379> XADD mystream MAXLEN 2 * value 3
1532049877554-0
我们添加了3条数据。现在让我们看一下流的长度和当前内容:
redis:6379> XLEN mystream
(integer) 2
redis:6379> XRANGE mystream - +
1) 1) 1532049872075-0
2) 1) "value"
2) "2"
2) 1) 1532049877554-0
2) 1) "value"
2) "3"
你可以看到只有2条数据。
4.总结
以上是redis流的基本操作。亲自尝试后, 你将更好地理解流。
评论前必须登录!
注册