Documentation

Flux与InfluxQL

此页面记录了 InfluxDB OSS 的早期版本。InfluxDB OSS v2 是最新的稳定版本。请参阅 InfluxDB v2 文档

Flux 是用于查询和分析数据的 InfluxQL 及其他类似 SQL 的查询语言的替代品。 Flux 使用函数式语言模式,使其强大、灵活,并能够克服 InfluxQL 的许多限制。 本文概述了许多使用 Flux 但 InfluxQL 不可能完成的任务,并提供有关 Flux 与 InfluxQL 的相同性的信息。

通过Flux实现的可能性

连接

InfluxQL 从未支持连接。它们可以使用 TICKscript 来实现,但即使是 TICKscript 的连接能力也有限。Flux 的 join() 函数 允许您从任何桶、任何测量和任何列联合数据,只要每个数据集包含要连接的列。这为真正强大和有用的操作开辟了机会。

dataStream1 = from(bucket: "bucket1")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "network" and
    r._field == "bytes-transferred"
  )

dataStream2 = from(bucket: "bucket1")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "httpd" and
    r._field == "requests-per-sec"
    )

join(
    tables: {d1:dataStream1, d2:dataStream2},
    on: ["_time", "_stop", "_start", "host"]
  )

有关使用 join() 函数的深入讲解,请参见 如何使用 Flux 连接数据


跨测量的数学

能够执行跨测量连接还允许您使用来自不同测量的数据进行计算——这是InfluxData社区高度请求的功能。下面的例子取自两个来自不同测量的数据流, memprocesses,将它们连接起来,然后计算每个运行进程使用的平均内存量:

// Memory used (in bytes)
memUsed = from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "mem" and
    r._field == "used"
  )

// Total processes running
procTotal = from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "processes" and
    r._field == "total"
    )

// Join memory used with total processes and calculate
// the average memory (in MB) used for running processes.
join(
    tables: {mem:memUsed, proc:procTotal},
    on: ["_time", "_stop", "_start", "host"]
  )
  |> map(fn: (r) => ({
    _time: r._time,
    _value: (r._value_mem / r._value_proc) / 1000000
  })
)

按标签排序

InfluxQL 的排序功能非常有限,仅允许您使用 ORDER BY time 子句控制 time 的排序顺序。Flux 的 sort() 函数 根据列的列表对记录进行排序。根据列的类型,记录按字典序、数字或时间顺序排序。

from(bucket:"telegraf/autogen")
  |> range(start:-12h)
  |> filter(fn: (r) =>
    r._measurement == "system" and
    r._field == "uptime"
  )
  |> sort(columns:["region", "host", "_value"])

按任意列分组

InfluxQL 允许你按标签或时间间隔进行分组,但没有其他选项。Flux 允许你按数据集中的任何列进行分组,包括 _value。使用 Flux group() 函数 定义按哪些列对数据进行分组。

from(bucket:"telegraf/autogen")
  |> range(start:-12h)
  |> filter(fn: (r) => r._measurement == "system" and r._field == "uptime" )
  |> group(columns:["host", "_value"])

按日历月和年份划分窗口

InfluxQL 不支持按日历月份和年份对数据进行窗口化,因为它们的长度不一致。 Flux 支持日历月份和年份持续时间单位 (1mo, 1y),并允许您 按日历月份和年份对数据进行窗口化和聚合。

from(bucket:"telegraf/autogen")
  |> range(start:-1y)
  |> filter(fn: (r) => r._measurement == "mem" and r._field == "used_percent" )
  |> aggregateWindow(every: 1mo, fn: mean)

处理多个数据源

InfluxQL 只能查询存储在 InfluxDB 中的数据。
Flux 可以从其他数据源查询数据,例如 CSV、PostgreSQL、MySQL、Google BigTable 等。
将这些数据与 InfluxDB 中的数据连接,以丰富查询结果。

import "csv"
import "sql"

csvData = csv.from(csv: rawCSV)
sqlData = sql.from(
  driverName: "postgres",
  dataSourceName: "postgresql://user:password@localhost",
  query:"SELECT * FROM example_table"
)
data = from(bucket: "telegraf/autogen")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "sensor")

auxData = join(tables: {csv: csvData, sql: sqlData}, on: ["sensor_id"])
enrichedData = join(tables: {data: data, aux: auxData}, on: ["sensor_id"])

enrichedData
  |> yield(name: "enriched_data")

有关查询SQL数据的深入指南,请参见查询SQL数据源


类似DatePart的查询

InfluxQL不支持仅在特定小时返回结果的类似DatePart的查询。 Flux hourSelection 函数 仅返回指定小时范围内的时间值的数据。

from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r.cpu == "cpu-total"
  )
  |> hourSelection(start: 9, stop: 17)

数据透视

在 InfluxQL 中从未支持数据表的透视。 Flux pivot() 函数 提供了通过指定 rowKeycolumnKeyvalueColumn 参数来透视数据表的能力。

from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "cpu" and
    r.cpu == "cpu-total"
  )
  |> pivot(
    rowKey:["_time"],
    columnKey: ["_field"],
    valueColumn: "_value"
  )

直方图

生成直方图的能力是InfluxQL中一个备受期待的功能,但从未得到支持。 Flux的 histogram() function 使用输入 数据生成一个累积直方图,未来将支持其他直方图类型。

from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "mem" and
    r._field == "used_percent"
  )
  |> histogram(
    buckets: [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]
  )

有关使用Flux创建累积直方图的示例,请参阅创建直方图


协方差

Flux提供用于简单协方差计算的函数。 covariance()函数 计算两列之间的协方差,而cov()函数 计算两个数据流之间的协方差。

两列之间的协方差
from(bucket: "telegraf/autogen")
  |> range(start:-5m)
  |> covariance(columns: ["x", "y"])
两组数据之间的协方差
table1 = from(bucket: "telegraf/autogen")
  |> range(start: -15m)
  |> filter(fn: (r) =>
    r._measurement == "measurement_1"
  )

table2 = from(bucket: "telegraf/autogen")
  |> range(start: -15m)
  |> filter(fn: (r) =>
    r._measurement == "measurement_2"
  )

cov(x: table1, y: table2, on: ["_time", "_field"])

将布尔值转换为整数

InfluxQL 支持类型转换,但仅限于数字数据类型(浮点数到整数及其反向转换)。Flux 类型转换函数 提供更广泛的类型转换支持,并允许您执行一些长期请求的操作,例如将布尔值转换为整数。

将布尔字段值转换为整数
from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "m" and
    r._field == "bool_field"
  )
  |> toInt()

字符串操作和数据整形

InfluxQL在查询数据时不支持字符串操作。 Flux Strings package是一个对字符串数据进行操作的函数集合。 与map()函数结合使用时, 字符串包中的函数允许进行字符串清理和规范化等操作。

import "strings"

from(bucket: "telegraf/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) =>
    r._measurement == "weather" and
    r._field == "temp"
  )
  |> map(fn: (r) => ({
    r with
    location: strings.toTitle(v: r.location),
    sensor: strings.replaceAll(v: r.sensor, t: " ", u: "-"),
    status: strings.substring(v: r.status, start: 0, end: 8)
  }))

处理地理时间数据

InfluxQL 并不提供处理地理时间数据的功能。 Flux Geo package 是一组函数的集合, 允许您对地理时间数据进行整形、过滤和分组。

import "experimental/geo"

from(bucket: "geo/autogen")
  |> range(start: -1w)
  |> filter(fn: (r) => r._measurement == "taxi")
  |> geo.shapeData(latField: "latitude", lonField: "longitude", level: 20)
  |> geo.filterRows(
    region: {lat: 40.69335938, lon: -73.30078125, radius: 20.0},
    strict: true
  )
  |> geo.asTracks(groupBy: ["fare-id"])

InfluxQL 和 Flux 对等

Flux 正在朝着与 InfluxQL 完全对等的方向努力,并为此添加了新的函数。下面的表格显示了 InfluxQL 语句、子句和函数以及它们等价的 Flux 函数。

有关Flux函数的完整列表,查看所有Flux函数

InfluxQL 和 Flux 的相等性

InfluxQLFlux函数
选择filter()
WHEREfilter(), range()
分组依据group()
插入到到() *
按顺序排列sort()
限制limit()
SLIMIT
OFFSET
SOFFSET
SHOW DATABASESbuckets()
显示测量值v1.measurements
显示字段键keys()
显示保留策略buckets()
显示标签键v1.tagKeys(), v1.measurementTagKeys()
显示标签值v1.tagValues(), v1.measurementTagValues()
显示系列
创建数据库
删除数据库
删除序列
删除
删除测量
删除分片
创建保留策略
更改保留策略
删除保留策略
计数count()
唯一distinct()
积分integral()
平均值mean()
中位数median()
模式mode()
SPREADspread()
STDDEVstddev()
SUMsum()
底部bottom()
首个first()
最后last()
最大值max()
最小值min()
百分位数quantile()
样本sample()
顶部top()
绝对值math.abs()
反余弦math.acos()
ASINmath.asin()
反正切math.atan()
ATAN2math.atan2()
向上取整math.ceil()
COSmath.cos()
CUMULATIVE_SUMcumulativeSum()
导数derivative()
差异difference()
已耗时elapsed()
EXPmath.exp()
向下取整math.floor()
直方图histogram()
LNmath.log()
日志math.logb()
LOG2math.log2()
LOG10math.log10()
移动平均movingAverage()
非负导数derivative(nonNegative:true)
非负差difference(nonNegative:true)
POWmath.pow()
ROUNDmath.round()
SINmath.sin()
SQRTmath.sqrt()
TANmath.tan()
HOLT_WINTERSholtWinters()
CHANDE_MOMENTUM_OSCILLATORchandeMomentumOscillator()
指数移动平均exponentialMovingAverage()
双指数移动平均doubleEMA()
KAUFMANS_EFFICIENCY_RATIOkaufmansER()
KAUFMANS_ADAPTIVE_MOVING_AVERAGEkaufmansAMA()
三重指数移动平均tripleEMA()
三重指数导数tripleExponentialDerivative()
相对强弱指数relativeStrengthIndex()

* to() 函数仅写入 InfluxDB 2.0。



Flux的未来

Flux 正在进入维护模式。您可以像现在一样继续使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开Alpha测试

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

我们将发布两个产品作为测试版的一部分。

InfluxDB 3 核心,是我们新的开源产品。 它是一个用于时间序列和事件数据的实时数据引擎。 InfluxDB 3 企业版是建立在核心基础之上的商业版本,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始的更多信息,请查看: