Python API
内容
Python API¶
你可以通过导入并创建一个不带参数的 Client
来创建一个 dask.distributed
调度器。这将覆盖之前设置的任何默认值。
from dask.distributed import Client
client = Client()
如果你安装了 Bokeh,可以导航到 http://localhost:8787/status
查看诊断仪表盘。
客户端¶
你可以通过实例化一个不带参数的 Dask Client 来轻松地在你的机器上设置一个本地集群。
from dask.distributed import Client
client = Client()
这会在您的本地进程中设置一个调度器,以及与机器内核数量相关的多个工作线程和每个工作线程的线程数。
如果你想在同一个进程中运行工作线程,可以传递 processes=False
关键字参数。
client = Client(processes=False)
如果你想要避免工作线程之间的通信,并且你的计算会释放GIL,这种情况有时是更可取的。当主要使用NumPy或Dask Array时,这种情况很常见。
LocalCluster¶
上述 Client()
调用是创建 LocalCluster 并将其传递给客户端的简写形式。
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
这等效,但稍微更明确。
您可能希望查看 LocalCluster
上可用的关键字参数,以了解在处理线程和进程混合时可用的选项,例如指定显式端口等。
要创建一个本地集群,其中所有工作线程都在专用子进程中运行,dask.distributed
还提供了实验性的 SubprocessCluster
。
集群管理器功能¶
实例化一个集群管理器类,如 LocalCluster
,然后将其传递给 Client
是一种常见模式。集群管理器还提供了有用的工具来帮助你理解正在发生的事情。
例如,您可以获取仪表板URL。
>>> cluster.dashboard_link
'http://127.0.0.1:8787/status'
您可以从集群组件中检索日志。
>>> cluster.get_logs()
{'Cluster': '',
'Scheduler': "distributed.scheduler - INFO - Clear task state\ndistributed.scheduler - INFO - S...
如果你使用的是支持扩展的集群管理器,你可以根据工作负载手动或自动修改工作者的数量。
>>> cluster.scale(10) # Sets the number of workers to 10
>>> cluster.adapt(minimum=1, maximum=10) # Allows the cluster to auto scale to 10 when tasks are computed
参考¶
- class distributed.deploy.local.LocalCluster(name=None, n_workers=None, threads_per_worker=None, processes=None, loop=None, start=None, host=None, ip=None, scheduler_port=0, silence_logs=30, dashboard_address=':8787', worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, interface=None, worker_class=None, scheduler_kwargs=None, scheduler_sync_interval=1, **worker_kwargs)[源代码]¶
创建本地调度器和工作节点
这会在本地机器上创建一个调度器和工作节点的“集群”。
- 参数
- n_workers: int
启动的工作者数量
- memory_limit: str, float, int, 或 None, 默认 “auto”
设置每个工作者的内存限制。
关于参数数据类型的说明:
如果为 None 或 0,则不应用限制。
如果为“auto”,系统总内存将在工作进程之间平均分配。
如果是浮点数,则系统内存的该比例将用于 每个工作进程。
如果给定一个表示字节数的字符串(如
"1GiB"
),则该数量将 每个工作进程 使用。如果是整数,则每个工作线程使用该数量的字节。
请注意,限制仅在
processes=True
时执行,并且该限制仅基于最大努力原则执行——工作者仍有可能超过此限制。- processes: bool
是否使用进程(True)或线程(False)。默认为True,除非worker_class=Worker,在这种情况下默认为False。
- threads_per_worker: int
每个工作者的线程数
- scheduler_port: int
调度器的端口。使用 0 选择一个随机端口(默认)。8786 是一个常见的选择。
- silence_logs: 日志级别
要输出到标准输出的日志级别。默认为
logging.WARN
。使用 False 或 None 等假值表示不更改。- host: 字符串
调度器将监听的主机地址,默认为仅限本地主机
- ip: 字符串
已弃用。请参见上面的
host
。- dashboard_address: str
监听 Bokeh 诊断服务器的地址,如 ‘localhost:8787’ 或 ‘0.0.0.0:8787’。默认为 ‘:8787’。设置为
None
以禁用仪表板。使用 ‘:0’ 表示随机端口。当仅指定端口如 ‘:8787’ 时,仪表板将绑定到host
参数中的给定接口。如果host
为空,绑定将发生在所有接口 ‘0.0.0.0’ 上。为避免本地部署时的防火墙问题,请将host
设置为 ‘localhost’。- worker_dashboard_address: str
监听 Bokeh 工作线程诊断服务器地址,如 ‘localhost:8787’ 或 ‘0.0.0.0:8787’。默认为 None,即禁用仪表盘。使用 ‘:0’ 表示随机端口。
- 诊断端口: int
已弃用。请参阅 dashboard_address。
- 异步: bool (默认值为 False)
如果在异步/等待函数或Tornado gen.coroutines中使用此集群,请设置为True。正常使用时应保持为False。
- blocked_handlers: List[str]
指定调度器上不允许的处理器块列表,例如
['feed', 'run_function']
- service_kwargs: Dict[str, Dict]
传递给运行服务的额外关键字
- 安全Security 或 bool, 可选
配置此集群中的通信安全。可以是安全对象,或为 True。如果为 True,将自动创建临时自签名凭据。
- 协议: str (可选)
使用的协议,如
tcp://
、tls://
、inproc://
。默认会根据其他关键字参数如processes
和security
做出合理选择。- 接口: str (可选)
要使用的网络接口。默认为 lo/localhost。
- worker_class: Worker
用于实例化工作者的类。默认为如果 processes=False 则使用 Worker,如果 processes=True 或省略则使用 Nanny。
- **worker_kwargs:
额外的工作者参数。任何额外的关键字参数都将传递给
Worker
类构造函数。
示例
>>> cluster = LocalCluster() # Create a local cluster >>> cluster LocalCluster("127.0.0.1:8786", workers=8, threads=8)
>>> c = Client(cluster) # connect to local cluster
将集群扩展到三个工作节点
>>> cluster.scale(3)
将额外的关键字参数传递给 Bokeh
>>> LocalCluster(service_kwargs={'dashboard': {'prefix': '/foo'}})