跟踪任务执行中的状态变化
问题
使用 InfluxDB tasks 对时间序列数据进行评估和分配状态,并检测状态变化是很常见的做法。任务按批处理数据,但如果跨批边界发生状态变化,会怎样呢?任务不会认识到这种变化,因为不知道上一个任务执行的最终状态。本指南将讲解如何创建一个任务,将状态分配给行,然后使用上一个任务执行的结果检测跨批边界的任何状态变化,以确保您不会错过任何状态变化。
解决方案
根据阈值明确分配级别给您的数据。
解决方案优势
如果你从未使用过monitor包编写任务,那么这是最简单的解决方案。
解决方案的缺点
您必须明确定义您的阈值,这可能需要更多的代码。
解决方案概述
创建一个任务,您需要:
- 模板。导入包并定义任务选项。
- 查询您的数据。
- 根据阈值将状态分配给您的数据。将这些数据存储在一个变量中,即“states”。
- 将“states”写入桶。
- 从上一个任务运行中找到最新值,并将其存储在变量“last_state_previous_task”中。
- 合并“states”和“last_state_previous_task”。将此数据存储在变量“unioned_states”中。
- 发现“unioned_states”中的状态变化。将此数据存储在变量“state_changes”中。
- 通知在最近两项任务之间发生的状态变化,以捕获在任务执行过程中发生的任何状态变化。
解决方案说明
- 导入包并定义任务选项和机密。导入以下包:
Flux InfluxDB 秘密包: 此包包含 secrets.get() 函数,允许您从 InfluxDB 秘密存储中检索秘密。了解如何在 InfluxDB 中 管理秘密 以使用此包。
Flux InfluxDB monitoring package: 该包包含用于监控您的数据的函数和工具。
import "contrib/sranka/telegram" import "influxdata/influxdb/secrets" import "influxdata/influxdb/monitor" option task = {name: "State changes across tasks", every: 30m, offset: 5m} telegram_token = secrets.get(key: "telegram_token") telegram_channel_ID = secrets.get(key: "telegram_channel_ID")
查询您想要监控的数据。
data = from(bucket: "example-bucket") // Query for data from the last successful task run or from the 1 every duration ago. // This ensures that you won’t miss any data. |> range(start: tasks.lastSuccess(orTime: -task.every)) |> filter(fn: (r) => r._measurement == "example-measurement") |> filter(fn: (r) => r.tagKey1 == "example-tag-value") |> filter(fn: (r) => r._field == "example-field")其中
data可能如下所示:_测量 标签键1 _字段 _值 _时间 示例测量 示例标签值 示例字段 30.0 2022-01-01T00:00:00Z 示例测量 示例标签值 示例字段 50.0 2022-01-01T00:00:00Z 根据阈值为您的数据分配状态。将此数据存储在一个变量中,即“states”。为了简化这个例子,只有两种状态:“ok”和“crit。”将状态存储在
_level列中(monitor包所需)。states = data |> map(fn: (r) => ({r with _level: if r._value > 40.0 then "crit" else "ok"}))其中
states可能看起来像:_测量 标签键1 _字段 _值 _级别 _时间 示例计量 示例标签值 示例字段 30.0 正常 2022-01-01T00:00:00Z 示例测量 示例标签值 示例字段 50.0 严重 2022-01-01T00:01:00Z 将“states”写回到InfluxDB。您可以将数据写入新的测量或新的桶。要将数据写入新的测量,请使用
set()来更新您的“states”数据中的_measurement列的值。states // (Optional) Change the measurement name to write the data to a new measurement |> set(key: "_measurement", value: "new-measurement") |> to(bucket : "example-bucket")查找上一个任务运行的最新值并将其存储在变量“last_state_previous_task”中,
last_state_previous_task = from(bucket: "example-bucket") |> range(start: date.sub(d: task.every, from: tasks.lastSuccess(orTime: -task.every)) |> filter(fn: (r) => r._measurement == "example-measurement") |> filter(fn: (r) => r.tagKey == "example-tag-value") |> filter(fn: (r) => r._field == "example-field") |> last()在哪里
last_state_previous_task可能看起来像:_测量 标签键1 _字段 _值 _等级 _时间 示例测量 示例标签值 示例字段 55.0 临界 2021-12-31T23:59:00Z 合并“states”和“last_state_previous_task”。将此数据存储在变量“unioned_states”中。使用
sort()确保行按时间排序。unioned_states = union(tables: [states, last_state_previous_task]) |> sort(columns: ["_time"], desc: true)其中
unioned_states可能看起来像:_测量 标签键1 _字段 _值 _级别 _时间 示例测量 示例标签值 示例字段 55.0 严重 2021-12-31T23:59:00Z 示例计量 示例标签值 示例字段 30.0 正常 2022-01-01T00:00:00Z 示例测量 示例标签值 示例字段 50.0 严重 2022-01-01T00:01:00Z 使用
monitor.stateChangesOnly()来仅返回“unioned_states”中状态发生变化的行。将这些数据存储在变量“state_changes”中。state_changes = unioned_states |> monitor.stateChangesOnly()其中
state_changes可能看起来像:_测量 标签键1 _字段 _值 _级别 _时间 示例计量 示例标签值 示例字段 30.0 正常 2022-01-01T00:00:00Z 示例测量 示例标签值 示例字段 50.0 严重 2022-01-01T00:01:00Z 在最后两个任务之间的状态变化时通知,以捕捉在任务执行过程中发生的任何状态变化。
state_changes = data |> map( fn: (r) => ({ _value: telegram.message( token: telegram_token, channel: telegram_channel_ID, text: "state change at ${r._value} at ${r._time}", ), }), )使用合并的数据,以下警报将发送到Telegram:
状态在 30.0 处变化,时间为 2022-01-01T00:00:00Z在2022-01-01T00:01:00Z时状态变化为50.0