Python API

你可以通过导入并创建一个不带参数的 Client 来创建一个 dask.distributed 调度器。这将覆盖之前设置的任何默认值。

from dask.distributed import Client
client = Client()

如果你安装了 Bokeh,可以导航到 http://localhost:8787/status 查看诊断仪表盘。

客户端

你可以通过实例化一个不带参数的 Dask Client 来轻松地在你的机器上设置一个本地集群。

from dask.distributed import Client
client = Client()

这会在您的本地进程中设置一个调度器,以及与机器内核数量相关的多个工作线程和每个工作线程的线程数。

如果你想在同一个进程中运行工作线程,可以传递 processes=False 关键字参数。

client = Client(processes=False)

如果你想要避免工作线程之间的通信,并且你的计算会释放GIL,这种情况有时是更可取的。当主要使用NumPy或Dask Array时,这种情况很常见。

LocalCluster

上述 Client() 调用是创建 LocalCluster 并将其传递给客户端的简写形式。

from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

这等效,但稍微更明确。

您可能希望查看 LocalCluster 上可用的关键字参数,以了解在处理线程和进程混合时可用的选项,例如指定显式端口等。

要创建一个本地集群,其中所有工作线程都在专用子进程中运行,dask.distributed 还提供了实验性的 SubprocessCluster

集群管理器功能

实例化一个集群管理器类,如 LocalCluster ,然后将其传递给 Client 是一种常见模式。集群管理器还提供了有用的工具来帮助你理解正在发生的事情。

例如,您可以获取仪表板URL。

>>> cluster.dashboard_link
'http://127.0.0.1:8787/status'

您可以从集群组件中检索日志。

>>> cluster.get_logs()
{'Cluster': '',
'Scheduler': "distributed.scheduler - INFO - Clear task state\ndistributed.scheduler - INFO -   S...

如果你使用的是支持扩展的集群管理器,你可以根据工作负载手动或自动修改工作者的数量。

>>> cluster.scale(10)  # Sets the number of workers to 10

>>> cluster.adapt(minimum=1, maximum=10)  # Allows the cluster to auto scale to 10 when tasks are computed

参考

class distributed.deploy.local.LocalCluster(name=None, n_workers=None, threads_per_worker=None, processes=None, loop=None, start=None, host=None, ip=None, scheduler_port=0, silence_logs=30, dashboard_address=':8787', worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, interface=None, worker_class=None, scheduler_kwargs=None, scheduler_sync_interval=1, **worker_kwargs)[源代码]

创建本地调度器和工作节点

这会在本地机器上创建一个调度器和工作节点的“集群”。

参数
n_workers: int

启动的工作者数量

memory_limit: str, float, int, 或 None, 默认 “auto”

设置每个工作者的内存限制。

关于参数数据类型的说明:

  • 如果为 None 或 0,则不应用限制。

  • 如果为“auto”,系统总内存将在工作进程之间平均分配。

  • 如果是浮点数,则系统内存的该比例将用于 每个工作进程

  • 如果给定一个表示字节数的字符串(如 "1GiB"),则该数量将 每个工作进程 使用。

  • 如果是整数,则每个工作线程使用该数量的字节。

请注意,限制仅在 processes=True 时执行,并且该限制仅基于最大努力原则执行——工作者仍有可能超过此限制。

processes: bool

是否使用进程(True)或线程(False)。默认为True,除非worker_class=Worker,在这种情况下默认为False。

threads_per_worker: int

每个工作者的线程数

scheduler_port: int

调度器的端口。使用 0 选择一个随机端口(默认)。8786 是一个常见的选择。

silence_logs: 日志级别

要输出到标准输出的日志级别。默认为 logging.WARN。使用 False 或 None 等假值表示不更改。

host: 字符串

调度器将监听的主机地址,默认为仅限本地主机

ip: 字符串

已弃用。请参见上面的 host

dashboard_address: str

监听 Bokeh 诊断服务器的地址,如 ‘localhost:8787’ 或 ‘0.0.0.0:8787’。默认为 ‘:8787’。设置为 None 以禁用仪表板。使用 ‘:0’ 表示随机端口。当仅指定端口如 ‘:8787’ 时,仪表板将绑定到 host 参数中的给定接口。如果 host 为空,绑定将发生在所有接口 ‘0.0.0.0’ 上。为避免本地部署时的防火墙问题,请将 host 设置为 ‘localhost’。

worker_dashboard_address: str

监听 Bokeh 工作线程诊断服务器地址,如 ‘localhost:8787’ 或 ‘0.0.0.0:8787’。默认为 None,即禁用仪表盘。使用 ‘:0’ 表示随机端口。

诊断端口: int

已弃用。请参阅 dashboard_address。

异步: bool (默认值为 False)

如果在异步/等待函数或Tornado gen.coroutines中使用此集群,请设置为True。正常使用时应保持为False。

blocked_handlers: List[str]

指定调度器上不允许的处理器块列表,例如 ['feed', 'run_function']

service_kwargs: Dict[str, Dict]

传递给运行服务的额外关键字

安全Security 或 bool, 可选

配置此集群中的通信安全。可以是安全对象,或为 True。如果为 True,将自动创建临时自签名凭据。

协议: str (可选)

使用的协议,如 tcp://tls://inproc://。默认会根据其他关键字参数如 processessecurity 做出合理选择。

接口: str (可选)

要使用的网络接口。默认为 lo/localhost。

worker_class: Worker

用于实例化工作者的类。默认为如果 processes=False 则使用 Worker,如果 processes=True 或省略则使用 Nanny。

**worker_kwargs:

额外的工作者参数。任何额外的关键字参数都将传递给 Worker 类构造函数。

示例

>>> cluster = LocalCluster()  # Create a local cluster  
>>> cluster  
LocalCluster("127.0.0.1:8786", workers=8, threads=8)
>>> c = Client(cluster)  # connect to local cluster  

将集群扩展到三个工作节点

>>> cluster.scale(3)  

将额外的关键字参数传递给 Bokeh

>>> LocalCluster(service_kwargs={'dashboard': {'prefix': '/foo'}})