dask.bag.Bag.to_textfiles

dask.bag.Bag.to_textfiles

Bag.to_textfiles(path, name_function=None, compression='infer', encoding='utf-8', compute=True, storage_options=None, last_endline=False, **kwargs)[源代码]

将 dask Bag 写入磁盘,每个分区一个文件名,每个元素一行。

路径:这将为您包中的每个分区创建一个文件。您可以通过多种方式指定文件名。

使用一个全局字符串

>>> b.to_textfiles('/path/to/data/*.json.gz')  

The * 将被替换为递增序列 1, 2, …

/path/to/data/0.json.gz
/path/to/data/1.json.gz

使用一个 globstring 和一个 name_function= 关键字参数。name_function 函数应接受一个整数并生成一个字符串。name_function 生成的字符串必须保持其各自分区索引的顺序。

>>> from datetime import date, timedelta
>>> def name(i):
...     return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0)
'2015-01-01'
>>> name(15)
'2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)  
/path/to/data/2015-01-01.json.gz
/path/to/data/2015-01-02.json.gz
...

您还可以提供一个显式的路径列表。

>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...]  
>>> b.to_textfiles(paths) 

压缩:带有与已知压缩算法(gz, bz2)相对应扩展名的文件将被相应地压缩。

包内容:调用 to_textfiles 的包必须是一个文本字符串的包。例如,一个字典的包可以通过首先在包上应用 json.dumps ,然后调用 to_textfiles 来写入 JSON 文本文件:

>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")  

最后一行的结尾:默认情况下,最后一行不以换行符结尾。传递 last_endline=True 以反转默认设置。