查询计划
查询计划是一系列步骤,InfluxDB 3 查询器 构思并执行这些步骤以计算查询的结果。查询器使用 DataFusion 和 Arrow 来构建和执行查询计划,这些计划调用 DataFusion 和 InfluxDB 特定的操作符,从 对象存储 和 数据摄取器 中读取数据,并应用查询转换,如去重、过滤、聚合、合并、投影和排序,以计算最终结果。
像许多其他数据库一样,Querier包含一个查询优化器。 在解析传入的查询后,Querier构建一个逻辑计划——一系列高层次步骤,如扫描、过滤和排序,满足查询的要求。 根据逻辑计划,Querier然后构建最佳物理计划,以在最短的时间内计算出正确的结果。 该计划利用Ingester的数据分区,以并行化计划操作并在执行计划之前修剪不必要的数据。 Querier还应用了谓词和投影下推的常见技术,以尽早修剪数据。
显示语法
逻辑 和 物理查询计划 在 树语法 中表示(例如,在 EXPLAIN 报告中)。
- 每个计划被表示为一个反向树,组成部分为节点。
- 一个父节点在等待其子节点的输出。
- 数据从树的最底部的最内层节点流向顶部的最外层根节点。
示例逻辑和物理计划
以下查询生成一个 EXPLAIN 报告,其中包含逻辑计划和物理计划:
EXPLAIN SELECT city, min_temp, time FROM h2o ORDER BY city ASC, time DESC;
输出如下:
图 1. EXPLAIN 报告
| plan_type | plan |
+---------------+--------------------------------------------------------------------------+
| logical_plan | Sort: h2o.city ASC NULLS LAST, h2o.time DESC NULLS FIRST |
| | TableScan: h2o projection=[city, min_temp, time] |
| physical_plan | SortPreservingMergeExec: [city@0 ASC NULLS LAST,time@2 DESC] |
| | UnionExec |
| | SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC] |
| | ParquetExec: file_groups={...}, projection=[city, min_temp, time] |
| | SortExec: expr=[city@0 ASC NULLS LAST,time@2 DESC] |
| | ParquetExec: file_groups={...}, projection=[city, min_temp, time] |
| | |
来自 EXPLAIN SELECT city, min_temp, time FROM h2o ORDER BY city ASC, time DESC; 的输出
图 1 物理计划中的叶子节点是并行 ParquetExec 节点:
ParquetExec: file_groups={...}, projection=[city, min_temp, time]
...
ParquetExec: file_groups={...}, projection=[city, min_temp, time]
数据流
A 物理计划 节点代表 ExecutionPlan 的特定实现,该实现接收输入流,应用过滤和排序的表达式,然后将输出流传递给其父节点。
下图显示了图 1 物理计划中ExecutionPlan 节点的数据流和顺序:
SortPreservingMergeExecUnionExecSortExecParquetExecSortExecParquetExecInfluxDB Cloud Serverless 包括以下计划表达式:
逻辑计划
查询的逻辑计划:
- 是一种高级计划,表达了查询的“意图”和计算结果所需的步骤。
- 需要有关数据架构的信息
- 独立于 物理执行、集群配置、数据源(Ingester或对象存储)或数据的组织或分区方式
- 显示为DataFusion
LogicalPlan节点的树
LogicalPlan 节点
InfluxDB Cloud Serverless 逻辑计划树中的每个节点表示一个 LogicalPlan 实现,该实现接收从查询中提取的条件,并应用关系运算符和优化,将输入数据转换为输出表。
以下是一些在InfluxDB逻辑计划中使用的 LogicalPlan 节点。
TableScan
Tablescan 通过引用或上下文从表提供者检索行。
Projection
Projection 在输入上评估任意表达式列表;相当于 SQL SELECT 语句中的表达式列表。
Filter
Filter 从输入中筛选出不满足指定表达式的行;相当于SQL中的WHERE子句,带有谓词表达式。
Sort
Sort 按照一系列排序表达式对输入进行排序;用于实现 SQL ORDER BY。
有关 LogicalPlan 实现的详细信息和列表,请参阅 Enum datafusion::logical_expr::LogicalPlan 变体 在 DataFusion 文档中。
物理计划
查询的物理计划,或 执行计划:
- 是一个优化的计划,它来源于逻辑计划,并包含查询执行的低级步骤。
- 考虑集群配置(例如,CPU和内存分配)和数据组织(例如:分区、文件数量,以及文件是否重叠)——例如:
- 如果在不同配置的不同集群上运行相同的数据和相同的查询,则每个集群可能会为查询生成不同的物理计划。
- 如果在同一集群上不同时间运行相同的查询,则物理计划可能每次都不同,这取决于查询时的数据。
- 如果使用
ANALYZE生成,则包括在查询执行期间采样的运行时指标 - 显示为
ExecutionPlan节点的树
ExecutionPlan 节点
InfluxDB Cloud Serverless 物理计划中的每个节点代表对特定实现的DataFusion ExecutionPlan的调用,该实现接收输入数据、查询条件表达式和输出模式。
以下是一些在InfluxDB物理计划中使用的ExecutionPlan节点。
DeduplicateExec
InfluxDB DeduplicateExec 接收一个按 sort_key 排序的 RecordBatch 输入流,并应用特定于 InfluxDB 的去重逻辑。输出取决于具有相同键的输入行的顺序。
EmptyExec
DataFusion EmptyExec 是一个空关系的执行计划,表示在查询的时间范围内该表不包含数据。
FilterExec
执行计划为 Filter LogicalPlan。
DataFusion FilterExec 对所有输入批次评估一个布尔谓词,以确定哪些行包含在输出批次中。
ParquetExec
DataFusion ParquetExec 扫描一个或多个 Parquet 分区。
ParquetExec 表达式
file_groups
A file group 是一个要扫描的文件列表。
文件通过路径引用:
1/1/b862a7e9b.../243db601-....parquet1/1/b862a7e9b.../f5fb7c7d-....parquet
在 InfluxDB 3 中,路径结构表示数据是如何组织的。
路径具有以下结构:
<namespace_id>/<table_id>/<partition_hash_id>/<uuid_of_the_file>.parquet
1 / 1 /b862a7e9b329ee6a4.../243db601-f3f1-4....parquet
namespace_id: 正在查询的命名空间(数据库)table_id: 正在查询的表(测量)partition_hash_id: 该文件所属的分区。 你可以计算分区 ID 以找出查询读取了多少个分区。uuid_of_the_file: 文件标识符。
ParquetExec 并行处理组,并顺序读取每个组中的文件。
projection
projection 列出了查询计划需要读取的表列以执行查询。 参数名称 projection 指的是 投影下推,即过滤列的动作。
考虑以下包含多个列的示例数据:
h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 600
| 表格 | 状态 | 城市 | 最小温度 | 最大温度 | 面积 | 时间 |
|---|---|---|---|---|---|---|
| h2o | CA | SF | 68.4 | 85.7 | 500u | 600 |
然而,以下SQL查询仅指定了三列 (city, state, 和 time):
SELECT city, count(1)
FROM h2o
WHERE time >= to_timestamp(200) AND time < to_timestamp(700)
AND state = 'MA'
GROUP BY city
ORDER BY city ASC;
在处理查询时,Querier 指定了投影中三个必需的列,并且投影被“向下推送”到叶节点——未指定的列在查询执行期间尽早被剪裁。
projection=[city, state, time]
output_ordering
output_ordering 指定输出的排序顺序。
如果输出应该是有序的,并且Querier知道顺序,则查询器指定output_ordering。
在将数据存储到Parquet文件时,InfluxDB对数据进行排序,以提高存储压缩和查询效率,计划程序尽可能长时间地保持该顺序。一般来说,output_ordering值是ParquetExec接收的存储数据的排序(或排序的子集)。
根据设计, RecordBatchesExec 数据是未排序的。
在以下示例中,查询规划器指定输出排序顺序 state ASC, city ASC, time ASC,:
output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC]
predicate
predicate 是查询中指定的数据过滤器,用于在扫描Parquet文件时进行行过滤。
例如,给出以下SQL查询:
SELECT city, count(1)
FROM h2o
WHERE time >= to_timestamp(200) AND time < to_timestamp(700)
AND state = 'MA'
GROUP BY city
ORDER BY city ASC;
predicate 值是 WHERE 语句中的布尔表达式:
predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA
pruning predicate
pruning_predicate 是从 predicate 值创建的,用于从选择的分区中修剪数据和文件。
例如,给定从SQL中解析的以下predicate:
predicate=time@5 >= 200 AND time@5 < 700 AND state@4 = MA,
查询器创建了以下 pruning_predicate:
pruning_predicate=time_max@0 >= 200 AND time_min@1 < 700 AND state_min@2 <= MA AND MA <= state_max@3
默认情况下,过滤文件的依据是 time。
在物理计划生成之前,额外的 partition pruning 步骤使用分区列上的谓词来修剪分区。
ProjectionExec
DataFusion ProjectionExec 评估输入上的任意表达式列表;Projection LogicalPlan 的执行计划。
RecordBatchesExec
InfluxDB RecordBatchesExec 实现从 InfluxDB 3 Ingester 检索和扫描最近写入的、尚未持久化的数据。
在生成计划时,Querier 将查询条件,如数据库(桶)、表(测量)和列,发送给 Ingester 以检索尚未持久化到 Parquet 文件的数据。如果 Ingester 有满足条件的数据(块大小非零),则计划包括 RecordBatchesExec。
RecordBatchesExec 属性
chunks
chunks 是来自 Ingester 的数据块数量。通常是一个 (1),但也可以是很多。
projection
projection 指定要读取和输出的列的列表。
__chunk_order 在列的列表中是一个由InfluxDB生成的列,用于保持块和文件的顺序以便去重——例如:
projection=[__chunk_order, city, state, time]
有关其他 DataFusion ExecutionPlan 实现的详细信息,请参阅 Struct datafusion::datasource::physical_plan 实现者 在 DataFusion 文档中。
SortExec
Sort LogicalPlan 的执行计划。
DataFusion SortExec 支持对大于内存管理器分配的内存的数据集进行排序,通过溢出到磁盘。
SortPreservingMergeExec
DataFusion SortPreservingMergeExec 接受一个输入执行计划和一组排序表达式,并且在确保输入计划的每个分区都是按照这些排序表达式排序的情况下,生成一个相对于这些排序表达式排序的单一分区。
UnionExec
DataFusion UnionExec 是用于合并具有相同架构的多个输入的 UNION ALL 执行计划。UnionExec 连接分区,并且不在分区内或跨分区混合或复制数据。
重叠数据和去重
重叠数据指的是时间范围(由时间戳表示)相交的文件或批次。如果两个数据块在相同的时间段内都有数据,则这两个数据块重叠。
重叠数据的示例
例如,以下数据块表示写入InfluxDB的线协议:
// Chunk 4: stored parquet file
// - time range: 400-600
// - no duplicates in its own chunk
// - overlaps chunk 3
[
"h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 600",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 600", // duplicates row 3 in chunk 5
"h2o,state=MA,city=Bedford max_temp=80.75,area=742u 400", // overlaps chunk 3
"h2o,state=MA,city=Boston min_temp=65.40,max_temp=82.67 400", // overlaps chunk 3
],
// Chunk 5: Ingester data
// - time range: 550-700
// - overlaps & duplicates data in chunk 4
[
"h2o,state=MA,city=Bedford max_temp=88.75,area=742u 600", // overlaps chunk 4
"h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 650",
"h2o,state=CA,city=SJ min_temp=68.5,max_temp=90.0 600", // duplicates row 2 in chunk 4
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
"h2o,state=MA,city=Boston min_temp=67.4 550", // overlaps chunk 4
]
Chunk 4跨越时间范围400-600,并表示持久化到 对象存储 中的 Parquet 文件的数据。Chunk 5跨越时间范围550-700,并表示来自 Ingester 的尚未持久化数据。- 这些块重叠范围
550-600。
如果在查询时数据重叠,Querier 必须在查询计划中包含去重过程,这使用与Ingester相同的多列排序合并操作符。与使用排序合并操作符的摄取计划相比,查询计划更复杂,并确保数据在去重后流经计划。
由于去重中使用的排序合并操作具有非平凡的执行成本,InfluxDB 3 尝试避免去重的需求。 由于 InfluxDB 组织数据的方式,Parquet 文件从不包含它所存储数据的重复项;只有重叠的数据可能包含重复项。 在压缩过程中,Compactor 对存储的数据进行排序,以减少重叠并优化查询性能。 对于没有重叠的数据,Querier 不需要包含去重过程,查询计划可以进一步分配无重叠数据以进行并行处理。
数据融合查询计划
有关 DataFusion 查询计划和在 InfluxDB 3 中使用的 DataFusion API 的更多信息,请参见以下内容: