API
内容
API¶
客户端
客户端连接到一个 Dask 集群(例如 distributed.LocalCluster
)并提交计算任务。
|
连接到 Dask 集群并提交计算 |
线程本地、任务本地的上下文管理器,使得 Client.current 类方法返回 self。 |
|
在工作人员上运行基准测试,以测试内存、磁盘和网络带宽 |
|
|
所有相关键的主动运行调用栈 |
|
取消正在运行的未来任务 如果尚未运行,这将阻止未来任务被调度,如果已经运行,则删除它们。 |
|
关闭此客户端 |
|
在优化之后,将多个集合转换为一个 dask 图。 |
|
在集群上计算 dask 集合 |
|
在 as_client 的上下文环境中运行时,返回上下文本地的当前客户端。 |
|
提取整个集群状态的转储并持久化到磁盘或URL。 |
|
开始将给定的记录器(默认是根记录器)及其下的所有记录器从工作任务转发到客户端进程。 |
|
futures_of 的包装方法 |
|
从分布式内存中收集未来 |
|
计算 dask 图 |
|
如果存在,从调度器获取命名数据集。 |
|
获取结构化主题日志 |
|
返回一个 concurrent.futures 执行器,用于在此客户端上提交任务 |
|
从调度器获取任意元数据 |
从调度器获取日志 |
|
|
从调度器获取任务流数据 |
|
返回调度器、所有工作节点以及我自己的版本信息 |
|
从工作节点获取日志 |
|
哪些键由哪些工作者持有 |
|
列出调度器上可用的命名数据集 |
|
在给定主题下记录事件 |
|
将函数映射到参数序列上 |
|
集群中每个键占用的字节数 |
|
每个工作节点上可用的线程/核心数 |
|
如果存在,用已存在的期货替换集合的任务 |
|
每个工作节点上可用的线程/核心数 |
|
在集群上持久化dask集合 |
|
每个工作节点上当前运行的任务 |
|
收集有关近期工作的统计分析信息 |
|
将命名数据集发布到调度器 |
|
在网络内重新平衡数据 |
|
注册一个插件。 |
|
注册一个调度器插件。 |
|
为所有当前和未来的工作线程注册一个设置回调函数。 |
|
为所有当前和未来的工作器注册一个生命周期工作器插件。 |
|
在网络中设置期货的复制 |
|
重启所有工作进程。 |
|
重启指定的一组工作进程 |
|
在调度器上退役某些工作者 |
|
重试失败的 futures |
|
在任务调度系统之外对所有工作节点运行一个函数 |
|
在调度器进程上运行一个函数 |
|
将数据分散到分布式内存中 |
|
集群中工人的基本信息 |
|
在调度器中设置任意元数据 |
关闭连接的调度器和工作节点 |
|
|
在单独的线程中启动调度器运行 |
|
返回给定键或刺激ID的集群范围的故事 |
|
向调度器提交一个函数应用 |
|
订阅一个主题并在每次接收到事件时执行一个处理程序 |
|
停止将给定的记录器(默认根)从工作任务转发到客户端进程。 |
|
从调度器中移除命名数据集 |
取消注册调度器插件 |
|
|
取消注册一个生命周期工作插件 |
|
取消订阅一个主题并移除事件处理程序 |
|
将本地包上传到调度器和工作节点 |
|
阻塞调用以等待 n 个工作线程完成后再继续 |
|
存储每个未来数据的工人 |
|
将调度器信息写入一个json文件。 |
|
获取此线程的客户端 |
获取当前正在运行此任务的工作者 |
|
|
在任务中获取客户端。 |
|
让此任务从工作线程池中分离 |
|
让这个线程重新加入 ThreadPoolExecutor |
|
一个内置 |
|
一个内置 |
重新安排此任务 |
对于任何计算,无论成功与否,都在本地执行任务以进行调试。 |
|
对于失败的计算,在本地执行被指责的任务以进行调试。 |
未来
|
远程运行的计算 |
当未来完成时调用回调 |
|
|
|
|
取消运行此未来的请求 |
如果未来已被取消,则返回 True |
|
返回计算是否完成。 |
|
|
返回失败任务的异常 |
注释 |
|
|
等待计算完成,将结果收集到本地进程。 |
|
如果此未来失败,请重试 |
|
返回失败任务的回溯信息 |
同步
|
分布式集中事件等同于 asyncio.Event |
|
分布式集中锁 |
|
分布式集中锁 |
|
这个 信号量 将跟踪调度器上的租约,这些租约可以被该类的实例获取和释放。 |
|
分布式队列 |
|
分布式全局变量 |
其他
|
按完成顺序返回期货 |
跟踪期货的进展 |
|
|
等待所有/任意未来完成 |
|
即使我们释放了未来,也要至少运行一次任务 |
|
集合中的未来对象 |
|
在上下文块中收集任务流 |
在上下文块中收集任务元数据 |
|
|
收集性能报告 |
实用工具
用于换行符分隔的日志条目字符串的容器 |
|
一个包含字典的容器,该字典将名称映射到日志条目的字符串 |
|
在调度器上生成一个 Memray 配置文件并下载生成的报告。 |
|
在workers上生成一个Memray配置文件并下载生成的报告。 |
异步方法¶
大多数方法和函数都可以在阻塞或异步环境中使用 Tornado 协程同样良好地工作。如果在 Tornado IOLoop 中使用,那么你应该适当地 yield 或 await 阻塞操作。
你必须告知客户端你打算在异步环境中使用它,通过传递 asynchronous=True
关键字
# blocking
client = Client()
future = client.submit(func, *args) # immediate, no blocking/async difference
result = client.gather(future) # blocking
# asynchronous Python 2/3
client = yield Client(asynchronous=True)
future = client.submit(func, *args) # immediate, no blocking/async difference
result = yield client.gather(future) # non-blocking/asynchronous
# asynchronous Python 3
client = await Client(asynchronous=True)
future = client.submit(func, *args) # immediate, no blocking/async difference
result = await client.gather(future) # non-blocking/asynchronous
异步变体必须在 Tornado 协程中运行。更多信息请参阅 异步 文档。
客户端¶
- class distributed.Client(address=None, loop=None, timeout=_NoDefault.no_default, set_as_default=True, scheduler_file=None, security=None, asynchronous=False, name=None, heartbeat_interval=None, serializers=None, deserializers=None, extensions={'pubsub': <class 'distributed.pubsub.PubSubClientExtension'>}, direct_to_workers=None, connection_limit=512, **kwargs)[源代码]¶
连接到 Dask 集群并提交计算
客户端将用户连接到 Dask 集群。它提供了一个围绕函数和未来的异步用户界面。此类类似于
concurrent.futures
中的执行器,但也允许在submit/map
调用中使用Future
对象。当实例化客户端时,默认情况下它会接管所有dask.compute
和dask.persist
调用。通常也可以在不指定调度器地址的情况下创建一个客户端,例如
Client()
。在这种情况下,客户端会在后台创建一个LocalCluster
并连接到它。任何额外的关键字参数在这种情况下都会从客户端传递给 LocalCluster。更多信息请参阅 LocalCluster 文档。- 参数
- 地址: 字符串, 或集群
这可以是
Scheduler
服务器的地址,例如字符串'127.0.0.1:8786'
,或者是集群对象,例如LocalCluster()
- 循环
事件循环
- timeout: int (默认为配置 ``distributed.comm.timeouts.connect``)
初始连接到调度器的超时时间
- set_as_default: bool (True)
将此客户端用作全局 dask 调度器
- scheduler_file: 字符串 (可选)
如果可用,指向包含调度器信息的文件的路径
- security: 安全或布尔值,可选
可选的安全信息。如果创建本地集群,也可以传入
True
,在这种情况下,将自动创建临时的自签名凭证。- 异步: bool (默认值为 False)
如果在异步/等待函数或Tornado gen.coroutines中使用此客户端,请设置为True。否则,在正常使用时应保持为False。
- 名称: 字符串 (可选)
为客户端指定一个名称,该名称将包含在调度程序生成的与该客户端相关的日志中
- heartbeat_interval: int (可选)
心跳到调度器之间的时间(以毫秒为单位)
- 序列化器
在序列化对象时要使用的迭代方法。更多信息请参见 序列化。
- 反序列化器
反序列化对象时要使用的迭代方法。更多信息请参见 序列化。
- 扩展列表
扩展
- direct_to_workers: bool (可选)
是否直接连接到工作节点,或者要求调度器作为中介。
- 连接限制整数
连接池中同时维护的开放通信数量
- **kwargs:
如果你没有传递调度器地址,Client 将创建一个
LocalCluster
对象,并传递任何额外的关键字参数。
示例
在初始化时提供集群的调度节点地址:
>>> client = Client('127.0.0.1:8786')
使用
submit
方法将单个计算任务发送到集群>>> a = client.submit(add, 1, 2) >>> b = client.submit(add, 10, 20)
继续使用提交或映射结果来构建更大的计算
>>> c = client.submit(add, a, b)
使用
gather
方法收集结果。>>> client.gather(c) 33
你也可以在没有参数的情况下调用 Client 来创建你自己的本地集群。
>>> client = Client() # makes your own local "cluster"
额外的关键词将直接传递给 LocalCluster
>>> client = Client(n_workers=2, threads_per_worker=4)
- as_current()[源代码]¶
线程本地、任务本地的上下文管理器,使得 Client.current 类方法返回 self。在此上下文管理器内反序列化的任何 Future 对象将自动附加到此 Client。
- benchmark_hardware() dict [源代码]¶
在工作人员上运行基准测试,以测试内存、磁盘和网络带宽
- 返回
- 结果: dict
一个映射字典,将名称“disk”、“memory”和“network”映射到将大小映射到带宽的字典。这些带宽是许多工作者在集群上运行计算的平均值。
- call_stack(futures=None, keys=None)[源代码]¶
所有相关键的主动运行调用栈
你可以通过在
futures=
关键字中提供期货或集合,或在keys=
关键字中提供显式键的列表来指定感兴趣的数据。如果两者都没有提供,则将返回所有调用栈。- 参数
- 未来列表(可选)
未来列表,默认为所有数据
- 键列表(可选)
关键名称列表,默认为所有数据
示例
>>> df = dd.read_parquet(...).persist() >>> client.call_stack(df) # call on collections
>>> client.call_stack() # Or call with no arguments for all activity
- cancel(futures, asynchronous=None, force=False, reason=None, msg=None)[源代码]¶
取消正在运行的未来任务 如果尚未运行,这将阻止未来任务被调度,如果已经运行,则删除它们。调用后,此结果及其所有依赖结果将不再可访问。
- 参数
- 未来List[Future]
期货列表
- 异步: bool
如果为真,客户端处于异步模式
- 强制布尔值 (False)
即使其他客户端希望,也要取消这个未来
- reason: str
取消期货的原因
- 消息str
将附加到已取消未来的消息
- close(timeout=_NoDefault.no_default)[源代码]¶
关闭此客户端
当你的Python会话结束时,客户端也会自动关闭。
如果你在没有参数的情况下启动了一个客户端,例如
Client()
,那么这也会关闭同时启动的本地集群。- 参数
- 超时数字
在引发
dask.distributed.TimeoutError
之前的秒数
- compute(collections, sync=False, optimize_graph=True, workers=None, allow_other_workers=False, resources=None, retries=0, priority=0, fifo_timeout='60s', actors=None, traverse=True, **kwargs)[源代码]¶
在集群上计算 dask 集合
- 参数
- 集合dask对象的可迭代对象或单个dask对象
像 dask.array 或 dataframe 或 dask.value 对象这样的集合
- 同步bool (可选)
如果为 False(默认)则返回 Futures,如果为 True 则返回具体值。
- 优化图布尔
是否优化底层图表
- 工人字符串或字符串的可迭代对象
一组可以执行计算的工作主机名。留空以默认使用所有工作节点(常见情况)
- 允许其他工作者bool (默认为 False)
与 workers 一起使用。指示计算是否可以在不属于 workers 集合的 worker 上执行。
- 重试int (默认为 0)
如果计算结果失败,允许的自动重试次数
- 优先级数字
任务的优先级可选。默认值为零。优先级越高,优先级越高。
- fifo_timeouttimedelta 字符串 (默认值为 ‘60s’)
允许的调用之间的时间量,以视为相同优先级
- 遍历bool (默认为 True)
默认情况下,dask 会遍历内置的 Python 集合,查找传递给
compute
的 dask 对象。对于大型集合,这可能会很耗费资源。如果没有任何参数包含 dask 对象,请设置traverse=False
以避免进行此遍历。- 资源dict (默认为 {})
定义了此映射任务的每个实例在工作节点上所需的 资源;例如
{'GPU': 2}
。有关定义资源的详细信息,请参阅 工作节点资源。- 演员布尔值或字典(默认 None)
这些任务是否应在工作节点上作为有状态的执行者存在。可以在全局(True/False)或每个任务(
{'x': True, 'y': False}
)的基础上指定。有关更多详细信息,请参阅 演员。- **kwargs
传递给图优化调用的选项
- 返回
- 如果输入是一个序列,则列出所有未来项;否则,列出单个未来项。
参见
Client.get
普通的同步 dask.get 函数
示例
>>> from dask import delayed >>> from operator import add >>> x = delayed(add)(1, 2) >>> y = delayed(add)(x, x) >>> xx, yy = client.compute([x, y]) >>> xx <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e> >>> xx.result() 3 >>> yy.result() 6
也支持单个参数
>>> xx = client.compute(x)
- classmethod current(allow_global=True)[源代码]¶
在 as_client 的上下文中运行时,返回上下文本地的当前客户端。否则,返回最新初始化的客户端。如果没有客户端实例存在,则引发 ValueError。如果 allow_global 设置为 False,则在 as_client 上下文管理器之外运行时引发 ValueError。
- 参数
- allow_global布尔
如果为真,返回默认客户端
- 返回
- 客户端
当前客户端
- Raises
- ValueError
如果没有设置客户端,则会引发 ValueError。
参见
default_client
- property dashboard_link¶
链接到调度器的仪表板。
- 返回
- str
仪表板 URL。
示例
在默认的网络浏览器中打开仪表板:
>>> import webbrowser >>> from distributed import Client >>> client = Client() >>> webbrowser.open(client.dashboard_link)
- dump_cluster_state(filename: str = 'dask-cluster-dump', write_from_scheduler: bool | None = None, exclude: collections.abc.Collection[str] = ('run_spec',), format: Literal['msgpack', 'yaml'] = 'msgpack', **storage_options)[源代码]¶
提取整个集群状态的转储并持久化到磁盘或URL。这仅用于调试目的。
警告:调度器(以及客户端,如果本地写入转储)的内存使用量可能会很大。在大型或长时间运行的集群上,这可能需要几分钟时间。在处理转储时,调度器可能会无响应。
结果将存储在一个字典中:
{ "scheduler": {...}, # scheduler state "workers": { worker_addr: {...}, # worker state ... } "versions": { "scheduler": {...}, "workers": { worker_addr: {...}, ... } } }
- 参数
- 文件名:
要写入的路径或URL。适当的文件后缀(
.msgpack.gz
或.yaml
)将自动附加。必须是
fsspec.open()
支持的路径(例如s3://my-bucket/cluster-dump
或cluster-dumps/dump
)。请参阅write_from_scheduler
以控制转储是从调度器直接写入filename
,还是通过网络发送回客户端,然后在本地写入。- write_from_scheduler:
如果为 None(默认),则根据
filename
是否看起来像 URL 或本地路径来推断:如果文件名包含://``(例如 ``s3://my-bucket/cluster-dump
),则为 True,否则为 False(例如local_dir/cluster-dump
)。如果为真,直接从调度器将集群状态写入
filename
。如果filename
是本地路径,转储将被写入调度器文件系统上的该路径,因此如果调度器运行在临时硬件上,请小心。当调度器连接到网络文件系统或持久磁盘时,或者用于写入存储桶时,这很有用。如果为 False,则通过网络将集群状态从调度器传输回客户端,然后将其写入
filename
。这对于大型转储来说效率较低,但在调度器无法访问任何持久存储时非常有用。- 排除:
一组属性名称,这些属性在转储时应被排除,例如排除代码、回溯、日志等。
默认排除
run_spec
,这是序列化的用户代码。这通常不需要用于调试。要允许序列化此内容,请传递一个空元组。- 格式:
可以是
"msgpack"
或"yaml"
。如果使用 msgpack(默认),输出将以 msgpack 格式存储在 gzip 压缩文件中。阅读:
import gzip, msgpack with gzip.open("filename") as fd: state = msgpack.unpack(fd)
或者:
import yaml try: from yaml import CLoader as Loader except ImportError: from yaml import Loader with open("filename") as fd: state = yaml.load(fd, Loader=Loader)
- **存储选项:
在写入URL时,传递给
fsspec.open()
的任何额外参数。
- forward_logging(logger_name=None, level=0)[源代码]¶
开始将给定的记录器(默认是根记录器)及其下的所有记录器从工作任务转发到客户端进程。每当命名记录器在工作端处理一个 LogRecord 时,该记录将被序列化,发送到客户端,并由客户端上同名的记录器处理。
请注意,工作端日志记录器只有在它们的级别设置得当的情况下才会处理 LogRecords,而客户端日志记录器只有在它自己的级别同样设置得当的情况下才会发出转发的 LogRecord。例如,如果你的提交任务将一条 DEBUG 消息记录到日志记录器“foo”,那么为了使
forward_logging()
在你的客户端会话中发出该消息,你必须确保在工作进程和客户端进程中,日志记录器“foo”的级别都设置为 DEBUG(或更低)。- 参数
- logger_namestr, 可选
开始转发的记录器名称。
logging
模块的分层命名系统的通常规则适用。例如,如果name
是"foo"
,那么不仅"foo"
,还有"foo.bar"
、"foo.baz"
等也将被转发。如果name
是None
,这表示根记录器,因此 所有 记录器都将被转发。请注意,只有当记录器的级别足够处理给定的 LogRecord 时,记录器才会转发该 LogRecord。
- 级别str | int, 可选
可选地限制转发到此级别或更高级别的 LogRecords,即使转发日志记录器的自身级别较低。
示例
为了举例说明,假设我们像用户一样配置客户端日志:使用一个附加到根记录器的 StreamHandler,输出级别为 INFO,并使用简单的输出格式:
import logging import distributed import io, yaml TYPICAL_LOGGING_CONFIG = ''' version: 1 handlers: console: class : logging.StreamHandler formatter: default level : INFO formatters: default: format: '%(asctime)s %(levelname)-8s [worker %(worker)s] %(name)-15s %(message)s' datefmt: '%Y-%m-%d %H:%M:%S' root: handlers: - console ''' config = yaml.safe_load(io.StringIO(TYPICAL_LOGGING_CONFIG)) logging.config.dictConfig(config)
现在创建一个客户端,并开始将工作线程的根日志转发回我们的本地客户端进程。
>>> client = distributed.Client() >>> client.forward_logging() # forward the root logger at any handled level
然后提交一个在工作者上进行一些错误日志记录的任务。我们看到了来自客户端 StreamHandler 的输出。
>>> def do_error(): ... logging.getLogger("user.module").error("Hello error") ... return 42 >>> client.submit(do_error).result() 2022-11-09 03:43:25 ERROR [worker tcp://127.0.0.1:34783] user.module Hello error 42
注意,dask 还会向转发的 LogRecord 添加一个
"worker"
属性,我们的自定义格式化程序会使用它。这对于准确识别哪个工作节点记录了错误非常有用。值得强调的一点细微差别:尽管我们的客户端根日志记录器配置了INFO级别,但工作端根日志记录器仍然保持其默认的ERROR级别,因为我们没有在工作端进行任何显式的日志记录配置。因此,工作端的INFO日志将*不会*被转发,因为它们从一开始就没有被处理。
>>> def do_info_1(): ... # no output on the client side ... logging.getLogger("user.module").info("Hello info the first time") ... return 84 >>> client.submit(do_info_1).result() 84
在处理并将信息消息转发到客户端之前,必须将客户端日志记录器的级别设置为INFO。换句话说,客户端转发日志记录的“有效”级别是每个日志记录器的客户端和工作者端级别的最大值。
>>> def do_info_2(): ... logger = logging.getLogger("user.module") ... logger.setLevel(logging.INFO) ... # now produces output on the client side ... logger.info("Hello info the second time") ... return 84 >>> client.submit(do_info_2).result() 2022-11-09 03:57:39 INFO [worker tcp://127.0.0.1:42815] user.module Hello info the second time 84
- gather(futures, errors='raise', direct=None, asynchronous=None)[源代码]¶
从分布式内存中收集未来
接受一个未来、未来的嵌套容器、迭代器或队列。返回类型将与输入类型匹配。
- 参数
- 未来未来集合
这可能是一个可能嵌套的 Future 对象集合。集合可以是列表、集合或字典。
- 错误字符串
如果一个 future 出错,我们应该 ‘raise’ 或者 ‘skip’,即抛出错误或跳过其在输出集合中的包含。
- 直接布尔
是否直接连接到工作节点,或者请求调度器作为中介。这也可以在创建客户端时设置。
- 异步: bool
如果为真,客户端处于异步模式
- 返回
- 结果:与输入类型相同的集合,但现在具有
- 收集的结果而不是未来
参见
Client.scatter
将数据发送到集群
示例
>>> from operator import add >>> c = Client('127.0.0.1:8787') >>> x = c.submit(add, 1, 2) >>> c.gather(x) 3 >>> c.gather([x, [x], x]) # support lists and dicts [3, [3], 3]
- get(dsk, keys, workers=None, allow_other_workers=None, resources=None, sync=True, asynchronous=None, direct=None, retries=None, priority=0, fifo_timeout='60s', actors=None, **kwargs)[源代码]¶
计算 dask 图
- 参数
- dskdict
- 键对象,或对象的嵌套列表
- 工人字符串或字符串的可迭代对象
一组工作地址或主机名,可以在其上执行计算。留空以默认使用所有工作节点(常见情况)
- 允许其他工作者bool (默认为 False)
与
workers
一起使用。指示计算是否可以在不属于 workers 集合的 worker 上执行。- 资源dict (默认为 {})
定义了此映射任务的每个实例在工作节点上所需的
资源
;例如{'GPU': 2}
。有关定义资源的详细信息,请参阅 工作节点资源。- 同步bool (可选)
如果为 False 则返回 Futures,如果为 True 则返回具体值(默认)。
- 异步: bool
如果为真,客户端处于异步模式
- 直接布尔
是否直接连接到工作节点,或者请求调度器作为中介。这也可以在创建客户端时设置。
- 重试int (默认为 0)
如果计算结果失败,允许的自动重试次数
- 优先级数字
任务的优先级可选。默认值为零。优先级越高,优先级越高。
- fifo_timeouttimedelta 字符串 (默认值为 ‘60s’)
允许的调用之间的时间量,以视为相同优先级
- 演员布尔值或字典(默认 None)
这些任务是否应在工作节点上作为有状态的执行者存在。可以在全局(True/False)或每个任务(
{'x': True, 'y': False}
)的基础上指定。有关更多详细信息,请参阅 演员。
- 返回
- 结果
如果 ‘sync’ 为 True,则返回结果。否则,返回已知数据。如果 ‘sync’ 为 False,则返回已知数据。否则,返回结果。
参见
Client.compute
计算异步集合
示例
>>> from operator import add >>> c = Client('127.0.0.1:8787') >>> c.get({'x': (add, 1, 2)}, 'x') 3
- get_dataset(name, default=_NoDefault.no_default, **kwargs)[源代码]¶
如果存在,从调度器获取命名数据集。如果不存在,返回默认值或引发 KeyError。
- 参数
- 名称str
要检索的数据集名称
- 默认str
可选,默认不设置 如果设置,当名称不存在时不会引发 KeyError,而是返回此默认值
- kwargsdict
传递给 _get_dataset 的额外关键字参数
- 返回
- 调度器的数据集,如果存在
- get_events(topic: str | None = None)[源代码]¶
获取结构化主题日志
- 参数
- 主题str, 可选
要检索事件的主题日志名称。如果没有提供
topic
,则将返回所有主题的日志。
- get_executor(**kwargs)[源代码]¶
返回一个 concurrent.futures 执行器,用于在此客户端上提交任务
- 参数
- **kwargs
任何与 submit() 或 map() 兼容的参数,例如 workers 或 resources。
- 返回
- ClientExecutor
一个完全兼容 concurrent.futures API 的 Executor 对象。
- get_metadata(keys, default=_NoDefault.no_default)[源代码]¶
从调度器获取任意元数据
参见 set_metadata 以获取包含示例的完整文档字符串
- 参数
- 键键或列表
访问键。如果是列表,则获取嵌套集合中的内容。
- 默认可选的
如果键不存在,则返回此值。如果未提供,则当键不存在时会引发 KeyError。
- get_scheduler_logs(n=None)[源代码]¶
从调度器获取日志
- 参数
- n整数
要检索的日志数量。默认最大值为10000,可通过
distributed.admin.log-length
配置值进行配置。
- 返回
- 按倒序记录日志(最新在前)
- get_task_stream(start=None, stop=None, count=None, plot=False, filename='task-stream.html', bokeh_resources=None)[源代码]¶
从调度器获取任务流数据
这收集了仪表板上诊断“任务流”图表中呈现的数据。它包括特定时间段内每个任务的开始、停止、传输和反序列化时间。
请注意,任务流诊断默认不运行。您可能希望在工作开始前调用此函数一次,以确保记录开始,然后在完成后再次调用。
- 参数
- 开始数字或字符串
当你想要开始记录时,如果是一个数字,它应该是调用 time() 的结果;如果是一个字符串,那么它应该是一个相对于现在的时间差,比如 ‘60s’ 或 ‘500 ms’。
- 停止数字或字符串
当你想要停止录制时
- 计数整数
所需记录的数量,如果同时指定了开始和结束,则忽略此项
- 绘图布尔值, 字符串
如果为真,则还返回一个 Bokeh 图形。如果 plot == ‘save’,则将图形保存到文件中。
- 文件名str (可选)
如果你设置
plot='save'
,保存的文件名- bokeh_resourcesbokeh.resources.Resources (可选)
指定资源组件是 INLINE 还是 CDN
- 返回
- L: List[Dict]
参见
get_task_stream
此方法的上下文管理器版本
示例
>>> client.get_task_stream() # prime plugin if not already connected >>> x.compute() # do some work >>> client.get_task_stream() [{'task': ..., 'type': ..., 'thread': ..., ...}]
传递
plot=True
或plot='save'
关键字以返回一个 Bokeh 图形>>> data, figure = client.get_task_stream(plot='save', filename='myfile.html')
或者考虑上下文管理器
>>> from dask.distributed import get_task_stream >>> with get_task_stream() as ts: ... x.compute() >>> ts.data [...]
- get_versions(check: bool = False, packages: collections.abc.Sequence[str] | None = None) distributed.client.VersionsDict | collections.abc.Coroutine[Any, Any, distributed.client.VersionsDict] [源代码]¶
返回调度器、所有工作节点以及我自己的版本信息
- 参数
- 检查
如果所有必需和可选的包不匹配,则引发 ValueError
- 包
额外需要检查的包名
示例
>>> c.get_versions()
>>> c.get_versions(packages=['sklearn', 'geopandas'])
- get_worker_logs(n=None, workers=None, nanny=False)[源代码]¶
从工作节点获取日志
- 参数
- n整数
要检索的日志数量。默认最大值为10000,可通过
distributed.admin.log-length
配置值进行配置。- 工人可迭代对象
要检索的工人地址列表。默认情况下获取所有工人。
- 保姆bool, 默认 False
是否从工作进程(False)或保姆进程(True)获取日志。如果指定,workers 中的地址仍应为工作进程地址,而不是保姆进程地址。
- 返回
- 字典映射工作地址到日志。
- 日志以倒序返回(最新的在前)
- has_what(workers=None, **kwargs)[源代码]¶
哪些键由哪些工作者持有
这将返回每个工作进程内存中保存的数据的键。
- 参数
- 工人列表(可选)
工作地址列表,默认为所有地址
- **kwargsdict
远程函数的可选关键字参数
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> wait([x, y, z]) >>> c.has_what() {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea', 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b', 'inc-1e297fc27658d7b67b3a758f16bcf47a']}
- log_event(topic: str | collections.abc.Collection[str], msg: Any)[源代码]¶
在给定主题下记录事件
- 参数
- 主题str, list[str]
记录事件的主题名称。要将同一事件记录在多个主题下,请传递一个主题名称列表。
- 消息
事件消息以记录。注意这必须是 msgpack 可序列化的。
示例
>>> from time import time >>> client.log_event("current-time", time())
- map(func: Callable, *iterables: collections.abc.Collection, key: str | list | None = None, workers: str | collections.abc.Iterable[str] | None = None, retries: int | None = None, resources: dict[str, Any] | None = None, priority: int = 0, allow_other_workers: bool = False, fifo_timeout: str = '100 ms', actor: bool = False, actors: bool = False, pure: bool = True, batch_size=None, **kwargs)[源代码]¶
将函数映射到参数序列上
参数可以是普通对象或 Futures
- 参数
- 函数可调用
要安排执行的可调用对象。如果
func
返回一个协程,它将在工作线程的主事件循环中运行。否则,func
将在工作线程的任务执行器池中运行(更多信息请参见Worker.executors
)。- 可迭代对象可迭代对象
类似列表的对象用于映射。它们应该具有相同的长度。
- 关键str, list
如果为字符串,则为任务名称添加前缀。如果为列表,则为显式名称。
- 工人字符串或字符串的可迭代对象
一组可以执行计算的工作主机名。留空以默认使用所有工作节点(常见情况)
- 重试int (默认为 0)
任务失败时允许的自动重试次数
- 资源dict (默认为 {})
定义了此映射任务的每个实例在工作节点上所需的 资源;例如
{'GPU': 2}
。有关定义资源的详细信息,请参阅 工作节点资源。- 优先级数字
任务的优先级可选。默认值为零。优先级越高,优先级越高。
- 允许其他工作者bool (默认为 False)
与 workers 一起使用。指示计算是否可以在不属于 workers 集合的 worker 上执行。
- fifo_timeoutstr timedelta (默认 ‘100ms’)
允许的调用之间的时间量,以视为相同优先级
- 演员bool (默认 False)
这些任务是否应在工作节点上作为有状态的执行者存在。有关更多详细信息,请参阅 执行者。
- 演员bool (默认 False)
actor 的别名
- 纯bool (默认为 True)
函数是否为纯函数。对于像
np.random.random
这样的非纯函数,设置pure=False
。请注意,如果actor
和pure
关键字参数都设置为 True,那么pure
的值将被重置为 False,因为 actor 是有状态的。更多详情请参见 纯函数。- batch_sizeint, 可选 (默认: 只有一个批次,其大小为整个可迭代对象)
以(最多)``batch_size`` 为批次向调度器提交任务。批次大小的权衡在于,大批次可以避免更多的每批次开销,但过大的批次可能会花费很长时间提交,并不合理地延迟集群开始处理的时间。
- **kwargsdict
发送给函数的额外关键字参数。较大的值将显式包含在任务图中。
- 返回
- 根据类型的不同,可以是 future 的列表、迭代器或队列。
- 输入。
参见
Client.submit
提交一个单一函数
注释
当前的任务图解析实现会搜索
key
的出现,并将其替换为相应的Future
结果。如果这些字符串作为任务的参数传递,并且这些字符串与集群上已存在的某些key
匹配,这可能导致不希望的字符串替换。为了避免这种情况,如果手动设置key
,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪此问题的解决进度。示例
>>> L = client.map(func, sequence)
- nbytes(keys=None, summary=True, **kwargs)[源代码]¶
集群中每个键占用的字节数
这是通过
sys.getsizeof
测量的,可能无法准确反映真实成本。- 参数
- 键列表(可选)
键的列表,默认为所有键
- 摘要布尔值,(可选)
将键汇总为键类型
- **kwargsdict
远程函数的可选关键字参数
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> c.nbytes(summary=False) {'inc-1c8dd6be1c21646c71f76c16d09304ea': 28, 'inc-1e297fc27658d7b67b3a758f16bcf47a': 28, 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': 28}
>>> c.nbytes(summary=True) {'inc': 84}
- ncores(workers=None, **kwargs)¶
每个工作节点上可用的线程/核心数
- 参数
- 工人列表(可选)
我们特别关心的工作人员列表。留空以接收所有工作人员的信息。
- **kwargsdict
远程函数的可选关键字参数
示例
>>> c.nthreads() {'192.168.1.141:46784': 8, '192.167.1.142:47548': 8, '192.167.1.143:47329': 8, '192.167.1.144:37297': 8}
- normalize_collection(collection)[源代码]¶
如果存在,用已存在的期货替换集合的任务
这将对集合任务图中的任务进行规范化,使其与调度器中的已知未来任务相匹配。它返回一个包含重叠未来任务的任务图的集合副本。
- 参数
- 集合dask 对象
类似 dask.array 或 dataframe 或 dask.value 对象的集合
- 返回
- 集合dask 对象
集合,其任务被任何现有的期货所替代。
参见
Client.persist
触发集合任务的计算
示例
>>> len(x.__dask_graph__()) # x is a dask collection with 100 tasks 100 >>> set(client.futures).intersection(x.__dask_graph__()) # some overlap exists 10
>>> x = client.normalize_collection(x) >>> len(x.__dask_graph__()) # smaller computational graph 20
- nthreads(workers=None, **kwargs)[源代码]¶
每个工作节点上可用的线程/核心数
- 参数
- 工人列表(可选)
我们特别关心的工作人员列表。留空以接收所有工作人员的信息。
- **kwargsdict
远程函数的可选关键字参数
示例
>>> c.nthreads() {'192.168.1.141:46784': 8, '192.167.1.142:47548': 8, '192.167.1.143:47329': 8, '192.167.1.144:37297': 8}
- persist(collections, optimize_graph=True, workers=None, allow_other_workers=None, resources=None, retries=None, priority=0, fifo_timeout='60s', actors=None, **kwargs)[源代码]¶
在集群上持久化dask集合
在集群后台启动集合的计算。提供一个新的 dask 集合,它在语义上与之前的集合相同,但现在基于当前正在执行的 futures。
- 参数
- 集合序列或单个 dask 对象
像 dask.array 或 dataframe 或 dask.value 对象这样的集合
- 优化图布尔
是否优化底层图表
- 工人字符串或字符串的可迭代对象
一组可以执行计算的工作主机名。留空以默认使用所有工作节点(常见情况)
- 允许其他工作者bool (默认为 False)
与 workers 一起使用。指示计算是否可以在不属于 workers 集合的 worker 上执行。
- 重试int (默认为 0)
如果计算结果失败,允许的自动重试次数
- 优先级数字
任务的优先级可选。默认值为零。优先级越高,优先级越高。
- fifo_timeouttimedelta 字符串 (默认值为 ‘60s’)
允许的调用之间的时间量,以视为相同优先级
- 资源dict (默认为 {})
定义了此映射任务的每个实例在工作节点上所需的 资源;例如
{'GPU': 2}
。有关定义资源的详细信息,请参阅 工作节点资源。- 演员布尔值或字典(默认 None)
这些任务是否应在工作节点上作为有状态的执行者存在。可以在全局(True/False)或每个任务(
{'x': True, 'y': False}
)的基础上指定。有关更多详细信息,请参阅 演员。- **kwargs
传递给图优化调用的选项
- 返回
- 集合列表,或单一集合,取决于输入的类型。
示例
>>> xx = client.persist(x) >>> xx, yy = client.persist([x, y])
- processing(workers=None)[源代码]¶
每个工作节点上当前运行的任务
- 参数
- 工人列表(可选)
工作地址列表,默认为所有地址
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> c.processing() {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea', 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b', 'inc-1e297fc27658d7b67b3a758f16bcf47a']}
- profile(key=None, start=None, stop=None, workers=None, merge_workers=True, plot=False, filename=None, server=False, scheduler=False)[源代码]¶
收集有关近期工作的统计分析信息
- 参数
- 关键str
选择的关键前缀,这通常是一个函数名,如 ‘inc’。留空为 None 以收集所有数据
- 开始时间
- 停止时间
- 工人列表
限制个人资料信息的员工列表
- 服务器布尔
如果为真,返回工作者管理线程的配置文件,而不是工作者线程的配置文件。这在分析Dask本身而不是用户代码时很有用。
- 调度器布尔
如果为真,从调度器的管理线程返回配置文件信息,而不是从工作线程返回。这在分析 Dask 的调度本身时很有用。
- 绘图布尔值或字符串
是否返回绘图对象
- 文件名str
保存图形的文件名
示例
>>> client.profile() # call on collections >>> client.profile(filename='dask-profile.html') # save to html file
- publish_dataset(*args, **kwargs)[源代码]¶
将命名数据集发布到调度器
这会在调度器上存储一个对 dask 集合或未来列表的命名引用。这些引用对其他客户端可用,它们可以通过
get_dataset
下载集合或未来。数据集不会立即计算。在发布数据集之前,您可能希望调用
Client.persist
。- 参数
- 参数要发布为名称的对象列表
- kwargsdict
命名集合以在调度器上发布
- 返回
- 无
示例
发布客户端:
>>> df = dd.read_csv('s3://...') >>> df = c.persist(df) >>> c.publish_dataset(my_dataset=df)
替代调用 >>> c.publish_dataset(df, name=’my_dataset’)
接收客户端:
>>> c.list_datasets() ['my_dataset'] >>> df2 = c.get_dataset('my_dataset')
- rebalance(futures=None, workers=None, **kwargs)[源代码]¶
在网络内重新平衡数据
在工作者之间移动数据以大致平衡内存负担。这根据关键字参数的影响,可能只影响部分键/工作者或整个网络。
有关算法的详细信息和配置选项,请参阅匹配的调度器端方法
rebalance()
。警告
此操作通常未经过调度器正常操作的充分测试。不建议在等待计算时使用它。
- 参数
- 未来列表,可选
要平衡的未来列表,默认为所有数据
- 工人列表,可选
要在其上进行负载均衡的工人列表,默认为所有工人
- **kwargsdict
函数的可选关键字参数
- register_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.SchedulerPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, idempotent: bool | None = None)[源代码]¶
注册一个插件。
请参阅 https://distributed.readthedocs.io/en/latest/plugins.html
- 参数
- 插件
要注册的保姆、调度器或工作插件。
- 名称
插件的名称;如果为 None,则从插件实例中获取名称,如果不存在则自动生成。
- 幂等
如果给定名称的插件已经存在,请勿重新注册。如果为 None,则使用
plugin.idempotent
的定义值,否则为 False。
- register_scheduler_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None)[源代码]¶
注册一个调度器插件。
2023.9.2 版后已移除: 请使用
Client.register_plugin()
代替。参见 https://distributed.readthedocs.io/en/latest/plugins.html#scheduler-plugins
- 参数
- 插件
SchedulerPlugin
传递给调度器的 SchedulerPlugin 实例。
- 名称str
插件的名称;如果为 None,则从插件实例中获取名称,如果不存在则自动生成。
- 幂等布尔
如果已存在指定名称的插件,请勿重新注册。
- 插件
- register_worker_callbacks(setup=None)[源代码]¶
为所有当前和未来的工作线程注册一个设置回调函数。
这为集群中的工作线程注册了一个新的设置函数。该函数将立即在所有当前连接的工作线程上运行。它还将在未来添加的任何工作线程连接时运行。可以注册多个设置函数 - 这些函数将按照它们被添加的顺序调用。
如果函数接受一个名为
dask_worker
的输入参数,那么该变量将被填充为工作节点本身。- 参数
- 设置callable(dask_worker: Worker) -> None
在所有工作节点上注册并运行的函数
- register_worker_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, nanny: bool | None = None)[源代码]¶
为所有当前和未来的工作器注册一个生命周期工作器插件。
2023.9.2 版后已移除: 请使用
Client.register_plugin()
代替。这注册了一个新对象来处理此集群中工作者的设置、任务状态转换和拆卸。插件将在所有当前连接的工作者上实例化自身。它还将在未来连接的任何工作者上运行。
该插件可能包含方法
setup
、teardown
、transition
和release_key
。请参阅dask.distributed.WorkerPlugin
类或下面的示例以了解接口和文档字符串。它必须可以通过 pickle 或 cloudpickle 模块进行序列化。如果插件有一个
name
属性,或者使用了name=
关键字,那么这将控制幂等性。如果已经注册了具有该名称的插件,那么它将被移除并替换为新的插件。对于插件的替代方案,您可能还希望了解预加载脚本。
- 参数
- 插件WorkerPlugin 或 NannyPlugin
要注册的 WorkerPlugin 或 NannyPlugin 实例。
- 名称str, 可选
插件的名称。使用相同名称注册插件将不会有任何效果。如果插件没有名称属性,则使用随机名称。
- 保姆bool, 可选
是否将插件注册到工作节点或保姆节点。
参见
distributed.WorkerPlugin
unregister_worker_plugin
示例
>>> class MyPlugin(WorkerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, ... **kwargs): ... pass ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin)
你可以通过
get_worker
函数访问插件>>> client.register_plugin(other_plugin, name='my-plugin') >>> def f(): ... worker = get_worker() ... plugin = worker.plugins['my-plugin'] ... return plugin.my_state
>>> future = client.run(f)
- replicate(futures, n=None, workers=None, branching_factor=2, **kwargs)[源代码]¶
在网络中设置期货的复制
将数据复制到多个工作节点。这有助于广播频繁访问的数据,并可以提高系统的韧性。
这会在网络中的每一份数据上单独执行数据的树形复制。此操作会阻塞直到完成。它不保证数据会复制到未来的工作节点。
备注
此方法与 Active Memory Manager 的 ReduceReplicas 策略不兼容。如果您希望使用它,您必须首先禁用该策略或完全禁用 AMM。
- 参数
- 未来未来列表
我们希望复制的未来
- nint, 可选
在集群上复制数据的进程数。默认为全部。
- 工人worker 地址列表
我们希望限制复制的工作者。默认为所有。
- branching_factorint, 可选
每个生成中可以复制数据的工人数
- **kwargsdict
远程函数的可选关键字参数
示例
>>> x = c.submit(func, *args) >>> c.replicate([x]) # send to all workers >>> c.replicate([x], n=3) # send to three workers >>> c.replicate([x], workers=['alice', 'bob']) # send to specific >>> c.replicate([x], n=1, workers=['alice', 'bob']) # send to one of specific workers >>> c.replicate([x], n=1) # reduce replications
- restart(timeout: typing.Union[str, int, float, typing.Literal[<no_default>]] = _NoDefault.no_default, wait_for_workers: bool = True)[源代码]¶
重启所有工作进程。重置本地状态。可选地等待工作进程返回。
没有保姆的工人会被关闭,希望外部部署系统会重新启动它们。因此,如果不使用保姆,并且您的部署系统不会自动重启工人,
restart
将只会关闭所有工人,然后超时!在
restart
之后,所有连接的工作者都是新的,无论是否引发了TimeoutError
。任何未能及时关闭的工作者都会被移除,并且可能在将来自行关闭,也可能不会。- 参数
- 超时:
如果
wait_for_workers
为 True,等待工作线程关闭并返回的时间长度,否则仅等待工作线程关闭的时间长度。如果超过此时间,则引发asyncio.TimeoutError
。- wait_for_workers:
是否等待所有工作进程重新连接,还是仅等待它们关闭(默认 True)。使用
restart(wait_for_workers=False)
结合Client.wait_for_workers()
可以实现对等待多少工作进程的精细控制。
参见
Scheduler.restart
Client.restart_workers
- restart_workers(workers: list[str], timeout: typing.Union[str, int, float, typing.Literal[<no_default>]] = _NoDefault.no_default, raise_for_error: bool = True)[源代码]¶
重启指定的一组工作进程
备注
只有被
distributed.Nanny
监控的工人才能被重启。更多详情请参见Nanny.restart
。- 参数
- 工人list[str]
需要重启的工作者。这可以是一个工作者地址、名称的列表,或者两者都有。
- 超时int | float | None
等待的秒数
- raise_for_error: bool (默认 True)
如果在
timeout
时间内重启工作进程未完成,是否引发TimeoutError
,或者由重启工作进程引起的其他异常。
- 返回
- dict[str, “OK” | “已移除” | “已超时”]
工作进程和重启状态的映射,键将与通过
workers
传入的原始值匹配。
注释
此方法与
Client.restart()
不同之处在于,此方法仅重启指定的工人集合,而Client.restart
将重启所有工人并重置集群上的本地状态(例如,所有键都被释放)。此外,这种方法在处理工作进程重启时正在执行的任务时表现不佳。这些任务可能会失败或其可疑计数增加。
示例
你可以使用以下方法获取有关活跃工作者的信息:
>>> workers = client.scheduler_info()['workers']
从该列表中,您可能希望选择一些工作进程进行重启
>>> client.restart_workers(workers=['tcp://address:port', ...])
- retire_workers(workers: list[str] | None = None, close_workers: bool = True, **kwargs)[源代码]¶
在调度器上退役某些工作者
查看
distributed.Scheduler.retire_workers()
获取完整的文档字符串。- 参数
- 工人
- 关闭工作者
- **kwargsdict
远程函数的可选关键字参数
参见
dask.distributed.Scheduler.retire_workers
示例
你可以使用以下方法获取有关活跃工作者的信息:
>>> workers = client.scheduler_info()['workers']
从该列表中,您可能希望选择一些工人来关闭
>>> client.retire_workers(workers=['tcp://address:port', ...])
- run(function, *args, workers: list[str] | None = None, wait: bool = True, nanny: bool = False, on_error: Literal['raise', 'return', 'ignore'] = 'raise', **kwargs)[源代码]¶
在任务调度系统之外对所有工作节点运行一个函数
此方法立即在所有当前已知的worker上调用一个函数,阻塞直到这些结果返回,并异步地以worker地址为键的字典形式返回结果。此方法通常用于收集诊断信息或安装库等副作用。
如果你的函数接受一个名为
dask_worker
的输入参数,那么该变量将被填充为工作节点本身。- 参数
- 函数可调用
要运行的函数
- *args元组
远程函数的可选参数
- **kwargsdict
远程函数的可选关键字参数
- 工人列表
运行函数的工人。默认为所有已知的工人。
- 等待布尔值(可选)
如果函数是异步的,是否等待该函数完成。
- 保姆bool, 默认 False
是否在保姆进程上运行
function
。默认情况下,该函数在工作进程上运行。如果指定,workers
中的地址仍应为工作进程地址,而不是保姆进程地址。- on_error: “raise” | “return” | “ignore”
如果在工作节点上函数引发错误:
- raise
(默认) 在客户端重新引发异常。其他工作者的输出将会丢失。
- 返回
返回异常对象而不是工作函数的输出
- 忽略
忽略异常并将工作者从结果字典中移除
示例
>>> c.run(os.getpid) {'192.168.0.100:9000': 1234, '192.168.0.101:9000': 4321, '192.168.0.102:9000': 5555}
使用
workers=
关键字参数将计算限制在特定的工人上。>>> c.run(os.getpid, workers=['192.168.0.100:9000', ... '192.168.0.101:9000']) {'192.168.0.100:9000': 1234, '192.168.0.101:9000': 4321}
>>> def get_status(dask_worker): ... return dask_worker.status
>>> c.run(get_status) {'192.168.0.100:9000': 'running', '192.168.0.101:9000': 'running}
在后台运行异步函数:
>>> async def print_state(dask_worker): ... while True: ... print(dask_worker.status) ... await asyncio.sleep(1)
>>> c.run(print_state, wait=False)
- run_on_scheduler(function, *args, **kwargs)[源代码]¶
在调度器进程上运行一个函数
这通常用于实时调试。该函数应接受一个关键字参数
dask_scheduler=
,该参数将被赋予调度器对象本身。- 参数
- 函数可调用
在调度器进程上运行的函数
- *args元组
函数的可选参数
- **kwargsdict
函数的可选关键字参数
参见
Client.run
在所有工作节点上运行一个函数
示例
>>> def get_number_of_tasks(dask_scheduler=None): ... return len(dask_scheduler.tasks)
>>> client.run_on_scheduler(get_number_of_tasks) 100
在后台运行异步函数:
>>> async def print_state(dask_scheduler): ... while True: ... print(dask_scheduler.status) ... await asyncio.sleep(1)
>>> c.run(print_state, wait=False)
- scatter(data, workers=None, broadcast=False, direct=None, hash=True, timeout=_NoDefault.no_default, asynchronous=None)[源代码]¶
将数据分散到分布式内存中
这将从本地客户端进程将数据移动到分布式调度器的工人中。请注意,通常更好的做法是向工人提交作业,让它们加载数据,而不是在本地加载数据,然后将其分散到它们中。
- 参数
- 数据列表、字典或对象
要分散给工人的数据。输出类型与输入类型匹配。
- 工人元组列表(可选)
可选地限制数据位置。将工作者指定为主机名/端口对,例如
('127.0.0.1', 8787)
。- 广播bool (默认为 False)
是否将每个数据元素发送给所有工作节点。默认情况下,我们根据核心数量进行轮询。
备注
将此标志设置为 True 与 Active Memory Manager 的 ReduceReplicas 策略不兼容。如果您希望使用它,您必须首先禁用该策略或完全禁用 AMM。
- 直接bool (默认为自动检查)
是否直接连接到工作节点,或者请求调度器作为中介。这也可以在创建客户端时设置。
- 哈希bool (可选)
是否对数据进行哈希以确定键。如果为 False,则使用随机键。
- 超时数字,可选
在引发
dask.distributed.TimeoutError
之前的秒数- 异步: bool
如果为真,客户端处于异步模式
- 返回
- 与输入类型匹配的 futures 列表、字典、迭代器或队列。
参见
Client.gather
将数据收集回本地进程
注释
散布字典使用
dict
键来创建Future
键。当前的任务图解析实现会搜索key
的出现,并用相应的Future
结果替换它。如果这些字符串作为参数传递给任务,并且这些字符串与集群上已存在的某些key
匹配,这可能会导致不需要的字符串替换。为了避免这种情况,如果手动设置key
,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪此问题的解决进度。示例
>>> c = Client('127.0.0.1:8787') >>> c.scatter(1) <Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>
>>> c.scatter([1, 2, 3]) [<Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>, <Future: status: finished, key: 58e78e1b34eb49a68c65b54815d1b158>, <Future: status: finished, key: d3395e15f605bc35ab1bac6341a285e2>]
>>> c.scatter({'x': 1, 'y': 2, 'z': 3}) {'x': <Future: status: finished, key: x>, 'y': <Future: status: finished, key: y>, 'z': <Future: status: finished, key: z>}
将数据位置限制在部分工作节点
>>> c.scatter([1, 2, 3], workers=[('hostname', 8788)])
向所有工作节点广播数据
>>> [future] = c.scatter([element], broadcast=True)
使用客户端期货接口将分散的数据发送到并行化函数
>>> data = c.scatter(data, broadcast=True) >>> res = [c.submit(func, data, i) for i in range(100)]
- scheduler_info(**kwargs)[源代码]¶
集群中工人的基本信息
- 参数
- **kwargsdict
远程函数的可选关键字参数
示例
>>> c.scheduler_info() {'id': '2de2b6da-69ee-11e6-ab6a-e82aea155996', 'services': {}, 'type': 'Scheduler', 'workers': {'127.0.0.1:40575': {'active': 0, 'last-seen': 1472038237.4845693, 'name': '127.0.0.1:40575', 'services': {}, 'stored': 0, 'time-delay': 0.0061032772064208984}}}
- set_metadata(key, value)[源代码]¶
在调度器中设置任意元数据
这允许你在中央调度器进程上存储少量数据,用于管理目的。数据应为 msgpack 可序列化(整数、字符串、列表、字典)。
如果键对应于一个任务,那么当调度器忘记该任务时,该键将被清理。
如果键是一个列表,那么将假定您想要使用这些键索引到嵌套的字典结构中。例如,如果您调用以下内容:
>>> client.set_metadata(['a', 'b', 'c'], 123)
那么这与设置相同
>>> scheduler.task_metadata['a']['b']['c'] = 123
低层级字典将按需创建。
参见
示例
>>> client.set_metadata('x', 123) >>> client.get_metadata('x') 123
>>> client.set_metadata(['x', 'y'], 123) >>> client.get_metadata('x') {'y': 123}
>>> client.set_metadata(['x', 'w', 'z'], 456) >>> client.get_metadata('x') {'y': 123, 'w': {'z': 456}}
>>> client.get_metadata(['x', 'w']) {'z': 456}
- shutdown()[源代码]¶
关闭连接的调度器和工作节点
注意,这可能会干扰其他可能使用相同调度器和工作节点的客户端。
参见
Client.close
仅关闭此客户端
- submit(func, *args, key=None, workers=None, resources=None, retries=None, priority=0, fifo_timeout='100 ms', allow_other_workers=False, actor=False, actors=False, pure=True, **kwargs)[源代码]¶
向调度器提交一个函数应用
- 参数
- 函数可调用
可调用对象将被调度为
func(*args **kwargs)
。如果func
返回一个协程,它将在工作线程的主事件循环中运行。否则func
将在工作线程的任务执行器池中运行(更多信息请参见Worker.executors
。)- *args元组
可选的位置参数
- 关键str
任务的唯一标识符。默认为函数名和哈希值。
- 工人字符串或字符串的可迭代对象
一组工作地址或主机名,可以在其上执行计算。留空以默认使用所有工作节点(常见情况)
- 资源dict (默认为 {})
定义了此映射任务的每个实例在工作节点上所需的
资源
;例如{'GPU': 2}
。有关定义资源的详细信息,请参阅 工作节点资源。- 重试int (默认为 0)
任务失败时允许的自动重试次数
- 优先级数字
任务的优先级可选。默认值为零。优先级越高,优先级越高。
- fifo_timeoutstr timedelta (默认 ‘100ms’)
允许的调用之间的时间量,以视为相同优先级
- 允许其他工作者bool (默认为 False)
与
workers
一起使用。指示计算是否可以在不属于 workers 集合的 worker 上执行。- 演员bool (默认 False)
此任务是否应在工作节点上作为有状态的actor存在。有关更多详细信息,请参阅 演员。
- 演员bool (默认 False)
actor 的别名
- 纯bool (默认为 True)
函数是否为纯函数。对于像
np.random.random
这样的非纯函数,设置pure=False
。请注意,如果actor
和pure
关键字参数都设置为 True,那么pure
的值将被重置为 False,因为 actor 是有状态的。更多详情请参见 纯函数。- **kwargs
- 返回
- 未来
如果在异步模式下运行,返回未来。否则返回具体值。
- Raises
- 类型错误
如果 ‘func’ 不可调用,则会引发 TypeError。
- ValueError
如果 ‘allow_other_workers’ 为 True 且 ‘workers’ 为 None,则会引发 ValueError。
参见
Client.map
一次性提交多个参数
注释
当前的任务图解析实现会搜索
key
的出现,并将其替换为相应的Future
结果。如果这些字符串作为任务的参数传递,并且这些字符串与集群上已存在的某些key
匹配,这可能导致不希望的字符串替换。为了避免这种情况,如果手动设置key
,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪此问题的解决进度。示例
>>> c = client.submit(add, a, b)
- subscribe_topic(topic, handler)[源代码]¶
订阅一个主题并在每次接收到事件时执行一个处理程序
- 参数
- topic: str
主题名称
- handler: 可调用对象或协程函数
为每个接收到的消息调用的处理程序。处理程序必须接受一个参数 event,这是一个元组 (timestamp, msg),其中时间戳指的是调度器上的时钟。
参见
dask.distributed.Client.unsubscribe_topic
dask.distributed.Client.get_events
dask.distributed.Client.log_event
示例
>>> import logging >>> logger = logging.getLogger("myLogger") # Log config not shown >>> client.subscribe_topic("topic-name", lambda: logger.info)
- unpublish_dataset(name, **kwargs)[源代码]¶
从调度器中移除命名数据集
- 参数
- 名称str
要取消发布的数据集名称
示例
>>> c.list_datasets() ['my_dataset'] >>> c.unpublish_dataset('my_dataset') >>> c.list_datasets() []
- unregister_scheduler_plugin(name)[源代码]¶
取消注册调度器插件
参见 https://distributed.readthedocs.io/en/latest/plugins.html#scheduler-plugins
- 参数
- 名称str
要取消注册的插件名称。更多信息请参阅
Client.register_scheduler_plugin()
文档字符串。
示例
>>> class MyPlugin(SchedulerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... async def start(self, scheduler: Scheduler) -> None: ... pass ... async def before_close(self) -> None: ... pass ... async def close(self) -> None: ... pass ... def restart(self, scheduler: Scheduler) -> None: ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin, name='foo') >>> client.unregister_scheduler_plugin(name='foo')
- unregister_worker_plugin(name, nanny=None)[源代码]¶
取消注册一个生命周期工作插件
这将注销一个现有的工作插件。作为注销过程的一部分,插件的
teardown
方法将被调用。- 参数
- 名称str
要取消注册的插件名称。更多信息请参阅
Client.register_plugin()
文档字符串。
示例
>>> class MyPlugin(WorkerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, **kwargs): ... pass ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin, name='foo') >>> client.unregister_worker_plugin(name='foo')
- unsubscribe_topic(topic)[源代码]¶
取消订阅一个主题并移除事件处理程序
参见
dask.distributed.Client.subscribe_topic
dask.distributed.Client.get_events
dask.distributed.Client.log_event
- upload_file(filename, load: bool = True)[源代码]¶
将本地包上传到调度器和工作节点
这将一个本地文件上传到调度器和所有工作节点。该文件被放置在每个节点的工作目录中,请参阅配置选项
temporary-directory
(默认为tempfile.gettempdir()
)。此目录将被添加到Python的系统路径中,因此任何
.py
、.egg
或.zip
文件都将可导入。- 参数
- 文件名字符串
要发送给工作者的
.py
、.egg
或.zip
文件的文件名- 加载bool, 可选
是否在上传过程中导入模块。默认为
True
。
示例
>>> client.upload_file('mylibrary.egg') >>> from mylibrary import myfunc >>> L = client.map(myfunc, seq) >>> >>> # Where did that file go? Use `dask_worker.local_directory`. >>> def where_is_mylibrary(dask_worker): >>> path = pathlib.Path(dask_worker.local_directory) / 'mylibrary.egg' >>> assert path.exists() >>> return str(path) >>> >>> client.run(where_is_mylibrary)
- wait_for_workers(n_workers: int, timeout: float | None = None) None [源代码]¶
阻塞调用以等待 n 个工作线程完成后再继续
- 参数
- n_workers整数
工人的数量
- 超时数字,可选
在引发
dask.distributed.TimeoutError
之前的秒数
- who_has(futures=None, **kwargs)[源代码]¶
存储每个未来数据的工人
- 参数
- 未来列表(可选)
未来列表,默认为所有数据
- **kwargsdict
远程函数的可选关键字参数
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> wait([x, y, z]) >>> c.who_has() {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'], 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784'], 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': ['192.168.1.141:46784']}
>>> c.who_has([x, y]) {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'], 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784']}
- write_scheduler_file(scheduler_file)[源代码]¶
将调度器信息写入一个json文件。
这通过文件系统方便了调度器信息的轻松共享。调度器文件可以用于实例化使用相同调度器的第二个客户端。
- 参数
- scheduler_filestr
写入调度器文件的路径。
示例
>>> client = Client() >>> client.write_scheduler_file('scheduler.json') # connect to previous client's scheduler >>> client2 = Client(scheduler_file='scheduler.json')
- class distributed.recreate_tasks.ReplayTaskClient(client)[源代码]¶
一个客户端插件,允许在本地重放远程任务
为给定的客户端添加以下方法:
recreate_error_locally
: 主要用户方法,用于重放失败的任务recreate_task_locally
: 用于重放任何任务的主要用户方法
- recreate_error_locally(future)[源代码]¶
对于失败的计算,在本地执行被指责的任务以进行调试。
如果堆栈跟踪不足以诊断问题,应在未来的操作(
gather
、compute``等的结果)返回“错误”状态后执行此操作。将从调度器中获取负责错误的特定任务(指向未来的图的一部分)及其输入值。然后执行该函数,以便可以使用 ``pdb
进行调试。- 参数
- 未来future 或失败的集合
与提供给
gather
的内容相同,但返回时带有异常/堆栈跟踪。也可以是一个包含任何错误未来的(持久化)dask集合。
- 返回
- 无;函数运行并应引发异常,允许
- 要运行的调试器。
示例
>>> future = c.submit(div, 1, 0) >>> future.status 'error' >>> c.recreate_error_locally(future) ZeroDivisionError: division by zero
如果你在 IPython 中,你可能会利用这个机会使用 pdb
>>> %pdb Automatic pdb calling has been turned ON
>>> c.recreate_error_locally(future) ZeroDivisionError: division by zero 1 def div(x, y): ----> 2 return x / y ipdb>
- recreate_task_locally(future)[源代码]¶
对于任何计算,无论成功与否,都在本地执行任务以进行调试。
此操作应在未来的状态(来自
gather
、compute
等的结果)返回非“pending”状态后执行。你可能想要调试成功完成但返回意外结果的未来情况。常见的调试过程可能包括在调试模式下本地运行任务,使用 pdb.runcall。- 参数
- 未来未来
与提供给
gather
的相同内容。
- 返回
- 任何;将返回任务未来的结果。
示例
>>> import pdb >>> future = c.submit(div, 1, 1) >>> future.status 'finished' >>> pdb.runcall(c.recreate_task_locally, future)
未来¶
- class distributed.Future(key, client=None, state=None, _id=None)[源代码]¶
远程运行的计算
Future 是运行在远程工作节点上的结果的本地代理。用户在本地 Python 进程中管理 future 对象,以决定在大规模集群中发生的事情。
备注
用户不应手动实例化 futures。这可能导致状态损坏和集群死锁。
- 参数
- 键: str, 或 tuple
此未来所引用的远程数据的键
- 客户端: 客户端
应拥有此未来的客户端。默认为 _get_global_client()。
- inform: bool
我们是否通知调度器我们需要更新这个未来
- 状态: 未来状态
未来的状态
参见
Client
创建期货
示例
Future 通常从客户端计算中产生
>>> my_future = client.submit(add, 1, 2)
我们可以跟踪未来的进展和结果。
>>> my_future <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>
我们可以从 future 中获取结果或异常和回溯
>>> my_future.result()
- add_done_callback(fn)[源代码]¶
当未来完成时调用回调
回调
fn
应仅以 future 作为其参数。无论 future 是成功完成、出错还是被取消,都会调用此回调。回调函数在单独的线程中执行。
- 参数
- fn可调用
要调用的方法或函数
- exception(timeout=None, **kwargs)[源代码]¶
返回失败任务的异常
- 参数
- 超时数字,可选
在引发
dask.distributed.TimeoutError
之前的秒数- **kwargsdict
函数的可选关键字参数
- 返回
- 异常
如果在返回之前经过 timeout 秒,则会引发
dask.distributed.TimeoutError
异常。
- property executor¶
返回执行器,即客户端。
- 返回
- 客户端
执行器
- result(timeout=None)[源代码]¶
等待计算完成,将结果收集到本地进程。
- 参数
- 超时数字,可选
在引发
dask.distributed.TimeoutError
之前的秒数
- 返回
- 结果
计算的结果。如果客户端是异步的,则可能是一个协程。
- Raises
- dask.distributed.TimeoutError
如果在返回之前经过了 timeout 秒,则会引发
dask.distributed.TimeoutError
。
- property status¶
返回状态
- 返回
- str
状态
- traceback(timeout=None, **kwargs)[源代码]¶
返回失败任务的回溯信息
这将返回一个回溯对象。你可以使用
traceback
模块检查这个对象。或者,如果你调用future.result()
,这个回溯将伴随引发的异常。- 参数
- 超时数字,可选
如果在 timeout 秒后仍未返回,则会引发
dask.distributed.TimeoutError
。如果在返回之前经过了 timeout 秒,则会引发dask.distributed.TimeoutError
。
- 返回
- traceback
回溯对象。如果客户端是异步的,则返回一个协程。
示例
>>> import traceback >>> tb = future.traceback() >>> traceback.format_tb(tb) [...]
- property type¶
返回类型
同步¶
- class distributed.Event(name=None, client=None)[源代码]¶
分布式集中事件等同于 asyncio.Event
一个事件存储一个标志,该标志在开始时设置为假。该标志可以通过 set() 调用设置为真,或者通过 clear() 调用重新设置为假。每次调用 wait() 都会阻塞,直到事件标志被设置为真。
- 参数
- 名称: 字符串 (可选)
事件的名称。选择相同的名称可以使两个不相连的进程协调一个事件。如果没有给出,将生成一个随机名称。
- client: 客户端 (可选)
用于与调度器通信的客户端。如果未指定,将使用默认的全局客户端。
示例
>>> event_1 = Event('a') >>> event_1.wait(timeout=1) >>> # in another process >>> event_2 = Event('a') >>> event_2.set() >>> # now event_1 will stop waiting
- class distributed.Lock(name=None, client=<object object>, scheduler_rpc=None, loop=None)[源代码]¶
分布式集中锁
警告
这是使用
distributed.Semaphore
作为后端,它容易受到租约超额预订的影响。对于锁来说,这意味着如果租约超时,两个或更多的实例可能会同时获取锁。要禁用租约超时,请将distributed.scheduler.locks.lease-timeout
设置为 inf,例如。with dask.config.set({"distributed.scheduler.locks.lease-timeout": "inf"}): lock = Lock("x") ...
注意,如果没有租约超时,在集群缩减或工作节点故障的情况下,锁可能会死锁。
- 参数
- 名称: 字符串 (可选)
要获取的锁的名称。选择相同的名称可以使两个不相关的进程协调一个锁。如果没有给出,将生成一个随机名称。
- client: 客户端 (可选)
用于与调度器通信的客户端。如果未指定,将使用默认的全局客户端。
示例
>>> lock = Lock('x') >>> lock.acquire(timeout=1) >>> # do things with protected resource >>> lock.release()
- class distributed.MultiLock(names: list[str] | None = None, client: distributed.client.Client | None = None)[源代码]¶
分布式集中锁
- 参数
- 名称
要获取的锁的名称。选择相同的名称可以让两个不相关的进程协调一个锁。
- 客户端
用于与调度器通信的客户端。如果未指定,将使用默认的全局客户端。
示例
>>> lock = MultiLock(['x', 'y']) >>> lock.acquire(timeout=1) >>> # do things with protected resource 'x' and 'y' >>> lock.release()
- acquire(blocking=True, timeout=None, num_locks=None)[源代码]¶
获取锁
- 参数
- 阻塞bool, 可选
如果为假,调度器中完全不等待锁。
- 超时字符串或数字或时间差,可选
在调度器中等待锁的时间(秒)。这不包括本地协程时间、网络传输时间等。当阻塞为假时,禁止指定超时。除了秒数,还可以指定字符串格式的timedelta,例如“200ms”。
- num_locksint, 可选
所需的锁的数量。如果为None,则需要所有的锁。
- 返回
- 是否成功获取锁的判断:真或假
示例
>>> lock = MultiLock(['x', 'y']) >>> lock.acquire(timeout="1s")
- class distributed.Semaphore(max_leases=1, name=None, scheduler_rpc=None, loop=None)[源代码]¶
这个 信号量 将跟踪调度器上的租约,这些租约可以被该类的实例获取和释放。如果已经获取了最大数量的租约,则无法获取更多租约,调用者将等待直到另一个租约被释放。
生命周期或租约通过超时来控制。此超时由该实例的
Client
定期刷新,并在工作节点失败时提供防止死锁或资源耗尽的保护。超时可以通过配置选项distributed.scheduler.locks.lease-timeout
来控制,调度器验证超时的间隔通过选项distributed.scheduler.locks.lease-validation-interval
设置。与Python标准库中的信号量相比,一个显著的区别是这个实现不允许释放的次数多于获取的次数。如果发生这种情况,会发出警告,但内部状态不会被修改。
警告
此实现容易在租约超时的情况下出现租约超订。建议监控日志信息,并根据用户应用程序调整上述配置选项为合适的值。
- 参数
- max_leases: int (可选)
可以同时授予的最大租约数量。这实际上为特定资源的并行访问设置了一个上限。默认为1。
- 名称: 字符串 (可选)
要获取的信号量的名称。选择相同名称可以让两个不相关的进程进行协调。如果未指定,将生成一个随机名称。
- register: bool
如果为 True,将信号量注册到调度器。这需要在获取任何租约之前完成。如果未在初始化期间完成,也可以通过调用此类型的 register 方法来完成。注册时,需要等待完成。
- scheduler_rpc: 连接池
连接到调度器的 ConnectionPool。如果没有提供,则使用工作节点或客户端的池。此参数主要用于测试。
- 循环: IOLoop
此实例正在使用的事件循环。如果提供 None,则重用活动工作线程或客户端的循环。
注释
如果客户端尝试释放信号量但没有获取租约,这将引发异常。
dask 默认情况下假设函数是纯函数,当在这样的函数内部使用信号量获取/释放时,必须注意实际上存在副作用,因此,该函数不能再被视为纯函数。如果不考虑这一点,可能会导致意外行为。
示例
>>> from distributed import Semaphore ... sem = Semaphore(max_leases=2, name='my_database') ... ... def access_resource(s, sem): ... # This automatically acquires a lease from the semaphore (if available) which will be ... # released when leaving the context manager. ... with sem: ... pass ... ... futures = client.map(access_resource, range(10), sem=sem) ... client.gather(futures) ... # Once done, close the semaphore to clean up the state on scheduler side. ... sem.close()
- class distributed.Queue(name=None, client=None, maxsize=0)[源代码]¶
分布式队列
这允许多个客户端通过多生产者/多消费者队列相互共享期货或少量数据。所有元数据都通过调度器进行序列化。
队列的元素必须是 Futures 或 msgpack 可编码的数据(整数、字符串、列表、字典)。所有数据都通过调度器发送,因此最好不要发送大型对象。要共享大型对象,请分散数据并共享 future。
警告
此对象为实验性
- 参数
- 名称: 字符串 (可选)
其他客户端和调度器用来标识队列的名称。如果未提供,将生成一个随机名称。
- client: 客户端 (可选)
用于与调度器通信的客户端。如果未指定,将使用默认的全局客户端。
- maxsize: int (可选)
队列中允许的项目数量。如果为0(默认值),则队列大小无限制。
参见
Variable
客户端之间的共享变量
示例
>>> from dask.distributed import Client, Queue >>> client = Client() >>> queue = Queue('x') >>> future = client.submit(f, x) >>> queue.put(future)
- get(timeout=None, batch=False, **kwargs)[源代码]¶
从队列中获取数据
- 参数
- 超时数字或字符串或时间差,可选
超时前等待的时间(以秒为单位)。除了秒数,还可以指定字符串格式的timedelta,例如“200ms”。
- 批处理布尔值, 整数 (可选)
如果为 True,则返回队列中当前等待的所有元素。如果为整数,则返回队列中的那么多元素。如果为 False(默认),则一次返回一个项目。
- class distributed.Variable(name=None, client=None)[源代码]¶
分布式全局变量
这允许多个客户端通过一个可变的变量相互共享期货和数据。所有元数据都通过调度器进行序列化。可能会发生竞态条件。
值必须是 Futures 或 msgpack 可编码的数据(整数、列表、字符串等)。所有数据都将被保留并通过调度器发送,因此最好不要发送太多数据。如果你想共享大量数据,那么请使用
scatter
并共享 future 对象。- 参数
- 名称: 字符串 (可选)
其他客户端和调度器用来识别变量的名称。如果未提供,将生成一个随机名称。
- client: 客户端 (可选)
用于与调度器通信的客户端。如果未指定,将使用默认的全局客户端。
参见
Queue
客户端之间的共享多生产者/多消费者队列
示例
>>> from dask.distributed import Client, Variable >>> client = Client() >>> x = Variable('x') >>> x.set(123) # docttest: +SKIP >>> x.get() # docttest: +SKIP 123 >>> future = client.submit(f, x) >>> x.set(future)
集群¶
与集群创建和管理相关的类。其他库(如 dask-jobqueue、dask-gateway、dask-kubernetes、dask-yarn 等)提供了额外的集群对象。
|
创建本地调度器和工作节点 |
|
需要完全指定工作节点的集群 |
- class distributed.LocalCluster(name=None, n_workers=None, threads_per_worker=None, processes=None, loop=None, start=None, host=None, ip=None, scheduler_port=0, silence_logs=30, dashboard_address=':8787', worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, interface=None, worker_class=None, scheduler_kwargs=None, scheduler_sync_interval=1, **worker_kwargs)[源代码]¶
创建本地调度器和工作节点
这会在本地机器上创建一个调度器和工作节点的“集群”。
- 参数
- n_workers: int
启动的工作者数量
- memory_limit: str, float, int, 或 None, 默认 “auto”
设置每个工作者的内存限制。
关于参数数据类型的说明:
如果为 None 或 0,则不应用限制。
如果为“auto”,系统总内存将在工作进程之间平均分配。
如果是浮点数,则系统内存的该比例将用于 每个工作进程。
如果给定一个表示字节数的字符串(如
"1GiB"
),则该数量将 每个工作进程 使用。如果是整数,则每个工作线程使用该数量的字节。
请注意,限制仅在
processes=True
时执行,并且该限制仅基于最大努力原则执行——工作者仍有可能超过此限制。- processes: bool
是否使用进程(True)或线程(False)。默认为True,除非worker_class=Worker,在这种情况下默认为False。
- threads_per_worker: int
每个工作者的线程数
- scheduler_port: int
调度器的端口。使用 0 选择一个随机端口(默认)。8786 是一个常见的选择。
- silence_logs: 日志级别
要输出到标准输出的日志级别。默认为
logging.WARN
。使用 False 或 None 等假值表示不更改。- host: 字符串
调度器将监听的主机地址,默认为仅限本地主机
- ip: 字符串
已弃用。请参见上面的
host
。- dashboard_address: str
监听 Bokeh 诊断服务器的地址,如 ‘localhost:8787’ 或 ‘0.0.0.0:8787’。默认为 ‘:8787’。设置为
None
以禁用仪表板。使用 ‘:0’ 表示随机端口。当仅指定端口如 ‘:8787’ 时,仪表板将绑定到host
参数中的给定接口。如果host
为空,绑定将发生在所有接口 ‘0.0.0.0’ 上。为避免本地部署时的防火墙问题,请将host
设置为 ‘localhost’。- worker_dashboard_address: str
监听 Bokeh 工作线程诊断服务器地址,如 ‘localhost:8787’ 或 ‘0.0.0.0:8787’。默认为 None,即禁用仪表盘。使用 ‘:0’ 表示随机端口。
- 诊断端口: int
已弃用。请参阅 dashboard_address。
- 异步: bool (默认值为 False)
如果在异步/等待函数或Tornado gen.coroutines中使用此集群,请设置为True。正常使用时应保持为False。
- blocked_handlers: List[str]
指定调度器上不允许的处理器块列表,例如
['feed', 'run_function']
- service_kwargs: Dict[str, Dict]
传递给运行服务的额外关键字
- 安全Security 或 bool, 可选
配置此集群中的通信安全。可以是安全对象,或为 True。如果为 True,将自动创建临时自签名凭据。
- 协议: str (可选)
使用的协议,如
tcp://
、tls://
、inproc://
。默认会根据其他关键字参数如processes
和security
做出合理选择。- 接口: str (可选)
要使用的网络接口。默认为 lo/localhost。
- worker_class: Worker
用于实例化工作者的类。默认为如果 processes=False 则使用 Worker,如果 processes=True 或省略则使用 Nanny。
- **worker_kwargs:
额外的工作者参数。任何额外的关键字参数都将传递给
Worker
类构造函数。
示例
>>> cluster = LocalCluster() # Create a local cluster >>> cluster LocalCluster("127.0.0.1:8786", workers=8, threads=8)
>>> c = Client(cluster) # connect to local cluster
将集群扩展到三个工作节点
>>> cluster.scale(3)
将额外的关键字参数传递给 Bokeh
>>> LocalCluster(service_kwargs={'dashboard': {'prefix': '/foo'}})
- class distributed.SpecCluster(workers=None, scheduler=None, worker=None, asynchronous=False, loop=None, security=None, silence_logs=False, name=None, shutdown_on_close=True, scheduler_sync_interval=1)[源代码]¶
需要完全指定工作节点的集群
SpecCluster 类期望使用完整的调度器和工作节点规范。它移除了任何对用户输入的处理(如线程与进程、核心数量等)以及任何对集群资源管理器(如pod、作业等)的处理。相反,它期望这些信息通过调度器和工作节点规范传递。此类处理了所有关于在正确时间异步干净地设置和拆除事物的逻辑。希望它可以作为其他更以用户为中心的类的基类。
- 参数
- workers: dict
一个将名称映射到工作类及其规范的字典。请参见下面的示例。
- 调度器: dict, 可选
调度器的类似映射
- worker: dict
单个工作者的规范。这用于任何新创建的工作者。
- 异步: bool
如果这是要在事件循环中直接使用 async/await
- silence_logs: bool
在设置集群时是否应该静默日志记录。
- 名称: str, 可选
打印集群时使用的名称,默认为类型名称
示例
要创建一个 SpecCluster,你需要指定如何设置调度器和工作节点。
>>> from dask.distributed import Scheduler, Worker, Nanny >>> scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}} >>> workers = { ... 'my-worker': {"cls": Worker, "options": {"nthreads": 1}}, ... 'my-nanny': {"cls": Nanny, "options": {"nthreads": 2}}, ... } >>> cluster = SpecCluster(scheduler=scheduler, workers=workers)
工作器规范存储为
.worker_spec
属性>>> cluster.worker_spec { 'my-worker': {"cls": Worker, "options": {"nthreads": 1}}, 'my-nanny': {"cls": Nanny, "options": {"nthreads": 2}}, }
虽然此规范的实例化存储在
.workers
属性中>>> cluster.workers { 'my-worker': <Worker ...> 'my-nanny': <Nanny ...> }
如果规范发生变化,我们可以等待集群或调用
._correct_state
方法以使实际状态与指定状态一致。我们也可以
.scale(...)
集群,这将添加给定形式的新工作节点。>>> worker = {'cls': Worker, 'options': {}} >>> cluster = SpecCluster(scheduler=scheduler, worker=worker) >>> cluster.worker_spec {}
>>> cluster.scale(3) >>> cluster.worker_spec { 0: {'cls': Worker, 'options': {}}, 1: {'cls': Worker, 'options': {}}, 2: {'cls': Worker, 'options': {}}, }
请注意,上面我们使用的是标准的
Worker
和Nanny
类,但在实际操作中,可以使用其他处理资源管理的类,如KubernetesPod
或SLURMJob
。规范不需要符合标准 Dask Worker 类的预期。它只需要使用提供的选项调用,支持__await__
和close
方法以及worker_address
属性。另请注意,规范的统一性不是必需的。其他 API 可以在外部(在子类中)添加,将不同规范的工人添加到同一个字典中。
如果在规范中的一个条目将生成多个dask工作线程,请在规范中提供一个`”group”`元素,该元素包含将添加到每个名称的后缀(这应由您的工作线程类处理)。
>>> cluster.worker_spec { 0: {"cls": MultiWorker, "options": {"processes": 3}, "group": ["-0", "-1", -2"]} 1: {"cls": MultiWorker, "options": {"processes": 2}, "group": ["-0", "-1"]} }
这些后缀应与工人在部署时使用的名称相对应。
>>> [ws.name for ws in cluster.scheduler.workers.values()] ["0-0", "0-1", "0-2", "1-0", "1-1"]
- adapt(Adaptive: type[distributed.deploy.adaptive.Adaptive] = <class 'distributed.deploy.adaptive.Adaptive'>, minimum: float = 0, maximum: float = inf, minimum_cores: int | None = None, maximum_cores: int | None = None, minimum_memory: str | None = None, maximum_memory: str | None = None, **kwargs: typing.Any) distributed.deploy.adaptive.Adaptive [源代码]¶
开启自适应
此功能根据调度器活动自动扩展Dask集群。
- 参数
- 最小整数
最小工作线程数
- 最大值整数
最大工作线程数
- minimum_cores整数
集群中保持的最小核心/线程数
- maximum_cores整数
集群中保留的最大核心/线程数
- 最小内存str
集群中保持的最小内存量 以“100 GiB”这样的字符串形式表示
- 最大内存str
集群中保留的最大内存量 以“100 GiB”这样的字符串形式表示
参见
dask.distributed.Adaptive
更多关键字参数
示例
>>> cluster.adapt(minimum=0, maximum_memory="100 GiB", interval='500ms')
- classmethod from_name(name: str) distributed.deploy.spec.ProcessInterface [源代码]¶
通过名称创建该类的一个实例来表示一个现有的集群。
- scale(n=0, memory=None, cores=None)[源代码]¶
将集群扩展到 n 个工作节点
- 参数
- n整数
目标工人数
示例
>>> cluster.scale(10) # scale cluster to ten workers
- scale_up(n=0, memory=None, cores=None)¶
将集群扩展到 n 个工作节点
- 参数
- n整数
目标工人数
示例
>>> cluster.scale(10) # scale cluster to ten workers
其他¶
- class distributed.as_completed(futures=None, loop=None, with_results=False, raise_errors=True, *, timeout=None)[源代码]¶
按完成顺序返回期货
这将返回一个迭代器,按完成顺序生成输入的未来对象。在迭代器上调用
next
将阻塞,直到下一个未来对象完成,无论顺序如何。此外,您还可以在计算过程中使用
.add
方法向此对象添加更多功能- 参数
- futures: 未来集合
一个 Future 对象的列表,按照它们完成的顺序进行迭代
- with_results: bool (False)
是否等待并包含未来的结果;在这种情况下,
as_completed
产生一个 (future, result) 的元组- raise_errors: bool (True)
当未来的结果引发异常时,我们是否应该抛出异常;仅在
with_results=True
时影响行为。- timeout: int (可选)
如果调用
__next__()
或__anext__()
并且结果在从原始调用as_completed()
起 timeout 秒后仍不可用,返回的迭代器将引发dask.distributed.TimeoutError
。如果未指定 timeout 或为None
,则等待时间没有限制。
示例
>>> x, y, z = client.map(inc, [1, 2, 3]) >>> for future in as_completed([x, y, z]): ... print(future.result()) 3 2 4
在计算过程中添加更多功能
>>> x, y, z = client.map(inc, [1, 2, 3]) >>> ac = as_completed([x, y, z]) >>> for future in ac: ... print(future.result()) ... if random.random() < 0.5: ... ac.add(c.submit(double, future)) 4 2 8 3 6 12 24
可选地等待结果也被收集完毕
>>> ac = as_completed([x, y, z], with_results=True) >>> for future, result in ac: ... print(result) 2 4 3
- batches()[源代码]¶
一次性产生所有已完成的future,而不是一个接一个地产生
这将返回一个未来列表或(未来,结果)元组列表的迭代器,而不是单个未来或单个(未来,结果)元组。它将尽快生成这些结果,而不会等待。
示例
>>> for batch in as_completed(futures).batches(): ... results = client.gather(batch) ... print(results) [4, 2] [1, 3, 7] [5] [6]
- distributed.diagnostics.progressbar.progress(*futures, notebook=None, multi=True, complete=True, group_by='prefix', **kwargs)[源代码]¶
跟踪期货的进展
这在笔记本和控制台中的操作方式不同
Notebook: 这会立即返回,屏幕上留下一个 IPython 小部件
控制台:此操作会一直阻塞,直到计算完成
- 参数
- 未来未来
要跟踪的未来列表或键
- 笔记本bool (可选)
是否在笔记本中运行(默认为猜测)
- 多bool (可选)
独立跟踪不同的功能(默认为True)
- 完成bool (可选)
跟踪所有键(True)或仅跟踪尚未运行的键(False)(默认为True)
- 按组可调用对象 | 字面量[“spans”] | 字面量[“prefix”]
使用跨度而不是任务键名来分组任务(默认为“前缀”)
注释
在笔记本中,progress 的输出必须是单元格中的最后一条语句。通常,这意味着在单元格的末尾调用 progress。
示例
>>> progress(futures) [########################################] | 100% Completed | 1.7s
- distributed.wait(fs, timeout=None, return_when='ALL_COMPLETED')[源代码]¶
等待所有/任意未来完成
- 参数
- fsList[Future]
- 超时数字, 字符串, 可选
在引发
dask.distributed.TimeoutError
之前的时间。可以是像"10 分钟"
这样的字符串,或者是等待的秒数。- return_whenstr, 可选
ALL_COMPLETED 或 FIRST_COMPLETED 之一
- 返回
- 已完成、未完成的命名元组
- distributed.fire_and_forget(obj)[源代码]¶
即使我们释放了未来,也要至少运行一次任务
在正常操作下,Dask 不会运行没有活动 future 的任务(这在许多情况下避免了不必要的工作)。然而,有时你只想启动一个任务,不跟踪它的 future,并期望它最终完成。你可以对 future 或 future 集合使用此函数,请求 Dask 即使没有活动客户端跟踪也要完成任务。
任务完成后,结果将不会保存在内存中(除非有活跃的未来),因此这仅对依赖副作用的任务有用。
- 参数
- obj未来, 列表, 字典, dask 集合
你希望至少运行一次的未来
示例
>>> fire_and_forget(client.submit(func, *args))
- distributed.futures_of(o, client=None)[源代码]¶
集合中的未来对象
- 参数
- o集合
一个可能是嵌套的Dask对象集合
- 客户端客户端,可选
客户端
- 返回
- 未来List[Future]
这些集合持有的未来列表
- Raises
- CancelledError
如果其中一个 future 被取消,则会引发 CancelledError
示例
>>> futures_of(my_dask_dataframe) [<Future: finished key: ...>, <Future: pending key: ...>]
- distributed.worker_client(timeout=None, separate_thread=True)[源代码]¶
获取此线程的客户端
这个上下文管理器旨在在我们在工作节点上运行的函数中调用。当作为上下文管理器运行时,它会提供一个
Client
对象,该对象可以直接从该工作节点提交其他任务。- 参数
- 超时数字或字符串
超时后将出错。默认为
distributed.comm.timeouts.connect
配置值。- 单独线程bool, 可选
是否在正常线程池之外运行此函数,默认为 True
示例
>>> def func(x): ... with worker_client() as c: # connect from worker back to scheduler ... a = c.submit(inc, x) # this task can submit more tasks ... b = c.submit(dec, x) ... result = c.gather([a, b]) # and gather results ... return result
>>> future = client.submit(func, 1) # submit func(1) on cluster
- distributed.get_worker() distributed.worker.Worker [源代码]¶
获取当前正在运行此任务的工作者
示例
>>> def f(): ... worker = get_worker() # The worker on which this task is running ... return worker.address
>>> future = client.submit(f) >>> future.result() 'tcp://127.0.0.1:47373'
- distributed.get_client(address=None, timeout=None, resolve_address=True) Client [源代码]¶
在任务中获取客户端。
此客户端连接到与工作节点相同的调度器
- 参数
- 地址str, 可选
要连接的调度器的地址。默认为工作进程连接的调度器。
- 超时int 或 str
获取客户端的超时时间(以秒为单位)。默认为
distributed.comm.timeouts.connect
配置值。- resolve_addressbool, 默认 True
是否将 address 解析为其规范形式。
- 返回
- 客户端
示例
>>> def f(): ... client = get_client(timeout="10s") ... futures = client.map(lambda x: x + 1, range(10)) # spawn many tasks ... results = client.gather(futures) ... return sum(results)
>>> future = client.submit(f) >>> future.result() 55
- distributed.secede()[源代码]¶
让此任务从工作线程池中分离
这为新任务打开了一个新的调度槽和一个新的线程。这使得客户端可以在这个节点上调度任务,这在等待其他作业完成时特别有用(例如,使用
client.gather
)。示例
>>> def mytask(x): ... # do some work ... client = get_client() ... futures = client.map(...) # do some remote work ... secede() # while that work happens, remove ourself from the pool ... return client.gather(futures) # return gathered results
- distributed.rejoin()[源代码]¶
让这个线程重新加入 ThreadPoolExecutor
这将阻塞,直到执行器中有一个新的槽位打开。下一个完成任务的线程将离开池子,以允许这个线程加入。
参见
secede
离开线程池
- distributed.print(*args, sep: str | None = ' ', end: str | None = '\n', file: TextIO | None = None, flush: bool = False) None [源代码]¶
一个用于从工作节点向客户端进行远程打印的内置
print
函数的替代品。如果在 dask 工作节点之外调用,其参数将直接传递给builtins.print()
。如果由在工作节点上运行的代码调用,除了本地打印外,任何连接到管理此工作节点的调度器的客户端(可能远程)都将收到一个事件,指示它们将相同的输出打印到它们自己的标准输出或标准错误流中。例如,用户可以通过在提交的代码中包含对此print
函数的调用来执行简单的远程计算调试,并在本地 Jupyter 笔记本或解释器会话中检查输出。所有参数的行为与
builtins.print()
的参数相同,但file
关键字参数除外,如果指定,必须是sys.stdout
或sys.stderr
;不允许使用任意类似文件的对象。所有非关键字参数都使用
str()
转换为字符串并写入流中,由sep
分隔并在end
后跟随。sep
和end
都必须是字符串;它们也可以是None
,这意味着使用默认值。如果没有给出对象,print()
将只写入end
。- 参数
- sepstr, 可选
插入在值之间的字符串,默认是一个空格。
- 结束str, 可选
在最后一个值之后附加的字符串,默认为换行符。
- 文件 :
sys.stdout
或sys.stderr
,可选sys.stdout 或 sys.stderr,可选 默认为当前的 sys.stdout。
- flushbool, 默认 False
是否强制刷新流。
示例
>>> from dask.distributed import Client, print >>> client = distributed.Client(...) >>> def worker_function(): ... print("Hello from worker!") >>> client.submit(worker_function) <Future: finished, type: NoneType, key: worker_function-...> Hello from worker!
- distributed.warn(message: str | Warning, category: type[Warning] | None = <class 'UserWarning'>, stacklevel: int = 1, source: typing.Any = None) None [源代码]¶
一个内置
warnings.warn()
函数的即插即用替代品,用于从工作线程向客户端远程发出警告。如果从 dask 工作线程外部调用,其参数将直接传递给
warnings.warn()
。如果由在工作线程上运行的代码调用,除了在本地发出警告外,任何连接(可能远程)到管理此工作线程的调度程序的客户端都将收到一个事件,指示它们发出相同的警告(受其本地过滤器等的影响)。在实现可能在工作线程上运行的计算时,用户可以调用此warn
函数,以确保任何远程客户端会话将看到他们的警告,例如在 Jupyter 输出单元格中。虽然所有参数都被本地发出的警告所尊重(与
warnings.warn()
中的含义相同),但stacklevel
和source
被客户端忽略,因为它们在客户端的线程中没有意义。示例
>>> from dask.distributed import Client, warn >>> client = Client() >>> def do_warn(): ... warn("A warning from a worker.") >>> client.submit(do_warn).result() /path/to/distributed/client.py:678: UserWarning: A warning from a worker.
- class distributed.Reschedule[源代码]¶
重新安排此任务
引发此异常将停止当前任务的执行,并请求调度器重新调度此任务,可能会在不同的机器上执行。
这并不能保证任务会转移到不同的机器上。调度器将通过其正常的启发式方法来确定接受此任务的最佳机器。如果在首次调度任务后集群的负载发生了显著变化,机器很可能会改变。
- class distributed.get_task_stream(client=None, plot=False, filename='task-stream.html')[源代码]¶
在上下文块中收集任务流
这提供了在此块活动期间运行的每个任务的诊断信息。
这必须作为上下文管理器使用。
- 参数
- plot: boolean, str
如果为真,则还返回一个 Bokeh 图形。如果 plot == ‘save’,则将图形保存到文件中。
- filename: str (可选)
如果你设置
plot='save'
,保存的文件名
参见
Client.get_task_stream
此上下文管理器的函数版本
示例
>>> with get_task_stream() as ts: ... x.compute() >>> ts.data [...]
获取一个 Bokeh 图形,并可选择保存到文件
>>> with get_task_stream(plot='save', filename='task-stream.html') as ts: ... x.compute() >>> ts.figure <Bokeh Figure>
要与他人共享此文件,您可能希望将其上传并在网上提供。一种常见的方法是将其作为gist上传,然后在 https://raw.githack.com 上提供服务:
$ python -m pip install gist $ gist task-stream.html https://gist.github.com/8a5b3c74b10b413f612bb5e250856ceb
然后你可以导航到该站点,点击
task-stream.html
文件右侧的“Raw”按钮,然后将该URL提供给 https://raw.githack.com 。这个过程应该会提供一个可分享的链接,其他人可以使用该链接查看你的任务流图。
- class distributed.get_task_metadata[源代码]¶
在上下文块中收集任务元数据
这收集了在此上下文管理器范围内提交并完成的任务的
TaskState
元数据和最终状态。示例
>>> with get_task_metadata() as tasks: ... x.compute() >>> tasks.metadata {...} >>> tasks.state {...}
- class distributed.performance_report(filename='dask-report.html', stacklevel=1, mode=None, storage_options=None)[源代码]¶
收集性能报告
这将创建一个静态HTML文件,其中包含仪表板中许多相同的图表,以便稍后查看。
生成的文件使用JavaScript,因此必须使用网络浏览器查看。在本地,我们推荐使用
python -m http.server
或在线托管文件。- 参数
- filename: str, 可选
要保存性能报告的本地文件名
- stacklevel: int, 可选
用于填充报告的 Calling Code 部分的代码执行帧。默认为 1,即调用
performance_report
的帧。- mode: str, 可选
传递给
bokeh.io.output.output_file()
的模式参数。默认为None
。- storage_options: dict, 可选
在写入URL时,传递给
fsspec.open()
的任何额外参数。
示例
>>> with performance_report(filename="myfile.html", stacklevel=1): ... x.compute()
实用工具¶
- distributed.diagnostics.memray.memray_scheduler(directory: str | pathlib.Path = 'memray-profiles', report_args: Union[collections.abc.Sequence[str], Literal[False]] = ('flamegraph', '--temporal', '--leaks'), **memray_kwargs: Any) collections.abc.Iterator[None] [源代码]¶
在调度器上生成一个 Memray 配置文件并下载生成的报告。
示例:
with memray_scheduler(): client.submit(my_function).result() # Or even while the computation is already running fut = client.submit(my_function) with memray_scheduler(): time.sleep(10) fut.result()
- 参数
- 目录str
保存报告的目录。
- report_argstuple[str]
特别是对于 native_traces=True,报告必须在生成配置文件的同一主机上使用相同的 Python 解释器生成。否则,本地追踪将产生不可用的结果。因此,我们在调度器上生成报告,然后下载它们。您可以通过提供额外的参数来修改报告生成,我们将按如下方式生成报告:
memray *report_args -f <filename> -o <filename>.html
如果应该获取原始数据而不是报告,请将此设置为 False。
- **memray_kwargs
传递给 memray.Tracker 的关键字参数,例如 {“native_traces”: True}
- distributed.diagnostics.memray.memray_workers(directory: str | pathlib.Path = 'memray-profiles', workers: int | None | list[str] = None, report_args: Union[collections.abc.Sequence[str], Literal[False]] = ('flamegraph', '--temporal', '--leaks'), fetch_reports_parallel: bool | int = True, **memray_kwargs: Any) collections.abc.Iterator[None] [源代码]¶
在workers上生成一个Memray配置文件并下载生成的报告。
示例:
with memray_workers(): client.submit(my_function).result() # Or even while the computation is already running fut = client.submit(my_function) with memray_workers(): time.sleep(10) fut.result()
- 参数
- 目录str
保存报告的目录。
- 工人int | None | list[str]
要分析的工作者。如果是整数,将使用前 n 个工作者。如果是 None,将使用所有工作者。如果是 list[str],将使用具有给定地址的工作者。
- report_argstuple[str]
特别是对于 native_traces=True,报告必须在生成配置文件的同一主机上使用相同的 Python 解释器生成。否则,本地跟踪将产生不可用的结果。因此,我们将在工作节点上生成报告,然后下载它们。您可以通过提供额外的参数来修改报告生成,我们将按如下方式生成报告:
memray *report_args -f <filename> -o <filename>.html
如果应该获取原始数据而不是报告,请将此设置为 False。
- fetch_reports_parallel布尔 | 整数
获取结果有时会很慢,有时不希望在收到第一个报告之前等待所有工作线程完成。这控制了同时获取的工作线程数量。
int: 并发获取的工人数量 True: 所有工人同时 False: 一次一个工人
- **memray_kwargs
传递给 memray.Tracker 的关键字参数,例如 {“native_traces”: True}
自适应¶
- class distributed.deploy.Adaptive(cluster: Cluster, interval: str | float | timedelta | None = None, minimum: int | None = None, maximum: int | float | None = None, wait_count: int | None = None, target_duration: str | float | timedelta | None = None, worker_key: Callable[[WorkerState], Hashable] | None = None, **kwargs: Any)[源代码]¶
根据调度器负载自适应分配工作者。这是一个超类。
包含根据当前使用情况动态调整 Dask 集群大小的逻辑。此类需要与能够使用集群资源管理器创建和销毁 Dask 工作者的系统配对。通常它被内置于已有的解决方案中,而不是直接由用户使用。它最常从各种 Dask 集群类的
.adapt(...)
方法中使用。- 参数
- 集群: 对象
必须有 scale 和 scale_down 方法/协程
- 间隔timedelta 或 str, 默认值为 “1000 ms”
检查之间的毫秒数
- wait_count: int, default 3
在移除一个worker之前,应该建议移除它的连续次数。
- target_duration: timedelta 或 str, 默认 “5s”
我们希望计算所需的时间。这会影响我们扩展的积极性。
- worker_key: Callable[WorkerState]
在缩减规模时将工作者分组的功能 更多信息请参见 Scheduler.workers_to_close
- 最小值: int
保留的最少工作线程数
- 最大值: int
最大工作线程数
- **kwargs:
传递给 Scheduler 的额外参数。workers_to_close
注释
子类可以重写
Adaptive.target()
和Adaptive.workers_to_close()
来控制集群何时应该调整大小。默认实现检查每个工作者的任务是否过多或可用内存是否过少(参见distributed.Scheduler.adaptive_target()
)。间隔、最小值、最大值、等待计数和目标持续时间的值可以在 dask 配置中的 distributed.adaptive 键下指定。示例
这通常从现有的 Dask 类中使用,例如 KubeCluster
>>> from dask_kubernetes import KubeCluster >>> cluster = KubeCluster() >>> cluster.adapt(minimum=10, maximum=100)
或者,您可以通过从 Dask 的 Cluster 超类继承来在自己的 Cluster 类中使用它。
>>> from distributed.deploy import Cluster >>> class MyCluster(Cluster): ... def scale_up(self, n): ... """ Bring worker count up to n """ ... def scale_down(self, workers): ... """ Remove worker addresses from cluster """
>>> cluster = MyCluster() >>> cluster.adapt(minimum=10, maximum=100)
- property loop: tornado.ioloop.IOLoop¶
覆盖 Adaptive.loop
- state: AdaptiveStateState¶
这种自适应策略是否定期调整