合并数据
使用 Flux join 包 根据公共值连接两个数据集。了解如何使用以下连接方法连接两个数据集:
何时使用连接包
我们推荐使用 join 包来连接大部分模式不同或来自两个独立数据源的流。如果您正在连接来自相同数据源且模式相同的数据,使用 union() 和 pivot() 来合并数据可能会更加高效。
欲了解更多信息,请参阅 何时使用并集和透视而不是连接函数。
连接函数的工作原理
join 函数将 两个 表流根据每个输入流中的共同值连接在一起。
输入流
每个输入流被分配到 left 或 right 参数中。输入流可以从任何有效的数据源定义。有关更多信息,请参见:
- 查询数据源
- 使用
array.from()定义临时表
数据要求
要连接数据,每个输入流必须具有以下内容:
一个或多个具有共同值以进行连接的列.
列的标签不需要完全相同,但它们需要具有可比较的值。相同的 组键.
在join包中的函数使用组键快速确定每个输入流中的哪些表应该配对并进行连接操作的评估。
两个输入流应该具有相同的组键。 如果它们不相同,您的连接操作可能找不到任何匹配的表,并且会 返回意外输出。 如果您的输入流的组键不相同,请使用group()在将它们连接在一起之前重新分组每个输入流。只有具有相同 group key instance 的表才会被连接。
连接谓词函数(on)
join 包的函数需要 on 参数来比较来自每个输入流的值(由 l(左)和 r(右)表示),并返回 true 或 false。返回 true 的行将被连接。这个参数是一个 谓词函数。
(l, r) => l.column == r.column
连接输出函数(作为)
join 包函数 (不包括 join.time())
需要 as 参数来定义连接的输出模式。
as 参数使用来自连接行的值返回一个新记录——左侧 (l) 和右侧 (r)。
(l, r) => ({l with name: r.name, location: r.location})
请勿修改分组键列
不要修改组键列。as 函数必须返回与两个输入流相同的组键,以成功执行连接。
执行连接操作
该join包支持以下连接类型和特殊用例:
执行内连接
使用 join.inner() 进行两个数据流的内连接。内连接会删除两个输入流中没有匹配行的任何行。
import "join"
left = from(bucket: "example-bucket-1") |> //...
right = from(bucket: "example-bucket-2") |> //...
join.inner(
left: left,
right: right,
on: (l, r) => l.column == r.column,
as: (l, r) => ({l with name: r.name, location: r.location}),
)
执行左外连接
使用 join.left() 对两个数据流执行外左连接。左连接为每个在 left 数据流中的行输出一行,并从 right 数据流中匹配数据。如果在 right 数据流中没有匹配的数据,非分组键列的值将是 null。
import "join"
left = from(bucket: "example-bucket-1") |> //...
right = from(bucket: "example-bucket-2") |> //...
join.left(
left: left,
right: right,
on: (l, r) => l.column == r.column,
as: (l, r) => ({l with name: r.name, location: r.location}),
)
执行右外连接
使用 join.right() 对两个数据流进行右外连接。右连接为每个 右 数据流中的行输出一行,匹配来自 左 数据流的数据。如果 左 数据流中没有匹配的数据,则来自 左 数据流的非分组键列的值为 null。
import "join"
left = from(bucket: "example-bucket-1") |> //...
right = from(bucket: "example-bucket-2") |> //...
join.right(
left: left,
right: right,
on: (l, r) => l.column == r.column,
as: (l, r) => ({r with name: l.name, location: l.location}),
)
执行全外连接
使用 join.full() 执行两个数据流的完全外连接。完全外连接为左和右输入流中的所有行输出一行,并根据 on 谓词连接匹配的行。
import "join"
left = from(bucket: "example-bucket-1") |> //...
right = from(bucket: "example-bucket-2") |> //...
join.full(
left: left,
right: right,
on: (l, r) => l.id== r.id,
as: (l, r) => {
id = if exists l.id then l.id else r.id
return {name: l.name, location: r.location, id: id}
},
)
按时间连接
使用 join.time() 根据 _time 列中的时间值连接两个数据流。这种类型的连接操作在连接两条 时间序列数据 时很常见。
import "join"
left = from(bucket: "example-bucket-1") |> //...
right = from(bucket: "example-bucket-2") |> //...
join.time(
left: left,
right: right,
as: (l, r) => ({l with field1: l._value, field2: r._value_}),
)
故障排除连接操作
有关使用 join 包时出现意外行为和错误的信息,请参阅 Troubleshoot join operations。