API 参考
内容
API 参考¶
Dask API 通常遵循上游 API:
数组 遵循 NumPy
数据框 遵循 Pandas
Bag 遵循了 Spark 和 Python 迭代器中常见的 map/filter/groupby/reduce 模式
延迟 封装了通用的 Python 代码
Futures 遵循标准库中的 concurrent.futures 进行实时计算。
此外,Dask 有自己的函数来启动计算、将数据持久化在内存中、检查进度等,这些函数补充了上述 API。下面描述了这些更通用的 Dask 函数:
|
一次计算多个 dask 集合。 |
如果 |
|
|
同时优化多个 dask 集合。 |
|
将多个 Dask 集合持久化到内存中 |
|
同时可视化多个dask图。 |
These functions work with any scheduler. More advanced operations are
available when using the newer scheduler and starting a
dask.distributed.Client
(which, despite its name, runs nicely on a
single machine). This API provides the ability to submit, cancel, and track
work asynchronously, and includes many functions for complex inter-task
workflows. These are not necessary for normal operation, but can be useful for
real-time or advanced operation.
这个更高级的API在 Dask分布式文档 中可用。
- dask.annotate(**annotations: Any) collections.abc.Iterator[None] [源代码]¶
用于设置 HighLevelGraph 层注释的上下文管理器。
注解是与任务相关联的元数据或软约束,dask调度器可能会选择尊重这些约束:它们传达意图而不强制执行硬约束。因此,它们主要设计用于分布式调度器。
几乎任何对象都可以作为注释,但建议使用小的Python对象,而像NumPy数组这样的大型对象则不鼓励使用。
作为注解提供的可调用对象应接受一个 key 参数并生成适当的注解。注解集合中的各个任务键将提供给该可调用对象。
- 参数
- **注解键值对
示例
数组 A 中的所有任务应具有优先级 100,并且在失败时应重试 3 次。
>>> import dask >>> import dask.array as da >>> with dask.annotate(priority=100, retries=3): ... A = da.ones((10000, 10000))
在扁平化的块ID内优先处理数组A中的任务。
>>> nblocks = (10, 10) >>> with dask.annotate(priority=lambda k: k[1]*nblocks[1] + k[2]): ... A = da.ones((1000, 1000), chunks=(100, 100))
注解可以嵌套。
>>> with dask.annotate(priority=1): ... with dask.annotate(retries=3): ... A = da.ones((1000, 1000)) ... B = A + 1
- dask.compute(*args, traverse=True, optimize_graph=True, scheduler=None, get=None, **kwargs)[源代码]¶
一次计算多个 dask 集合。
- 参数
- 参数对象
任意数量的对象。如果是 dask 对象,则会计算并返回结果。默认情况下,也会遍历 Python 内置集合以查找 dask 对象(更多信息请参见
traverse
关键字)。非 dask 参数保持不变。- 遍历bool, 可选
默认情况下,dask 会遍历内置的 Python 集合,查找传递给
compute
的 dask 对象。对于大型集合,这可能会很耗费资源。如果没有任何参数包含 dask 对象,请设置traverse=False
以避免进行此遍历。- 调度器字符串,可选
使用哪种调度器,如“线程”、“同步”或“进程”。如果没有提供,默认首先检查全局设置,然后回退到集合的默认设置。
- 优化图bool, 可选
如果为 True [默认],则在计算之前对每个集合应用优化。否则,图表将按原样运行。这对于调试很有用。
- 获取 :
None
无 应保留为
None
get= 关键字已被移除。- kwargs
传递给调度器函数的额外关键字参数。
示例
>>> import dask >>> import dask.array as da >>> a = da.arange(10, chunks=2).sum() >>> b = da.arange(10, chunks=2).mean() >>> dask.compute(a, b) (np.int64(45), np.float64(4.5))
默认情况下,python集合中的dask对象也会被计算:
>>> dask.compute({'a': a, 'b': b, 'c': 1}) ({'a': np.int64(45), 'b': np.float64(4.5), 'c': 1},)
- dask.is_dask_collection(x) bool [源代码]¶
如果
x
是一个 dask 集合,则返回True
。- 参数
- x任何
要测试的对象。
- 返回
- 结果布尔
如果 x 是一个 Dask 集合,则为
True
。
注释
DaskCollection 的 typing.Protocol 实现将 Dask 集合定义为一个类,该类从
__dask_graph__
方法返回一个 Mapping。这个辅助函数在协议实现之前就已经存在。
- dask.optimize(*args, traverse=True, **kwargs)[源代码]¶
同时优化多个 dask 集合。
返回所有共享相同合并和优化基础图的等效dask集合。如果在将多个集合转换为延迟对象时,或者在战略点手动应用优化时,这可能很有用。
请注意,在大多数情况下,您不需要直接调用此方法。
- 参数
- *args对象
任意数量的对象。如果是一个dask对象,其图表会在返回一个等效的dask集合之前,与所有其他dask对象的图表进行优化和合并。非dask参数保持不变。
- 遍历bool, 可选
默认情况下,dask 会遍历内置的 Python 集合,查找传递给
optimize
的 dask 对象。对于大型集合,这可能会很耗费资源。如果没有任何参数包含 dask 对象,请设置traverse=False
以避免进行此遍历。- 优化可调用对象列表,可选
要执行的额外优化过程。
- **kwargs
传递给优化过程的额外关键字参数。
示例
>>> import dask >>> import dask.array as da >>> a = da.arange(10, chunks=2).sum() >>> b = da.arange(10, chunks=2).mean() >>> a2, b2 = dask.optimize(a, b)
>>> a2.compute() == a.compute() np.True_ >>> b2.compute() == b.compute() np.True_
- dask.persist(*args, traverse=True, optimize_graph=True, scheduler=None, **kwargs)[源代码]¶
将多个 Dask 集合持久化到内存中
这将惰性 Dask 集合转换为具有相同元数据的 Dask 集合,但现在其结果已完全计算或正在后台主动计算。
例如,从一个包含许多惰性调用的惰性 dask.array 构建的数组现在将是一个具有相同形状、数据类型、块等的 dask.array,但现在所有这些先前的惰性任务要么在内存中计算为许多小的 :class:`numpy.array`(在单机情况下),要么在集群的后台异步运行(在分布式情况下)。
如果存在并连接到分布式调度器的
dask.distributed.Client
,此函数将会有不同的操作。在这种情况下,此函数将在任务图提交到集群后立即返回,但在计算完成之前。计算将在后台异步继续。当使用此函数与单机调度器时,它会阻塞直到计算完成。在使用单机上的 Dask 时,应确保数据集完全适合内存。
- 参数
- *args: Dask 集合
- 调度器字符串,可选
使用哪种调度器,如“线程”、“同步”或“进程”。如果没有提供,默认首先检查全局设置,然后回退到集合的默认设置。
- 遍历bool, 可选
默认情况下,dask 会遍历内置的 Python 集合,查找传递给
persist
的 dask 对象。对于大型集合,这可能会很耗费资源。如果没有任何参数包含 dask 对象,请设置traverse=False
以避免进行此遍历。- 优化图bool, 可选
如果为 True [默认],图表在计算前会被优化。否则,图表将按原样运行。这对于调试很有用。
- **kwargs
传递给调度器函数的额外关键字参数。
- 返回
- 新的 dask 集合由内存数据支持
示例
>>> df = dd.read_csv('/path/to/*.csv') >>> df = df[df.name == 'Alice'] >>> df['in-debt'] = df.balance < 0 >>> df = df.persist() # triggers computation
>>> df.value().min() # future computations are now fast -10 >>> df.value().max() 100
>>> from dask import persist # use persist function on multiple collections >>> a, b = persist(a, b)
- dask.visualize(*args, filename='mydask', traverse=True, optimize_graph=False, maxval=None, engine: Optional[Literal['cytoscape', 'ipycytoscape', 'graphviz']] = None, **kwargs)[源代码]¶
同时可视化多个dask图。
需要安装
graphviz
。所有不是 dask 图的选项都应该作为关键字参数传递。- 参数
- 参数对象
任意数量的对象。如果它是一个 dask 集合(例如,dask DataFrame、Array、Bag 或 Delayed),其关联的图将被包含在 visualize 的输出中。默认情况下,python 内置集合也会被遍历以查找 dask 对象(更多信息请参见
traverse
关键字)。缺少关联图的参数将被忽略。- 文件名str 或 None, 可选
要写入磁盘的文件名。如果提供的 filename 不包含扩展名,默认将使用 ‘.png’。如果 filename 为 None,则不会写入文件,我们将仅通过管道与 dot 通信。
- 格式{‘png’, ‘pdf’, ‘dot’, ‘svg’, ‘jpeg’, ‘jpg’}, 可选
输出文件的格式。默认是 ‘png’。
- 遍历bool, 可选
默认情况下,dask 会遍历内置的 Python 集合,查找传递给
visualize
的 dask 对象。对于大型集合,这可能会很耗费资源。如果没有任何参数包含 dask 对象,请设置traverse=False
以避免进行此遍历。- 优化图bool, 可选
如果为 True,图表在渲染前会被优化。否则,图表将按原样显示。默认值为 False。
- 颜色{None, ‘顺序’, ‘年龄’, ‘释放’, ‘内存增加’, ‘内存减少’, ‘内存压力’}, 可选
颜色节点的选项。colormap:
None,默认,无颜色。
‘order’, 根据节点在图中的出现顺序为其边框着色。
‘ages’, 节点数据被保留的时间长度。
‘freed’, 运行一个节点后释放的依赖项数量。
‘memoryincreases’, 节点生命周期结束后保留的额外输出数量。较大的值可能表明节点应该在之后运行。
‘memorydecreases’, 在节点生命周期后保留的输出减少的数量。较大的值可能表明节点应该更早运行。
‘memorypressure’, 节点运行时持有的数据数量(圆形),或数据被释放时(矩形)。
- maxval{int, float}, 可选
用于颜色映射归一化的最大值,范围为0到1.0。默认值为
None
,将使其为值的最大数量。- collapse_outputsbool, 可选
是否折叠输出框,这些框通常有空的标签。默认是 False。
- 详细bool, 可选
即使数据未分块,是否也要标记输出和输入框。注意:这些标签可能会变得非常长。默认值为 False。
- 引擎{“graphviz”, “ipycytoscape”, “cytoscape”},可选。
要使用的可视化引擎。如果没有提供,这将检查 dask 配置值 “visualization.engine”。如果未设置,它会尝试导入
graphviz
和ipycytoscape
,使用第一个成功导入的。- **kwargs
传递给可视化引擎的其他关键字参数。
- 返回
- 结果IPython.display.Image, IPython.display.SVG, 或 None
更多信息请参见 dask.dot.dot_graph。
参见
dask.dot.dot_graph
注释
有关优化的更多信息,请参见此处:
http://www.aidoczh.com/dask/zh_CN/latest/optimize.html
示例
>>> x.visualize(filename='dask.pdf') >>> x.visualize(filename='dask.pdf', color='order')
数据集¶
Dask 提供了一些用于生成演示数据集的辅助工具
- dask.datasets.make_people(npartitions=10, records_per_partition=1000, seed=None, locale='en')[源代码]¶
创建一个随机人物的数据集
这将创建一个包含随机生成的人的词典记录的 Dask Bag。这需要可选库
mimesis
来生成记录。- 参数
- npartitions整数
分区数量
- 每个分区的记录数整数
每个分区中的记录数
- 种子int, (可选)
随机种子
- localestr
语言区域设置,如 ‘en’、’fr’、’zh’ 或 ‘ru’
- 返回
- b: Dask 包
- dask.datasets.timeseries(start='2000-01-01', end='2000-01-31', freq='1s', partition_freq='1D', dtypes=None, seed=None, **kwargs)[源代码]¶
使用随机数据创建时间序列数据框
- 参数
- 开始datetime (或类似 datetime 的字符串)
时间序列的开始
- 结束datetime (或类似 datetime 的字符串)
时间序列的结束
- dtypesdict (可选)
列名到类型的映射。有效类型包括 {float, int, str, ‘category’}。
- 频率字符串
时间序列频率的字符串,如 ‘2s’ 或 ‘1H’ 或 ‘12W’
- partition_freq字符串
字符串如 ‘1M’ 或 ‘2Y’ 用于将数据框划分为分区
- 种子int (可选)
随机状态种子
- **kwargs:
传递给单个列创建函数的参数。参数应以列名开头,后跟下划线。
示例
>>> import dask >>> df = dask.datasets.timeseries() >>> df.head() timestamp id name x y 2000-01-01 00:00:00 967 Jerry -0.031348 -0.040633 2000-01-01 00:00:01 1066 Michael -0.262136 0.307107 2000-01-01 00:00:02 988 Wendy -0.526331 0.128641 2000-01-01 00:00:03 1016 Yvonne 0.620456 0.767270 2000-01-01 00:00:04 998 Ursula 0.684902 -0.463278 >>> df = dask.datasets.timeseries( ... '2000', '2010', ... freq='2h', partition_freq='1D', seed=1, # data frequency ... dtypes={'value': float, 'name': str, 'id': int}, # data types ... id_lam=1000 # control number of items in id column ... )
具有定义规范的数据集¶
以下助手仍处于实验阶段:
- dask.dataframe.io.demo.with_spec(spec: dask.dataframe.io.demo.DatasetSpec, seed: int | None = None)[源代码]¶
根据提供的规范生成随机数据集
- 参数
- 规范数据集规范
指定数据集的所有参数
- seed: int (可选)
随机状态种子
注释
此API仍处于实验阶段,未来可能会发生变化。
示例
>>> from dask.dataframe.io.demo import ColumnSpec, DatasetSpec, with_spec >>> ddf = with_spec( ... DatasetSpec( ... npartitions=10, ... nrecords=10_000, ... column_specs=[ ... ColumnSpec(dtype=int, number=2, prefix="p"), ... ColumnSpec(dtype=int, number=2, prefix="n", method="normal"), ... ColumnSpec(dtype=float, number=2, prefix="f"), ... ColumnSpec(dtype=str, prefix="s", number=2, random=True, length=10), ... ColumnSpec(dtype="category", prefix="c", choices=["Y", "N"]), ... ], ... ), seed=42) >>> ddf.head(10) p1 p2 n1 n2 f1 f2 s1 s2 c1 0 1002 972 -811 20 0.640846 -0.176875 L#h98#}J`? _8C607/:6e N 1 985 982 -1663 -777 0.790257 0.792796 u:XI3,omoZ w~@ /d)'-@ N 2 947 970 799 -269 0.740869 -0.118413 O$dnwCuq\ !WtSe+(;#9 Y 3 1003 983 1133 521 -0.987459 0.278154 j+Qr_2{XG& &XV7cy$y1T Y 4 1017 1049 826 5 -0.875667 -0.744359 bJ3E-{:o {+jC).?vK+ Y 5 984 1017 -492 -399 0.748181 0.293761 ~zUNHNgD"! yuEkXeVot| Y 6 992 1027 -856 67 -0.125132 -0.234529 j.7z;o]Gc9 g|Fi5*}Y92 Y 7 1011 974 762 -1223 0.471696 0.937935 yT?j~N/-u] JhEB[W-}^$ N 8 984 974 856 74 0.109963 0.367864 _j"&@ i&;/ OYXQ)w{hoH N 9 1030 1001 -792 -262 0.435587 -0.647970 Pmrwl{{|.K 3UTqM$86Sg N
ColumnSpec
类¶
- class dask.dataframe.io.demo.ColumnSpec(prefix: str | None = None, dtype: str | type | None = None, number: int = 1, nunique: int | None = None, choices: list = <factory>, low: int | None = None, high: int | None = None, length: int | None = None, random: bool = False, method: str | None = None, args: tuple[typing.Any, ...] = <factory>, kwargs: dict[str, typing.Any] = <factory>)[源代码]¶
基类:
object
封装具有相同数据类型的列族的属性。可以为整数数据类型指定不同的方法(如“泊松”、“均匀”、“二项式”等)。
注释
此API仍处于实验阶段,未来可能会发生变化。
RangeIndexSpec
类¶
DatetimeIndexSpec
类¶
DatasetSpec
类¶
- class dask.dataframe.io.demo.DatasetSpec(npartitions: int = 1, nrecords: int = 1000, index_spec: dask.dataframe.io.demo.RangeIndexSpec | dask.dataframe.io.demo.DatetimeIndexSpec = <factory>, column_specs: list[dask.dataframe.io.demo.ColumnSpec] = <factory>)[源代码]¶
基类:
object
定义一个包含随机数据的dataset,例如生成哪些列和数据类型
注释
此API仍处于实验阶段,未来可能会发生变化。
- column_specs: list[dask.dataframe.io.demo.ColumnSpec]¶
列定义列表
- index_spec: dask.dataframe.io.demo.RangeIndexSpec | dask.dataframe.io.demo.DatetimeIndexSpec¶
索引的属性
实用工具¶
Dask 有一些公共实用方法。这些方法主要用于解析配置值。
- dask.utils.apply(func, args, kwargs=None)[源代码]¶
应用一个函数,给定其位置参数和关键字参数。
等同于
func(*args, **kwargs)
大多数 Dask 用户不需要使用apply
函数。它通常仅由需要将关键字参数值注入低级 Dask 任务图的人使用。- 参数
- 函数可调用
你想要应用的函数。
- 参数元组
包含
func
所需的所有位置参数的元组(例如:(arg_1, arg_2, arg_3)
)- kwargsdict, 可选
一个映射关键字参数的字典(例如:
{"kwarg_1": value, "kwarg_2": value}
)
示例
>>> from dask.utils import apply >>> def add(number, second_number=5): ... return number + second_number ... >>> apply(add, (10,), {"second_number": 2}) # equivalent to add(*args, **kwargs) 12
>>> task = apply(add, (10,), {"second_number": 2}) >>> dsk = {'task-name': task} # adds the task to a low level Dask task graph
- dask.utils.format_bytes(n: int) str [源代码]¶
将字节格式化为文本
>>> from dask.utils import format_bytes >>> format_bytes(1) '1 B' >>> format_bytes(1234) '1.21 kiB' >>> format_bytes(12345678) '11.77 MiB' >>> format_bytes(1234567890) '1.15 GiB' >>> format_bytes(1234567890000) '1.12 TiB' >>> format_bytes(1234567890000000) '1.10 PiB'
对于所有小于 2**60 的值,输出总是不超过 10 个字符。
- dask.utils.format_time(n: float) str [源代码]¶
将整数格式化为时间
>>> from dask.utils import format_time >>> format_time(1) '1.00 s' >>> format_time(0.001234) '1.23 ms' >>> format_time(0.00012345) '123.45 us' >>> format_time(123.456) '123.46 s' >>> format_time(1234.567) '20m 34s' >>> format_time(12345.67) '3hr 25m' >>> format_time(123456.78) '34hr 17m' >>> format_time(1234567.89) '14d 6hr'
- dask.utils.parse_bytes(s: float | str) int [源代码]¶
解析字节字符串为数字
>>> from dask.utils import parse_bytes >>> parse_bytes('100') 100 >>> parse_bytes('100 MB') 100000000 >>> parse_bytes('100M') 100000000 >>> parse_bytes('5kB') 5000 >>> parse_bytes('5.4 kB') 5400 >>> parse_bytes('1kiB') 1024 >>> parse_bytes('1e6') 1000000 >>> parse_bytes('1e6 kB') 1000000000 >>> parse_bytes('MB') 1000000 >>> parse_bytes(123) 123 >>> parse_bytes('5 foos') Traceback (most recent call last): ... ValueError: Could not interpret 'foos' as a byte unit
- dask.utils.parse_timedelta(s: None, default: Union[str, Literal[False]] = 'seconds') None [源代码]¶
- dask.utils.parse_timedelta(s: str | float | datetime.timedelta, default: Union[str, Literal[False]] = 'seconds') float
解析时间差字符串为秒数
- 参数
- sstr, float, timedelta, 或 None
- 默认值: str 或 False, 可选
如果没有指定单位,则默认单位为秒。设置为False以要求s明确指定其自己的单位。
示例
>>> from datetime import timedelta >>> from dask.utils import parse_timedelta >>> parse_timedelta('3s') 3 >>> parse_timedelta('3.5 seconds') 3.5 >>> parse_timedelta('300ms') 0.3 >>> parse_timedelta(timedelta(seconds=3)) # also supports timedeltas 3