流触发器

在将项目添加到流时执行 JavaScript 函数

Redis 堆栈 Redis 社区版 Redis 企业软件 Redis 云 Redis 社区版 适用于 Kubernetes 的 Redis Enterprise 客户

Redis Stack 的触发器和函数功能附带一个完整的流 API,用于处理来自 Redis 流的数据。与提供微批处理 API 的 RedisGears v1 不同,新的触发器和函数功能提供了真正的流式处理 API,这意味着数据一旦进入流就会被处理。

注册流使用者

触发器和函数提供了一个允许注册流触发器的 API。不要与 Redis 流、使用者组、触发器和函数混淆,使用 Redis 模块 API 来有效地读取流并管理其使用者。这种方法提供了更好的性能,因为无需调用任何 Redis 命令即可从流中读取。让我们看一个简单的例子:

#!js api_version=1.0 name=myFirstLibrary

redis.registerStreamTrigger(
    "consumer", // consumer name
    "stream", // streams prefix
    function(c, data) {
        // callback to run on each element added to the stream
        redis.log(JSON.stringify(data, (key, value) =>
            typeof value === 'bigint'
                ? value.toString()
                : value // return everything else unchanged
        ));
    }
);

Argument Description:

  • consumer - the consumer name.
  • stream - streams name prefix on which to trigger the callback.
  • callback - the callback to invoke on each element in the stream. Following the same rules of sync and async invocation. The callback will be invoke only on primary shard.

If we register this library (see the quick start section to learn how to Register a RedisGears function) and run the following command on our Redis:

XADD stream:1 * foo1 bar1
XADD stream:1 * foo2 bar2
XADD stream:2 * foo1 bar1
XADD stream:2 * foo2 bar2

We will see the following line on the Redis log file:

2630021:M 05 Jul 2022 17:13:22.506 * <redisgears_2> {"id":["1657030402506","0"],"stream_name":"stream:1","record":[["foo1","bar1"]]}
2630021:M 05 Jul 2022 17:13:25.323 * <redisgears_2> {"id":["1657030405323","0"],"stream_name":"stream:1","record":[["foo2","bar2"]]}
2630021:M 05 Jul 2022 17:13:29.475 * <redisgears_2> {"id":["1657030409475","0"],"stream_name":"stream:2","record":[["foo1","bar1"]]}
2630021:M 05 Jul 2022 17:13:32.715 * <redisgears_2> {"id":["1657030412715","0"],"stream_name":"stream:2","record":[["foo2","bar2"]]}

The data argument which pass to the stream consumer callback are in the following format:

{
    "id": ["<ms>", "<seq>"],
    "stream_name": "<stream name>",
    "stream_name_raw": "<stream name as ArrayBuffer>",
    "record":[
        ["<key>", "<value>"],
        .
        .
        ["<key>", "<value>"]
    ],
    "record_raw":[
        ["<key_raw>", "<value_raw>"],
        .
        .
        ["<key_raw>", "<value_raw>"]
    ],
}

The reason why the record is a list of touples and not an object is because the Redis Stream specifications allows duplicate keys.

Notice that stream_name and record fields might contains null's if the data can not be decoded as string. the *_raw fields will always be provided and will contains the data as JS ArrayBuffer.

We can observe the streams which are tracked by our registered consumer using TFUNCTION LIST command:

127.0.0.1:6379> TFUNCTION LIST LIBRARY lib vvv
1)  1) "engine"
    1) "js"
    2) "api_version"
    3) "1.0"
    4) "name"
    5) "lib"
    6) "pending_jobs"
    7) (integer) 0
    8) "user"
    9)  "default"
    10) "functions"
   1)  (empty array)
   2)  "stream_triggers"
   3)  1)  1) "name"
           1) "consumer"
           2) "prefix"
           3) "stream"
           4) "window"
           5) (integer) 1
           6) "trim"
           7) "disabled"
           8) "num_streams"
          1)  (integer) 2
          2)  "streams"
          3)  1)  1) "name"
                  1) "stream:2"
                  2) "last_processed_time"
                  3) (integer) 0
                  4) "avg_processed_time"
                  5) "0"
                  6) "last_lag"
                  7) (integer) 0
                  8) "avg_lag"
                 1)  "0"
                 2)  "total_record_processed"
                 3)  (integer) 2
                 4)  "id_to_read_from"
                 5)  "1657030412715-0"
                 6)  "last_error"
                 7)  "None"
                 17) "pending_ids"
                 18) (empty array)
              1)  1) "name"
                  1) "stream:1"
                  2) "last_processed_time"
                  3) (integer) 1
                  4) "avg_processed_time"
                  5) "0.5"
                  6) "last_lag"
                  7) (integer) 1
                  8) "avg_lag"
                 1)  "0.5"
                 2)  "total_record_processed"
                 3)  (integer) 2
                 4)  "id_to_read_from"
                 5)  "1657030405323-0"
                 6)  "last_error"
                 7)  "None"
                 8)  "pending_ids"
                 9)  (empty array)
   4)  "keyspace_triggers"
   5)  (empty array)

Enable trimming and set window

We can enable stream trimming by adding isStreamTrimmed optional argument after the trigger callback, we can also set the window argument that controls how many elements can be processed simultaneously. example:

#!js api_version=1.0 name=myFirstLibrary

redis.registerStreamTrigger(
    "consumer", // consumer name
    "stream", // streams prefix
    function(c, data) {
        // callback to run on each element added to the stream
        redis.log(JSON.stringify(data, (key, value) =>
            typeof value === 'bigint'
                ? value.toString()
                : value // return everything else unchanged
        ));
    }, 
    {
        isStreamTrimmed: true,
        window: 3   
    }
);

The default values are:

  • isStreamTrimmed - false
  • window - 1

It is enough that a single consumer will enable trimming so that the stream will be trimmed. The stream will be trim according to the slowest consumer that consume the stream at a given time (even if this is not the consumer that enabled the trimming). Raising exception during the callback invocation will not prevent the trimming. The callback should decide how to handle failures by invoke a retry or write some error log. The error will be added to the last_error field on TFUNCTION LIST command.

Data processing guarantees

As long as the primary shard is up and running we guarantee exactly once property (the callback will be triggered exactly one time on each element in the stream). In case of failure such as shard crashing, we guarantee at least once property (the callback will be triggered at least one time on each element in the stream)

Upgrades

When upgrading the consumer code (using the REPLACE option of TFUNCTION LOAD command) the following consumer parameters can be updated:

  • Window
  • Trimming

Any attempt to update any other parameter will result in an error when loading the library.