Documentation

流节点

stream 节点表示通过其任何输入流向 Kapacitor 的数据源。stream 变量在流任务中是 StreamNode. 的一个实例。StreamNode.From 是此节点的方法/属性。

构造函数

链式方法描述
stream没有构造函数签名。

属性方法

设置器描述
quiet ( )抑制此节点的所有错误日志事件。

链式调用方法

死者, 来自, 统计


属性

属性方法修改调用节点的状态。它们不会向管道中添加另一个节点,并始终返回对调用节点的引用。属性方法使用.运算符标记。

安静

抑制来自此节点的所有错误日志事件。

stream.quiet()

链式调用方法

链式方法在调用节点的子节点中创建一个新的节点。它们不会修改调用节点。链式方法使用 | 运算符标记。

死者

用于在低吞吐量时创建警报的辅助函数,也称为死手开关。

  • 阈值:如果吞吐量在点/区间中下降到阈值以下,则触发警报。
  • 间隔:检查吞吐量的频率。
  • 表达式:可选的表达式列表,供评估使用。对于时间警报非常有用。

示例:

    var data = stream
        |from()...
    // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
    data
        |deadman(100.0, 10s)
    //Do normal processing of data
    data...

上面的内容等同于这个示例:

    var data = stream
        |from()...
    // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
    data
        |stats(10s)
            .align()
        |derivative('emitted')
            .unit(10s)
            .nonNegative()
        |alert()
            .id('node \'stream0\' in task \'{{ .TaskName }}\'')
            .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "emitted" | printf "%0.3f" }} points/10s.')
            .crit(lambda: "emitted" <= 100.0)
    //Do normal processing of data
    data...

可以通过“deadman”配置部分全局配置idmessage警报属性。

由于AlertNode是最后一部分,可以像往常一样进一步修改。 示例:

    var data = stream
        |from()...
    // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
    data
        |deadman(100.0, 10s)
            .slack()
            .channel('#dead_tasks')
    //Do normal processing of data
    data...

您可以指定额外的lambda表达式,以进一步限制何时触发死手按钮。 示例:

    var data = stream
        |from()...
    // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
    // Only trigger the alert if the time of day is between 8am-5pm.
    data
        |deadman(100.0, 10s, lambda: hour("time") >= 8 AND hour("time") <= 17)
    //Do normal processing of data
    data...
stream|deadman(threshold float64, interval time.Duration, expr ...ast.LambdaNode)

返回: AlertNode

来自

创建一个新的 FromNode,可以进一步使用数据库、保留策略、测量和条件属性进行过滤。可以多次调用 From 来创建数据流的多个独立分支。

示例:

    // Select the 'cpu' measurement from just the database 'mydb'
    // and retention policy 'myrp'.
    var cpu = stream
        |from()
            .database('mydb')
            .retentionPolicy('myrp')
            .measurement('cpu')
    // Select the 'load' measurement from any database and retention policy.
    var load = stream
        |from()
            .measurement('load')
    // Join cpu and load streams and do further processing.
    cpu
        |join(load)
            .as('cpu', 'load')
        ...
stream|from()

返回: FromNode

统计

创建一个新的数据流,其中包含节点的内部统计信息。 间隔表示根据实时多长时间发出一次统计信息。 这意味着间隔时间与源节点接收的数据点次数无关。

stream|stats(interval time.Duration)

返回结果: StatsNode



Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: