Dask 数据框和 SQL

SQL 是一种在数据库服务器上执行表格计算的方法。类似的操作可以在 Dask Dataframes 上完成。用户通常希望将两者结合起来。

本文档描述了Dask与SQL数据库之间的连接,并旨在澄清我们经常从用户那里收到的一些问题。

Dask 是否实现了 SQL?

简短的答案是“否”。Dask 没有用于 SQL 查询的解析器或查询计划器。然而,Dask DataFrames 的 Pandas API 在很大程度上是相同的,并且有许多类似于 SQL 操作的功能。关于将 SQL 映射到 Pandas 语法的良好描述可以在 pandas 文档 中找到。

以下包可能值得关注:

  • dask-sql 在 Dask 之上添加了一个 SQL 查询引擎。除了在 CPU 上工作外,它还通过 cuDF 等 RAPIDS 库提供了对 CUDA 支持的 GPU 的实验性支持。

  • FugueSQL 提供了一个统一的接口,用于在各种不同的计算框架上运行SQL代码。将 DaskExecutionEngineDaskSQLExecutionEngine 指定为查询的执行引擎,分别允许它们使用Dask或dask-sql进行计算。

数据库还是Dask?

数据库服务器能够处理表格数据并生成结果,就像 Dask Dataframe 一样。为什么你会选择使用其中一个而不是另一个?

如今,数据库服务器可以是一个分片/分布式系统,能够处理包含数百万行的表。大多数数据库实现都针对行检索和小部分表的(原子)更新进行了优化。为特定类型的查询配置数据库以提高速度可能具有挑战性,但如果所有数据都已经在数据库中,这可能是一个很好的解决方案——特别是如果你对SQL查询计划优化有所了解。SQL实现可以非常高效地分析查询,仅提取表的一小部分进行考虑,当其余部分被条件排除时。

Dask 比数据库灵活得多,并且专门设计用于处理大于内存的数据集,可以并行处理,并且可能分布在集群中。如果你的工作流程不适合SQL,请使用dask。如果你的数据库服务器在处理数据量时遇到困难,dask可能会表现得更好。最好对你的查询进行性能分析(并记住其他资源用户!)。如果你需要从不同来源组合数据,dask可能是你的最佳选择。

你可能会发现 Dask API 比编写 SQL 更容易使用(如果你已经习惯使用 Pandas),并且诊断反馈更有用。这些点可以说是 Dask 的优势。

从SQL加载数据使用read_sql_table或read_sql_query

Dask 允许你使用函数 dask.dataframe.read_sql_table()dask.dataframe.read_sql_query() 从 SQL 表和查询构建数据框,基于 Pandas 版本,共享大多数参数,并使用 SQLAlchemy 来实际处理查询。你可能需要为所选的数据库服务器安装额外的驱动程序包。

由于 Dask 设计用于处理大于内存的数据集,或在集群上分布式运行,以下是与 Pandas 相比需要注意的主要差异

  • Dask 不支持任意文本查询,只支持整个表和 SQLAlchemy sql 表达式

  • con 参数必须是一个 URI字符串,而不是一个SQLAlchemy引擎/连接

  • 分区信息是*必需的*,这可以简单到提供一个索引列参数,或者可以更明确(见下文)

  • 由于分区必须通过索引列进行,因此不使用 chunksize 参数。

如果你需要比这更灵活的东西,或者这个方法对你失败了(例如,在类型推断上),那么跳到下一节。

为什么有差异

Dask 旨在使处理大量数据成为可能,包括可能在集群中分布该处理。对于从 SQL 服务器检索数据,这意味着查询必须是可分区的:每个分区可以独立于其他分区获取,并且不依赖于某些全局状态,并且任务的定义必须是可序列化的,即可以表示为传递给工作者的字节流。

约束意味着我们不能直接接受 SQLAlchemy 引擎或连接对象,因为它们具有无法序列化的内部状态(缓冲区等)。必须使用 URI 字符串,它可以在工作线程上重新创建为新的引擎。同样,我们不能容纳依赖数据库游标内部状态的分块查询;也不能容纳 LIMIT/OFFSET 查询,这些查询不能保证是可重复的,并且涉及在服务器上扫描整个查询(这是非常低效的)。

如果 你的数据量小到不需要 Dask 的核外和/或分布式功能,那么你可能更适合直接使用 Pandas 或 SQLAlchemy。

索引列

我们需要一种方法将单个主查询转换为每个分区的子查询。对于大多数合理的数据库表,应该有一个明显的列可以用于分区——它可能是数值型的,并且肯定应该在数据库中建立索引。后一个条件很重要,因为一旦Dask开始计算,许多同时进行的查询将访问您的服务器。

通过仅为索引参数提供一个列名,您暗示该列是数值型的,Dask 会通过在最小值和最大值之间均匀分割空间,将其分成 npartitions 个区间来猜测一个合理的分区。您还可以提供希望考虑的最大值/最小值,这样 Dask 就不需要查询这些值。或者,您可以让 Dask 获取前几行(默认5行),并使用它们来猜测每行的典型字节数,并基于此来确定分区大小。不用说,对于不常见的同质表,结果会有很大差异。

特定分区

在某些情况下,您可能对如何划分数据有一个很好的想法,例如基于具有有限数量唯一值或类别的列。这使得可以使用字符串列,或任何具有自然顺序的内容,作为索引列,而不仅仅是数值类型。

在这种情况下,您将提供一组特定的 divisions,即每个分区的索引列的起始/结束值。例如,如果某一列恰好包含一个随机十六进制字符串格式的ID,那么您可以指定16个分区。

df = read_sql_table("mytable", divisions=list("0123456789abcdefh"),
                    index_col="hexID")

因此,第一个分区将具有值为 "0" <= hexID < "1" 的ID,即前导字符为 “0”。

SQLAlchemy 表达式

由于我们只发送数据库连接URI,而不是引擎对象,因此我们无法依赖SQLAlchemy的表类推断和ORM来执行查询。然而,我们可以使用“select” sql表达式,这些表达式仅在执行时才被格式化为文本查询。

from sqlalchemy import sql
number = sql.column("number")
name = sql.column("name")
s1 = sql.select([
        number, name, sql.func.length(name).label("lenname")
    ]
    ).select_from(sql.table("test"))
data = read_sql_query(
    s1, db, npartitions=2, index_col=number
)

这里我们还演示了使用函数 length 在服务器端执行操作。请注意,必须为这些操作 标记 ,但只要它也在选定的列集合中,您就可以将其用于索引列。如果用于索引/分区,该列仍应在数据库中建立索引,以提高性能。需要考虑的最重要功能之一是 cast ,用于在数据库中指定输出数据类型或转换,以防 pandas 难以推断数据类型。

你应该注意,SQLAlchemy 表达式需要一些时间来适应,你可以先通过 Pandas 进行练习,只阅读查询的第一小部分,直到看起来正确为止。你可以在 这个 gist 中找到一个更完整的面向对象示例。

从SQL加载,手动方法

如果 read_sql_table 不能满足你的需求,你可以尝试以下方法之一。

从地图功能

通常你对你的数据和服务器的了解比上述通用方法所允许的要多。确实,一些类似数据库的服务器可能根本不受SQLAlchemy支持,或者提供了优化更好的替代API。

如果你已经有了一种从数据库分区获取数据的方法,那么你可以使用 dask.dataframe.from_map() 并以此方式构建数据框。它可能看起来像这样。

import dask.dataframe as dd

def fetch_partition(part):
    conn = establish_connection()
    df = fetch_query(base_query.format(part))
    return df.astype(known_types)

ddf = dd.from_map(fetch_partition,
                  parts,
                  meta=known_types,
                  divisions=div_from_parts(parts))

在你必须为设置与服务器的连接、自己的查询以及将该查询格式化为特定于每个分区的方式提供自己的函数的地方。例如,你可能有范围或带有WHERE子句的特定唯一值。这里的``known_types``用于转换数据框分区并提供一个``meta``,以帮助保持一致性并避免Dask必须提前分析一个分区来猜测列/类型;你可能还想显式设置索引。

通过客户端流式传输

在某些情况下,工作者可能无法访问数据,但客户端可以;或者数据的初始加载时间并不重要,只要数据集随后保存在集群内存中,并且可以用于 dask-dataframe 查询。可以通过从客户端上传数据块来构建数据框:

查看如何完成此操作的完整示例 这里

直接访问数据文件

一些数据库系统,如 Apache Hive,将其数据存储在可以直接访问 Dask 的位置和格式中,例如 S3 或 HDFS 上的 parquet 文件。在您的 SQL 查询会读取整个数据集并将其传递给 Dask 的情况下,从数据库流式传输数据很可能是瓶颈,直接读取源数据文件可能会更快。

查询下推?

如果你基于数据库表定义了一个查询,然后只使用输出的一些列,你可能会期望Dask能够告诉数据库服务器只发送表中的一部分数据。Dask目前无法进行这种“下推”优化,你需要使用SQL表达式语法来修改你的查询。我们未来可能会解决这个问题(GH#6388)。

如果你的数据框的分区定义良好,那么在索引上的选择可能会成功避免读取不相关的分区。