Documentation

开始使用InfluxDB任务

一个InfluxDB 任务是一个计划的 Flux 脚本,它接受一系列输入数据, 以某种方式修改或分析这些数据,然后将修改后的数据写回 InfluxDB 或执行其他操作。

本文将介绍如何编写一个基本的InfluxDB任务,该任务对数据进行下采样并将其存储在一个新的存储桶中。

任务的组成部分

每个 InfluxDB 任务都需要以下组成部分。它们的形式和顺序可能会有所不同,但它们都是任务的基本部分。

跳转到完整的示例任务脚本

定义任务选项

任务选项定义了任务的时间表、名称和其他信息。以下示例展示了如何在 Flux 脚本中设置任务选项:

option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}

请参阅任务配置选项以获取有关每个选项的详细信息。

请注意,InfluxDB不保证任务会在预定时间运行。 有关任务服务级别协议(SLA)的详细信息,请参见查看任务运行日志

InfluxDB 用户界面提供了一个用于定义任务选项的表单。

可调用脚本的任务选项

使用InfluxDB Cloud API创建引用和运行可调用脚本的任务。当您创建或更新任务时,将任务选项作为请求体中的属性传递——例如:

  {
   "name": "30-day-avg-temp",
   "description": "IoT Center 30d environment average.",
   "every": "1d",
   "offset": "0m"
   ...
  }

要了解有关创建运行可调用脚本的任务的更多信息,请查看如何创建引用脚本的任务

检索和筛选数据

一个最小的 Flux 脚本使用以下函数从数据源中检索指定数量的数据,然后根据时间或列值过滤数据:

  1. from(): 从 InfluxDB Cloud 查询数据。
  2. range(): 定义返回数据的时间范围。
  3. filter(): 根据列值过滤数据。

以下示例 Flux 从 InfluxDB 桶中检索数据,然后按 _measurementhost 列过滤:

from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")

要从其他来源检索数据,请参阅 Flux 输入函数

在您的Flux脚本中使用任务选项

InfluxDB 在一个 task 选项记录中存储选项,您可以在 Flux 脚本中引用它。以下示例 Flux 使用时间范围 -task.every

from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")

task.every 是点表示法,用于引用 task 选项记录的 every 属性。every 被定义为 1h,因此 -task.every 等于 -1h

使用任务选项在您的Flux脚本中定义值可以使重用任务变得更容易。

处理或转换您的数据

任务定期自动运行脚本。 脚本以某种方式处理或转换数据——例如:降采样、检测异常或发送通知。

考虑一个每小时运行的任务,通过计算设定时间间隔的平均值来下采样数据。 它使用 aggregateWindow() 将点分组到5分钟(5m)窗口,并使用 mean() 计算每个 窗口的平均值。

以下示例代码显示了带有任务选项的Flux脚本:

option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}

from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
    |> aggregateWindow(every: 5m, fn: mean)

使用偏移量来考虑潜在数据

使用offset任务选项来考虑潜在的延迟数据(如来自边缘设备的数据)。 一个每小时运行一次的任务(every: 1h)若偏移五分钟(offset: 5m) 将在整点后五分钟执行,但查询来自原始一小时间隔的数据。

请参见 常见任务 以获取与 InfluxDB 常用任务的示例。

使用可调用脚本处理数据

在 InfluxDB Cloud 中,您可以创建运行可调用脚本的任务。 您可以使用可调用脚本来管理和重用组织的脚本。 您可以使用任务来调度带有选项和参数的脚本运行。

以下示例 POST /api/v2/scripts 请求体定义了一个新的可调用脚本,使用了上一个示例中的 Flux:

{
   "name": "aggregate-intervals",
   "description": "Group points into 5 minute windows and calculate the average of each
   window.",
   "script": "from(bucket: "example-bucket")\
                |> range(start: -task.every)\
                |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")\
                |> aggregateWindow(every: 5m, fn: mean)",
    "language": "flux"
}

注意,脚本不包含任务选项。 一旦您创建了可调用的脚本,您可以使用 POST /api/v2/tasks 创建一个运行该脚本的任务。 以下示例请求体定义了一个包含脚本ID和选项的任务:

{
   "every": "1h",
   "description": "Downsample host with 5 min precision.",
   "name": "downsample_5m_precision",
   "scriptID": "09b2136232083000"
}

要创建一个使用参数的脚本和任务,请查看如何创建一个运行可调用脚本的任务

定义一个目的地

在大多数情况下,您会希望在任务转换数据后发送和存储数据。 目的地可以是单独的 InfluxDB 测量或桶。

下面的例子使用 to() 将转换后的数据写回到另一个 InfluxDB 存储桶:

// ...
    |> to(bucket: "example-downsampled", org: "my-org")

要将数据写入InfluxDB, to() 需要以下列:

  • _time
  • _measurement
  • _field
  • _value

要将数据写入其他目的地,请参见 Flux输出函数

完整示例 Flux 任务脚本

以下示例 Flux 组合了本指南中描述的所有组件:

// Task options
option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}

// Data source
from(bucket: "example-bucket")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
    // Data processing
    |> aggregateWindow(every: 5m, fn: mean)
    // Data destination
    |> to(bucket: "example-downsampled")

可调用脚本的完整示例任务

以下示例代码展示了一个 POST /api/v2/scripts 请求体,该请求体结合了本指南中描述的组件:

{
   "name": "aggregate-intervals-and-export",
   "description": "Group points into 5 minute windows and calculate the average of each
   window.",
   "script": "from(bucket: "example-bucket")\
                |> range(start: -task.every)\
                |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")\
                // Data processing\
                |> aggregateWindow(every: 5m, fn: mean)\
                // Data destination\
                |> to(bucket: "example-downsampled")",
    "language": "flux"
}

以下示例代码显示了一个 POST /api/v2/tasks 请求体,用于调度脚本:

{
   "every": "1h",
   "description": "Downsample host with 5 min precision.",
   "name": "downsample_5m_precision",
   "scriptID": "SCRIPT_ID"
}

要了解更多关于InfluxDB任务及其工作原理的信息,请观看以下视频:



Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看:

由TSM驱动的InfluxDB Cloud