连接节点
join节点连接来自任何数量节点的数据。
当从父节点接收到每个数据点时,它会与来自其他父节点的下一个数据点配对,条件是它们具有匹配的时间戳。每个父节点最多为每个连接点贡献一个点。可以提供一个容差来连接时间戳不完全对齐的点。
任何落在容差范围内的点都会根据时间戳进行连接。
如果多个点落在相同的容差窗口内,则按照它们到达的顺序进行连接。
别名用于为来自各自节点的所有字段添加前缀。
连接可以是内连接或外连接,请参阅 JoinNode.Fill 属性。
示例:连接两个测量值
在下面的示例中,errors 和 requests 流被连接并转换以计算一个组合字段。
var errors = stream
|from()
.measurement('errors')
var requests = stream
|from()
.measurement('requests')
// Join the errors and requests streams
errors
|join(requests)
// Provide prefix names for the fields of the data points.
.as('errors', 'requests')
// points that are within 1 second are considered the same time.
.tolerance(1s)
// fill missing values with 0, implies outer join.
.fill(0.0)
// name the resulting stream
.streamName('error_rate')
// treat a delete from one side of the join as a delete to all sides
.deleteAll(TRUE)
// Both the "value" fields from each parent have been prefixed
// with the respective names 'errors' and 'requests'.
|eval(lambda: "errors.value" / "requests.value")
.as('rate')
...
示例:连接三个或更多测量值
在下面的示例中,errors、missing_page_errors 和 server_errors 被
连接并转换,以计算两个组合字段:404_rate 和 500_rate。
var errors = stream
|from()
.measurement('errors')
var missing_page_errors = stream
|from()
.measurement('errors')
.where(lambda: "type" == '404')
var server_errors = stream
|from()
.measurement('errors')
.where(lambda: "type" == '500')
// Join the errors, missing_page_errors, and server_errors streams
errors
|join(missing_page_errors, server_errors)
// Provide prefix names for the fields of the data points.
.as('errors', '404', '500')
// points that are within 1 second are considered the same time.
.tolerance(1s)
// fill missing values with 0, implies outer join.
.fill(0.0)
// name the resulting stream
.streamName('error_rates')
// The "value" fields from each parent have been prefixed
// with the respective names 'errors', 'missing_page_errors', 'and server_errors'.
// Calculate the percentage of 404 errors
|eval(lambda: "404.value" / "errors.value")
.as('404_rate')
// Calculate the percentage of 500 errors
|eval(lambda: "500.value" / "errors.value")
.as('500_rate')
...
构造函数
| 链式方法 | 描述 |
|---|---|
join(others ...Node) | 将此节点与其他节点连接。数据是基于时间戳连接的。 |
属性方法
| 设置器 | 描述 |
|---|---|
作为(names ...字符串) | 为来自各自节点的所有字段添加前缀。父节点中的每个字段将与提供的名称和.前缀。请参见下面的示例。 |
delimiter(value string) | 字段名称前缀的分隔符。可以是空字符串。 |
deleteAll(value bool) | 无论哪个侧接收删除消息,都删除连接的两侧。 |
fill(value interface{}) | 填充数据。填充选项意味着连接的类型:内连接或完全外连接。 |
on(dims ...string) | 在一组分组维度的子集上连接。这是一个特殊的情况,您希望一个父级的单个点与来自另一个父级的多个点进行连接。 |
| quiet() | 禁止此节点的所有错误日志记录事件。 |
streamName(value string) | 此新连接数据流的名称。如果为空,则使用左侧父级的名称。 |
tolerance(value time.Duration) | 两个输入点之间可以相差的最大时间持续时间,仍被视为在时间上相等。连接的数据点的时间将四舍五入到容差持续时间的最接近的倍数。 |
链式调用方法
警报, 屏障, 底部, 变化检测, 合并, 计数, 累计和, 死人, 默认, 删除, 导数, 差异, 唯一, Ec2自动缩放, 经过时间, 评估, 第一次, 扁平化, 分组, 霍尔特-温特斯, 霍尔特-温特斯与拟合, Http输出, HttpPOST, InfluxDB输出, 连接, K8s自动缩放, Kapacitor循环回路, 最后, 日志, 最大值, 平均值, 中位数, 最小值, 众数, 移动平均, 百分位数, 样本, 移位, 旁加载, 传播, 状态计数, 状态持续时间, 统计, 标准差, 总和, 群体自动缩放, 顶部, 涓涓细流, 并集, 条件, 窗口
属性
属性方法修改调用节点的状态。它们不会向管道中添加另一个节点,并始终返回对调用节点的引用。属性方法使用.运算符标记。
作为
来自各个节点的所有字段的前缀名称。
父节点中的每个字段将以提供的名称和“.”作为前缀。
请参见上面的示例。
名称中不能包含点 ‘.’ 字符。
join.as(names ...string)
分隔符
字段名称前缀的分隔符。可以是空字符串。
join.delimiter(value string)
删除所有
无论哪一方接收删除消息,删除联接的两侧。
join.deleteAll(value bool)
填充
填写数据。 填充选项意味着连接的类型:内部连接或完全外连接。 选项包括:
- none - (默认)跳过缺少点的行,内连接。
- null - 用 null 填充缺失的点,使用全外连接。
- 任何数值 - 用给定值填写字段,完全外连接。
当使用数字填充或空值填充时,字段名称通过从另一个点复制字段名称来确定。 当不同来源具有不同字段名称时,这样做效果不佳。 如果有必要,请使用 DefaultNode 和 DeleteNode 来完成填充操作。
join.fill(value interface{})
示例:
var maintlock = stream
|from()
.measurement('maintlock')
.groupBy('service')
var requests = stream
|from()
.measurement('requests')
.groupBy('service')
// Join the maintlock and requests streams
// The intent it to drop any points in maintenance mode.
maintlock
|join(requests)
// Provide prefix names for the fields of the data points.
.as('maintlock', 'requests')
// points that are within 1 second are considered the same time.
.tolerance(1s)
// fill missing fields with null, implies outer join.
// a better default per field will be set later.
.fill('null')
// name the resulting stream.
.streamName('requests')
|default()
// default maintenance mode to false, overwriting the null value if present.
.field('maintlock.mode', false)
// default the requests to 0, again overwriting the null value if present.
.field('requests.value', 0.0)
// drop any points that are in maintenance mode.
|where(lambda: "maintlock.mode")
|...
处理外连接中的空值填充
在使用Kapacitor执行外连接时,为了连接和填充操作产生的null字段设置默认值是很重要的。这是通过DefaultNode来完成的,它为特定字段键将null值替换为指定的默认值。如果不这样做,可能会导致无效的线协议(因为null并不是所有字段类型的适当值),从而导致连接失败。
source1
|join(source2)
.as('source1', 'source2')
.fill('null')
|default()
// .field('field-key', default-value)
// Define a default for an integer field type
.field('source1.rounded', 0)
// Define a default for a float field type
.field('source1.value', 0.0)
// Define a default for a string field type
.field('source2.location', '')
// Define a default for a boolean field type
.field('source2.maintenance', false)
使用此方法时,您必须知道连接结果的所有字段和字段类型,并提供适当的默认值。
您还可以使用 DeleteNode 来删除由于连接而产生的不必要的字段或标签。
source1
|join(source2)
.as('source1', 'source2')
.fill('null')
|default()
.field('source1.mode', false)
.field('source2.value', 0.0)
|delete()
.field('source1.anon')
.tag('host')
开启
在一组维度的子集上进行连接。 这是一个特殊情况,您希望从一个父项中获取单个点与来自不同父项的多个点连接。
例如给定两个测量值:
- building_power(一个单一值):由建筑标记,值是建筑消耗的总电力。
- floor_power (多个值): 按建筑和楼层标记,值是每层消耗的总功率。
您想要计算每层楼耗电量占总建筑能耗的比例。由于每栋建筑只有一个数据点,您需要将其与每层楼的数据点进行多次连接。通过将 on 维度定义为 building,我们表示希望仅将具有建筑标签的数据点与更具体的具有更多标签的数据点连接,在这种情况下是 floor 标签。换句话说,当我们有带有建筑和楼层标签的数据点时,我们只希望根据建筑标签进行连接。
示例:
var building = stream
|from()
.measurement('building_power')
.groupBy('building')
var floor = stream
|from()
.measurement('floor_power')
.groupBy('building', 'floor')
building
|join(floor)
.as('building', 'floor')
.on('building')
|eval(lambda: "floor.value" / "building.value")
... // Values here are grouped by 'building' and 'floor'
join.on(dims ...string)
安静
抑制来自此节点的所有错误日志事件。
join.quiet()
流名称
这个新组合数据流的名称。
如果为空,则使用左侧父级的名称。
join.streamName(value string)
公差
两个输入点之间可以相隔的最大时间,如果仍然被视为时间相等。连接数据点的时间将四舍五入到公差持续时间的最接近倍数。
join.tolerance(value time.Duration)
链式调用方法
链式方法在调用节点的子节点中创建一个新的节点。它们不会修改调用节点。链式方法使用 | 运算符标记。
警告
创建一个警报节点,可以触发警报。
join|alert()
返回: AlertNode
障碍
创建一个新的障碍节点,它定期发出障碍消息。
每个周期都会发出一条 barrier消息。
join|barrier()
返回: BarrierNode
底部
选择底部 num 点用于 field 并按任何额外标签或字段排序。
join|bottom(num int64, field string, fieldsAndTags ...string)
返回: InfluxQLNode
变更检测
创建一个新的节点,仅在与之前的点不同的情况下发出新点。
join|changeDetect(field string)
返回: ChangeDetectNode
合并
将此节点与自身结合。数据根据时间戳进行结合。
join|combine(expressions ...ast.LambdaNode)
返回: CombineNode
计数
计算点的数量。
join|count(field string)
返回: InfluxQLNode
累积和
计算每个接收到的点的累积和。 每收集到一个点就会发出一个点。
join|cumulativeSum(field string)
返回: InfluxQLNode
死者
用于在低吞吐量时创建警报的辅助函数,也称为死手开关。
- 阈值:如果吞吐量在点/区间中下降到阈值以下,则触发警报。
- 间隔:检查吞吐量的频率。
- 表达式:可选的表达式列表,供评估使用。对于时间警报非常有用。
示例:
var data = stream
|from()...
// Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
data
|deadman(100.0, 10s)
//Do normal processing of data
data...
上面的内容等同于这个示例:
var data = stream
|from()...
// Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
data
|stats(10s)
.align()
|derivative('emitted')
.unit(10s)
.nonNegative()
|alert()
.id('node \'stream0\' in task \'{{ .TaskName }}\'')
.message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "emitted" | printf "%0.3f" }} points/10s.')
.crit(lambda: "emitted" <= 100.0)
//Do normal processing of data
data...
可以通过“deadman”配置部分全局配置id和message警报属性。
由于AlertNode是最后一部分,可以像往常一样进一步修改。 示例:
var data = stream
|from()...
// Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
data
|deadman(100.0, 10s)
.slack()
.channel('#dead_tasks')
//Do normal processing of data
data...
您可以指定额外的lambda表达式,以进一步限制何时触发死手按钮。 示例:
var data = stream
|from()...
// Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
// Only trigger the alert if the time of day is between 8am-5pm.
data
|deadman(100.0, 10s, lambda: hour("time") >= 8 AND hour("time") <= 17)
//Do normal processing of data
data...
join|deadman(threshold float64, interval time.Duration, expr ...ast.LambdaNode)
返回: AlertNode
默认
创建一个节点,可以为缺失的标签或字段设置默认值。
join|default()
返回: DefaultNode
删除
创建一个可以删除标签或字段的节点。
join|delete()
返回: DeleteNode
导数
创建一个新节点,该节点计算相邻点的导数。
join|derivative(field string)
返回: DerivativeNode
差异
计算独立于经过时间的点之间的差异。
join|difference(field string)
返回: InfluxQLNode
唯一
生成仅包含不同点的批次。
join|distinct(field string)
返回: InfluxQLNode
Ec2Autoscale
创建一个可以触发自动缩放事件的 EC2 自动缩放组节点。
join|ec2Autoscale()
返回: Ec2AutoscaleNode
经过时间
计算点之间的经过时间。
join|elapsed(field string, unit time.Duration)
返回: InfluxQLNode
评估
创建一个评估节点,该节点将对每个数据点评估给定的变换函数。可以提供表达式列表,并将按给定顺序进行评估。结果可供后续表达式使用。
join|eval(expressions ...ast.LambdaNode)
返回: EvalNode
第一
选择第一个点。
join|first(field string)
返回: InfluxQLNode
扁平化
将具有相似时间的点合并为一个点。
join|flatten()
返回: FlattenNode
分组
按一组标签对数据进行分组。
可以传递字面量 * 来按所有维度分组。 示例:
|groupBy(*)
join|groupBy(tag ...interface{})
返回: GroupByNode
霍尔特-温特斯
计算一个数据集的霍尔特-温特斯(/influxdb/v1/query_language/functions/#holt-winters)预测。
join|holtWinters(field string, h int64, m int64, interval time.Duration)
返回: InfluxQLNode
霍尔特-冬季法与拟合
计算Holt-Winters (/influxdb/v1/query_language/functions/#holt-winters) 数据集的预测。 此方法还输出用于拟合数据的所有点,除了预测的数据。
join|holtWintersWithFit(field string, h int64, m int64, interval time.Duration)
返回: InfluxQLNode
Http输出
创建一个HTTP输出节点,用于缓存它所接收到的最新数据。缓存的数据可以在给定的端点访问。该端点是从运行任务的API端点的相对路径。例如,如果任务端点位于 /kapacitor/v1/tasks/<task_id> 且端点为 top10,那么数据可以从 /kapacitor/v1/tasks/<task_id>/top10 请求。
join|httpOut(endpoint string)
返回: HTTPOutNode
HttpPost
创建一个HTTP Post节点,将接收到的数据POST到提供的HTTP端点。HttpPost期望0或1个参数。如果提供0个参数,必须指定一个端点属性方法。
join|httpPost(url ...string)
返回: HTTPPostNode
InfluxDB输出
创建一个 influxdb 输出节点,将传入的数据存储到 InfluxDB 中。
join|influxDBOut()
返回: InfluxDBOutNode
加入
将此节点与其他节点连接。数据是基于时间戳进行连接的。
join|join(others ...Node)
返回: JoinNode
K8s自缩放
创建一个可以触发Kubernetes集群自适应缩放事件的节点。
join|k8sAutoscale()
返回: K8sAutoscaleNode
Kapacitor循环回路
创建一个将数据作为流发送回Kapacitor的kapacitor循环节点。
join|kapacitorLoopback()
最后
选择最后一点。
join|last(field string)
返回: InfluxQLNode
日志
创建一个节点,记录它接收到的所有数据。
join|log()
返回: LogNode
最大值
选择最大点。
join|max(field string)
返回: InfluxQLNode
均值
计算数据的平均值。
join|mean(field string)
返回: InfluxQLNode
中位数
计算数据的中位数。
注意:此方法不是选择器。如果你想要中位数,请使用
.percentile(field, 50.0)。
join|median(field string)
返回: InfluxQLNode
最小值
选择最小点。
join|min(field string)
返回: InfluxQLNode
模式
计算数据的众数。
join|mode(field string)
返回: InfluxQLNode
移动平均
计算最后窗口点的移动平均值。 在窗口填满之前不会发出任何点。
join|movingAverage(field string, window int64)
返回: InfluxQLNode
百分位数
在给定百分位数处选择一个点。 这是一个选择器函数,不执行点之间的插值。
join|percentile(field string, percentile float64)
返回: InfluxQLNode
示例
创建一个新节点,该节点对传入的点或批次进行采样。
每个指定的计数或持续时间将会发出一个点。
join|sample(rate interface{})
返回: SampleNode
移位
创建一个新的节点,按时间移动传入的点或批次。
join|shift(shift time.Duration)
返回: ShiftNode
侧载
创建一个可以从外部源加载数据的节点。
join|sideload()
返回: SideloadNode
扩散
计算 min 和 max 点之间的差。
join|spread(field string)
返回: InfluxQLNode
状态计数
创建一个节点,用于跟踪给定状态中连续点的数量。
join|stateCount(expression ast.LambdaNode)
返回: StateCountNode
状态持续时间
创建一个跟踪给定状态下持续时间的节点。
join|stateDuration(expression ast.LambdaNode)
统计
创建一个新的数据流,其中包含节点的内部统计信息。 间隔表示根据实时多长时间发出一次统计信息。 这意味着间隔时间与源节点接收的数据点次数无关。
join|stats(interval time.Duration)
返回结果: StatsNode
标准差
计算标准差。
join|stddev(field string)
返回: InfluxQLNode
总和
计算所有值的总和。
join|sum(field string)
返回: InfluxQLNode
群集自动缩放
创建一个可以触发Docker swarm集群的自动缩放事件的节点。
join|swarmAutoscale()
顶部
选择前 num 个点用于 field 并按任何额外标签或字段排序。
join|top(num int64, field string, fieldsAndTags ...string)
返回: InfluxQLNode
涓流
创建一个新的节点,将批量数据转换为流数据。
join|trickle()
返回: TrickleNode
联合
执行该节点与所有其他给定节点的并集。
join|union(node ...Node)
返回: UnionNode
在哪里
创建一个新节点,该节点根据给定的表达式过滤数据流。
join|where(expression ast.LambdaNode)
返回: WhereNode
窗口
创建一个新的节点,通过时间窗口化流。
注意:窗口只能应用于流边缘。
join|window()
返回: WindowNode