dask.dataframe.to_sql
dask.dataframe.to_sql¶
- dask.dataframe.to_sql(df, name: str, uri: str, schema=None, if_exists: str = 'fail', index: bool = True, index_label=None, chunksize=None, dtype=None, method=None, compute=True, parallel=False, engine_kwargs=None)[源代码]¶
将 Dask 数据框存储到 SQL 表中
基于“meta” DataFrame(并符合调用者的“if_exists”偏好)创建一个空表,然后每个块调用 pd.DataFrame.to_sql(使用 if_exists=”append”)。
SQLAlchemy 支持的数据库 [1] 都支持。可以新建表、追加数据或覆盖数据。
- 参数
- 名称str
SQL 表的名称。
- uri字符串
数据库连接的完整 sqlalchemy URI
- 模式str, 可选
指定模式(如果数据库类型支持此功能)。如果为 None,则使用默认模式。
- 如果存在{‘fail’, ‘replace’, ‘append’}, 默认 ‘fail’
如果表格已经存在,如何处理。
fail: 引发 ValueError。
replace: 在插入新值之前删除表。
append: 向现有表格插入新值。
- 索引bool, 默认 True
将 DataFrame 索引写为列。使用 index_label 作为表中的列名。
- index_labelstr 或序列,默认 None
索引列的列标签。如果给定 None(默认)且 index 为 True,则使用索引名称。如果 DataFrame 使用 MultiIndex,则应给定一个序列。
- chunksizeint, 可选
指定每次写入的批次中的行数。默认情况下,所有行将一次性写入。
- dtype字典或标量,可选
指定列的数据类型。如果使用字典,键应为列名,值应为 SQLAlchemy 类型或 sqlite3 旧模式的字符串。如果提供标量值,它将应用于所有列。
- 方法{None, ‘multi’, callable}, 可选
控制使用的SQL插入子句:
None : 使用标准的 SQL
INSERT
子句(每行一个)。‘multi’: 在单个
INSERT
子句中传递多个值。具有签名
(pd_table, conn, keys, data_iter)
的可调用对象。
详细信息和一个示例的可调用实现可以在 插入方法 部分找到。
- 计算bool, 默认 True
当为真时,调用 dask.compute 并执行加载到 SQL 中;否则,返回一个 Dask 对象(或在 parallel=True 时返回每个块对象的数组)
- 并行bool, 默认 False
当为真时,每个块会并发地将自己追加到数据库表中。这可能导致数据库行的顺序与源DataFrame的相应行不同。当为假时,按顺序将每个块加载到SQL数据库中。
- engine_kwargs字典或无
sqlalchemy 的特定数据库引擎参数
- Raises
- ValueError
当表格已经存在且 if_exists 为 ‘fail’(默认值)时。
参见
read_sql
从表中读取一个 DataFrame。
注释
如果数据库支持,时区感知的日期时间列将使用 SQLAlchemy 写为
带时区的Timestamp
类型。否则,日期时间将存储为原始时区本地的无时区时间戳。0.24.0 新版功能.
参考文献
示例
从零开始创建一个包含4行的表格。
>>> import pandas as pd >>> import dask.dataframe as dd >>> df = pd.DataFrame([ {'i':i, 's':str(i)*2 } for i in range(4) ]) >>> ddf = dd.from_pandas(df, npartitions=2) >>> ddf Dask DataFrame Structure: i s npartitions=2 0 int64 object 2 ... ... 3 ... ... Dask Name: from_pandas, 2 tasks
>>> from dask.utils import tmpfile >>> from sqlalchemy import create_engine, text >>> with tmpfile() as f: ... db = 'sqlite:///%s' %f ... ddf.to_sql('test', db) ... engine = create_engine(db, echo=False) ... with engine.connect() as conn: ... result = conn.execute(text("SELECT * FROM test")).fetchall() >>> result [(0, 0, '00'), (1, 1, '11'), (2, 2, '22'), (3, 3, '33')]