创建 Dask 包

有几种方法可以在您的数据周围创建Dask包:

db.from_sequence

你可以从现有的 Python 可迭代对象创建一个包:

>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])

您可以控制此数据被分成的分区数量:

>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)

这控制了你暴露的并行粒度。默认情况下,Dask 会尝试将你的数据分成大约 100 个分区。

重要提示:不要将数据加载到Python中,然后再将该数据加载到Dask包中。相反,使用Dask包来加载数据。这样可以并行化加载步骤并减少工作节点间的通信。

>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)

db.read_text

Dask Bag 可以直接从文本文件加载数据。你可以传递单个文件名、文件名列表或通配符字符串。生成的bag将每行一个项目,每个文件一个分区:

>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])
>>> b = db.read_text('myfile.*.txt')

这处理标准的压缩库,如 gzipbz2xz,或任何易于安装的具有类文件对象的压缩库。压缩将通过文件名扩展名推断,或通过使用 compression='gzip' 关键字:

>>> b = db.read_text('myfile.*.txt.gz')

袋子中的结果项是字符串。如果你有像行分隔的JSON这样的编码数据,那么你可能希望在袋子上映射一个解码或加载函数:

>>> import json
>>> b = db.read_text('myfile.*.json').map(json.loads)

或者执行字符串处理任务。为了方便,有一个字符串命名空间直接附加在 bags 上,使用 .str.methodname

>>> b = db.read_text('myfile.*.csv').str.strip().str.split(',')

db.read_avro

如果安装了 fastavro ,Dask Bag 可以读取 Avro 格式的二进制文件。可以从一个或多个文件创建一个包,文件内可以选择分块。生成的包将每个 Avro 记录作为一个项目,这将是一个由 Avro 模式给出的形式的字典。每个输入文件至少会有一个分区:

>>> b = db.read_avro('datafile.avro')
>>> b = db.read_avro('data.*.avro')

默认情况下,Dask 会将数据文件分割成大约 blocksize 字节大小的块。您实际得到的块取决于文件的内部块结构。

对于创建后被压缩的文件(这与Avro使用的内部“编解码器”不同),不应使用分块,并且每个文件将恰好有一个分区:

> b = bd.read_avro('compressed.*.avro.gz', blocksize=None, compression='gzip')

db.from_delayed

你可以使用 db.from_delayed 函数从 dask.delayed 值构建一个 Dask 包。更多信息,请参阅 使用 dask.delayed 与集合的文档

存储 Dask 包

在内存中

你可以通过调用 compute() 或将对象转换为列表来将 Dask 包转换为列表或 Python 可迭代对象:

>>> result = b.compute()
or
>>> result = list(b)

到文本文件

你可以通过调用 .to_textfiles() 方法将 Dask 包转换为磁盘上的文件序列:

dask.bag.core.to_textfiles(b, path, name_function=None, compression='infer', encoding='utf-8', compute=True, storage_options=None, last_endline=False, **kwargs)[源代码]

将 dask Bag 写入磁盘,每个分区一个文件名,每个元素一行。

路径:这将为您包中的每个分区创建一个文件。您可以通过多种方式指定文件名。

使用一个全局字符串

>>> b.to_textfiles('/path/to/data/*.json.gz')  

The * 将被替换为递增序列 1, 2, …

/path/to/data/0.json.gz
/path/to/data/1.json.gz

使用一个 globstring 和一个 name_function= 关键字参数。name_function 函数应接受一个整数并生成一个字符串。name_function 生成的字符串必须保持其各自分区索引的顺序。

>>> from datetime import date, timedelta
>>> def name(i):
...     return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0)
'2015-01-01'
>>> name(15)
'2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)  
/path/to/data/2015-01-01.json.gz
/path/to/data/2015-01-02.json.gz
...

您还可以提供一个显式的路径列表。

>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...]  
>>> b.to_textfiles(paths) 

压缩:带有与已知压缩算法(gz, bz2)相对应扩展名的文件将被相应地压缩。

包内容:调用 to_textfiles 的包必须是一个文本字符串的包。例如,一个字典的包可以通过首先在包上应用 json.dumps ,然后调用 to_textfiles 来写入 JSON 文本文件:

>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")  

最后一行的结尾:默认情况下,最后一行不以换行符结尾。传递 last_endline=True 以反转默认设置。

到 Avro

Dask 包可以直接使用 fastavro 写入 Avro 二进制格式。每个包分区将写入一个文件。这要求用户提供一个完全指定的模式字典(参见 .to_avro() 方法的文档字符串)。

dask.bag.avro.to_avro(b, filename, schema, name_function=None, storage_options=None, codec='null', sync_interval=16000, metadata=None, compute=True, **kwargs)[源代码]

将数据包写入一组 Avro 文件

该模式是一个描述数据的复杂字典,参见 https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schemahttps://fastavro.readthedocs.io/en/latest/writer.html 。其结构如下:

{'name': 'Test',
 'namespace': 'Test',
 'doc': 'Descriptive text',
 'type': 'record',
 'fields': [
    {'name': 'a', 'type': 'int'},
 ]}

其中“name”字段是必需的,但“namespace”和“doc”是可选的描述符;“type”必须始终为“record”。字段列表应对输入记录的每个键都有一个条目,类型类似于Avro规范中的原始、复杂或逻辑类型( https://avro.apache.org/docs/1.8.2/spec.html )。

每个输入分区生成一个Avro文件。

参数
b: dask.bag.Bag
文件名: 列表或字符串

要写入的文件名。如果是列表,数量必须与分区数量匹配。如果是字符串,必须包含通配符“*”,该通配符将使用 name_function 进行扩展。

schema: dict

Avro 模式字典,见上文

name_function: None 或 可调用对象

将整数扩展为字符串,参见 dask.bytes.utils.build_name_function

storage_options: None 或 dict

传递给后端文件系统的额外键/值选项

codec: ‘null’, ‘deflate’, 或 ‘snappy’

压缩算法

sync_interval: int

文件中每个块包含的记录数

元数据: None 或 dict

包含在文件头中

compute: bool

如果为 True,文件会立即写入,并阻塞函数块。如果为 False,则返回延迟对象,用户可以在方便时计算这些对象。

**kwargs: 如果 compute=True,则传递给 compute() 方法

示例

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice', 'value': 100},
...                       {'name': 'Bob', 'value': 200}])
>>> schema = {'name': 'People', 'doc': "Set of people's scores",
...           'type': 'record',
...           'fields': [
...               {'name': 'name', 'type': 'string'},
...               {'name': 'value', 'type': 'int'}]}
>>> b.to_avro('my-data.*.avro', schema)  
['my-data.0.avro', 'my-data.1.avro']

到 DataFrames

你可以将一个 Dask 包转换为 Dask DataFrame 并使用这些存储解决方案。

Bag.to_dataframe(meta=None, columns=None, optimize_graph=True)[源代码]

从 Dask Bag 创建 Dask Dataframe。

Bag 应包含元组、字典记录或标量。

索引将不会有特别的意义。如有必要,请使用 reindex 之后。

参数
metapd.DataFrame, dict, iterable, 可选

一个空的 pd.DataFrame,其数据类型和列名与输出匹配。此元数据对于 dask dataframe 中的许多算法正常工作是必要的。为了便于使用,还提供了一些替代输入。可以提供一个 {name: dtype} 的字典或 (name, dtype) 的可迭代对象,而不是 DataFrame。如果没有提供或是一个列表,将从第一个分区中计算一个单一元素,这可能会触发一个潜在的高成本调用 compute。这可能会导致意外的结果,因此建议提供 meta。更多信息,请参见 dask.dataframe.utils.make_meta

序列,可选

要使用的列名。如果传递的数据没有与之关联的名称,此参数为列提供名称。否则,此参数指示结果中列的顺序(任何在数据中找不到的名称将变为全NA列)。请注意,如果提供了 meta ,列名将从那里获取,此参数无效。

优化图bool, 可选

如果为 True [默认],图表在转换为 dask.dataframe.DataFrame 之前会进行优化。

示例

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
...                       {'name': 'Bob',     'balance': 200},
...                       {'name': 'Charlie', 'balance': 300}],
...                      npartitions=2)
>>> df = b.to_dataframe()
>>> df.compute()
      name  balance
0    Alice      100
1      Bob      200
0  Charlie      300

延迟值

你可以将一个 Dask 包转换为一个 Dask 延迟值列表 和自定义存储解决方案。

Bag.to_delayed(optimize_graph=True)[源代码]

转换为一个 dask.delayed 对象列表,每个分区一个。

参数
优化图bool, 可选

如果为 True [默认],图表在转换为 dask.delayed 对象之前会进行优化。