dask.dataframe.to_sql

dask.dataframe.to_sql

dask.dataframe.to_sql(df, name: str, uri: str, schema=None, if_exists: str = 'fail', index: bool = True, index_label=None, chunksize=None, dtype=None, method=None, compute=True, parallel=False, engine_kwargs=None)[源代码]

将 Dask 数据框存储到 SQL 表中

基于“meta” DataFrame(并符合调用者的“if_exists”偏好)创建一个空表,然后每个块调用 pd.DataFrame.to_sql(使用 if_exists=”append”)。

SQLAlchemy 支持的数据库 [1] 都支持。可以新建表、追加数据或覆盖数据。

参数
名称str

SQL 表的名称。

uri字符串

数据库连接的完整 sqlalchemy URI

模式str, 可选

指定模式(如果数据库类型支持此功能)。如果为 None,则使用默认模式。

如果存在{‘fail’, ‘replace’, ‘append’}, 默认 ‘fail’

如果表格已经存在,如何处理。

  • fail: 引发 ValueError。

  • replace: 在插入新值之前删除表。

  • append: 向现有表格插入新值。

索引bool, 默认 True

将 DataFrame 索引写为列。使用 index_label 作为表中的列名。

index_labelstr 或序列,默认 None

索引列的列标签。如果给定 None(默认)且 index 为 True,则使用索引名称。如果 DataFrame 使用 MultiIndex,则应给定一个序列。

chunksizeint, 可选

指定每次写入的批次中的行数。默认情况下,所有行将一次性写入。

dtype字典或标量,可选

指定列的数据类型。如果使用字典,键应为列名,值应为 SQLAlchemy 类型或 sqlite3 旧模式的字符串。如果提供标量值,它将应用于所有列。

方法{None, ‘multi’, callable}, 可选

控制使用的SQL插入子句:

  • None : 使用标准的 SQL INSERT 子句(每行一个)。

  • ‘multi’: 在单个 INSERT 子句中传递多个值。

  • 具有签名 (pd_table, conn, keys, data_iter) 的可调用对象。

详细信息和一个示例的可调用实现可以在 插入方法 部分找到。

计算bool, 默认 True

当为真时,调用 dask.compute 并执行加载到 SQL 中;否则,返回一个 Dask 对象(或在 parallel=True 时返回每个块对象的数组)

并行bool, 默认 False

当为真时,每个块会并发地将自己追加到数据库表中。这可能导致数据库行的顺序与源DataFrame的相应行不同。当为假时,按顺序将每个块加载到SQL数据库中。

engine_kwargs字典或无

sqlalchemy 的特定数据库引擎参数

Raises
ValueError

当表格已经存在且 if_exists 为 ‘fail’(默认值)时。

参见

read_sql

从表中读取一个 DataFrame。

注释

如果数据库支持,时区感知的日期时间列将使用 SQLAlchemy 写为 带时区的Timestamp 类型。否则,日期时间将存储为原始时区本地的无时区时间戳。

0.24.0 新版功能.

参考文献

1

https://docs.sqlalchemy.org

2

https://www.python.org/dev/peps/pep-0249/

示例

从零开始创建一个包含4行的表格。

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> df = pd.DataFrame([ {'i':i, 's':str(i)*2 } for i in range(4) ])
>>> ddf = dd.from_pandas(df, npartitions=2)
>>> ddf  
Dask DataFrame Structure:
                   i       s
npartitions=2
0              int64  object
2                ...     ...
3                ...     ...
Dask Name: from_pandas, 2 tasks
>>> from dask.utils import tmpfile
>>> from sqlalchemy import create_engine, text
>>> with tmpfile() as f:
...     db = 'sqlite:///%s' %f
...     ddf.to_sql('test', db)
...     engine = create_engine(db, echo=False)
...     with engine.connect() as conn:
...         result = conn.execute(text("SELECT * FROM test")).fetchall()
>>> result
[(0, 0, '00'), (1, 1, '11'), (2, 2, '22'), (3, 3, '33')]