创建 Dask 包
内容
创建 Dask 包¶
有几种方法可以在您的数据周围创建Dask包:
db.from_sequence
¶
你可以从现有的 Python 可迭代对象创建一个包:
>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])
您可以控制此数据被分成的分区数量:
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)
这控制了你暴露的并行粒度。默认情况下,Dask 会尝试将你的数据分成大约 100 个分区。
重要提示:不要将数据加载到Python中,然后再将该数据加载到Dask包中。相反,使用Dask包来加载数据。这样可以并行化加载步骤并减少工作节点间的通信。
>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)
db.read_text
¶
Dask Bag 可以直接从文本文件加载数据。你可以传递单个文件名、文件名列表或通配符字符串。生成的bag将每行一个项目,每个文件一个分区:
>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])
>>> b = db.read_text('myfile.*.txt')
这处理标准的压缩库,如 gzip
、bz2
、xz
,或任何易于安装的具有类文件对象的压缩库。压缩将通过文件名扩展名推断,或通过使用 compression='gzip'
关键字:
>>> b = db.read_text('myfile.*.txt.gz')
袋子中的结果项是字符串。如果你有像行分隔的JSON这样的编码数据,那么你可能希望在袋子上映射一个解码或加载函数:
>>> import json
>>> b = db.read_text('myfile.*.json').map(json.loads)
或者执行字符串处理任务。为了方便,有一个字符串命名空间直接附加在 bags 上,使用 .str.methodname
:
>>> b = db.read_text('myfile.*.csv').str.strip().str.split(',')
db.read_avro
¶
如果安装了 fastavro ,Dask Bag 可以读取 Avro 格式的二进制文件。可以从一个或多个文件创建一个包,文件内可以选择分块。生成的包将每个 Avro 记录作为一个项目,这将是一个由 Avro 模式给出的形式的字典。每个输入文件至少会有一个分区:
>>> b = db.read_avro('datafile.avro')
>>> b = db.read_avro('data.*.avro')
默认情况下,Dask 会将数据文件分割成大约 blocksize
字节大小的块。您实际得到的块取决于文件的内部块结构。
对于创建后被压缩的文件(这与Avro使用的内部“编解码器”不同),不应使用分块,并且每个文件将恰好有一个分区:
> b = bd.read_avro('compressed.*.avro.gz', blocksize=None, compression='gzip')
db.from_delayed
¶
你可以使用 db.from_delayed
函数从 dask.delayed 值构建一个 Dask 包。更多信息,请参阅 使用 dask.delayed 与集合的文档。
存储 Dask 包¶
在内存中¶
你可以通过调用 compute()
或将对象转换为列表来将 Dask 包转换为列表或 Python 可迭代对象:
>>> result = b.compute()
or
>>> result = list(b)
到文本文件¶
你可以通过调用 .to_textfiles()
方法将 Dask 包转换为磁盘上的文件序列:
- dask.bag.core.to_textfiles(b, path, name_function=None, compression='infer', encoding='utf-8', compute=True, storage_options=None, last_endline=False, **kwargs)[源代码]¶
将 dask Bag 写入磁盘,每个分区一个文件名,每个元素一行。
路径:这将为您包中的每个分区创建一个文件。您可以通过多种方式指定文件名。
使用一个全局字符串
>>> b.to_textfiles('/path/to/data/*.json.gz')
The * 将被替换为递增序列 1, 2, …
/path/to/data/0.json.gz /path/to/data/1.json.gz
使用一个 globstring 和一个
name_function=
关键字参数。name_function 函数应接受一个整数并生成一个字符串。name_function 生成的字符串必须保持其各自分区索引的顺序。>>> from datetime import date, timedelta >>> def name(i): ... return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0) '2015-01-01' >>> name(15) '2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)
/path/to/data/2015-01-01.json.gz /path/to/data/2015-01-02.json.gz ...
您还可以提供一个显式的路径列表。
>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...] >>> b.to_textfiles(paths)
压缩:带有与已知压缩算法(gz, bz2)相对应扩展名的文件将被相应地压缩。
包内容:调用
to_textfiles
的包必须是一个文本字符串的包。例如,一个字典的包可以通过首先在包上应用json.dumps
,然后调用to_textfiles
来写入 JSON 文本文件:>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")
最后一行的结尾:默认情况下,最后一行不以换行符结尾。传递
last_endline=True
以反转默认设置。
到 Avro¶
Dask 包可以直接使用 fastavro 写入 Avro 二进制格式。每个包分区将写入一个文件。这要求用户提供一个完全指定的模式字典(参见 .to_avro()
方法的文档字符串)。
- dask.bag.avro.to_avro(b, filename, schema, name_function=None, storage_options=None, codec='null', sync_interval=16000, metadata=None, compute=True, **kwargs)[源代码]¶
将数据包写入一组 Avro 文件
该模式是一个描述数据的复杂字典,参见 https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schema 和 https://fastavro.readthedocs.io/en/latest/writer.html 。其结构如下:
{'name': 'Test', 'namespace': 'Test', 'doc': 'Descriptive text', 'type': 'record', 'fields': [ {'name': 'a', 'type': 'int'}, ]}
其中“name”字段是必需的,但“namespace”和“doc”是可选的描述符;“type”必须始终为“record”。字段列表应对输入记录的每个键都有一个条目,类型类似于Avro规范中的原始、复杂或逻辑类型( https://avro.apache.org/docs/1.8.2/spec.html )。
每个输入分区生成一个Avro文件。
- 参数
- b: dask.bag.Bag
- 文件名: 列表或字符串
要写入的文件名。如果是列表,数量必须与分区数量匹配。如果是字符串,必须包含通配符“*”,该通配符将使用 name_function 进行扩展。
- schema: dict
Avro 模式字典,见上文
- name_function: None 或 可调用对象
将整数扩展为字符串,参见
dask.bytes.utils.build_name_function
- storage_options: None 或 dict
传递给后端文件系统的额外键/值选项
- codec: ‘null’, ‘deflate’, 或 ‘snappy’
压缩算法
- sync_interval: int
文件中每个块包含的记录数
- 元数据: None 或 dict
包含在文件头中
- compute: bool
如果为 True,文件会立即写入,并阻塞函数块。如果为 False,则返回延迟对象,用户可以在方便时计算这些对象。
- **kwargs: 如果 compute=True,则传递给 compute() 方法
示例
>>> import dask.bag as db >>> b = db.from_sequence([{'name': 'Alice', 'value': 100}, ... {'name': 'Bob', 'value': 200}]) >>> schema = {'name': 'People', 'doc': "Set of people's scores", ... 'type': 'record', ... 'fields': [ ... {'name': 'name', 'type': 'string'}, ... {'name': 'value', 'type': 'int'}]} >>> b.to_avro('my-data.*.avro', schema) ['my-data.0.avro', 'my-data.1.avro']
到 DataFrames¶
你可以将一个 Dask 包转换为 Dask DataFrame 并使用这些存储解决方案。
- Bag.to_dataframe(meta=None, columns=None, optimize_graph=True)[源代码]¶
从 Dask Bag 创建 Dask Dataframe。
Bag 应包含元组、字典记录或标量。
索引将不会有特别的意义。如有必要,请使用
reindex
之后。- 参数
- metapd.DataFrame, dict, iterable, 可选
一个空的
pd.DataFrame
,其数据类型和列名与输出匹配。此元数据对于 dask dataframe 中的许多算法正常工作是必要的。为了便于使用,还提供了一些替代输入。可以提供一个{name: dtype}
的字典或(name, dtype)
的可迭代对象,而不是DataFrame
。如果没有提供或是一个列表,将从第一个分区中计算一个单一元素,这可能会触发一个潜在的高成本调用compute
。这可能会导致意外的结果,因此建议提供meta
。更多信息,请参见dask.dataframe.utils.make_meta
。- 列序列,可选
要使用的列名。如果传递的数据没有与之关联的名称,此参数为列提供名称。否则,此参数指示结果中列的顺序(任何在数据中找不到的名称将变为全NA列)。请注意,如果提供了
meta
,列名将从那里获取,此参数无效。- 优化图bool, 可选
如果为 True [默认],图表在转换为
dask.dataframe.DataFrame
之前会进行优化。
示例
>>> import dask.bag as db >>> b = db.from_sequence([{'name': 'Alice', 'balance': 100}, ... {'name': 'Bob', 'balance': 200}, ... {'name': 'Charlie', 'balance': 300}], ... npartitions=2) >>> df = b.to_dataframe()
>>> df.compute() name balance 0 Alice 100 1 Bob 200 0 Charlie 300
延迟值¶
你可以将一个 Dask 包转换为一个 Dask 延迟值列表 和自定义存储解决方案。