在Kapacitor中使用Flux任务
使用 Kapacitor 1.6+ 针对 InfluxDB 和其他数据源运行 Flux 任务。利用完整的 Flux 功能库构建强大的数据处理和监控任务在 Kapacitor 中。
开始之前
在开始使用Flux任务之前,请考虑:
- Kapacitor Flux 任务无法使用 Kapacitor 主题或事件处理程序。 你只能通过 Flux 通知端点在 Flux 脚本中发送警报。
- Flux任务是由内置于Kapacitor 1.6+中的Flux任务引擎调度和执行的。 这个引擎与Kapacitor TICKscript任务引擎是分开的。
- Flux 任务在 Flux 任务脚本中使用 Flux
task选项进行配置。这包括任务名称和执行计划。
选择您使用的InfluxDB版本:
在InfluxDB中设置Flux任务数据库
(可选但建议使用)
当Kapacitor执行Flux任务时,它可以在InfluxDB数据库中存储有关任务执行(运行)的信息。 要存储此数据,请执行以下操作:
在InfluxDB中创建一个新数据库以存储Flux任务运行和日志数据。
CREATE DATABASE kapacitorfluxtasks为了防止磁盘上大量的 Kapacitor Flux 任务日志数据,请更新默认的
autogen保留策略,设置有限的保留期限,或创建一个具有有限保留期限的新保留策略 (RP)。
-- Syntax
ALTER RETENTION POLICY <rp-name> ON <db-name> DURATION <new-retention-duration>
-- Example
ALTER RETENTION POLICY autogen ON kapacitorfluxtasks DURATION 3d
-- Syntax
CREATE RETENTION POLICY <rp-name> on <db-name> DURATION <retention-duration>
-- Example
CREATE RETENTION POLICY threedays on kapacitorfluxtasks DURATION 3d
配置Kapacitor Flux任务
在您的 kapacitor.conf 中的 [fluxtask] 下更新或添加以下设置:
- 启用:
true - task-run-influxdb: 在你的 kapacitor.conf 中用于存储 Flux 任务数据的 InfluxDB 配置名称。
要禁用 Flux 任务日志记录,请设置为
"none"。 - task-run-bucket: InfluxDB数据库,用于存储Flux任务数据和日志。我们建议将此项留空。默认情况下,数据写入
kapacitor_fluxtask_logs数据库。要指定其他数据库以写入任务日志数据,请使用"db-name"命名约定(包括保留策略"db-name/rp"不受支持)。如果指定的数据库在InfluxDB中尚不存在,Kapacitor会尝试创建该数据库。如果启用了身份验证,则需要具有CREATE DATABASE权限。有关更多信息,请参阅InfluxDB中的身份验证和授权。 - 提供以下任一项:
- task-run-org: 保持为空字符串 (
"") - task-run-orgid: 保持为空字符串 (
"")
- task-run-org: 保持为空字符串 (
- task-run-measurement: InfluxDB 衡量标准,用于存储任务运行和日志数据。默认值是
"runs"。
Kapacitor Flux 任务配置示例
# ...
[fluxtask]
enabled = true
task-run-influxdb = "default"
task-run-bucket = "kapacitor_fluxtask_logs"
task-run-org = ""
task-run-orgid = ""
task-run-measurement = "runs"
# ...
有关Kapacitor [fluxtask] 配置选项的更多信息,请参阅 配置Kapacitor。
创建一个 Flux 任务
创建一个 Flux 任务脚本。在您的脚本中包含 任务选项 以配置 Kapacitor Flux 任务。 有关编写 Flux 任务的更多信息,请参见:
提供 InfluxDB 连接凭据
from()和to()函数 需要您的 InfluxDB 主机 和 令牌。- host: InfluxDB URL。
- token: 如果启用了 InfluxDB 身份验证,使用
username:password语法。否则,使用空字符串("")作为您的令牌。
桶名称语法
在使用Flux查询或写入InfluxDB 1.x时,使用
database-name/retention-policy-name模式来指定你的桶。示例任务.flux
option task = { name: "example-task-name", every: 1h, offset: 10m } host = "http://localhost:8086" token = "" from(bucket: "example-db/example-rp", host: host, token: token) |> range(start: -task.every) |> filter(fn: (r) => r._measurement == "example-measurement") |> aggregateWindow(every: 10m, fn: mean) |> to(bucket: "example-db/example-rp-downsampled", host: host, token: token)使用
kapacitor flux task create命令将您的 Flux 脚本添加为 Kapacitor Flux 任务。kapacitor flux task create --file /path/to/example-task.flux
有关创建Kapacitor Flux任务的更多细节,请参见 创建Kapacitor Flux任务。
考虑使用 InfluxDB 任务
如果您正在使用 InfluxDB Cloud 或 InfluxDB OSS 2.x,建议使用 原生 InfluxDB 任务 进行数据处理。
为 InfluxDB Cloud 或 2.x 设置 Kapacitor
配置Kapacitor以连接到InfluxDB Cloud或InfluxDB OSS 2.x。有关详细说明,请参见以下内容:
为 InfluxDB Cloud 或 2.x 配置 Kapacitor Flux 任务
在您的 kapacitor.conf 中更新或添加以下设置 [fluxtask]:
- 启用:
true - task-run-influxdb: 在你的 kapacitor.conf 中用于存储 Flux 任务数据的 InfluxDB 配置名称。
要禁用 Flux 任务日志记录,请设置为
"none"。 - task-run-bucket: InfluxDB 存储 Flux 任务数据和日志的桶。我们建议将其留空。默认情况下,数据写入
kapacitor_fluxtask_logs桶。要指定另一个桶以写入任务日志数据,请使用 _tasks system bucket 或 创建一个新桶。如果指定的桶在 InfluxDB 中尚不存在,Kapacitor 会尝试使用POST /api/v2/buckets创建它,此时您的 API 令牌必须具有在 InfluxDB 中创建桶的权限。有关更多信息,请参见 管理 API 令牌。 - 提供以下内容之一:
- task-run-org: InfluxDB 组织名称。
- task-run-orgid: InfluxDB 组织 ID。
- task-run-measurement: InfluxDB 衡量标准,用于存储任务运行和日志数据。默认值是
"runs"。
# ...
[fluxtask]
enabled = true
task-run-influxdb = "InfluxDB"
task-run-bucket = "kapacitor_fluxtask_logs"
task-run-org = "example-org"
task-run-measurement = "runs"
# ...
创建一个 Flux 任务
创建一个 Flux 任务脚本。在您的脚本中包含 任务选项 以配置 Kapacitor Flux 任务。 有关编写 Flux 任务的更多信息,请参见:
提供 InfluxDB 连接凭据
from()](/flux/v0/stdlib/influxdata/influxdb/from/) 和to()函数 需要你的 InfluxDB 主机 和 令牌。- host: InfluxDB 的 URL。
- 令牌: 如果 启用了 InfluxDB 认证,使用
username:password语法。否则,使用空字符串 ("") 作为您的令牌。
示例任务.flux
option task = { name: "example-task-name", every: 1h, offset: 10m } host = "http://localhost:8086" token = "" from(bucket: "example-bucket", host: host, token: token) |> range(start: -task.every) |> filter(fn: (r) => r._measurement == "example-measurement") |> aggregateWindow(every: 10m, fn: mean) |> to(bucket: "example-bucket-downsampled", host: host, token: token)使用
kapacitor flux task create命令将您的 Flux 脚本添加为 Kapacitor Flux 任务。kapacitor flux task create --file /path/to/example-task.flux
有关创建 Kapacitor Flux 任务的更多详细信息,请参阅 Create a Kapacitor Flux task。