Flux与InfluxQL
此页面记录了 InfluxDB OSS 的早期版本。InfluxDB OSS v2 是最新的稳定版本。请参阅 InfluxDB v2 文档。
Flux 是用于查询和分析数据的 InfluxQL 及其他类似 SQL 的查询语言的替代品。 Flux 使用函数式语言模式,使其强大、灵活,并能够克服 InfluxQL 的许多限制。 本文概述了许多使用 Flux 但 InfluxQL 不可能完成的任务,并提供有关 Flux 与 InfluxQL 的相同性的信息。
通过Flux实现的可能性
连接
InfluxQL 从未支持连接。它们可以使用 TICKscript 来实现,但即使是 TICKscript 的连接能力也有限。Flux 的 join() 函数 允许您从任何桶、任何测量和任何列联合数据,只要每个数据集包含要连接的列。这为真正强大和有用的操作开辟了机会。
dataStream1 = from(bucket: "bucket1")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "network" and
r._field == "bytes-transferred"
)
dataStream2 = from(bucket: "bucket1")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "httpd" and
r._field == "requests-per-sec"
)
join(
tables: {d1:dataStream1, d2:dataStream2},
on: ["_time", "_stop", "_start", "host"]
)
有关使用 join() 函数的深入讲解,请参见 如何使用 Flux 连接数据。
跨测量的数学
能够执行跨测量连接还允许您使用来自不同测量的数据进行计算——这是InfluxData社区高度请求的功能。下面的例子取自两个来自不同测量的数据流, mem 和 processes,将它们连接起来,然后计算每个运行进程使用的平均内存量:
// Memory used (in bytes)
memUsed = from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "mem" and
r._field == "used"
)
// Total processes running
procTotal = from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "processes" and
r._field == "total"
)
// Join memory used with total processes and calculate
// the average memory (in MB) used for running processes.
join(
tables: {mem:memUsed, proc:procTotal},
on: ["_time", "_stop", "_start", "host"]
)
|> map(fn: (r) => ({
_time: r._time,
_value: (r._value_mem / r._value_proc) / 1000000
})
)
按标签排序
InfluxQL 的排序功能非常有限,仅允许您使用 ORDER BY time 子句控制 time 的排序顺序。Flux 的 sort() 函数 根据列的列表对记录进行排序。根据列的类型,记录按字典序、数字或时间顺序排序。
from(bucket:"telegraf/autogen")
|> range(start:-12h)
|> filter(fn: (r) =>
r._measurement == "system" and
r._field == "uptime"
)
|> sort(columns:["region", "host", "_value"])
按任意列分组
InfluxQL 允许你按标签或时间间隔进行分组,但没有其他选项。Flux 允许你按数据集中的任何列进行分组,包括 _value。使用 Flux group() 函数 定义按哪些列对数据进行分组。
from(bucket:"telegraf/autogen")
|> range(start:-12h)
|> filter(fn: (r) => r._measurement == "system" and r._field == "uptime" )
|> group(columns:["host", "_value"])
按日历月和年份划分窗口
InfluxQL 不支持按日历月份和年份对数据进行窗口化,因为它们的长度不一致。
Flux 支持日历月份和年份持续时间单位 (1mo, 1y),并允许您
按日历月份和年份对数据进行窗口化和聚合。
from(bucket:"telegraf/autogen")
|> range(start:-1y)
|> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent" )
|> aggregateWindow(every: 1mo, fn: mean)
处理多个数据源
InfluxQL 只能查询存储在 InfluxDB 中的数据。
Flux 可以从其他数据源查询数据,例如 CSV、PostgreSQL、MySQL、Google BigTable 等。
将这些数据与 InfluxDB 中的数据连接,以丰富查询结果。
import "csv"
import "sql"
csvData = csv.from(csv: rawCSV)
sqlData = sql.from(
driverName: "postgres",
dataSourceName: "postgresql://user:password@localhost",
query:"SELECT * FROM example_table"
)
data = from(bucket: "telegraf/autogen")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "sensor")
auxData = join(tables: {csv: csvData, sql: sqlData}, on: ["sensor_id"])
enrichedData = join(tables: {data: data, aux: auxData}, on: ["sensor_id"])
enrichedData
|> yield(name: "enriched_data")
有关查询SQL数据的深入指南,请参见查询SQL数据源。
类似DatePart的查询
InfluxQL不支持仅在特定小时返回结果的类似DatePart的查询。
Flux hourSelection 函数
仅返回指定小时范围内的时间值的数据。
from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r.cpu == "cpu-total"
)
|> hourSelection(start: 9, stop: 17)
数据透视
在 InfluxQL 中从未支持数据表的透视。
Flux pivot() 函数 提供了通过指定 rowKey、columnKey 和 valueColumn 参数来透视数据表的能力。
from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "cpu" and
r.cpu == "cpu-total"
)
|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value"
)
直方图
生成直方图的能力是InfluxQL中一个备受期待的功能,但从未得到支持。
Flux的 histogram() function 使用输入
数据生成一个累积直方图,未来将支持其他直方图类型。
from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "mem" and
r._field == "used_percent"
)
|> histogram(
buckets: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
)
有关使用Flux创建累积直方图的示例,请参阅创建直方图。
协方差
Flux提供用于简单协方差计算的函数。
covariance()函数
计算两列之间的协方差,而cov()函数
计算两个数据流之间的协方差。
两列之间的协方差
from(bucket: "telegraf/autogen")
|> range(start:-5m)
|> covariance(columns: ["x", "y"])
两组数据之间的协方差
table1 = from(bucket: "telegraf/autogen")
|> range(start: -15m)
|> filter(fn: (r) =>
r._measurement == "measurement_1"
)
table2 = from(bucket: "telegraf/autogen")
|> range(start: -15m)
|> filter(fn: (r) =>
r._measurement == "measurement_2"
)
cov(x: table1, y: table2, on: ["_time", "_field"])
将布尔值转换为整数
InfluxQL 支持类型转换,但仅限于数字数据类型(浮点数到整数及其反向转换)。Flux 类型转换函数 提供更广泛的类型转换支持,并允许您执行一些长期请求的操作,例如将布尔值转换为整数。
将布尔字段值转换为整数
from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "m" and
r._field == "bool_field"
)
|> toInt()
字符串操作和数据整形
InfluxQL在查询数据时不支持字符串操作。
Flux Strings package是一个对字符串数据进行操作的函数集合。
与map()函数结合使用时,
字符串包中的函数允许进行字符串清理和规范化等操作。
import "strings"
from(bucket: "telegraf/autogen")
|> range(start: -1h)
|> filter(fn: (r) =>
r._measurement == "weather" and
r._field == "temp"
)
|> map(fn: (r) => ({
r with
location: strings.toTitle(v: r.location),
sensor: strings.replaceAll(v: r.sensor, t: " ", u: "-"),
status: strings.substring(v: r.status, start: 0, end: 8)
}))
处理地理时间数据
InfluxQL 并不提供处理地理时间数据的功能。 Flux Geo package 是一组函数的集合, 允许您对地理时间数据进行整形、过滤和分组。
import "experimental/geo"
from(bucket: "geo/autogen")
|> range(start: -1w)
|> filter(fn: (r) => r._measurement == "taxi")
|> geo.shapeData(latField: "latitude", lonField: "longitude", level: 20)
|> geo.filterRows(
region: {lat: 40.69335938, lon: -73.30078125, radius: 20.0},
strict: true
)
|> geo.asTracks(groupBy: ["fare-id"])
InfluxQL 和 Flux 对等
Flux 正在朝着与 InfluxQL 完全对等的方向努力,并为此添加了新的函数。下面的表格显示了 InfluxQL 语句、子句和函数以及它们等价的 Flux 函数。
有关Flux函数的完整列表,查看所有Flux函数。
InfluxQL 和 Flux 的相等性
* to() 函数仅写入 InfluxDB 2.0。