开始使用InfluxDB任务
一个InfluxDB 任务是一个计划的 Flux 脚本,它接受一系列输入数据, 以某种方式修改或分析这些数据,然后将修改后的数据写回 InfluxDB 或执行其他操作。
本文将介绍如何编写一个基本的InfluxDB任务,该任务对数据进行下采样并将其存储在一个新的存储桶中。
任务的组成部分
每个 InfluxDB 任务都需要以下组成部分。它们的形式和顺序可能会有所不同,但它们都是任务的基本部分。
定义任务选项
任务选项定义了任务的时间表、名称和其他信息。以下示例展示了如何在 Flux 脚本中设置任务选项:
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}
请参阅任务配置选项以获取有关每个选项的详细信息。
请注意,InfluxDB不保证任务会在预定时间运行。 有关任务服务级别协议(SLA)的详细信息,请参见查看任务运行日志。
InfluxDB 用户界面提供了一个用于定义任务选项的表单。
可调用脚本的任务选项
使用InfluxDB Cloud API创建引用和运行可调用脚本的任务。当您创建或更新任务时,将任务选项作为请求体中的属性传递——例如:
{
"name": "30-day-avg-temp",
"description": "IoT Center 30d environment average.",
"every": "1d",
"offset": "0m"
...
}
要了解有关创建运行可调用脚本的任务的更多信息,请查看如何创建引用脚本的任务。
检索和筛选数据
一个最小的 Flux 脚本使用以下函数从数据源中检索指定数量的数据,然后根据时间或列值过滤数据:
以下示例 Flux 从 InfluxDB 桶中检索数据,然后按 _measurement 和 host 列过滤:
from(bucket: "example-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
要从其他来源检索数据,请参阅 Flux 输入函数。
在您的Flux脚本中使用任务选项
InfluxDB 在一个 task 选项记录中存储选项,您可以在 Flux 脚本中引用它。以下示例 Flux 使用时间范围 -task.every:
from(bucket: "example-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
task.every 是点表示法,用于引用 task 选项记录的 every 属性。every 被定义为 1h,因此 -task.every 等于 -1h。
使用任务选项在您的Flux脚本中定义值可以使重用任务变得更容易。
处理或转换您的数据
任务定期自动运行脚本。 脚本以某种方式处理或转换数据——例如:降采样、检测异常或发送通知。
考虑一个每小时运行的任务,通过计算设定时间间隔的平均值来下采样数据。
它使用 aggregateWindow()
将点分组到5分钟(5m)窗口,并使用 mean() 计算每个
窗口的平均值。
以下示例代码显示了带有任务选项的Flux脚本:
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}
from(bucket: "example-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
|> aggregateWindow(every: 5m, fn: mean)
使用偏移量来考虑潜在数据
使用offset任务选项来考虑潜在的延迟数据(如来自边缘设备的数据)。
一个每小时运行一次的任务(every: 1h)若偏移五分钟(offset: 5m)
将在整点后五分钟执行,但查询来自原始一小时间隔的数据。
请参见 常见任务 以获取与 InfluxDB 常用任务的示例。
使用可调用脚本处理数据
在 InfluxDB Cloud 中,您可以创建运行可调用脚本的任务。 您可以使用可调用脚本来管理和重用组织的脚本。 您可以使用任务来调度带有选项和参数的脚本运行。
以下示例 POST /api/v2/scripts 请求体定义了一个新的可调用脚本,使用了上一个示例中的 Flux:
{
"name": "aggregate-intervals",
"description": "Group points into 5 minute windows and calculate the average of each
window.",
"script": "from(bucket: "example-bucket")\
|> range(start: -task.every)\
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")\
|> aggregateWindow(every: 5m, fn: mean)",
"language": "flux"
}
注意,脚本不包含任务选项。 一旦您创建了可调用的脚本,您可以使用 POST /api/v2/tasks 创建一个运行该脚本的任务。 以下示例请求体定义了一个包含脚本ID和选项的任务:
{
"every": "1h",
"description": "Downsample host with 5 min precision.",
"name": "downsample_5m_precision",
"scriptID": "09b2136232083000"
}
要创建一个使用参数的脚本和任务,请查看如何创建一个运行可调用脚本的任务。
定义一个目的地
在大多数情况下,您会希望在任务转换数据后发送和存储数据。 目的地可以是单独的 InfluxDB 测量或桶。
下面的例子使用 to()
将转换后的数据写回到另一个 InfluxDB 存储桶:
// ...
|> to(bucket: "example-downsampled", org: "my-org")
要将数据写入InfluxDB, to() 需要以下列:
_time_measurement_field_value
要将数据写入其他目的地,请参见 Flux输出函数。
完整示例 Flux 任务脚本
以下示例 Flux 组合了本指南中描述的所有组件:
// Task options
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}
// Data source
from(bucket: "example-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
// Data processing
|> aggregateWindow(every: 5m, fn: mean)
// Data destination
|> to(bucket: "example-downsampled")
可调用脚本的完整示例任务
以下示例代码展示了一个 POST /api/v2/scripts 请求体,该请求体结合了本指南中描述的组件:
{
"name": "aggregate-intervals-and-export",
"description": "Group points into 5 minute windows and calculate the average of each
window.",
"script": "from(bucket: "example-bucket")\
|> range(start: -task.every)\
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")\
// Data processing\
|> aggregateWindow(every: 5m, fn: mean)\
// Data destination\
|> to(bucket: "example-downsampled")",
"language": "flux"
}
以下示例代码显示了一个 POST /api/v2/tasks 请求体,用于调度脚本:
{
"every": "1h",
"description": "Downsample host with 5 min precision.",
"name": "downsample_5m_precision",
"scriptID": "SCRIPT_ID"
}
要了解更多关于InfluxDB任务及其工作原理的信息,请观看以下视频: