查询流节点
这个 queryFlux 节点定义了一个源和一个处理批量数据的时间表。 数据是从一个 InfluxDB bucket 中查询的,然后传递到数据管道中。
示例
以下示例每20秒查询一次InfluxDB,并返回最近一分钟的数据。
batch
|queryFlux('''
from(bucket: "example-bucket")
|> range(start: -1m)
|> filter(fn: (r) =>
r._measurement == "example-measurement" and
r._field == "example-field"
)
''')
.period(1m)
.every(20s)
构造函数
| 链式方法 | 描述 |
|---|---|
queryFlux( queryStr string) | 要执行的Flux查询。 |
属性方法
| 设置器 | 描述 |
|---|---|
| align( ) | 将开始和结束时间与 every 值对齐。如果使用 cron 则不适用。 |
cluster( value string) | 配置的 InfluxDB 集群的名称。如果为空,将使用默认集群。 |
cron( value string) | 使用cron语法定义一个时间表。与every属性互斥。 |
every( value time.Duration) | 查询InfluxDB的频率。与cron属性互斥。 |
offset( value time.Duration) | 查询从当前时间往回多远的时间。 |
org( value string) | InfluxDB Cloud 或 2.x 组织名称。 |
orgID( value string) | InfluxDB Cloud 或 2.x 组织 ID。 |
period( value time.Duration) | 从InfluxDB查询的时间段或长度。 |
| quiet( ) | 抑制来自该节点的所有错误日志事件。 |
链式调用方法
警报, 障碍, 底部, 变化检测, 组合, 计数, 累积和, 死信阀, 默认, 删除, 导数, 差异, 不同, Ec2自动缩放, 经过时间, 评估, 第一个, 扁平化, 霍尔特-温特斯, 霍尔特-温特斯拟合, Http输出, Http发布, InfluxDB输出, 连接, K8s自动缩放, Kapacitor循环反馈, 最后, 日志, 最大值, 均值, 中位数, 最小值, 众数, 移动平均, 百分位数, 样本, 偏移, 并行加载, 扩散, 状态计数, 状态持续时间, 统计, 标准差, 和, 群体自动缩放, 顶部, 涓流, 联合, 哪里, 窗口
属性
属性方法修改调用节点的状态。它们不会向管道中添加另一个节点,并始终返回对调用节点的引用。属性方法使用.运算符标记。
对齐
对齐查询的开始和结束时间,使其与QueryFluxNode.Every属性的偶数边界一致。如果使用QueryFluxNode.Cron属性,则不适用。
queryFlux.align()
聚类
配置的 InfluxDB 集群的名称。
如果为空,则使用默认集群。
queryFlux.cluster(value string)
定时任务
使用cron语法定义一个调度。
具体的cron实现文档在这里: https://github.com/gorhill/cronexpr#implementation
Cron属性与Every属性是互斥的。
queryFlux.cron(value string)
每一个
多长时间查询一次InfluxDB。
每个属性与Cron属性是互斥的。
queryFlux.every(value time.Duration)
偏移量
从当前时间查询的时间回溯多久。
例如,使用2小时的offset和5分钟的every,Kapacitor将每5分钟查询一次InfluxDB,获取2小时前的数据窗口。
这也适用于 Cron 调度。如果 cron 指定每星期天凌晨 1 点运行,而时区偏移为 1 小时。那么在星期天凌晨 1 点时,将查询前一天晚上 12 点的数据。
queryFlux.offset(value time.Duration)
周期
从InfluxDB查询的时间段或持续时间。
queryFlux.period(value time.Duration)
组织
要查询的 InfluxDB Cloud 或 2.x 组织名称。
如果为空,则使用默认的 org。
queryFlux.org(value string)
组织ID
要查询的 InfluxDB Cloud 或 2.x 组织 ID。
如果为空,则使用默认 orgID。
queryFlux.orgID(value string)
安静
抑制来自此节点的所有错误日志事件。
queryFlux.quiet()
链式调用方法
链式方法在调用节点的子节点中创建一个新的节点。它们不会修改调用节点。链式方法使用 | 运算符标记。
警告
创建一个警报节点,可以触发警报。
queryFlux|alert()
返回: AlertNode
障碍
创建一个新的障碍节点,它定期发出障碍消息。
每个周期都会发出一条 barrier消息。
queryFlux|barrier()
返回: BarrierNode
底部
选择底部 num 点用于 field 并按任何额外标签或字段排序。
queryFlux|bottom(num int64, field string, fieldsAndTags ...string)
返回: InfluxQLNode
变更检测
创建一个新节点,只有在与前一个点不同的情况下才发出新点。
queryFlux|changeDetect(field string)
返回: ChangeDetectNode
合并
将此节点与自身结合。数据根据时间戳进行结合。
queryFlux|combine(expressions ...ast.LambdaNode)
返回: CombineNode
计数
计算点的数量。
queryFlux|count(field string)
返回: InfluxQLNode
累积和
计算每个接收到的点的累积和。 每收集到一个点就会发出一个点。
queryFlux|cumulativeSum(field string)
返回: InfluxQLNode
死者
用于在低吞吐量时创建警报的辅助函数,也称为死手开关。
- 阈值:如果吞吐量在点/区间中下降到阈值以下,则触发警报。
- 间隔:检查吞吐量的频率。
- 表达式:可选的表达式列表,供评估使用。对于时间警报非常有用。
示例:
var data = batch
|queryFlux()...
// Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
data
|deadman(100.0, 10s)
//Do normal processing of data
data...
上面的内容等同于这个示例:
var data = batch
|queryFlux()...
// Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
data
|stats(10s)
.align()
|derivative('emitted')
.unit(10s)
.nonNegative()
|alert()
.id('node \'stream0\' in task \'{{ .TaskName }}\'')
.message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "emitted" | printf "%0.3f" }} points/10s.')
.crit(lambda: "emitted" <= 100.0)
//Do normal processing of data
data...
可以通过“deadman”配置部分全局配置id和message警报属性。
由于AlertNode是最后一部分,可以像往常一样进一步修改。 示例:
var data = batch
|queryFlux()...
// Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
data
|deadman(100.0, 10s)
.slack()
.channel('#dead_tasks')
//Do normal processing of data
data...
您可以指定额外的lambda表达式,以进一步限制何时触发死手按钮。 示例:
var data = batch
|queryFlux()...
// Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
// Only trigger the alert if the time of day is between 8am-5pm.
data
|deadman(100.0, 10s, lambda: hour("time") >= 8 AND hour("time") <= 17)
//Do normal processing of data
data...
queryFlux|deadman(threshold float64, interval time.Duration, expr ...ast.LambdaNode)
返回: AlertNode
默认
创建一个节点,可以为缺失的标签或字段设置默认值。
queryFlux|default()
返回: DefaultNode
删除
创建一个可以删除标签或字段的节点。
queryFlux|delete()
返回: DeleteNode
导数
创建一个新节点,该节点计算相邻点的导数。
queryFlux|derivative(field string)
返回: DerivativeNode
差异
计算独立于经过时间的点之间的差异。
queryFlux|difference(field string)
返回: InfluxQLNode
唯一
生成仅包含不同点的批次。
queryFlux|distinct(field string)
返回: InfluxQLNode
Ec2Autoscale
创建一个可以触发自动缩放事件的 EC2 自动缩放组节点。
queryFlux|ec2Autoscale()
返回: Ec2AutoscaleNode
经过时间
计算点之间的经过时间。
queryFlux|elapsed(field string, unit time.Duration)
返回: InfluxQLNode
评估
创建一个评估节点,该节点将对每个数据点评估给定的变换函数。可以提供表达式列表,并将按给定顺序进行评估。结果可供后续表达式使用。
queryFlux|eval(expressions ...ast.LambdaNode)
返回: EvalNode
第一
选择第一个点。
queryFlux|first(field string)
返回: InfluxQLNode
扁平化
将具有相似时间的点合并为一个点。
queryFlux|flatten()
返回: FlattenNode
霍尔特-温特斯
计算一个数据集的霍尔特-温特斯(/influxdb/v1/query_language/functions/#holt-winters)预测。
queryFlux|holtWinters(field string, h int64, m int64, interval time.Duration)
返回: InfluxQLNode
霍尔特-冬季法与拟合
计算Holt-Winters (/influxdb/v1/query_language/functions/#holt-winters) 数据集的预测。 此方法还输出用于拟合数据的所有点,除了预测的数据。
queryFlux|holtWintersWithFit(field string, h int64, m int64, interval time.Duration)
返回: InfluxQLNode
Http输出
创建一个HTTP输出节点,用于缓存它所接收到的最新数据。缓存的数据可以在给定的端点访问。该端点是从运行任务的API端点的相对路径。例如,如果任务端点位于 /kapacitor/v1/tasks/<task_id> 且端点为 top10,那么数据可以从 /kapacitor/v1/tasks/<task_id>/top10 请求。
queryFlux|httpOut(endpoint string)
返回: HTTPOutNode
HttpPost
创建一个HTTP Post节点,将接收到的数据POST到提供的HTTP端点。HttpPost期望0或1个参数。如果提供0个参数,必须指定一个端点属性方法。
queryFlux|httpPost(url ...string)
返回: HTTPPostNode
InfluxDB输出
创建一个 influxdb 输出节点,将传入的数据存储到 InfluxDB 中。
queryFlux|influxDBOut()
返回: InfluxDBOutNode
加入
将此节点与其他节点连接。数据是基于时间戳进行连接的。
queryFlux|join(others ...Node)
返回: JoinNode
K8s自缩放
创建一个可以触发Kubernetes集群自适应缩放事件的节点。
queryFlux|k8sAutoscale()
返回: K8sAutoscaleNode
Kapacitor循环回路
创建一个将数据作为流发送回Kapacitor的kapacitor循环节点。
queryFlux|kapacitorLoopback()
最后
选择最后一点。
queryFlux|last(field string)
返回: InfluxQLNode
日志
创建一个节点,记录它接收到的所有数据。
queryFlux|log()
返回: LogNode
最大值
选择最大点。
queryFlux|max(field string)
返回: InfluxQLNode
均值
计算数据的平均值。
queryFlux|mean(field string)
返回: InfluxQLNode
中位数
计算数据的中位数。
注意:此方法不是选择器。如果你想要中位数,请使用
.percentile(field, 50.0)。
queryFlux|median(field string)
返回: InfluxQLNode
最小值
选择最小点。
queryFlux|min(field string)
返回: InfluxQLNode
模式
计算数据的众数。
queryFlux|mode(field string)
返回: InfluxQLNode
移动平均
计算最后窗口点的移动平均值。 在窗口填满之前不会发出任何点。
queryFlux|movingAverage(field string, window int64)
返回: InfluxQLNode
百分位数
在给定百分位数处选择一个点。 这是一个选择器函数,不执行点之间的插值。
queryFlux|percentile(field string, percentile float64)
返回: InfluxQLNode
示例
创建一个新节点,该节点对传入的点或批次进行采样。
每个指定的计数或持续时间将会发出一个点。
queryFlux|sample(rate interface{})
返回: SampleNode
移位
创建一个新的节点,按时间移动传入的点或批次。
queryFlux|shift(shift time.Duration)
返回: ShiftNode
侧载
创建一个可以从外部源加载数据的节点。
queryFlux|sideload()
返回: SideloadNode
扩散
计算 min 和 max 点之间的差。
queryFlux|spread(field string)
返回: InfluxQLNode
状态计数
创建一个节点,用于跟踪给定状态中连续点的数量。
queryFlux|stateCount(expression ast.LambdaNode)
返回: StateCountNode
状态持续时间
创建一个跟踪给定状态下持续时间的节点。
queryFlux|stateDuration(expression ast.LambdaNode)
统计
创建一个新的数据流,其中包含节点的内部统计信息。 间隔表示根据实时多长时间发出一次统计信息。 这意味着间隔时间与源节点接收的数据点次数无关。
queryFlux|stats(interval time.Duration)
返回结果: StatsNode
标准差
计算标准差。
queryFlux|stddev(field string)
返回: InfluxQLNode
总和
计算所有值的总和。
queryFlux|sum(field string)
返回: InfluxQLNode
群集自动缩放
创建一个可以触发Docker swarm集群的自动缩放事件的节点。
queryFlux|swarmAutoscale()
顶部
选择前 num 个点用于 field 并按任何额外标签或字段排序。
queryFlux|top(num int64, field string, fieldsAndTags ...string)
返回: InfluxQLNode
涓流
创建一个新的节点,将批量数据转换为流数据。
queryFlux|trickle()
返回: TrickleNode
联合
执行该节点与所有其他给定节点的并集。
queryFlux|union(node ...Node)
返回: UnionNode
在哪里
创建一个新节点,该节点根据给定的表达式过滤数据流。
queryFlux|where(expression ast.LambdaNode)
返回: WhereNode
窗口
创建一个新的节点,通过时间窗口化流。
注意:窗口只能应用于流边缘。
queryFlux|window()
返回: WindowNode