实时笔记

你可以在 live session Binder 中运行此笔记本,或查看 Github 上的内容。

使用 SQL 操作 Dask 数据框

Dask-SQL 是一个开源项目和 Python 包,利用 Apache CalciteDask 数据帧操作提供 SQL 前端,使 SQL 用户能够利用 Dask 的分布式能力,而无需深入了解数据帧 API。

[ ]:
! pip install dask-sql

设置一个 Dask 集群

设置一个 Dask 集群 是可选的,但它可以通过让我们访问 GPU 上的 Dask 工作者、远程机器、常见的云提供商等,显著扩展我们分布式计算的选项。此外,将我们的集群连接到 Dask 客户端 将使我们能够访问一个仪表板,该仪表板可用于监控活动计算的进度和诊断问题。

对于这个笔记本,我们将创建一个本地集群并将其连接到客户端。一旦客户端创建完成,其关联的仪表板的链接将会出现,可以在后续的计算过程中查看。

[ ]:
from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')
client

创建一个上下文

dask_sql.Context 是 Python 中相当于 SQL 数据库的类,作为注册 SQL 查询中使用的所有表和函数的接口,以及执行查询本身。在典型用法中,一个 Context 对象在 Python 脚本或笔记本的整个生命周期中创建并使用。

[ ]:
from dask_sql import Context
c = Context()

加载并注册数据

一旦创建了 Context ,有多种方式在其中注册表。最简单的方法是通过 create_table 方法,该方法接受多种输入类型,Dask-SQL 随后使用这些输入类型来推断表的创建方法。支持的输入类型包括:

输入类型也可以通过提供 format 来明确指定。在注册时,可以通过传递 persist=True 将表可选地持久化到内存中,这可以大大加快对同一张表的重复查询速度,但代价是将整个表加载到内存中。更多信息,请参见 数据加载和输入

[ ]:
import pandas as pd
from dask.datasets import timeseries

# register and persist a dask table
ddf = timeseries()
c.create_table("dask", ddf, persist=True)

# register a pandas table (implicitly converted to a dask table)
df = pd.DataFrame({"a": [1, 2, 3]})
c.create_table("pandas", df)

# register a table from local storage; kwargs are passed on to the underlying table creation method
c.create_table(
    "local",
    "surveys/data/2021-user-survey-results.csv.gz",
    format="csv",
    parse_dates=['Timestamp'],
    blocksize=None
)

表格也可以通过 SQL CREATE TABLE WITHCREATE TABLE AS 语句,使用 sql 方法来注册。

[ ]:
# replace our table from local storage
c.sql("""
    CREATE OR REPLACE TABLE
        "local"
    WITH (
        location = 'surveys/data/2021-user-survey-results.csv.gz',
        format = 'csv',
        parse_dates = ARRAY [ 'Timestamp' ]
    )
""")

# create a new table from a SQL query
c.sql("""
    CREATE TABLE filtered AS (
        SELECT id, name FROM dask WHERE name = 'Zelda'
    )
""")

所有已注册的表都可以通过 SHOW TABLES 语句列出。

[ ]:
c.sql("SHOW TABLES FROM root").compute()

Dask-SQL 目前提供实验性的 GPU 支持,由 RAPIDS 套件的开源 GPU 数据科学库驱动。输入支持目前仅限于 Dask / Pandas 类似的数据框和本地/远程存储中的数据,尽管大多数查询运行无问题,但用户应预期会出现一些错误或未定义行为。要注册一个表并将其标记为在 GPU 上使用,可以在标准的 create_table 调用中传递 gpu=True,或者其等效的 CREATE TABLE WITH 查询(请注意,这需要 cuDF 和 Dask-cuDF)。

# register a dask table for use on GPUs (not possible in this binder)
c.create_table("gpu_dask", ddf, gpu=True)

# load in a table from disk using GPU-accelerated IO operations
c.sql("""
    CREATE TABLE
        "gpu_local"
    WITH (
        location = 'surveys/data/2021-user-survey-results.csv.gz',
        format = 'csv',
        parse_dates = ARRAY [ 'Timestamp' ],
        gpu = True
    )
""")

查询数据

当调用 sql 方法时,Dask-SQL 将查询交给 Apache Calcite 转换为关系代数——本质上是一系列必须执行的 SQL 任务,以获得结果。任何查询的关系代数都可以使用 explain 方法直接查看。

[ ]:
print(c.explain("SELECT AVG(x) FROM dask"))

从这里开始,这个关系代数被转换成一个Dask计算图,最终返回(或在 CREATE TABLE 语句的情况下,隐式赋值)一个Dask数据框。

[ ]:
c.sql("SELECT AVG(x) FROM dask")

Dask 数据帧是惰性的,这意味着在它们创建时,其依赖的任务尚未执行。要实际执行这些任务并获得结果,我们必须调用 compute

[ ]:
c.sql("SELECT AVG(x) FROM dask").compute()

查看仪表盘,我们可以看到执行此查询已触发了一些 Dask 计算。

因为查询的返回值是一个 Dask 数据框,所以也可以使用 Dask 的数据框 API 对其进行后续操作。如果我们想在一个数据框上执行一些通过 Dask 无法实现的复杂操作,然后通过数据框 API 轻松表达一些更简单的操作,这可能会很有用。

[ ]:
# perform a multi-column sort that isn't possible in Dask
res = c.sql("""
    SELECT * FROM dask ORDER BY name ASC, id DESC, x ASC
""")

# now do some follow groupby aggregations
res.groupby("name").agg({"x": "sum", "y": "mean"}).compute()

自定义函数和聚合

当标准SQL功能不足时,可以注册自定义函数以在查询中使用。这些函数可以分为以下几类之一:

  • 按列函数

  • 逐行函数

  • 聚合

按列函数

按列函数可以接受列或字面值作为输入,并返回一个长度相同的列。按列函数可以通过 register_function 方法在 Context 中注册。

[ ]:
import numpy as np

def f(x):
    return x ** 2

c.register_function(f, "f", [("x", np.float64)], np.float64)

函数注册需要以下输入:

  • 可调用函数

  • 函数在查询中引用的名称

  • 一个元组列表,表示输入变量及其各自类型,可以是 Pandas 或 NumPy 类型

  • 输出列的类型

一旦函数被注册,它就可以像其他标准SQL函数一样被调用。

[ ]:
c.sql("SELECT F(x) FROM dask").compute()

逐行函数

在某些情况下,编写一个处理类似字典的 row 对象的自定义函数可能会更容易——也就是所谓的行函数。这些函数也可以通过传递 row_udf=True 使用 register_function 进行注册,并以与列函数相同的方式使用。

[ ]:
def g(row):
    if row["x"] > row["y"]:
        return row["x"] - row["y"]
    return row["y"] - row["x"]

c.register_function(g, "g", [("x", np.float64), ("y", np.float64)], np.float64, row_udf=True)

c.sql("SELECT G(x, y) FROM dask").compute()

需要注意的是,与直接使用指定列和字面量作为输入调用的列式函数不同,行式函数是通过 apply 调用的,其性能可能因底层数据框库(例如 Pandas、cuDF)和函数本身而不可预测。

聚合

聚合操作以单个列作为输入并返回单个值 - 因此,它们只能用于减少 GROUP BY 查询的结果。聚合可以通过 register_aggregation 方法注册,该方法在功能上类似于 register_function,但接受 Dask Aggregation 作为输入,而不是可调用函数。

[ ]:
import dask.dataframe as dd

my_sum = dd.Aggregation("my_sum", lambda x: x.sum(), lambda x: x.sum())

c.register_aggregation(my_sum, "my_sum", [("x", np.float64)], np.float64)

c.sql("SELECT MY_SUM(x) FROM dask").compute()

SQL中的机器学习

Dask-SQL 支持模型训练和预测,使得机器学习工作流可以通过 Python 和 SQL 的灵活组合来实现。模型可以通过 register_model 方法或 CREATE MODEL 语句在 Context 中注册。

[ ]:
from dask_ml.linear_model import LinearRegression
from sklearn.ensemble import GradientBoostingClassifier

# create a dask-ml model and train it
model = GradientBoostingClassifier()
data = c.sql("SELECT x, y, x * y > 0 AS target FROM dask LIMIT 50")
model.fit(data[["x", "y"]], data["target"])

# register this model in the context
c.register_model("python_model", model, training_columns=["x", "y"])

# create and train a model directly from SQL
c.sql("""
    CREATE MODEL sql_model WITH (
        model_class = 'sklearn.ensemble.GradientBoostingClassifier',
        wrap_predict = True,
        target_column = 'target'
    ) AS (
        SELECT x, y, x * y > 0 AS target
        FROM dask
        LIMIT 50
    )
""")

注册的模型必须遵循 scikit-learn 接口,通过实现一个 predict 方法。与表格一样,所有注册的模型都可以通过 SHOW MODEL 语句列出。

[ ]:
c.sql("SHOW MODELS").compute()

从这里开始,可以使用 PREDICT 关键字作为 SELECT 查询的一部分,利用模型进行预测。

[ ]:
c.sql("""
    SELECT * FROM PREDICT (
        MODEL sql_model,
        SELECT x, y, x * y > 0 AS actual FROM dask
        OFFSET 50
    )
""").compute()
[ ]: