Documentation

跟踪任务执行中的状态变化

问题

使用 InfluxDB tasks 对时间序列数据进行评估和分配状态,并检测状态变化是很常见的做法。任务按批处理数据,但如果跨批边界发生状态变化,会怎样呢?任务不会认识到这种变化,因为不知道上一个任务执行的最终状态。本指南将讲解如何创建一个任务,将状态分配给行,然后使用上一个任务执行的结果检测跨批边界的任何状态变化,以确保您不会错过任何状态变化。

解决方案

根据阈值明确分配级别给您的数据。

解决方案优势

如果你从未使用过monitor编写任务,那么这是最简单的解决方案。

解决方案的缺点

您必须明确定义您的阈值,这可能需要更多的代码。

解决方案概述

创建一个任务,您需要:

  1. 模板。导入包并定义任务选项。
  2. 查询您的数据。
  3. 根据阈值将状态分配给您的数据。将这些数据存储在一个变量中,即“states”。
  4. 将“states”写入桶。
  5. 从上一个任务运行中找到最新值,并将其存储在变量“last_state_previous_task”中。
  6. 合并“states”和“last_state_previous_task”。将此数据存储在变量“unioned_states”中。
  7. 发现“unioned_states”中的状态变化。将此数据存储在变量“state_changes”中。
  8. 通知在最近两项任务之间发生的状态变化,以捕获在任务执行过程中发生的任何状态变化。

解决方案说明

  1. 导入包并定义任务选项和机密。导入以下包:
  • Flux Telegram package: 这个包

  • Flux InfluxDB 秘密包: 此包包含 secrets.get() 函数,允许您从 InfluxDB 秘密存储中检索秘密。了解如何在 InfluxDB 中 管理秘密 以使用此包。

  • Flux InfluxDB monitoring package: 该包包含用于监控您的数据的函数和工具。

    import "contrib/sranka/telegram"
    import "influxdata/influxdb/secrets"
    import "influxdata/influxdb/monitor"
    
    option task = {name: "State changes across tasks", every: 30m, offset: 5m}
    
    telegram_token = secrets.get(key: "telegram_token")
    telegram_channel_ID = secrets.get(key: "telegram_channel_ID")
    
  1. 查询您想要监控的数据。

    data = from(bucket: "example-bucket")
        // Query for data from the last successful task run or from the 1 every duration ago.
        // This ensures that you won’t miss any data.
        |> range(start: tasks.lastSuccess(orTime: -task.every))
        |> filter(fn: (r) => r._measurement == "example-measurement")
        |> filter(fn: (r) => r.tagKey1 == "example-tag-value")
        |> filter(fn: (r) => r._field == "example-field")
    

    其中 data 可能如下所示:

    _测量标签键1_字段_值_时间
    示例测量示例标签值示例字段30.02022-01-01T00:00:00Z
    示例测量示例标签值示例字段50.02022-01-01T00:00:00Z
  2. 根据阈值为您的数据分配状态。将此数据存储在一个变量中,即“states”。为了简化这个例子,只有两种状态:“ok”和“crit。”将状态存储在 _level 列中(monitor 包所需)。

    states =
        data
            |> map(fn: (r) => ({r with _level: if r._value > 40.0 then "crit" else "ok"}))
    

    其中 states 可能看起来像:

    _测量标签键1_字段_值_级别_时间
    示例计量示例标签值示例字段30.0正常2022-01-01T00:00:00Z
    示例测量示例标签值示例字段50.0严重2022-01-01T00:01:00Z
  3. 将“states”写回到InfluxDB。您可以将数据写入新的测量或新的桶。要将数据写入新的测量,请使用 set() 来更新您的“states”数据中的 _measurement 列的值。

    states
        // (Optional) Change the measurement name to write the data to a new measurement
        |> set(key: "_measurement", value: "new-measurement")
        |> to(bucket : "example-bucket") 
    
  4. 查找上一个任务运行的最新值并将其存储在变量“last_state_previous_task”中,

    last_state_previous_task =
        from(bucket: "example-bucket")
            |> range(start: date.sub(d: task.every, from: tasks.lastSuccess(orTime: -task.every))
            |> filter(fn: (r) => r._measurement == "example-measurement")
            |> filter(fn: (r) => r.tagKey == "example-tag-value")
            |> filter(fn: (r) => r._field == "example-field")
            |> last() 
    

    在哪里 last_state_previous_task 可能看起来像:

    _测量标签键1_字段_值_等级_时间
    示例测量示例标签值示例字段55.0临界2021-12-31T23:59:00Z
  5. 合并“states”和“last_state_previous_task”。将此数据存储在变量“unioned_states”中。使用 sort() 确保行按时间排序。

    unioned_states =
        union(tables: [states, last_state_previous_task])
            |> sort(columns: ["_time"], desc: true)
    

    其中 unioned_states 可能看起来像:

    _测量标签键1_字段_值_级别_时间
    示例测量示例标签值示例字段55.0严重2021-12-31T23:59:00Z
    示例计量示例标签值示例字段30.0正常2022-01-01T00:00:00Z
    示例测量示例标签值示例字段50.0严重2022-01-01T00:01:00Z
  6. 使用 monitor.stateChangesOnly() 来仅返回“unioned_states”中状态发生变化的行。将这些数据存储在变量“state_changes”中。

    state_changes =
        unioned_states 
            |> monitor.stateChangesOnly()
    

    其中 state_changes 可能看起来像:

    _测量标签键1_字段_值_级别_时间
    示例计量示例标签值示例字段30.0正常2022-01-01T00:00:00Z
    示例测量示例标签值示例字段50.0严重2022-01-01T00:01:00Z
  7. 在最后两个任务之间的状态变化时通知,以捕捉在任务执行过程中发生的任何状态变化。

    state_changes =
        data
            |> map(
                fn: (r) =>
                    ({
                        _value:
                            telegram.message(
                                token: telegram_token,
                                channel: telegram_channel_ID,
                                text: "state change at ${r._value} at ${r._time}",
                            ),
                    }),
            )
    

    使用合并的数据,以下警报将发送到Telegram:

    • 状态在 30.0 处变化,时间为 2022-01-01T00:00:00Z
    • 在2022-01-01T00:01:00Z时状态变化为50.0


Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: