API 参考

Dask API 通常遵循上游 API:

此外,Dask 有自己的函数来启动计算、将数据持久化在内存中、检查进度等,这些函数补充了上述 API。下面描述了这些更通用的 Dask 函数:

compute(*args[, traverse, optimize_graph, ...])

一次计算多个 dask 集合。

is_dask_collection(x)

如果 x 是一个 dask 集合,则返回 True

optimize(*args[, traverse])

同时优化多个 dask 集合。

persist(*args[, traverse, optimize_graph, ...])

将多个 Dask 集合持久化到内存中

visualize(*args[, filename, traverse, ...])

同时可视化多个dask图。

These functions work with any scheduler. More advanced operations are available when using the newer scheduler and starting a dask.distributed.Client (which, despite its name, runs nicely on a single machine). This API provides the ability to submit, cancel, and track work asynchronously, and includes many functions for complex inter-task workflows. These are not necessary for normal operation, but can be useful for real-time or advanced operation.

这个更高级的API在 Dask分布式文档 中可用。

dask.annotate(**annotations: Any) collections.abc.Iterator[None][源代码]

用于设置 HighLevelGraph 层注释的上下文管理器。

注解是与任务相关联的元数据或软约束,dask调度器可能会选择尊重这些约束:它们传达意图而不强制执行硬约束。因此,它们主要设计用于分布式调度器。

几乎任何对象都可以作为注释,但建议使用小的Python对象,而像NumPy数组这样的大型对象则不鼓励使用。

作为注解提供的可调用对象应接受一个 key 参数并生成适当的注解。注解集合中的各个任务键将提供给该可调用对象。

参数
**注解键值对

示例

数组 A 中的所有任务应具有优先级 100,并且在失败时应重试 3 次。

>>> import dask
>>> import dask.array as da
>>> with dask.annotate(priority=100, retries=3):
...     A = da.ones((10000, 10000))

在扁平化的块ID内优先处理数组A中的任务。

>>> nblocks = (10, 10)
>>> with dask.annotate(priority=lambda k: k[1]*nblocks[1] + k[2]):
...     A = da.ones((1000, 1000), chunks=(100, 100))

注解可以嵌套。

>>> with dask.annotate(priority=1):
...     with dask.annotate(retries=3):
...         A = da.ones((1000, 1000))
...     B = A + 1
dask.get_annotations() dict[str, Any][源代码]

获取当前注释。

返回
所有当前注释的字典

参见

annotate
dask.compute(*args, traverse=True, optimize_graph=True, scheduler=None, get=None, **kwargs)[源代码]

一次计算多个 dask 集合。

参数
参数对象

任意数量的对象。如果是 dask 对象,则会计算并返回结果。默认情况下,也会遍历 Python 内置集合以查找 dask 对象(更多信息请参见 traverse 关键字)。非 dask 参数保持不变。

遍历bool, 可选

默认情况下,dask 会遍历内置的 Python 集合,查找传递给 compute 的 dask 对象。对于大型集合,这可能会很耗费资源。如果没有任何参数包含 dask 对象,请设置 traverse=False 以避免进行此遍历。

调度器字符串,可选

使用哪种调度器,如“线程”、“同步”或“进程”。如果没有提供,默认首先检查全局设置,然后回退到集合的默认设置。

优化图bool, 可选

如果为 True [默认],则在计算之前对每个集合应用优化。否则,图表将按原样运行。这对于调试很有用。

获取 : None

应保留为 None get= 关键字已被移除。

kwargs

传递给调度器函数的额外关键字参数。

示例

>>> import dask
>>> import dask.array as da
>>> a = da.arange(10, chunks=2).sum()
>>> b = da.arange(10, chunks=2).mean()
>>> dask.compute(a, b)
(np.int64(45), np.float64(4.5))

默认情况下,python集合中的dask对象也会被计算:

>>> dask.compute({'a': a, 'b': b, 'c': 1})
({'a': np.int64(45), 'b': np.float64(4.5), 'c': 1},)
dask.is_dask_collection(x) bool[源代码]

如果 x 是一个 dask 集合,则返回 True

参数
x任何

要测试的对象。

返回
结果布尔

如果 x 是一个 Dask 集合,则为 True

注释

DaskCollection 的 typing.Protocol 实现将 Dask 集合定义为一个类,该类从 __dask_graph__ 方法返回一个 Mapping。这个辅助函数在协议实现之前就已经存在。

dask.optimize(*args, traverse=True, **kwargs)[源代码]

同时优化多个 dask 集合。

返回所有共享相同合并和优化基础图的等效dask集合。如果在将多个集合转换为延迟对象时,或者在战略点手动应用优化时,这可能很有用。

请注意,在大多数情况下,您不需要直接调用此方法。

参数
*args对象

任意数量的对象。如果是一个dask对象,其图表会在返回一个等效的dask集合之前,与所有其他dask对象的图表进行优化和合并。非dask参数保持不变。

遍历bool, 可选

默认情况下,dask 会遍历内置的 Python 集合,查找传递给 optimize 的 dask 对象。对于大型集合,这可能会很耗费资源。如果没有任何参数包含 dask 对象,请设置 traverse=False 以避免进行此遍历。

优化可调用对象列表,可选

要执行的额外优化过程。

**kwargs

传递给优化过程的额外关键字参数。

示例

>>> import dask
>>> import dask.array as da
>>> a = da.arange(10, chunks=2).sum()
>>> b = da.arange(10, chunks=2).mean()
>>> a2, b2 = dask.optimize(a, b)
>>> a2.compute() == a.compute()
np.True_
>>> b2.compute() == b.compute()
np.True_
dask.persist(*args, traverse=True, optimize_graph=True, scheduler=None, **kwargs)[源代码]

将多个 Dask 集合持久化到内存中

这将惰性 Dask 集合转换为具有相同元数据的 Dask 集合,但现在其结果已完全计算或正在后台主动计算。

例如,从一个包含许多惰性调用的惰性 dask.array 构建的数组现在将是一个具有相同形状、数据类型、块等的 dask.array,但现在所有这些先前的惰性任务要么在内存中计算为许多小的 :class:`numpy.array`(在单机情况下),要么在集群的后台异步运行(在分布式情况下)。

如果存在并连接到分布式调度器的 dask.distributed.Client,此函数将会有不同的操作。在这种情况下,此函数将在任务图提交到集群后立即返回,但在计算完成之前。计算将在后台异步继续。当使用此函数与单机调度器时,它会阻塞直到计算完成。

在使用单机上的 Dask 时,应确保数据集完全适合内存。

参数
*args: Dask 集合
调度器字符串,可选

使用哪种调度器,如“线程”、“同步”或“进程”。如果没有提供,默认首先检查全局设置,然后回退到集合的默认设置。

遍历bool, 可选

默认情况下,dask 会遍历内置的 Python 集合,查找传递给 persist 的 dask 对象。对于大型集合,这可能会很耗费资源。如果没有任何参数包含 dask 对象,请设置 traverse=False 以避免进行此遍历。

优化图bool, 可选

如果为 True [默认],图表在计算前会被优化。否则,图表将按原样运行。这对于调试很有用。

**kwargs

传递给调度器函数的额外关键字参数。

返回
新的 dask 集合由内存数据支持

示例

>>> df = dd.read_csv('/path/to/*.csv')  
>>> df = df[df.name == 'Alice']  
>>> df['in-debt'] = df.balance < 0  
>>> df = df.persist()  # triggers computation  
>>> df.value().min()  # future computations are now fast  
-10
>>> df.value().max()  
100
>>> from dask import persist  # use persist function on multiple collections
>>> a, b = persist(a, b)  
dask.visualize(*args, filename='mydask', traverse=True, optimize_graph=False, maxval=None, engine: Optional[Literal['cytoscape', 'ipycytoscape', 'graphviz']] = None, **kwargs)[源代码]

同时可视化多个dask图。

需要安装 graphviz 。所有不是 dask 图的选项都应该作为关键字参数传递。

参数
参数对象

任意数量的对象。如果它是一个 dask 集合(例如,dask DataFrame、Array、Bag 或 Delayed),其关联的图将被包含在 visualize 的输出中。默认情况下,python 内置集合也会被遍历以查找 dask 对象(更多信息请参见 traverse 关键字)。缺少关联图的参数将被忽略。

文件名str 或 None, 可选

要写入磁盘的文件名。如果提供的 filename 不包含扩展名,默认将使用 ‘.png’。如果 filename 为 None,则不会写入文件,我们将仅通过管道与 dot 通信。

格式{‘png’, ‘pdf’, ‘dot’, ‘svg’, ‘jpeg’, ‘jpg’}, 可选

输出文件的格式。默认是 ‘png’。

遍历bool, 可选

默认情况下,dask 会遍历内置的 Python 集合,查找传递给 visualize 的 dask 对象。对于大型集合,这可能会很耗费资源。如果没有任何参数包含 dask 对象,请设置 traverse=False 以避免进行此遍历。

优化图bool, 可选

如果为 True,图表在渲染前会被优化。否则,图表将按原样显示。默认值为 False。

颜色{None, ‘顺序’, ‘年龄’, ‘释放’, ‘内存增加’, ‘内存减少’, ‘内存压力’}, 可选

颜色节点的选项。colormap:

  • None,默认,无颜色。

  • ‘order’, 根据节点在图中的出现顺序为其边框着色。

  • ‘ages’, 节点数据被保留的时间长度。

  • ‘freed’, 运行一个节点后释放的依赖项数量。

  • ‘memoryincreases’, 节点生命周期结束后保留的额外输出数量。较大的值可能表明节点应该在之后运行。

  • ‘memorydecreases’, 在节点生命周期后保留的输出减少的数量。较大的值可能表明节点应该更早运行。

  • ‘memorypressure’, 节点运行时持有的数据数量(圆形),或数据被释放时(矩形)。

maxval{int, float}, 可选

用于颜色映射归一化的最大值,范围为0到1.0。默认值为 None ,将使其为值的最大数量。

collapse_outputsbool, 可选

是否折叠输出框,这些框通常有空的标签。默认是 False。

详细bool, 可选

即使数据未分块,是否也要标记输出和输入框。注意:这些标签可能会变得非常长。默认值为 False。

引擎{“graphviz”, “ipycytoscape”, “cytoscape”},可选。

要使用的可视化引擎。如果没有提供,这将检查 dask 配置值 “visualization.engine”。如果未设置,它会尝试导入 graphvizipycytoscape,使用第一个成功导入的。

**kwargs

传递给可视化引擎的其他关键字参数。

返回
结果IPython.display.Image, IPython.display.SVG, 或 None

更多信息请参见 dask.dot.dot_graph。

参见

dask.dot.dot_graph

注释

有关优化的更多信息,请参见此处:

http://www.aidoczh.com/dask/zh_CN/latest/optimize.html

示例

>>> x.visualize(filename='dask.pdf')  
>>> x.visualize(filename='dask.pdf', color='order')  

数据集

Dask 提供了一些用于生成演示数据集的辅助工具

dask.datasets.make_people(npartitions=10, records_per_partition=1000, seed=None, locale='en')[源代码]

创建一个随机人物的数据集

这将创建一个包含随机生成的人的词典记录的 Dask Bag。这需要可选库 mimesis 来生成记录。

参数
npartitions整数

分区数量

每个分区的记录数整数

每个分区中的记录数

种子int, (可选)

随机种子

localestr

语言区域设置,如 ‘en’、’fr’、’zh’ 或 ‘ru’

返回
b: Dask 包
dask.datasets.timeseries(start='2000-01-01', end='2000-01-31', freq='1s', partition_freq='1D', dtypes=None, seed=None, **kwargs)[源代码]

使用随机数据创建时间序列数据框

参数
开始datetime (或类似 datetime 的字符串)

时间序列的开始

结束datetime (或类似 datetime 的字符串)

时间序列的结束

dtypesdict (可选)

列名到类型的映射。有效类型包括 {float, int, str, ‘category’}。

频率字符串

时间序列频率的字符串,如 ‘2s’ 或 ‘1H’ 或 ‘12W’

partition_freq字符串

字符串如 ‘1M’ 或 ‘2Y’ 用于将数据框划分为分区

种子int (可选)

随机状态种子

**kwargs:

传递给单个列创建函数的参数。参数应以列名开头,后跟下划线。

示例

>>> import dask
>>> df = dask.datasets.timeseries()
>>> df.head()  
          timestamp    id     name         x         y
2000-01-01 00:00:00   967    Jerry -0.031348 -0.040633
2000-01-01 00:00:01  1066  Michael -0.262136  0.307107
2000-01-01 00:00:02   988    Wendy -0.526331  0.128641
2000-01-01 00:00:03  1016   Yvonne  0.620456  0.767270
2000-01-01 00:00:04   998   Ursula  0.684902 -0.463278
>>> df = dask.datasets.timeseries(
...     '2000', '2010',
...     freq='2h', partition_freq='1D', seed=1,  # data frequency
...     dtypes={'value': float, 'name': str, 'id': int},  # data types
...     id_lam=1000  # control number of items in id column
... )

具有定义规范的数据集

以下助手仍处于实验阶段:

dask.dataframe.io.demo.with_spec(spec: dask.dataframe.io.demo.DatasetSpec, seed: int | None = None)[源代码]

根据提供的规范生成随机数据集

参数
规范数据集规范

指定数据集的所有参数

seed: int (可选)

随机状态种子

注释

此API仍处于实验阶段,未来可能会发生变化。

示例

>>> from dask.dataframe.io.demo import ColumnSpec, DatasetSpec, with_spec
>>> ddf = with_spec(
...     DatasetSpec(
...         npartitions=10,
...         nrecords=10_000,
...         column_specs=[
...             ColumnSpec(dtype=int, number=2, prefix="p"),
...             ColumnSpec(dtype=int, number=2, prefix="n", method="normal"),
...             ColumnSpec(dtype=float, number=2, prefix="f"),
...             ColumnSpec(dtype=str, prefix="s", number=2, random=True, length=10),
...             ColumnSpec(dtype="category", prefix="c", choices=["Y", "N"]),
...         ],
...     ), seed=42)
>>> ddf.head(10)  
     p1    p2    n1    n2        f1        f2          s1          s2 c1
0  1002   972  -811    20  0.640846 -0.176875  L#h98#}J`?  _8C607/:6e  N
1   985   982 -1663  -777  0.790257  0.792796  u:XI3,omoZ  w~@ /d)'-@  N
2   947   970   799  -269  0.740869 -0.118413  O$dnwCuq\  !WtSe+(;#9  Y
3  1003   983  1133   521 -0.987459  0.278154  j+Qr_2{XG&  &XV7cy$y1T  Y
4  1017  1049   826     5 -0.875667 -0.744359  bJ3E-{:o  {+jC).?vK+  Y
5   984  1017  -492  -399  0.748181  0.293761  ~zUNHNgD"!  yuEkXeVot|  Y
6   992  1027  -856    67 -0.125132 -0.234529  j.7z;o]Gc9  g|Fi5*}Y92  Y
7  1011   974   762 -1223  0.471696  0.937935  yT?j~N/-u]  JhEB[W-}^$  N
8   984   974   856    74  0.109963  0.367864  _j"&@ i&;/  OYXQ)w{hoH  N
9  1030  1001  -792  -262  0.435587 -0.647970  Pmrwl{{|.K  3UTqM$86Sg  N

ColumnSpec

class dask.dataframe.io.demo.ColumnSpec(prefix: str | None = None, dtype: str | type | None = None, number: int = 1, nunique: int | None = None, choices: list = <factory>, low: int | None = None, high: int | None = None, length: int | None = None, random: bool = False, method: str | None = None, args: tuple[typing.Any, ...] = <factory>, kwargs: dict[str, typing.Any] = <factory>)[源代码]

基类:object

封装具有相同数据类型的列族的属性。可以为整数数据类型指定不同的方法(如“泊松”、“均匀”、“二项式”等)。

注释

此API仍处于实验阶段,未来可能会发生变化。

args: tuple[Any, ...]

传递给方法的参数

choices: list

对于“类别”或字符串列,列出可能的值

dtype: str | type | None = None

列数据类型。仅支持 numpy 数据类型。

high: int | None = None

对于一个整数列,范围的高端

kwargs: dict[str, Any]

传递给方法的任何其他 kwargs

length: int | None = None

对于一个带有 random=True 的 str 或 “category” 列,生成多大的字符串

low: int | None = None

int 列的起始值。如果 random=True,则为可选,因为 randint 不接受 high 和 low。

method: str | None = None

对于整数列,生成值时使用的方法,例如“poisson”、“uniform”、“binomial”。默认为“poisson”。委托给 RandomState 的相同方法。

number: int = 1

使用这些属性创建多少列。默认值为1。如果指定了多个列,它们将被编号:“int1”、“int2”等。

nunique: int | None = None

对于“类别”列,生成多少个唯一的类别

prefix: str | None = None

列前缀。如果未指定,将默认为 str(dtype)

random: bool = False

对于一个整数列,是否使用 randint。对于一个字符串列,生成指定 长度 的随机字符串

RangeIndexSpec

class dask.dataframe.io.demo.RangeIndexSpec(dtype: str | type = <class 'int'>, step: int = 1)[源代码]

基类:object

数据框 RangeIndex 的属性

注释

此API仍处于实验阶段,未来可能会发生变化。

dtype

索引数据类型

:py:class:`int`的别名

step: int = 1

RangeIndex 的步骤

DatetimeIndexSpec

class dask.dataframe.io.demo.DatetimeIndexSpec(dtype: str | type = <class 'int'>, start: str | None = None, freq: str = '1H', partition_freq: str | None = None)[源代码]

基类:object

数据框 DatetimeIndex 的属性

注释

此API仍处于实验阶段,未来可能会发生变化。

dtype

索引数据类型

:py:class:`int`的别名

freq: str = '1H'

索引的频率(“1H”,“1D”等)

partition_freq: str | None = None

分区频率(“1D”,“1M”等)

start: str | None = None

索引的第一个值

DatasetSpec

class dask.dataframe.io.demo.DatasetSpec(npartitions: int = 1, nrecords: int = 1000, index_spec: dask.dataframe.io.demo.RangeIndexSpec | dask.dataframe.io.demo.DatetimeIndexSpec = <factory>, column_specs: list[dask.dataframe.io.demo.ColumnSpec] = <factory>)[源代码]

基类:object

定义一个包含随机数据的dataset,例如生成哪些列和数据类型

注释

此API仍处于实验阶段,未来可能会发生变化。

column_specs: list[dask.dataframe.io.demo.ColumnSpec]

列定义列表

index_spec: dask.dataframe.io.demo.RangeIndexSpec | dask.dataframe.io.demo.DatetimeIndexSpec

索引的属性

npartitions: int = 1

数据框中生成了多少分区。如果数据框具有 DatetimeIndex,请指定其 partition_freq

nrecords: int = 1000

要生成的记录总数

实用工具

Dask 有一些公共实用方法。这些方法主要用于解析配置值。

dask.utils.apply(func, args, kwargs=None)[源代码]

应用一个函数,给定其位置参数和关键字参数。

等同于 func(*args, **kwargs) 大多数 Dask 用户不需要使用 apply 函数。它通常仅由需要将关键字参数值注入低级 Dask 任务图的人使用。

参数
函数可调用

你想要应用的函数。

参数元组

包含 func 所需的所有位置参数的元组(例如:(arg_1, arg_2, arg_3)

kwargsdict, 可选

一个映射关键字参数的字典(例如:{"kwarg_1": value, "kwarg_2": value}

示例

>>> from dask.utils import apply
>>> def add(number, second_number=5):
...     return number + second_number
...
>>> apply(add, (10,), {"second_number": 2})  # equivalent to add(*args, **kwargs)
12
>>> task = apply(add, (10,), {"second_number": 2})
>>> dsk = {'task-name': task}  # adds the task to a low level Dask task graph
dask.utils.format_bytes(n: int) str[源代码]

将字节格式化为文本

>>> from dask.utils import format_bytes
>>> format_bytes(1)
'1 B'
>>> format_bytes(1234)
'1.21 kiB'
>>> format_bytes(12345678)
'11.77 MiB'
>>> format_bytes(1234567890)
'1.15 GiB'
>>> format_bytes(1234567890000)
'1.12 TiB'
>>> format_bytes(1234567890000000)
'1.10 PiB'

对于所有小于 2**60 的值,输出总是不超过 10 个字符。

dask.utils.format_time(n: float) str[源代码]

将整数格式化为时间

>>> from dask.utils import format_time
>>> format_time(1)
'1.00 s'
>>> format_time(0.001234)
'1.23 ms'
>>> format_time(0.00012345)
'123.45 us'
>>> format_time(123.456)
'123.46 s'
>>> format_time(1234.567)
'20m 34s'
>>> format_time(12345.67)
'3hr 25m'
>>> format_time(123456.78)
'34hr 17m'
>>> format_time(1234567.89)
'14d 6hr'
dask.utils.parse_bytes(s: float | str) int[源代码]

解析字节字符串为数字

>>> from dask.utils import parse_bytes
>>> parse_bytes('100')
100
>>> parse_bytes('100 MB')
100000000
>>> parse_bytes('100M')
100000000
>>> parse_bytes('5kB')
5000
>>> parse_bytes('5.4 kB')
5400
>>> parse_bytes('1kiB')
1024
>>> parse_bytes('1e6')
1000000
>>> parse_bytes('1e6 kB')
1000000000
>>> parse_bytes('MB')
1000000
>>> parse_bytes(123)
123
>>> parse_bytes('5 foos')
Traceback (most recent call last):
    ...
ValueError: Could not interpret 'foos' as a byte unit
dask.utils.parse_timedelta(s: None, default: Union[str, Literal[False]] = 'seconds') None[源代码]
dask.utils.parse_timedelta(s: str | float | datetime.timedelta, default: Union[str, Literal[False]] = 'seconds') float

解析时间差字符串为秒数

参数
sstr, float, timedelta, 或 None
默认值: str 或 False, 可选

如果没有指定单位,则默认单位为秒。设置为False以要求s明确指定其自己的单位。

示例

>>> from datetime import timedelta
>>> from dask.utils import parse_timedelta
>>> parse_timedelta('3s')
3
>>> parse_timedelta('3.5 seconds')
3.5
>>> parse_timedelta('300ms')
0.3
>>> parse_timedelta(timedelta(seconds=3))  # also supports timedeltas
3