个性化阅读
专注于IT技术分析

Redis流教程(20分钟)

本文概述

1.简介

Redis 5中有一个关键的新功能:流。

流是日志形式的存储结构, 你可以在其中添加数据。它将为每个数据生成一个时间戳ID。而且流还具有读取数据的便捷模型。

因此, 流适用于消息队列和时间序列存储。

2.安装

我们需要使用最新版本的Redis 5.0, 在这里我们使用docker redis容器:


docker run --name redis5 -p 6379:6379 -d redis:5.0-rc3

Redis客户端:


docker run -it --link redis5:redis --rm redis redis-cli -h redis -p 6379

启动后将进入交互式命令行:


redis:6379>

3.用法

3.1向流添加元素

流元素可以是一个或多个键值对。让我们向流中添加元素:


redis:6379> XADD mystream * sensor-id 1234 temperature 19.8
1531989605376-0

从上面我们可以知道:

  • mystream是流的关键;
  • *位置的参数为元素ID, *表示系统自动生成元素ID。
  • 添加的元素包含2个键值对, 传感器ID 1234和温度19.8;
  • 返回的值是新添加的元素的ID, 由时间戳和递增数字组成。

你还可以获取流中的元素数:


redis:6379> XLEN mystream
(integer) 1

3.2范围查询

当我们要使用范围查询时, 我们需要指定开始和结束ID, 这等效于指定时间范围:


redis:6379> XRANGE mystream 1531989605376 1531989605377
1) 1) 1531989605376-0
  2) 1) "sensor-id"
     2) "1234"
     3) "temperature"
     4) "19.8"

你可以将-用于最小的ID, 而将+用于最大的ID:


redis:6379> XRANGE mystream - +
1) 1) 1531989605376-0
  2) 1) "sensor-id"
     2) "1234"
     3) "temperature"
     4) "19.8"

当返回的元素太多时, 你可以限制返回结果的数量, 就像查询数据库并通过COUNT参数指定它时分页一样:


redis:6379> XRANGE mystream - + COUNT 2
1) 1) 1531989605376-0
  2) 1) "sensor-id"
     2) "1234"
     3) "temperature"
     4) "19.8"

你还可以使用XREVRANGE命令来反向查询, 其用法与XRANGE相同。

3.3聆听流的新元素


redis:6379> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
  2) 1) 1) 1531989605376-0
        2) 1) "sensor-id"
           2) "1234"
           3) "temperature"
           4) "19.8"

STREAMS之后的mystream指定目标流的密钥; 0是最小的ID, 我们需要获得大于指定流中指定ID的元素; COUNT是指我们要获取的元素数。

可以一起指定多个流, 例如STREAMS mystream otherstream 0 0。

3.3.1阻止监听器

如果你在客户端1中执行以下操作:


redis:6379> XREAD BLOCK 0 STREAMS mystream $

它将进入等待状态。

如果你向客户端2添加元素:


redis:6379> XADD mystream * test 1

刚添加的元素将显示在客户端1中:


1) 1) "mystream"
  2) 1) 1) 1531994510562-0
        2) 1) "test"
           2) "1"

0是指定的超时, 因此0表示从不超时; $表示流中的最大ID。

3.4消费群体

当流的数量非常大时, 或者当使用者处理非常耗时时, 如果只有一个使用者, 它将承受更大的压力。因此, redis流提供了使用者组的概念, 从而允许多个使用者处理同一个流以实现负载平衡。

例如, 如果有3个使用者C1, C2和C3, 并且流中有7个消息元素, 则这些使用者的分配为:


1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

3.4.1创建消费者组


redis:6379> XGROUP CREATE mystream mygroup01 $
OK

这里为流mystream创建了一个消费者组, 该组的名称为mygroup01; $表示读取当前最大ID之后的元素。

3.4.2添加测试数据

添加一些新数据:


redis:6379> XADD mystream * message apple
1531999977149-0
redis:6379> XADD mystream * message orange
1531999980272-0
redis:6379> XADD mystream * message strawberry
1531999983493-0
redis:6379> XADD mystream * message apricot
1531999988458-0
redis:6379> XADD mystream * message banana
1531999991782-0

3.4.3通过消费者组读取数据


redis:6379> XREADGROUP GROUP mygroup01 Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
  2) 1) 1) 1531999977149-0
        2) 1) "message"
           2) "apple"

爱丽丝(Alice)是该组成员的名称, >表示该​​组的成员到目前为止尚未读取数据。

如你所见, 你无需预先创建组成员, 因为它们将在首次使用时自动创建。

然后让我们创建另一个成员以读取数据:


redis:6379> XREADGROUP GROUP mygroup01 Bob COUNT 1 STREAMS mystream >
1) 1) "mystream"
  2) 1) 1) 1531999980272-0
        2) 1) "message"
           2) "orange"

3.4.4消费历史


redis:6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream 0
1) 1) "mystream"
  2) 1) 1) 1531999977149-0
        2) 1) "message"
           2) "apple"

此处最后指定的ID为0, 因此你可以获得未决的历史记录数据(已使用但未发送消耗确认的数据), 并且可以帮助恢复后的工作。

3.4.5消费确认


redis:6379> XACK mystream mygroup01 1531999977149-0
(integer) 1

1531999977149-0是Alice消耗的苹果数据。现在让我们检查爱丽丝的消费历史记录:


redis:6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream 0
1) 1) "mystream"
  2) (empty list or set)

它已经是空的。

3.4.6故障处理

因此, 当使用者遇到问题时, 你可以获取尚未确认的消息数据, 然后将其恢复。这是一种安全机制, 但是如果不能再恢复有问题的使用者, 该怎么办?有什么方法可以处理尚未确认的消息数据?

redis流中提供了一个解决方案来处理这种情况:

  1. 找出所有已传递但未确认的消息数据;
  2. 更改这些数据的所有者。

现在, 它允许新的使用者处理数据。

列出未处理的数据:


redis:6379> XPENDING mystream mygroup01 - + 10
1) 1) 1531999980272-0
  2) "Bob"
  3) (integer) 45126376
  4) (integer) 2
2) 1) 1531999983493-0
  2) "Tom"
  3) (integer) 867475
  4) (integer) 1

你会看到有2个未处理的数据, 并且列出了每个数据的ID, 所有者, 此消息的空闲时间(以毫秒为单位)以及该消息的传递次数。

更改所有者:


redis:6379> XCLAIM mystream mygroup01 Gates 3600 1531999980272-0 1531999983493-0
1) 1) 1531999980272-0
  2) 1) "message"
     2) "orange"
2) 1) 1531999983493-0
  2) 1) "message"
     2) "strawberry"

它将指定2个ID的消息传递给Gates。 3600是指最短的空闲时间, 它将空闲时间大于3600的指定消息分配给Gates。请注意, 盖茨是一个新消费者。之前尚未声明过。

检查到目前为止盖茨尚未处理的数据:


redis:6379> XREADGROUP GROUP mygroup01 Gates STREAMS mystream 0
1) 1) "mystream"
  2) 1) 1) 1531999980272-0
        2) 1) "message"
           2) "orange"
     2) 1) 1531999983493-0
        2) 1) "message"
           2) "strawberry"

有2个新分配的数据。

3.5查看相关信息

查看基本信息:


redis:6379> XINFO STREAM mystream
1) length
2) (integer) 15
3) radix-tree-keys
4) (integer) 1
5) radix-tree-nodes
6) (integer) 2
7) groups
8) (integer) 2
9) first-entry
10) 1) 1531989605376-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
11) last-entry
12) 1) 1531999991782-0
   2) 1) "message"
      2) "banana"

查看消费者组信息:


redis:6379> XINFO GROUPS mystream
1) 1) name
  2) "mygroup"
  3) consumers
  4) (integer) 3
  5) pending
  6) (integer) 5
2) 1) name
  2) "mygroup01"
  3) consumers
  4) (integer) 4
  5) pending
  6) (integer) 2

查看有关组中消费者的信息:


redis:6379> XINFO CONSUMERS mystream mygroup
1) 1) name
  2) "Alice"
  3) pending
  4) (integer) 3
  5) idle
  6) (integer) 2483388
2) 1) name
  2) "Bob"
  3) pending
  4) (integer) 2
  5) idle
  6) (integer) 48453755
3) 1) name
  2) "Gates"
  3) pending
  4) (integer) 0
  5) idle
  6) (integer) 2385114

3.7删除消息数据

首先让我们检查现有数据:


redis:6379> XRANGE mystream - + COUNT 2
1) 1) 1531989605376-0
  2) 1) "sensor-id"
     2) "1234"
     3) "temperature"
     4) "19.8"
2) 1) 1531994510562-0
  2) 1) "test"
     2) "1"

删除第一条数据:


redis:6379> XDEL mystream 1531989605376-0
(integer) 1

再次检查, 你会发现先前的第一个数据不见了:


redis:6379> XRANGE mystream - + COUNT 2
1) 1) 1531994510562-0
  2) 1) "test"
     2) "1"
2) 1) 1531994516257-0
  2) 1) "test"
     2) "2"

注意:如果使用XDEL, 它并不会真正从内存中删除数据, 而只是标记数据。它不会回收内存。

3.8设置流的最大长度

添加数据并将最大长度指定为2:


redis:6379> XADD mystream MAXLEN 2 * value 1
1532049865028-0
redis:6379> XADD mystream MAXLEN 2 * value 2
1532049872075-0
redis:6379> XADD mystream MAXLEN 2 * value 3
1532049877554-0

我们添加了3条数据。现在让我们看一下流的长度和当前内容:


redis:6379> XLEN mystream
(integer) 2

redis:6379> XRANGE mystream - +
1) 1) 1532049872075-0
  2) 1) "value"
     2) "2"
2) 1) 1532049877554-0
  2) 1) "value"
     2) "3"

你可以看到只有2条数据。

4.总结

以上是redis流的基本操作。亲自尝试后, 你将更好地理解流。

赞(0)
未经允许不得转载:srcmini » Redis流教程(20分钟)

评论 抢沙发

评论前必须登录!