处理迟到的数据
在某些情况下,由于网络延迟或其他问题,您的时间序列数据可能会晚于预期到达InfluxDB。为了确保您计算的聚合结果是正确的,您必须在聚合和降采样任务中考虑数据延迟。本文指南将介绍一种方法,该方法使用InfluxDB任务和 API-invokable scripts 检测和考虑晚到的数据。
场景
您正在收集和存储100个不同地点的水位数据。 每个地点的数据每10秒报告一次。 每个地点的网络连接性各不相同,但可以自信地每小时至少将报告的数据写入InfluxDB,但可能会更频繁。
设置
为了遵循本指南,您需要创建以下资源:
- 一个 All-Access token。
- 三个 InfluxDB 桶:
- water_level_raw: 存储原始水位数据。
- water_level_mean: 存储水位的一分钟平均值。 平均值包含来自最后一小时的迟到数据。
- water_level_checksum: 存储水位的一分钟计数。 该计数用作每一分钟窗口的校验和。
- 一个可通过API调用的脚本:
water_level_process.flux: 这个脚本计算每分钟的水位平均值,并统计在水位平均值计算中使用的点的数量。平均值和数量分别写入water_level_mean和water_level_checksum桶中。
- 一个任务:
water_level_checksum.flux: 该任务触发water_level_process.flux脚本。该任务还会重新计算用于计算最新水位平均值的点的数量。它将 water_level_checksum 存储桶中的最新计数与这个新计数进行比较,并触发水位平均值的重新计算,以适应因迟到数据导致的计数增加。
在这个过程中,您计算每个位置在一分钟窗口内的平均水位。 它被设计为处理最多延迟一小时到达的数据。 每个位置的数据每10秒钟写入一次。 此外,每10秒钟,在每个位置的最后一小时内写入一个延迟数据点。
概述
在深入代码之前,先高层次地了解一下Flux脚本的逻辑。

这个 water_level_checksum.flux 任务每分钟运行一次。它计算存在于 water_level_raw 存储桶中的点的数量(新计数),并将该计数与 water_level_checksum 存储桶中的计数(旧计数)进行比较。如果 water_level_raw 存储桶中的新计数不等于 water_level_checksum 存储桶中的计数,则该任务调用 water_level_process.flux 可调用的脚本,重新计算旧计数和聚合。
Flux脚本详细信息
水位处理.flux
water_level_process.flux 是一个可调用的脚本,它完成两件事:
- 计算由
start和stop脚本参数定义的时间范围内值的平均值,并将计算得到的平均值写入 water_level_mean 存储桶。 - 计算由
start和stop脚本参数定义的时间范围内的点数或总数,并将计数写入water_level_checksum桶。
// Compute and store the mean for the window
from(bucket: "water_level_raw")
|> range(start: params.start, stop: params.stop)
|> mean()
|> to(bucket: "water_level_mean", timeColumn: "_stop")
|> yield(name: "means")
// Compute and store the new checksum for this window
from(bucket: "water_level_raw")
|> range(start: params.start, stop: params.stop)
|> group(columns: ["_measurement", "_field", "_stop"])
|> count()
|> to(bucket: "water_level_checksum", timeColumn: "_stop")
|> yield(name: "checksums")
水位检查.flux
water_level_process.flux 是一个执行以下操作的任务:
- 计算过去一小时内每分钟窗口中water_level_raw桶中的点数(新计数)。
- 调用
water_level_process.flux可调用脚本以计算一分钟窗口内的新平均值和新计数。 - 收集过去一小时内water_level_checksum桶中之前的计数(旧计数)。
- 将旧流和新流连接,并将旧计数与新计数进行比较。
- 不匹配的计数过滤器。
- 调用
water_level_process.flux可调用脚本以重新计算每一分钟窗口内计数不匹配的均值和计数。
任务详情
task选项提供任务的配置设置:name: 为任务提供一个名称。every: 定义任务运行的频率(每一分钟),在这种情况下,用于计算均值和计数的时间窗口间隔。offset: 定义在执行任务之前等待的时间。 _偏移量不会改变任务查询的时间范围。_
invokeScripts()是一个自定义函数,用于调用water_level_process.flux可调用脚本。
option task = {name: "water_level_checksum", every: 1m, offset: 10s}
invokeScript = (start, stop, scriptID) =>
requests.post(
url: "https://cloud2.influxdata.com/api/v2/scripts/${scriptID}/invoke",
headers: ["Authorization": "Token ${token}", "Accept": "application/json", "Content-Type": "application/json"],
body: json.encode(v: {params: {start: string(v: start), stop: string(v: stop)}}),
)
首先,新计数被计算并存储在变量 newCounts 中。newCounts =
from(bucket: "water_level_raw")
|> range(start: start, stop: stop)
|> group(columns: ["_measurement", "_field"])
|> aggregateWindow(every: every, fn: count)
其中范围的开始值和结束值定义为:start = date.truncate(t: -late_window, unit: every)
stop = date.truncate(t: now(), unit: every. late_window 等于您愿意等待的最久时间,以便接收延迟到达的数据(在此示例中等于)。 date.truncate() 函数用于将开始和停止时间截断到最新的分钟,以确保您在相同的时间戳上成功重新计算值。 其中 every = task.every。由于任务以 1 分钟的间隔运行,every 等于 1m。此外,请记住 aggregateWindow 函数默认使用 _stop 列作为汇总值的新时间值来源。
接下来,使用以下代码计算当前的平均值和计数:// 始终计算最新的区间
newCounts
|> filter(fn: (r) => r._time == stop)
|> map(
fn: (r) => {
response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)
return {r with code: response.statusCode}
},
)
|> yield(name: "current")
过滤最后的 newCount 值。请记住,时间值等于停止值,因为 aggregateWindow() 函数的默认行为。然后遍历该单行表以调用一次 invokeScript 函数。在这里,您还将 date.sub(d: every, from: r._time) 和 r._time 的开始和停止参数的值传递进去。请记住,every 变量等于 1m。实际上,这意味着您将在 1 分钟的间隔内计算均值和计数(时间戳适当截断,以确保后来重新计算的均值不会被覆盖)。这段代码确保您至少会调用一次 water_level_process.flux 脚本,以分别将新的均值和计数写入 water_level_mean 和 water_level_checksum 桶。
接下来,查询过去一小时的 water_level_checksum 桶:
oldCounts =
from(bucket: "water_level_checksum")
|> range(start: start, stop: stop)
|> group(columns: ["_measurement", "_field"])
请记住,开始和结束时间在这里等于 - 和现在() 截断到分钟。
现在将旧计数和新计数结合在一起。你还需要过滤计数不同的情况。如果它们确实不同,那么响应中将会有可以映射的记录。遍历这些记录,通过调用 level_water_process.flux 脚本来重新计算平均值和计数:
experimental.join(
left: oldCounts,
right: newCounts,
fn: (left, right) => ({left with old_count: left._value, new_count: right._value}),
)
// Recompute any windows where the checksum is different
|> filter(fn: (r) => r.old_count != r.new_count)
|> map(
fn: (r) => {
response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)
return {r with code: response.statusCode}
},
)
|> yield(name: "diffs")
完整的 water_level_checsum.flux 如下所示:
import "influxdata/influxdb/secrets"
import "experimental/http/requests"
import "json"
import "date"
import "experimental"
option task = {name: "water_level_checksum", every: 1m, offset: 10s}
// Size of the window to aggregate
every = task.every
// Longest we are willing to wait for late data
late_window = 1h
token = secrets.get(key: "SELF_TOKEN")
// invokeScript calls a Flux script with the given start stop
// parameters to recompute the window.
invokeScript = (start, stop) =>
requests.post(
// We have hardcoded the script ID here
url: "https://eastus-1.azure.cloud2.influxdata.com/api/v2/scripts/095fabd404108000/invoke",
headers: ["Authorization": "Token ${token}", "Accept": "application/json", "Content-Type": "application/json"],
body: json.encode(v: {params: {start: string(v: start), stop: string(v: stop)}}),
)
// Only query windows that span a full minute
start = date.truncate(t: -late_window, unit: every)
stop = date.truncate(t: now(), unit: every)
newCounts =
from(bucket: "water_level_raw")
|> range(start: start, stop: stop)
|> group(columns: ["_measurement", "_field"])
|> aggregateWindow(every: every, fn: count)
// Always compute the most recent interval
newCounts
|> filter(fn: (r) => r._time == stop)
|> map(
fn: (r) => {
response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)
return {r with code: response.statusCode}
},
)
|> yield(name: "current")
oldCounts =
from(bucket: "water_level_checksum")
|> range(start: start, stop: stop)
|> group(columns: ["_measurement", "_field"])
// Compare old and new checksum
experimental.join(
left: oldCounts,
right: newCounts,
fn: (left, right) => ({left with old_count: left._value, new_count: right._value}),
)
// Recompute any windows where the checksum is different
|> filter(fn: (r) => r.old_count != r.new_count)
|> map(
fn: (r) => {
response = invokeScript(start: date.sub(d: every, from: r._time), stop: r._time)
return {r with code: response.statusCode}
},
)
|> yield(name: "diffs")