带着问题看 redis 源码 -- stream 实现 - (sunznx) 振翅飞翔
12 October 2019

在 redis 中,stream 和 list/zset/pubsub 有点类似,区别在于 stream 中的数据如果不显式调用 xdel/xtrim 来删除的话,数据还会一直存在,而 list/zset/pubsub 中的数据一旦被客户端取出来之后,便删除

在了解 stream 实现之前,首要先知道 stream 提供了哪些功能,以及如何使用 stream 来使用消息队列。这样再来看 stream 的功能,才能明白为什么每个命令是这样实现的。可以通过阅读如下文章了解大概:

xadd/xread

stream 可以简单的理解为一个 key-value 结构,key 是一个有序的标识号,使用符号 "*" 表示由 redis 自动生成标识号,结构是这样的 {timestamp}-{seq} ,value 为结构为 {key11: val11, key12, val12} 的 map 类型

xadd:
xadd 往 stream 中添加一个 item,redis 会将 item 的 key 转换成 char * 类型,插入到 radix tree 中,并记录 last_id 再用 listpack 将 value 打包起来,作为 radix tree 的 value

xread/xrange:
直接对 radix tree 做 range 操作。如果 key 为 $ ,会把 key 转换成 radix tree 的 last_id ;否则,redis 会读取指定范围内的数据

xgroup

简单说下 xgroup 的特性。在不同 xgroup 中,读取的 stream 的数据是互不影响的。

假设有如下场景:
stream 里面有 3 条数据,两个 xgroup:group1 和 group2。

group:
在 group1 读取了 3 条数据之后,group2 也还能读取 3 条数据,
即我们可以用 group 来实现 rabbitmq 的 fanout 模式。

consumer:
每一个 group 里面又可以有多个消费者来轮流消费 group 里面的消息。

在 redis 中,group 和 consumer 的定义如下:

typedef struct streamCG {
    streamID last_id;
    rax *pel;
    rax *consumers;
} streamCG;

typedef struct streamConsumer {
    mstime_t seen_time;
    sds name;
    rax *pel;
} streamConsumer;

创建 group 的时候可以指定消息的范围:

0 表示 group 的消息范围是从 0 开始的
$ 表示 group 的消息范围是从当前 key 最大值开始的

读取 group 的消息的时候不再使用 xread,使用 xGroupRead。在 group 里面的消息有 4 种状态:

  1. 未被消费者读取
  2. 被消费者读取了,处于 pending 状态
  3. acked(使用 xAck 命令)
  4. 删除(使用 xDel 命令)

consumer 从 group 里面读消息只有两种形式:

  1. 读取的时候指定 id 为 "<",表示只读取 “未被消费者读取” 的消息
  2. 读取的时候指定 id 为 "0 或者其他 id 值",表示读取 “处于 pending 状态” 的消息

xgroup 实现

redis 在创建 group 的时候,会生成一个 group.last_id 属性,如果这个值为 $ ,会直接取 stream.last_id
读消息的实现:

  1. 读取 “未被消费者读取” 的消息(指定了 id 为 ">")
    读取 stream.last_id>=group.last_id 的消息,并将这条信息的 key 写入到 group.pelconsumer.pel ,同时更新 group.last_id
  2. 读取 “处于 pending 状态” 的消息(指定了特定 id)
    读取 consumer.pel.last_id>=id 的消息

另外,需要注意的是,group.pelconsumer.pel 也是一个 radix tree 结构。其中,key 为消息的 key,value 为一个 nack 结构,如下:

typedef struct streamNACK {
    mstime_t delivery_time;   // 记录了什么时候读取的消息
    uint64_t delivery_count;  // 记录了消息被读取了几次
    streamConsumer *consumer;
} streamNACK;

这个结构在介绍 xclaim 命令的时候十分重要

xReadGroup block

上面两种情况在读取消息的时候会对 block 选项带来不同的影响:block 选项值只有在 id 为 ">" 的时候有意义

  1. 指定了 id 为 ">"
    判断 stream.last_id>=group.last_id 。是的话,将 stream.last_id>=group.last_id 的消息返回给客户端(这时候不会被 block),更新 group.last_id=stream.last_id ;否则,会被 block 住,直到有消息
  2. 指定了特定 id
    block 选项将会失效

xgroup 总结

在 group 中,如果一个消息被 consumer 读取了,那么这个消息就会 “属于这个 consumer”

xack

xack 的实现很简单了,只是将指定的消息 id 从 group.pelconsumer.pel 中删除(但是消息还是存在 stream 中,只有 xtrim/xdel 才能将消息从 stream 中删除)

xclaim

上面提到过:

如果一个消息被 consumer 读取了,那么这个消息就会 “属于这个 consumer”

xclaim 命令能将一个 pending 中的消息从 consumer1 转移到 consumer2

可以设置转移条件

  1. minIdleTime
    minIdleTime 表示只有当这个消息 pending 了这么久的时候才发生转移。如果 idle 时间不超过 minIdleTime,不会发生转移。consumer 在获取到消息的时候会设置一个 delivery_time ,当 $minIdleTime+$delivery_time=now() 条件成立
  2. IDEL
    当 xClaim 执行成功的时候,如果 delivery_time 默认会被更新成当前时间,即 delivery_time=now() 如果设置了这个参数,那么 delivery_time=now()-$IDEL
  3. TIME
    用法和 IDEL 一样,只不过 IDEL 不适合记录在 AOF 中。如果设置了这个参数,那么 delivery_time=$TIME
  4. JUSTID
    只返回 ID 给客户端。(这个选项还有个奇葩的功能。一般情况下,如果消息被转移了,delivery_count 会加 1, 如果设置了这个选项,那么 delivery_count 不会加 1 了)
  5. RETRYCOUNT
    当 xClaim 执行成功的时候,RETRYCOUNT 默认会加 1。通过设置这个参数,可以强制设置 RETRYCOUNT