Documentation

窗口节点

窗口节点在移动的时间范围内缓存数据。 windowperiod 属性定义了 window 所覆盖的时间范围。

窗口的 every 属性定义了窗口向管道中的下一个节点发出的频率。

windowalign属性定义了如何对齐窗口边缘。 (默认情况下,边缘是相对于window节点接收到的第一个数据点定义的。)

示例:

stream
  |window()
    .period(10m)
    .every(5m)
  |httpOut('recent')

此示例每 5 minutes 发送最后 10 minute 期间到管道的 httpOut 节点。 因为 every 小于 period,每次窗口发出时包含 5 minutes 的 新数据和 5 minutes 的前一个期间的数据。

注意: 因为没有定义 align 属性,所以 window 边缘是相对于第一个数据点定义的。

构造函数

链式方法描述
window ( )创建一个按时间窗口化流的新节点。

属性方法

设置器描述
align ( )如果未使用align属性来修改window节点,则窗口对齐假定从它接收到的第一个数据点的时间开始。如果设置了align属性,则窗口时间边缘将被截断到every属性(例如,如果一个数据点的时间是12:06,而every属性是5m,则该数据点的窗口将范围从12:05到12:10)。
every ( value time.Duration)当前窗口多久发射到管道中。如果等于零,则每一个新的点都会发射当前窗口。
everyCount ( value int64)EveryCount根据点的数量确定窗口发出的频率。值为1意味着每一个新点都会发出窗口。
fillPeriod ( )FillPeriod指示WindowNode在发出第一批数据之前等待时间段的结束。这仅在时间段大于每个值时适用。
period ( value time.Duration)窗口的周期或时间长度。
periodCount ( value int64)PeriodCount是每个窗口中的点数。
quiet ( )抑制此节点的所有错误日志事件。

链式调用方法

警报, 屏障, 底部, 变化检测, 合并, 计数, 累计和, 死人, 默认, 删除, 导数, 差异, 唯一, Ec2自动缩放, 经过时间, 评估, 第一次, 扁平化, 分组, 霍尔特-温特斯, 霍尔特-温特斯与拟合, Http输出, HttpPOST, InfluxDB输出, 连接, K8s自动缩放, Kapacitor循环回路, 最后, 日志, 最大值, 平均值, 中位数, 最小值, 众数, 移动平均, 百分位数, 样本, 移位, 旁加载, 传播, 状态计数, 状态持续时间, 统计, 标准差, 总和, 群体自动缩放, 顶部, 涓涓细流, 并集, 条件, 窗口


属性

属性方法修改调用节点的状态。它们不会向管道中添加另一个节点,并始终返回对调用节点的引用。属性方法使用.运算符标记。

对齐

align 属性设置为截断窗口时间边缘到 every 属性。例如,如果数据点的时间是 12:06 而 every 属性是 5m,那么数据点的窗口范围是从 12:05 到 12:10)。

如果没有使用align属性来修改window节点,窗口对齐将在接收到第一个数据点时开始。

window.align()

注意:在不规则间隔内摄取数据时,我们建议使用 window.align() 来对数据进行分组。

每一个

当前窗口向管道发出的频率。 如果等于零,则每个新点将发出当前窗口。

window.every(value time.Duration)

每次计数

EveryCount 根据点的数量确定窗口发出的频率。 值为 1 意味着每个新点都会发出窗口。

window.everyCount(value int64)

填充周期

FillPeriod 指示 WindowNode 在周期结束前等待,才会发出第一批。这仅在周期大于每个值时适用。

window.fillPeriod()

周期

窗口的时间长度或期间。

window.period(value time.Duration)

周期计数

PeriodCount 是每个窗口内的点数。

window.periodCount(value int64)

安静

抑制来自此节点的所有错误日志事件。

window.quiet()

链式调用方法

链式方法在调用节点的子节点中创建一个新的节点。它们不会修改调用节点。链式方法使用 | 运算符标记。

警告

创建一个警报节点,可以触发警报。

window|alert()

返回: AlertNode

障碍

创建一个新的障碍节点,它定期发出障碍消息。

每个周期都会发出一条 barrier消息。

window|barrier()

返回: BarrierNode

底部

选择底部 num 点用于 field 并按任何额外标签或字段排序。

window|bottom(num int64, field string, fieldsAndTags ...string)

返回: InfluxQLNode

变更检测

创建一个新节点,只有在与前一个点不同的情况下才发出新点。

window|changeDetect(field string)

返回: ChangeDetectNode

合并

将此节点与自身结合。数据根据时间戳进行结合。

window|combine(expressions ...ast.LambdaNode)

返回: CombineNode

计数

计算点的数量。

window|count(field string)

返回: InfluxQLNode

累积和

计算每个接收到的点的累积和。 每收集到一个点就会发出一个点。

window|cumulativeSum(field string)

返回: InfluxQLNode

死者

用于在低吞吐量时创建警报的辅助函数,也称为死手开关。

  • 阈值:如果吞吐量在点/区间中下降到阈值以下,则触发警报。
  • 间隔:检查吞吐量的频率。
  • 表达式:可选的表达式列表,供评估使用。对于时间警报非常有用。

示例:

    var data = stream
        |from()...
    // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
    data
        |deadman(100.0, 10s)
    //Do normal processing of data
    data...

上面的内容等同于这个示例:

    var data = stream
        |from()...
    // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
    data
        |stats(10s)
            .align()
        |derivative('emitted')
            .unit(10s)
            .nonNegative()
        |alert()
            .id('node \'stream0\' in task \'{{ .TaskName }}\'')
            .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "emitted" | printf "%0.3f" }} points/10s.')
            .crit(lambda: "emitted" <= 100.0)
    //Do normal processing of data
    data...

可以通过“deadman”配置部分全局配置idmessage警报属性。

由于AlertNode是最后一部分,可以像往常一样进一步修改。 示例:

    var data = stream
        |from()...
    // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
    data
        |deadman(100.0, 10s)
            .slack()
            .channel('#dead_tasks')
    //Do normal processing of data
    data...

您可以指定额外的lambda表达式,以进一步限制何时触发死手按钮。 示例:

    var data = stream
        |from()...
    // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
    // Only trigger the alert if the time of day is between 8am-5pm.
    data
        |deadman(100.0, 10s, lambda: hour("time") >= 8 AND hour("time") <= 17)
    //Do normal processing of data
    data...
window|deadman(threshold float64, interval time.Duration, expr ...ast.LambdaNode)

返回: AlertNode

默认

创建一个节点,可以为缺失的标签或字段设置默认值。

window|default()

返回: DefaultNode

删除

创建一个可以删除标签或字段的节点。

window|delete()

返回: DeleteNode

导数

创建一个新节点,该节点计算相邻点的导数。

window|derivative(field string)

返回: DerivativeNode

差异

计算独立于经过时间的点之间的差异。

window|difference(field string)

返回: InfluxQLNode

唯一

生成仅包含不同点的批次。

window|distinct(field string)

返回: InfluxQLNode

Ec2Autoscale

创建一个可以触发自动缩放事件的 EC2 自动缩放组节点。

window|ec2Autoscale()

返回: Ec2AutoscaleNode

经过时间

计算点之间的经过时间。

window|elapsed(field string, unit time.Duration)

返回: InfluxQLNode

评估

创建一个评估节点,该节点将对每个数据点评估给定的变换函数。可以提供表达式列表,并将按给定顺序进行评估。结果可供后续表达式使用。

window|eval(expressions ...ast.LambdaNode)

返回: EvalNode

第一

选择第一个点。

window|first(field string)

返回: InfluxQLNode

扁平化

将具有相似时间的点合并为一个点。

window|flatten()

返回: FlattenNode

分组

按一组标签对数据进行分组。

可以传递字面量 * 来按所有维度分组。 示例:

    |groupBy(*)
window|groupBy(tag ...interface{})

返回: GroupByNode

霍尔特-温特斯

计算一个数据集的霍尔特-温特斯(/influxdb/v1/query_language/functions/#holt-winters)预测。

window|holtWinters(field string, h int64, m int64, interval time.Duration)

返回: InfluxQLNode

霍尔特-冬季法与拟合

计算Holt-Winters (/influxdb/v1/query_language/functions/#holt-winters) 数据集的预测。 此方法还输出用于拟合数据的所有点,除了预测的数据。

window|holtWintersWithFit(field string, h int64, m int64, interval time.Duration)

返回: InfluxQLNode

Http输出

创建一个HTTP输出节点,用于缓存它所接收到的最新数据。缓存的数据可以在给定的端点访问。该端点是从运行任务的API端点的相对路径。例如,如果任务端点位于 /kapacitor/v1/tasks/<task_id> 且端点为 top10,那么数据可以从 /kapacitor/v1/tasks/<task_id>/top10 请求。

window|httpOut(endpoint string)

返回: HTTPOutNode

HttpPost

创建一个HTTP Post节点,将接收到的数据POST到提供的HTTP端点。HttpPost期望0或1个参数。如果提供0个参数,必须指定一个端点属性方法。

window|httpPost(url ...string)

返回: HTTPPostNode

InfluxDB输出

创建一个 influxdb 输出节点,将传入的数据存储到 InfluxDB 中。

window|influxDBOut()

返回: InfluxDBOutNode

加入

将此节点与其他节点连接。数据是基于时间戳进行连接的。

window|join(others ...Node)

返回: JoinNode

K8s自缩放

创建一个可以触发Kubernetes集群自适应缩放事件的节点。

window|k8sAutoscale()

返回: K8sAutoscaleNode

Kapacitor循环回路

创建一个将数据作为流发送回Kapacitor的kapacitor循环节点。

window|kapacitorLoopback()

返回: KapacitorLoopbackNode

最后

选择最后一点。

window|last(field string)

返回: InfluxQLNode

日志

创建一个节点,记录它接收到的所有数据。

window|log()

返回: LogNode

最大值

选择最大点。

window|max(field string)

返回: InfluxQLNode

均值

计算数据的平均值。

window|mean(field string)

返回: InfluxQLNode

中位数

计算数据的中位数。

注意:此方法不是选择器。如果你想要中位数,请使用 .percentile(field, 50.0)

window|median(field string)

返回: InfluxQLNode

最小值

选择最小点。

window|min(field string)

返回: InfluxQLNode

模式

计算数据的众数。

window|mode(field string)

返回: InfluxQLNode

移动平均

计算最后窗口点的移动平均值。 在窗口填满之前不会发出任何点。

window|movingAverage(field string, window int64)

返回: InfluxQLNode

百分位数

在给定百分位数处选择一个点。 这是一个选择器函数,不执行点之间的插值。

window|percentile(field string, percentile float64)

返回: InfluxQLNode

示例

创建一个新节点,该节点对传入的点或批次进行采样。

每个指定的计数或持续时间将会发出一个点。

window|sample(rate interface{})

返回: SampleNode

移位

创建一个新的节点,按时间移动传入的点或批次。

window|shift(shift time.Duration)

返回: ShiftNode

侧载

创建一个可以从外部源加载数据的节点。

window|sideload()

返回: SideloadNode

扩散

计算 minmax 点之间的差。

window|spread(field string)

返回: InfluxQLNode

状态计数

创建一个节点,用于跟踪给定状态中连续点的数量。

window|stateCount(expression ast.LambdaNode)

返回: StateCountNode

状态持续时间

创建一个跟踪给定状态下持续时间的节点。

window|stateDuration(expression ast.LambdaNode)

返回: StateDurationNode

统计

创建一个新的数据流,其中包含节点的内部统计信息。 间隔表示根据实时多长时间发出一次统计信息。 这意味着间隔时间与源节点接收的数据点次数无关。

window|stats(interval time.Duration)

返回结果: StatsNode

标准差

计算标准差。

window|stddev(field string)

返回: InfluxQLNode

总和

计算所有值的总和。

window|sum(field string)

返回: InfluxQLNode

群集自动缩放

创建一个可以触发Docker swarm集群的自动缩放事件的节点。

window|swarmAutoscale()

返回: SwarmAutoscaleNode

顶部

选择前 num 个点用于 field 并按任何额外标签或字段排序。

window|top(num int64, field string, fieldsAndTags ...string)

返回: InfluxQLNode

涓流

创建一个新的节点,将批量数据转换为流数据。

window|trickle()

返回: TrickleNode

联合

执行该节点与所有其他给定节点的并集。

window|union(node ...Node)

返回: UnionNode

在哪里

创建一个新节点,该节点根据给定的表达式过滤数据流。

window|where(expression ast.LambdaNode)

返回: WhereNode

窗口

创建一个新的节点,通过时间窗口化流。

注意:窗口只能应用于流边缘。

window|window()

返回: WindowNode



Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: