Python API (高级)

在某些罕见情况下,专家可能希望在 Python 中显式创建 SchedulerWorkerNanny 对象。这在制作自动部署 Dask 到自定义环境的工具时通常是必要的。

更常见的是创建一个 在单机上使用 Client() 的本地集群 或使用 命令行界面 (CLI)。新读者建议从那里开始。

如果你确实想要自己启动 Scheduler 和 Worker 对象,你应该对 async/await 风格的 Python 语法有一些了解。这些对象是可等待的,并且通常在 async with 上下文管理器中使用。以下是一些示例,展示了几种启动和完成任务的方法。

完整示例

Scheduler([loop, services, service_kwargs, ...])

动态分布式任务调度器

Worker([scheduler_ip, scheduler_port, ...])

Dask 分布式集群中的工作节点

Client([address, loop, timeout, ...])

连接到 Dask 集群并提交计算

我们首先从一个综合示例开始,设置一个调度器、两个工作者和一个客户端在同一个事件循环中,运行一个简单的计算,然后清理所有内容。

import asyncio
from dask.distributed import Scheduler, Worker, Client

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w1, Worker(s.address) as w2:
            async with Client(s.address, asynchronous=True) as client:
                future = client.submit(lambda x: x + 1, 10)
                result = await future
                print(result)

asyncio.get_event_loop().run_until_complete(f())

现在我们来看一些更简单的例子,这些例子逐步构建到这种情况。

调度器

Scheduler([loop, services, service_kwargs, ...])

动态分布式任务调度器

我们通过创建一个 Scheduler() 对象来创建调度器,然后 await 该对象以等待其启动。之后,我们可以等待 .finished 方法,直到它关闭。在此期间,调度器将处于活动状态,管理集群。

import asyncio
from dask.distributed import Scheduler, Worker

async def f():
    s = Scheduler()        # scheduler created, but not yet running
    s = await s            # the scheduler is running
    await s.finished()     # wait until the scheduler closes

asyncio.get_event_loop().run_until_complete(f())

这个程序将一直运行,直到某个外部进程连接到调度器并告诉它停止。如果你想自己关闭程序,可以通过等待 .close 方法来关闭任何 SchedulerWorkerNannyClient 类:

await s.close()

工人

Worker([scheduler_ip, scheduler_port, ...])

Dask 分布式集群中的工作节点

worker 遵循相同的 API。唯一的区别是 worker 需要知道调度器的地址。

import asyncio
from dask.distributed import Scheduler, Worker

async def f(scheduler_address):
    w = await Worker(scheduler_address)
    await w.finished()

asyncio.get_event_loop().run_until_complete(f("tcp://127.0.0.1:8786"))

在单个事件循环中启动多个

Scheduler([loop, services, service_kwargs, ...])

动态分布式任务调度器

Worker([scheduler_ip, scheduler_port, ...])

Dask 分布式集群中的工作节点

我们可以在同一个事件循环中运行任意数量的这些对象。

import asyncio
from dask.distributed import Scheduler, Worker

async def f():
    s = await Scheduler()
    w = await Worker(s.address)
    await w.finished()
    await s.finished()

asyncio.get_event_loop().run_until_complete(f())

使用上下文管理器

我们也可以使用 async with 上下文管理器来确保我们正确清理。以下是与上面相同的示例:

import asyncio
from dask.distributed import Scheduler, Worker

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w:
            await w.finished()
            await s.finished()

asyncio.get_event_loop().run_until_complete(f())

另外,在下面的例子中,我们还包含了一个 Client ,运行了一个小的计算,然后允许在计算后进行清理。

import asyncio
from dask.distributed import Scheduler, Worker, Client

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w1, Worker(s.address) as w2:
            async with Client(s.address, asynchronous=True) as client:
                future = client.submit(lambda x: x + 1, 10)
                result = await future
                print(result)

asyncio.get_event_loop().run_until_complete(f())

这相当于创建并 await 每个服务器,然后在离开上下文时对每个服务器调用 .close。在这个例子中,我们没有等待 s.finished(),因此这将相对较快地终止。如果你想让它永远运行,你可以调用 await s.finished()

保姆

Nanny([scheduler_ip, scheduler_port, ...])

管理工作进程的流程

另外,如果我们希望你的工作者在单独的进程中被管理,我们可以用 Nanny 替换 WorkerNanny 构造函数遵循相同的 API。这允许工作者在失败时自行重启。此外,它提供了一些额外的监控功能,并且在协调许多应该生活在不同进程中的工作者以避免 GIL 时非常有用。

# w = await Worker(s.address)
w = await Nanny(s.address)

API

这些类有多种关键字参数,您可以使用它们来控制其行为。有关更多信息,请参阅下面的API文档。

调度器

class distributed.Scheduler(loop=None, services=None, service_kwargs=None, allowed_failures=None, extensions=None, validate=None, scheduler_file=None, security=None, worker_ttl=None, idle_timeout=None, interface=None, host=None, port=0, protocol=None, dashboard_address=None, dashboard=None, http_prefix='/', preload=None, preload_argv=(), plugins=(), contact_address=None, transition_counter_max=False, jupyter=False, **kwargs)[源代码]

动态分布式任务调度器

调度器跟踪工作者、数据和计算的当前状态。调度器监听事件,并通过适当控制工作者来响应。它不断尝试利用工作者来执行一个不断增长的dask图。

所有事件都能快速处理,时间复杂度与输入成线性关系(输入通常是常量大小),并且通常在毫秒内完成。为了实现这一点,调度器跟踪大量状态。每个操作都维护此状态的一致性。

调度器通过 Comm 对象与外界通信。即使在同时监听多个客户端时,它也能保持对世界的一致且有效的视图。

调度器通常通过 dask scheduler 可执行文件启动:

$ dask scheduler
Scheduler started at 127.0.0.1:8786

或者在 LocalCluster 中,Client 在没有连接信息的情况下启动:

>>> c = Client()  
>>> c.cluster.scheduler  
Scheduler(...)

用户通常不直接与调度器交互,而是通过客户端对象 Client 进行交互。

contact_address 参数允许向工作节点通告一个特定的地址,用于与调度器的通信,该地址与调度器绑定的地址不同。当调度器监听一个私有地址时,这非常有用,因为工作节点无法使用该私有地址来联系调度器。

状态

调度器包含以下状态变量。每个变量都列出了它存储的内容和简要描述。

  • 任务: {任务键: 任务状态}

    调度器当前已知的任务

  • 不可运行: {TaskState}

    处于“无工作者”状态的任务

  • 工人: {工人键: 工人状态}

    当前连接到调度器的工人

  • 空闲: {WorkerState}:

    未充分利用的工作者集合

  • 饱和状态: {WorkerState}:

    未过度利用的工作者集合

  • host_info: {hostname: dict}:

    每个工作主机的信息

  • 客户端: {客户端键: 客户端状态}

    当前连接到调度器的客户端

  • 服务: {str: 端口}:

    此调度器上运行的其他服务,如 Bokeh

  • 循环: IOLoop:

    正在运行的 Tornado IOLoop

  • client_comms: {客户端密钥: 通信}

    对于每个客户端,使用一个 Comm 对象来接收任务请求并报告任务状态更新。

  • stream_comms: {worker key: Comm}

    对于每个工作者,我们从一个Comm对象中接受刺激并报告结果。

  • 任务持续时间: {key-prefix: time}

    我们预期某些函数所需的时间,例如 {'sum': 0.25}

adaptive_target(target_duration=None)[源代码]

根据当前工作负载所需的工人数

这会查看当前运行的任务和内存使用情况,并返回所需的工作者数量。这通常用于自适应调度。

参数
目标持续时间str

计算所需的时间长度。这会影响调度器请求扩展的速度。

async add_client(comm: distributed.comm.core.Comm, client: str, versions: dict[str, Any]) None[源代码]

将客户端添加到网络

我们接收来自此通信的所有未来消息。

add_keys(worker: str, keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]] = (), stimulus_id: str | None = None) Literal['OK', 'not found'][源代码]

了解一个工人拥有某些钥匙

这在实践中不应使用,主要是出于历史原因而存在。然而,工作人员偶尔会发送它。

add_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs: Any) None[源代码]

将外部插件添加到调度器。

请参阅 https://distributed.readthedocs.io/en/latest/plugins.html

参数
插件SchedulerPlugin

SchedulerPlugin 实例以添加

幂等布尔

如果为真,则假定插件已经存在,不会采取任何行动。

名称str

插件的名称,如果为 None,则检查 Plugin 实例上的 name 属性,并在未发现时生成。

async add_worker(comm: distributed.comm.core.Comm, *, address: str, status: str, server_id: str, nthreads: int, name: str, resolve_address: bool = True, now: float, resources: dict[str, float], host_info: None = None, memory_limit: int | None, metrics: dict[str, Any], pid: int = 0, services: dict[str, int], local_directory: str, versions: dict[str, Any], nanny: str, extra: dict, stimulus_id: str) None[源代码]

向集群添加新工作节点

async benchmark_hardware() dict[str, dict[str, float]][源代码]

在工作人员上运行基准测试,以测试内存、磁盘和网络带宽

返回
结果: dict

一个映射字典,将名称“disk”、“memory”和“network”映射到将大小映射到带宽的字典。这些带宽是许多工作者在集群上运行计算的平均值。

async broadcast(*, msg: dict, workers: collections.abc.Collection[str] | None = None, hosts: collections.abc.Collection[str] | None = None, nanny: bool = False, serializers: Any = None, on_error: Literal['raise', 'return', 'return_pickle', 'ignore'] = 'raise') dict[str, Any][源代码]

向工作节点广播消息,返回所有结果

client_heartbeat(client: str) None[源代码]

处理来自客户端的心跳

client_releases_keys(keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, stimulus_id: str | None = None) None[源代码]

从客户端期望列表中移除键

client_send(client, msg)[源代码]

向客户端发送消息

async close(fast=None, close_workers=None, reason='unknown')[源代码]

向所有协程发送清理信号,然后等待直到完成

参见

Scheduler.cleanup
close_worker(worker: str) None[源代码]

要求一个工作者自行关闭。不要等待其生效。注意,无法保证工作者会实际接受该命令。

注意,如果 close=True,remove_worker() 会在内部发送相同的命令。

coerce_address(addr: str | tuple, resolve: bool = True) str[源代码]

将可能的输入地址强制转换为规范形式。resolve 可以在使用假主机名进行测试时禁用。

处理字符串、元组或别名。

async delete_worker_data(worker_address: str, keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], stimulus_id: str) None[源代码]

从工作者中删除数据并更新相应的工作者/任务状态

参数
worker_address: str

要从中删除键的工作者地址

keys: list[Key]

在指定工作节点上删除的键列表

async dump_cluster_state_to_url(url: str, exclude: collections.abc.Collection[str], format: Literal['msgpack', 'yaml'], **storage_options: dict[str, Any]) None[源代码]

将集群状态转储写入一个与 fsspec 兼容的 URL。

async feed(comm: distributed.comm.core.Comm, function: bytes | None = None, setup: bytes | None = None, teardown: bytes | None = None, interval: str | float = '1s', **kwargs: Any) None[源代码]

提供对外部请求者的数据通信

注意:这会在调度器上运行任意 Python 代码。这最终应该被淘汰。它主要用于诊断。

async gather(keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], serializers: list[str] | None = None) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][源代码]

从工作者收集数据到调度器

async gather_on_worker(worker_address: str, who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) set[源代码]

从多个工作者到单个工作者的密钥点对点复制

参数
worker_address: str

接收工作者的地址以复制密钥到

who_has: dict[Key, list[str]]

{键: [发送者地址, 发送者地址, …], 键: …}

返回
returns:

一组未能复制的键

async get_cluster_state(exclude: collections.abc.Collection[str]) dict[源代码]

生成用于集群状态转储的状态字典

async get_story(keys_or_stimuli: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) list[distributed.scheduler.Transition][源代码]

用于 SchedulerState.story() 的 RPC 钩子。

请注意,msgpack 序列化/反序列化往返会将 Transition 命名元组转换为普通元组。

get_worker_service_addr(worker: str, service_name: str, protocol: bool = False) tuple[str, int] | str | None[源代码]

获取 worker 上命名服务的 (主机, 端口) 地址。如果服务不存在,则返回 None。

参数
工人地址
服务名称str

常见服务包括 ‘bokeh’ 和 ‘nanny’

协议布尔

是否包含带有协议的完整地址(True)或仅包含(主机,端口)对

handle_long_running(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], worker: str, compute_duration: float | None, stimulus_id: str) None[源代码]

一个任务已从线程池中脱离

我们阻止任务在未来被窃取,并更改任务持续时间计算,就好像任务已经停止一样。

handle_request_refresh_who_has(keys: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], worker: str, stimulus_id: str) None[源代码]

来自工作者的请求,要求刷新某些键的 who_has 信息。不要与 scheduler.who_has 混淆,后者是客户端的专用通信 RPC 请求。

async handle_worker(comm: distributed.comm.core.Comm, worker: str) None[源代码]

监听来自单个工作者的响应

这是调度器与工作线程交互的主循环

参见

Scheduler.handle_client

客户端的等效协程

identity() dict[str, Any][源代码]

关于我们自己和我们集群的基本信息

log_event(topic: str | collections.abc.Collection[str], msg: Any) None[源代码]

在给定主题下记录事件

参数
主题str, list[str]

记录事件的主题名称。要将同一事件记录在多个主题下,请传递一个主题名称列表。

消息

事件消息以记录。注意这必须是 msgpack 可序列化的。

async proxy(msg: dict, worker: str, serializers: Any = None) Any[源代码]

通过调度器代理通信到其他工作节点

async rebalance(keys: collections.abc.Iterable[Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]] | None = None, workers: collections.abc.Iterable[str] | None = None, stimulus_id: str | None = None) dict[源代码]

重新平衡键,使得每个工作线程最终拥有大致相同的进程内存(托管+非托管)。

警告

此操作通常未经过调度器正常操作的充分测试。不建议在等待计算时使用它。

算法

  1. 查找集群的平均占用率,定义为 dask 管理的数据 + 至少存在 30 秒的未管理进程内存(distributed.worker.memory.recent-to-old-time)。这使我们能够忽略由任务堆使用引起的临时峰值。

    或者,您可以通过 distributed.worker.memory.rebalance.measure 更改内存的测量方式,包括单个工作者的内存测量以及计算平均值。具体来说,这可以用于忽略不准确的系统内存测量。

  2. 丢弃占用率在集群平均占用率5%以内的工作节点(distributed.worker.memory.rebalance.sender-recipient-gap / 2)。这有助于避免数据在集群中反复跳动。

  3. 高于平均水平的工人是发送者;低于平均水平的是接收者。

  4. 丢弃发送者,其绝对占用率低于30%(distributed.worker.memory.rebalance.sender-min)。换句话说,只要所有工作者的占用率都低于30%,无论不平衡程度如何,都不会移动数据。

  5. 丢弃绝对占用率超过60%的接收者(distributed.worker.memory.rebalance.recipient-max)。请注意,此阈值默认与``distributed.worker.memory.target``相同,以防止工作者在接受数据后立即将其溢出到磁盘。

  6. 迭代地选择与平均值最远的发送者和接收者,并在两者之间移动*最近最少插入*的键,直到所有发送者或所有接收者都在平均值的5%以内。

    如果接收者已经拥有数据的副本,它将被跳过。换句话说,此方法不会降低复制效果。如果没有足够的内存来接受密钥且尚未持有副本的接收者可用,则该密钥将被跳过。

最近最少插入 (LRI) 策略是一种具有 O(1) 时间复杂度优势的贪心选择,实现简单(依赖于 Python 字典的插入排序),并且在大多数情况下应该足够好。被淘汰的替代策略有:

  • 从最大开始。O(n*log(n)) 节省了非平凡的额外数据结构,并可能导致最大的数据块像弹球一样在集群中反复移动。

  • 最近最少使用 (LRU)。此信息目前仅在工作节点上可用,并且在调度器上复制并不简单;通过网络传输它将非常昂贵。此外,请注意,dask 会尽力最小化中间键在内存中保留的时间,因此在这种情况下的 LRI 是 LRU 的近似值。

参数
键: 可选

应考虑移动的 dask 键的允许列表。所有其他键将被忽略。请注意,这并不保证键实际上会被移动(例如,因为它是不必要的,或者因为没有可行的接收工作者)。

workers: 可选

允许的工作者地址列表,作为发送者或接收者。所有其他工作者将被忽略。集群占用率的平均值将仅使用允许的工作者计算。

async register_nanny_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage][源代码]

在所有运行中和未来的保姆上注册一个保姆插件

async register_scheduler_plugin(plugin: bytes | distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None) None[源代码]

在调度器上注册一个插件。

async register_worker_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage][源代码]

在所有正在运行和未来的工作进程上注册一个工作插件

remove_client(client: str, stimulus_id: str | None = None) None[源代码]

从网络中移除客户端

remove_plugin(name: str | None = None) None[源代码]

从调度器中移除外接插件

参数
名称str

要移除的插件名称

remove_worker(address: str, *, stimulus_id: str, expected: bool = False, close: bool = True) Literal['OK', 'already-removed'][源代码]

从集群中移除工作节点。

当一个工作者报告它计划离开或当它看起来无响应时,我们会这样做。这可能会将其任务发送回释放状态。

async replicate(comm=None, keys=None, n=None, workers=None, branching_factor=2, delete=True, stimulus_id=None)[源代码]

在整个集群中复制数据

这会在网络中的每一份数据上单独执行数据的树形复制。

参数
键: 可迭代对象

要复制的键列表

n: int

我们期望在集群内看到的复制次数

branching_factor: int, 可选

每个生成中可以复制数据的工人数量。分支因子越大,我们在单个步骤中复制的数据越多,但给定的工人面临的数据请求风险也越大。

report(msg: dict, ts: distributed.scheduler.TaskState | None = None, client: str | None = None) None[源代码]

发布更新到所有监听的队列和通信

如果消息包含一个键,那么我们只将消息发送给那些关心该键的通信。

request_acquire_replicas(addr: str, keys: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None[源代码]

异步请求一个工作者从其他工作者获取所列键的副本。这是一个没有反馈的成功或失败的触发即忘操作,旨在用于家务管理而非计算。

request_remove_replicas(addr: str, keys: list[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None[源代码]

异步请求一个工作者丢弃其列出键的副本。这绝不能用于销毁键的最后一个副本。这是一个即发即弃的操作,适用于家务管理而非计算。

副本会立即从调度器端的 TaskState.who_has 中消失;如果工作节点拒绝删除,例如因为该任务是运行在其上的另一个任务的依赖项,它将(同样异步地)通知调度器重新将自己添加到 who_has 中。如果工作节点同意丢弃任务,则不会有反馈。

async restart(*, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, stimulus_id: str) None[源代码]

忘记所有任务并在所有工作器上调用 restart_workers。

参数
超时:

参见 restart_workers

wait_for_workers:

参见 restart_workers

async restart_workers(workers: list[str] | None = None, *, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, on_error: Literal['raise', 'return'] = 'raise', stimulus_id: str) dict[str, Literal['OK', 'removed', 'timed out']][源代码]

重启选定的工人。可以选择等待工人返回。

没有保姆的工人会被关闭,希望外部部署系统会重新启动它们。因此,如果不使用保姆,并且您的部署系统不会自动重启工人,restart 将只会关闭所有工人,然后超时!

restart 之后,所有连接的工作者都是新的,无论是否引发了 TimeoutError。任何未能及时关闭的工作者都会被移除,并且可能在将来自行关闭,也可能不会。

参数
工人:

要重启的工作者地址列表。如果省略,则重启所有工作者。

超时:

如果 wait_for_workers 为 True,等待工作线程关闭并返回的时间长度,否则仅等待工作线程关闭的时间长度。如果超过此时间,则引发 asyncio.TimeoutError

wait_for_workers:

是否等待所有工作进程重新连接,还是仅等待它们关闭(默认 True)。使用 restart(wait_for_workers=False) 结合 Client.wait_for_workers() 可以实现对等待多少工作进程的精细控制。

on_error:

如果为 ‘raise’(默认),则在重新启动工作进程时,如果有任何保姆超时,则引发异常。如果为 ‘return’,则返回错误消息。

返回
{worker address: “OK”, “no nanny”, 或 “timed out” 或错误信息}
async retire_workers(workers: list[str], *, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, distributed.utils.Any][源代码]
async retire_workers(*, names: list, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, distributed.utils.Any]
async retire_workers(*, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None', memory_ratio: int | float | None = 'None', n: int | None = 'None', key: Callable[[WorkerState], Hashable] | bytes | None = 'None', minimum: int | None = 'None', target: int | None = 'None', attribute: str = "'address'") dict[str, distributed.utils.Any]

优雅地从集群中退役工作节点。任何仅在退役工作节点内存中的键将被复制到其他地方。

参数
workers: list[str] (可选)

待退休的工人地址列表。

names: list (可选)

要退休的工人名称列表。与 workers 互斥。如果既没有提供 workers 也没有提供 names,我们将调用 workers_to_close,它会找到一个合适的选择。

close_workers: bool (默认为 False)

是否从这里实际明确地关闭工作线程。否则,我们期望一些外部作业调度程序来完成工作线程。

remove: bool (默认为 True)

是否立即删除工作者的元数据,或者等待工作者与我们联系。

如果 close_workers=False 且 remove=False,此方法仅将内存中的任务从工作线程中刷新并返回。如果 close_workers=True 且 remove=False,此方法将在工作线程仍在集群中时返回,尽管它们不会接受新任务。如果 close_workers=False 或由于任何原因工作线程不接受关闭命令,它将永久无法接受新任务,并预期以其他方式关闭。

**kwargs: 字典

传递给 workers_to_close 的额外选项,用于确定我们应该丢弃哪些工作线程。仅在 workersnames 被省略时接受。

返回
字典映射工作器ID/地址到关于该工作器信息的字典
每个退休工人对应一个工人。
如果存在仅在即将退役的工作节点上内存中存在的键和它
无法在其他地方复制它们(例如,因为没有
如果有任何其他正在运行的工作进程),持有这些键的工作进程将不会被退役并且
不会出现在返回的字典中。
run_function(comm: distributed.comm.core.Comm, function: collections.abc.Callable, args: tuple = (), kwargs: dict | None = None, wait: bool = True) Any[源代码]

在此进程中运行一个函数

async scatter(comm=None, data=None, workers=None, client=None, broadcast=False, timeout=2)[源代码]

将数据发送给工作者

send_all(client_msgs: dict[str, list[dict[str, Any]]], worker_msgs: dict[str, list[dict[str, Any]]]) None[源代码]

向客户端和工作者发送消息

send_task_to_worker(worker: str, ts: distributed.scheduler.TaskState, duration: float = - 1) None[源代码]

将单个计算任务发送给一个工作者

async start_unsafe() Self[源代码]

清除旧状态并重新启动所有正在运行的协程

stimulus_cancel(keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, force: bool = False) None[源代码]

在键列表上停止执行

stimulus_queue_slots_maybe_opened(*, stimulus_id: str) None[源代码]

响应一个可能在工作线程池中打开空位的事件

根据工作线程上可用的任务槽总数(可能为0),从队列前端选择适当数量的任务,并将它们转换为 processing

注释

与此刺激相关的其他转换应在事先完全处理完毕,因此任何变得可运行的任务都已经在 processing 中。否则,如果队列中的任务在下游任务之前被调度,可能会发生过度生产。

必须在调用 check_idle_saturated 之后调用;即 idle_task_count 必须是最新的。

stimulus_task_erred(key=None, worker=None, exception=None, stimulus_id=None, traceback=None, run_id=None, **kwargs)[源代码]

标记某个任务在特定的工作节点上出错

stimulus_task_finished(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], worker: str, stimulus_id: str, run_id: int, **kwargs: Any) tuple[dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]][源代码]

标记一个任务在特定的工作节点上已完成执行

transition(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], finish: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten'], stimulus_id: str, **kwargs: Any) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']][源代码]

将一个键从其当前状态转换到完成状态

返回
未来过渡的建议字典

参见

Scheduler.transitions

此函数的传递版本

示例

>>> self.transition('x', 'waiting')
{'x': 'processing'}
transitions(recommendations: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], stimulus_id: str) None[源代码]

处理所有剩余的转换过程

这包括了从前几次过渡中得到的反馈,并持续进行,直到我们达到一个稳定状态

async unregister_nanny_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage][源代码]

注销一个工作插件

async unregister_scheduler_plugin(name: str) None[源代码]

在调度器上注销一个插件。

async unregister_worker_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage][源代码]

注销一个工作插件

update_data(*, who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]], nbytes: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], int], client: str | None = None) None[源代码]

了解到有新数据从外部来源进入网络

worker_send(worker: str, msg: dict[str, Any]) None[源代码]

向工作者发送消息

这还通过在下一周期添加回调来处理连接失败,以移除工作线程。

workers_list(workers: collections.abc.Iterable[str] | None) list[str][源代码]

合格工人名单

接受一个工作地址或主机名的列表。返回所有匹配的工作地址列表。

workers_to_close(memory_ratio: int | float | None = None, n: int | None = None, key: collections.abc.Callable[[distributed.scheduler.WorkerState], collections.abc.Hashable] | bytes | None = None, minimum: int | None = None, target: int | None = None, attribute: str = 'address') list[str][源代码]

找到我们可以以低成本关闭的工人

这将返回一组适合退休的工作者。这些工作者没有运行任何任务,并且相对于其他工作者存储的数据量较少。如果所有工作者都处于空闲状态,我们仍然会保留足够的工作者,以确保有足够的RAM来存储我们的数据,并留有舒适的缓冲区。

这是用于 distributed.deploy.adaptive 等系统。

参数
memory_ratio数字

我们希望为存储的数据额外保留的空间量。默认为2,即我们希望保留的内存量是当前数据量的两倍。

n整数

要关闭的工人数量

最小整数

保留的最少工作线程数

关键Callable(WorkerState)

一个可选的可调用对象,将 WorkerState 对象映射到一个组隶属关系。组将一起关闭。这在需要集体关闭工作进程时很有用,例如按主机名关闭。

目标整数

在我们关闭后,目标工作者的数量

属性str

要返回的 WorkerState 对象的属性,如“address”或“name”。默认为“address”。

返回
to_close: 可以关闭的工作地址列表

示例

>>> scheduler.workers_to_close()
['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']

在关闭之前按主机名对工作线程进行分组

>>> scheduler.workers_to_close(key=lambda ws: ws.host)
['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']

移除两名工人

>>> scheduler.workers_to_close(n=2)

保持足够的工人,使其内存是我们所需的两倍。

>>> scheduler.workers_to_close(memory_ratio=2)

工人

class distributed.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool | None = None, executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = '1s', extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, interface: str | None = None, host: str | None = None, port: int | str | Collection[int] | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, transition_counter_max: int | Literal[False] = False, memory_limit: str | float = 'auto', data: WorkerDataParameter = None, memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, scheduler_sni: str | None = None, **kwargs)[源代码]

Dask 分布式集群中的工作节点

工人执行两项功能:

  1. 从本地字典提供数据

  2. 对这些数据以及来自同行的数据进行计算

工作者会向调度器报告他们的数据,并在必要时使用该调度器从其他工作者那里收集数据以执行计算。

你可以使用 dask worker 命令行应用程序启动一个工作进程:

$ dask worker scheduler-ip:port

使用 --help 标志查看更多选项:

$ dask worker --help

此文档字符串的其余部分是关于工作线程用于管理和跟踪内部计算的内部状态。

状态

信息状态

这些属性在执行过程中不会显著变化。

  • nthreads: int:

    此工作进程使用的 nthreads 数量

  • 执行器: dict[str, concurrent.futures.Executor]:

    执行器用于执行计算。始终包含默认执行器。

  • 本地目录: 路径:

    本地机器上存储临时文件的路径

  • 调度器: PooledRPCCall:

    调度器的位置。参见 .ip/.port 属性。

  • 名称: 字符串:

    别名

  • 服务: {str: Server}:

    在此工作节点上运行的辅助Web服务器

  • service_ports: {str: port}:

  • transfer_outgoing_count_limit: int

    并发传出数据传输的最大数量。另请参见 distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit

  • batched_stream: BatchedSend

    我们与调度器通信的批处理流

  • 日志: [(消息)]

    一个结构化且可查询的日志。请参见 Worker.story

易失状态

这些属性跟踪此工作程序尝试完成的任务的进度。在下面的描述中,key 是我们想要计算的任务的名称,而 dep 是我们想要从他人那里收集的依赖数据的名称。

  • 线程{key: int}

    任务运行的线程ID

  • active_threads: {int: key}

    当前在活动线程上运行的键

  • 状态: WorkerState

    封装的状态机。参见 BaseWorkerWorkerState

参数
scheduler_ip: str, 可选
scheduler_port: int, 可选
scheduler_file: str, 可选
host: str, 可选
数据: MutableMapping, 类型, 无

用于存储的对象,默认构建一个基于磁盘的LRU字典。

如果提供了一个可调用对象来构造存储对象,并且调用签名中有一个名为 worker_local_directory 的参数,那么它将接收到工作者的 attr:local_directory 作为参数。

nthreads: int, 可选
local_directory: str, 可选

我们放置本地资源的目录

名称: str, 可选
memory_limit: int, float, string

此工作线程应使用的内存字节数。设置为零表示无限制。设置为 ‘auto’ 则按系统.MEMORY_LIMIT * min(1, nthreads / total_cores) 计算。可以使用字符串或数字,如 5GB 或 5e9。

memory_target_fraction: float 或 False

尝试保持在内存的分数(默认:从配置键 distributed.worker.memory.target 读取)

memory_spill_fraction: float 或 False

开始将数据溢出到磁盘的内存比例(默认:从配置键 distributed.worker.memory.spill 读取)

memory_pause_fraction: float 或 False

我们停止运行新任务时的内存比例(默认值:从配置键 distributed.worker.memory.pause 读取)

max_spill: int, string 或 False

溢出到磁盘的字节数限制。(默认:从配置键 distributed.worker.memory.max-spill 读取)

executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
要使用的执行器。根据类型,它有以下含义:
  • 执行器实例:默认执行器。

  • Dict[str, Executor]: 将名称映射到 Executor 实例。如果字典中没有 “default” 键,将使用 ThreadPoolExecutor(nthreads) 创建一个 “default” 执行器。

  • 字符串 “offload”,它指的是用于卸载通信的同一个线程池。这导致用于反序列化和计算的是同一个线程。

资源: dict

该工作人员拥有的资源,例如 {'GPU': 2}

nanny: str

联系保姆的地址,如果存在

lifetime: str

在“1小时”等时间量之后,我们优雅地关闭工作线程。默认值为None,表示没有明确的关闭时间。

lifetime_stagger: str

时间量如“5分钟”来错开生命周期值 实际生命周期将在生命周期 +/- 生命周期错开范围内均匀随机选择

lifetime_restart: bool

是否在工作进程达到其生命周期后重新启动,默认值为 False

kwargs: 可选

ServerNode 构造函数的附加参数

示例

使用命令行启动一个工作进程:

$ dask scheduler
Start scheduler at 127.0.0.1:8786

$ dask worker 127.0.0.1:8786
Start worker at:               127.0.0.1:1234
Registered with scheduler at:  127.0.0.1:8786
batched_send(msg: dict[str, Any]) None[源代码]

实现 BaseWorker 抽象方法。

通过批量通信向调度器发送一个“发送即忘”的消息。

如果我们当前没有连接到调度器,消息将会被静默丢弃!

async close(timeout: float = 30, executor_wait: bool = True, nanny: bool = True, reason: str = 'worker-close') str | None[源代码]

关闭工作线程

关闭工作线程上运行的异步操作,停止所有执行器和通信。如果请求,这也会关闭保姆。

参数
超时

关闭单个指令的超时时间(秒)

executor_wait

如果为 True,则同步关闭执行器,否则异步关闭。

保姆

如果为真,关闭保姆

原因

关闭工作者的理由

返回
str | None

如果工作线程已经在关闭状态或失败,则返回 None,否则返回 “OK”

async close_gracefully(restart=None, reason: str = 'worker-close-gracefully')[源代码]

优雅地关闭一个工作进程

这首先通知调度器我们正在关闭,并请求它将我们的数据移动到其他地方。之后,我们正常关闭。

property data: collections.abc.MutableMapping[Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object]

所有已完成任务的 task payload,无论这些任务是在此Worker上计算的,还是在其他地方计算并通过网络传输到这里的。

在使用默认配置时,这是一个 zict 缓冲区,当目标阈值超过时会自动溢出到磁盘。如果禁用了溢出,它将是一个普通的字典。它也可能是用户在初始化 Worker 或 Nanny 时传递的自定义类似字典的对象。Worker 逻辑应将其视为不透明对象,并坚持使用 MutableMapping API。

备注

这个相同的集合也可以在 self.state.dataself.memory_manager.data 中找到。

类型

toctree 是一个 reStructuredText 指令 ,这是一个非常多功能的标记。指令可以有参数、选项和内容。

digest_metric(name: collections.abc.Hashable, value: float) None[源代码]

通过调用 Server.digest_metric 实现 BaseWorker.digest_metric

async execute(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[源代码]

执行一个任务。实现 BaseWorker 抽象方法。

async gather(who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][源代码]

调度器使用的端点。由 Scheduler.rebalance() 和 Scheduler.replicate() 使用。

async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent[源代码]

实现 BaseWorker 抽象方法

get_current_task() Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]][源代码]

获取我们当前正在运行的任务的键

这只能在任务中运行才有意义

参见

get_worker

示例

>>> from dask.distributed import get_worker
>>> def f():
...     return get_worker().get_current_task()
>>> future = client.submit(f)  
>>> future.result()  
'f-1234'
handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None[源代码]

覆盖 BaseWorker 方法以增加验证

log_event(topic: str | collections.abc.Collection[str], msg: Any) None[源代码]

在给定主题下记录事件

参数
主题str, list[str]

记录事件的主题名称。要将同一事件记录在多个主题下,请传递一个主题名称列表。

消息

事件消息以记录。注意这必须是 msgpack 可序列化的。

async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent[源代码]

等待一段时间,然后将一个对等工作线程从忙碌状态中取出。实现 BaseWorker 抽象方法。

async start_unsafe()[源代码]

尝试启动服务器。这不是幂等的,也没有防止并发启动尝试的保护措施。

这旨在被子类覆盖或调用。为了安全启动,请使用 Server.start 代替。

如果配置了 death_timeout,我们将要求此协程在该超时到达之前完成。如果达到超时,我们将关闭实例并引发 asyncio.TimeoutError

transfer_outgoing_bytes: int

当前向其他工作者开放的数据传输总大小

transfer_outgoing_bytes_total: int

传输到其他工作者的数据总量(包括进行中和失败的传输)

transfer_outgoing_count: int

当前向其他工作者开放的数据传输数量

transfer_outgoing_count_total: int

自工作进程启动以来,向其他工作进程传输的数据总数

trigger_profile() None[源代码]

从所有正在计算的线程中获取一个帧

将这些帧合并到现有的配置文件计数中

property worker_address

为了与Nanny的API兼容

保姆

class distributed.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port: int | str | collections.abc.Collection[int] | None = 0, nthreads=None, loop=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port: int | str | collections.abc.Collection[int] | None = None, protocol=None, config=None, **worker_kwargs)[源代码]

管理工作进程的流程

保姆进程会启动工作进程,监视它们,并在必要时杀死或重启它们。如果你想使用 Client.restart 方法,或者在工作进程达到其内存限制的终止部分时自动重启它,这是必要的。

Nanny 的参数大部分与 Worker 的参数相同,以下列出例外情况。

参数
env: dict, 可选

在Nanny初始化时设置的环境变量将被确保在Worker进程中也设置。此参数允许覆盖或设置Worker的环境变量。也可以使用选项 distributed.nanny.environ 来设置环境变量。优先级如下

  1. 保姆参数

  2. 现有的环境变量

  3. Dask 配置

备注

一些环境变量,如 OMP_NUM_THREADS,必须在导入 numpy 之前设置才能生效。其他变量,如 MALLOC_TRIM_THRESHOLD_``(参见 :ref:`memtrim`),必须在启动 Linux 进程之前设置。如果在本处或 ``distributed.nanny.environ 中设置这些变量将无效;它们必须在 distributed.nanny.pre-spawn-environ 中设置,以便在生成子进程之前设置,即使这意味着会污染运行 Nanny 的进程。

出于同样的原因,请注意将 distributed.worker.multiprocessing-methodspawn 更改为 forkforkserver 可能会抑制某些环境变量;如果这样做,您应该在启动 dask-worker 之前在 shell 中自行设置这些变量。

参见

Worker
async close(timeout: float = 5, reason: str = 'nanny-close') Literal['OK'][源代码]

关闭工作进程,停止所有通信。

close_gracefully(reason: str = 'nanny-close-gracefully') None[源代码]

如果我们不应该尝试在工作者消失时重新启动它们,这是一个信号。

这作为集群关闭过程的一部分使用。

async instantiate() distributed.core.Status[源代码]

启动一个本地工作进程

直到进程启动并且调度器正确通知为止,一直阻塞

async kill(timeout: float = 5, reason: str = 'nanny-kill') None[源代码]

终止本地工作进程

阻塞直到进程停止并且调度器被正确通知

log_event(topic, msg)[源代码]

在给定主题下记录事件

参数
主题str, list[str]

记录事件的主题名称。要将同一事件记录在多个主题下,请传递一个主题名称列表。

消息

事件消息以记录。注意这必须是 msgpack 可序列化的。

async start_unsafe()[源代码]

启动保姆进程,启动本地进程,开始监控