使用 SQL 操作 Dask 数据框
内容
实时笔记
你可以在 live session 中运行此笔记本,或查看 Github 上的内容。
使用 SQL 操作 Dask 数据框¶
Dask-SQL 是一个开源项目和 Python 包,利用 Apache Calcite 为 Dask 数据帧操作提供 SQL 前端,使 SQL 用户能够利用 Dask 的分布式能力,而无需深入了解数据帧 API。
[ ]:
! pip install dask-sql
设置一个 Dask 集群¶
设置一个 Dask 集群 是可选的,但它可以通过让我们访问 GPU 上的 Dask 工作者、远程机器、常见的云提供商等,显著扩展我们分布式计算的选项。此外,将我们的集群连接到 Dask 客户端 将使我们能够访问一个仪表板,该仪表板可用于监控活动计算的进度和诊断问题。
对于这个笔记本,我们将创建一个本地集群并将其连接到客户端。一旦客户端创建完成,其关联的仪表板的链接将会出现,可以在后续的计算过程中查看。
[ ]:
from dask.distributed import Client
client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')
client
创建一个上下文¶
dask_sql.Context
是 Python 中相当于 SQL 数据库的类,作为注册 SQL 查询中使用的所有表和函数的接口,以及执行查询本身。在典型用法中,一个 Context
对象在 Python 脚本或笔记本的整个生命周期中创建并使用。
[ ]:
from dask_sql import Context
c = Context()
加载并注册数据¶
一旦创建了 Context
,有多种方式在其中注册表。最简单的方法是通过 create_table
方法,该方法接受多种输入类型,Dask-SQL 随后使用这些输入类型来推断表的创建方法。支持的输入类型包括:
Dask / Pandas 类似的数据框
本地或远程数据集的字符串位置
通过 PyHive 或 SQLAlchemy 提供的 Apache Hive 表
输入类型也可以通过提供 format
来明确指定。在注册时,可以通过传递 persist=True
将表可选地持久化到内存中,这可以大大加快对同一张表的重复查询速度,但代价是将整个表加载到内存中。更多信息,请参见 数据加载和输入。
[ ]:
import pandas as pd
from dask.datasets import timeseries
# register and persist a dask table
ddf = timeseries()
c.create_table("dask", ddf, persist=True)
# register a pandas table (implicitly converted to a dask table)
df = pd.DataFrame({"a": [1, 2, 3]})
c.create_table("pandas", df)
# register a table from local storage; kwargs are passed on to the underlying table creation method
c.create_table(
"local",
"surveys/data/2021-user-survey-results.csv.gz",
format="csv",
parse_dates=['Timestamp'],
blocksize=None
)
表格也可以通过 SQL CREATE TABLE WITH
或 CREATE TABLE AS
语句,使用 sql
方法来注册。
[ ]:
# replace our table from local storage
c.sql("""
CREATE OR REPLACE TABLE
"local"
WITH (
location = 'surveys/data/2021-user-survey-results.csv.gz',
format = 'csv',
parse_dates = ARRAY [ 'Timestamp' ]
)
""")
# create a new table from a SQL query
c.sql("""
CREATE TABLE filtered AS (
SELECT id, name FROM dask WHERE name = 'Zelda'
)
""")
所有已注册的表都可以通过 SHOW TABLES
语句列出。
[ ]:
c.sql("SHOW TABLES FROM root").compute()
Dask-SQL 目前提供实验性的 GPU 支持,由 RAPIDS 套件的开源 GPU 数据科学库驱动。输入支持目前仅限于 Dask / Pandas 类似的数据框和本地/远程存储中的数据,尽管大多数查询运行无问题,但用户应预期会出现一些错误或未定义行为。要注册一个表并将其标记为在 GPU 上使用,可以在标准的 create_table
调用中传递 gpu=True
,或者其等效的 CREATE TABLE WITH
查询(请注意,这需要 cuDF 和 Dask-cuDF)。
# register a dask table for use on GPUs (not possible in this binder)
c.create_table("gpu_dask", ddf, gpu=True)
# load in a table from disk using GPU-accelerated IO operations
c.sql("""
CREATE TABLE
"gpu_local"
WITH (
location = 'surveys/data/2021-user-survey-results.csv.gz',
format = 'csv',
parse_dates = ARRAY [ 'Timestamp' ],
gpu = True
)
""")
查询数据¶
当调用 sql
方法时,Dask-SQL 将查询交给 Apache Calcite 转换为关系代数——本质上是一系列必须执行的 SQL 任务,以获得结果。任何查询的关系代数都可以使用 explain
方法直接查看。
[ ]:
print(c.explain("SELECT AVG(x) FROM dask"))
从这里开始,这个关系代数被转换成一个Dask计算图,最终返回(或在 CREATE TABLE
语句的情况下,隐式赋值)一个Dask数据框。
[ ]:
c.sql("SELECT AVG(x) FROM dask")
Dask 数据帧是惰性的,这意味着在它们创建时,其依赖的任务尚未执行。要实际执行这些任务并获得结果,我们必须调用 compute
。
[ ]:
c.sql("SELECT AVG(x) FROM dask").compute()
查看仪表盘,我们可以看到执行此查询已触发了一些 Dask 计算。
因为查询的返回值是一个 Dask 数据框,所以也可以使用 Dask 的数据框 API 对其进行后续操作。如果我们想在一个数据框上执行一些通过 Dask 无法实现的复杂操作,然后通过数据框 API 轻松表达一些更简单的操作,这可能会很有用。
[ ]:
# perform a multi-column sort that isn't possible in Dask
res = c.sql("""
SELECT * FROM dask ORDER BY name ASC, id DESC, x ASC
""")
# now do some follow groupby aggregations
res.groupby("name").agg({"x": "sum", "y": "mean"}).compute()
自定义函数和聚合¶
当标准SQL功能不足时,可以注册自定义函数以在查询中使用。这些函数可以分为以下几类之一:
按列函数
逐行函数
聚合
按列函数¶
按列函数可以接受列或字面值作为输入,并返回一个长度相同的列。按列函数可以通过 register_function
方法在 Context
中注册。
[ ]:
import numpy as np
def f(x):
return x ** 2
c.register_function(f, "f", [("x", np.float64)], np.float64)
函数注册需要以下输入:
可调用函数
函数在查询中引用的名称
一个元组列表,表示输入变量及其各自类型,可以是 Pandas 或 NumPy 类型
输出列的类型
一旦函数被注册,它就可以像其他标准SQL函数一样被调用。
[ ]:
c.sql("SELECT F(x) FROM dask").compute()
逐行函数¶
在某些情况下,编写一个处理类似字典的 row
对象的自定义函数可能会更容易——也就是所谓的行函数。这些函数也可以通过传递 row_udf=True
使用 register_function
进行注册,并以与列函数相同的方式使用。
[ ]:
def g(row):
if row["x"] > row["y"]:
return row["x"] - row["y"]
return row["y"] - row["x"]
c.register_function(g, "g", [("x", np.float64), ("y", np.float64)], np.float64, row_udf=True)
c.sql("SELECT G(x, y) FROM dask").compute()
需要注意的是,与直接使用指定列和字面量作为输入调用的列式函数不同,行式函数是通过 apply
调用的,其性能可能因底层数据框库(例如 Pandas、cuDF)和函数本身而不可预测。
聚合¶
聚合操作以单个列作为输入并返回单个值 - 因此,它们只能用于减少 GROUP BY
查询的结果。聚合可以通过 register_aggregation
方法注册,该方法在功能上类似于 register_function
,但接受 Dask Aggregation 作为输入,而不是可调用函数。
[ ]:
import dask.dataframe as dd
my_sum = dd.Aggregation("my_sum", lambda x: x.sum(), lambda x: x.sum())
c.register_aggregation(my_sum, "my_sum", [("x", np.float64)], np.float64)
c.sql("SELECT MY_SUM(x) FROM dask").compute()
SQL中的机器学习¶
Dask-SQL 支持模型训练和预测,使得机器学习工作流可以通过 Python 和 SQL 的灵活组合来实现。模型可以通过 register_model
方法或 CREATE MODEL
语句在 Context
中注册。
[ ]:
from dask_ml.linear_model import LinearRegression
from sklearn.ensemble import GradientBoostingClassifier
# create a dask-ml model and train it
model = GradientBoostingClassifier()
data = c.sql("SELECT x, y, x * y > 0 AS target FROM dask LIMIT 50")
model.fit(data[["x", "y"]], data["target"])
# register this model in the context
c.register_model("python_model", model, training_columns=["x", "y"])
# create and train a model directly from SQL
c.sql("""
CREATE MODEL sql_model WITH (
model_class = 'sklearn.ensemble.GradientBoostingClassifier',
wrap_predict = True,
target_column = 'target'
) AS (
SELECT x, y, x * y > 0 AS target
FROM dask
LIMIT 50
)
""")
注册的模型必须遵循 scikit-learn 接口,通过实现一个 predict
方法。与表格一样,所有注册的模型都可以通过 SHOW MODEL
语句列出。
[ ]:
c.sql("SHOW MODELS").compute()
从这里开始,可以使用 PREDICT
关键字作为 SELECT
查询的一部分,利用模型进行预测。
[ ]:
c.sql("""
SELECT * FROM PREDICT (
MODEL sql_model,
SELECT x, y, x * y > 0 AS actual FROM dask
OFFSET 50
)
""").compute()
[ ]: