配置
内容
配置¶
充分利用 Dask 有时需要用户进行配置。这可能是为了控制日志详细程度、指定集群配置、提供安全凭证,或生产中出现的其他几个选项。
配置可以通过以下方式之一指定:
~/.config/dask/
或/etc/dask/
中的 YAML 文件环境变量如
DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=True
子库中的默认设置
这种组合使得在各种环境中指定配置变得容易,从个人工作站到IT规定的配置,再到Docker镜像。
访问配置¶
|
从全局配置中获取元素 |
Dask 的配置系统通常通过 dask.config.get
函数访问。你可以使用 .
进行嵌套访问,例如:
>>> import dask
>>> import dask.distributed # populate config with distributed defaults
>>> dask.config.get("distributed.client") # use `.` for nested access
{'heartbeat': '5s', 'scheduler-info-interval': '2s'}
>>> dask.config.get("distributed.scheduler.unknown-task-duration")
'500ms'
您可能希望检查 dask.config.config
字典,以了解当前系统正在使用哪些配置。
注意 get
函数将下划线和连字符视为相同。例如, dask.config.get("temporary-directory")
等同于 dask.config.get("temporary_directory")
。
像 "128 MiB"
和 "10s"
这样的值是使用 实用工具 中的函数解析的。
指定配置¶
YAML 文件¶
你可以在 YAML 文件中指定配置值。例如:
array:
chunk-size: 128 MiB
distributed:
worker:
memory:
spill: 0.85 # default: 0.7
target: 0.75 # default: 0.6
terminate: 0.98 # default: 0.95
dashboard:
# Locate the dashboard if working on a Jupyter Hub server
link: /user/<user>/proxy/8787/status
这些文件可以存在于以下任何位置:
用户主目录中的
~/.config/dask
目录{sys.prefix}/etc/dask
目录位于 Python 本地{prefix}/etc/dask
目录,其中{prefix}
在 site.PREFIXES 中根目录(由
DASK_ROOT_CONFIG
环境变量指定,默认情况下为/etc/dask/
)
Dask 搜索这些目录中的 所有 YAML 文件并将它们合并在一起,优先选择用户附近的配置文件而不是系统配置文件(优先级遵循上述列表中的顺序)。此外,用户可以使用 DASK_CONFIG
环境变量指定路径,该路径在上述列表的顶部具有优先级。
这些 YAML 文件的内容会被合并在一起,允许不同的 Dask 子项目如 dask-kubernetes
或 dask-ml
分别管理配置文件,但它们会合并到同一个全局配置中。
环境变量¶
你也可以通过环境变量指定配置值,如下所示:
export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=True
export DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES=5
export DASK_DISTRIBUTED__DASHBOARD__LINK="/user/<user>/proxy/8787/status"
产生如下配置值:
{
'distributed': {
'scheduler': {
'work-stealing': True,
'allowed-failures': 5
}
}
}
Dask 搜索所有以 DASK_
开头的环境变量,然后通过将键转换为小写并将双下划线更改为嵌套结构来转换键。
Dask 尝试使用 ast.literal_eval 解析所有值,允许用户传递数值和布尔值(如上面的 True
)以及列表、字典等,使用正常的 Python 语法。
环境变量优先于YAML文件中找到的配置值。
默认值¶
此外,当导入各个子项目时,它们可能会添加自己的默认值。这些值的优先级总是低于上述的 YAML 文件或环境变量:
>>> import dask.config
>>> dask.config.config # no configuration by default
{}
>>> import dask.distributed
>>> dask.config.config # New values have been added
{
'scheduler': ...,
'worker': ...,
'tls': ...
}
直接在 Python 中¶
|
在上下文管理器中临时设置配置值 |
配置存储在 dask.config.config
中的一个普通 Python 字典中,并且可以使用普通的 Python 操作进行修改。
此外,你可以使用 dask.config.set
函数临时设置配置值。此函数接受一个字典作为输入,并将 "."
解释为嵌套访问:
>>> dask.config.set({'optimization.fuse.ave-width': 4})
此函数也可以用作上下文管理器,以确保清理工作的一致性:
>>> with dask.config.set({'optimization.fuse.ave-width': 4}):
... arr2, = dask.optimize(arr)
注意,set
函数将下划线和连字符视为相同。例如,dask.config.set({'optimization.fuse.ave_width': 4})
等同于 dask.config.set({'optimization.fuse.ave-width': 4})
。
最后,请注意持久对象在初始化时可能会获取配置设置。出于性能考虑,这些设置也可能被缓存。这对于 dask.distributed
对象(如 Client、Scheduler、Worker 和 Nanny)尤其如此。
直接从命令行¶
配置也可以通过CLI设置和查看。
$ dask config set optimization.fuse.ave-width 4
Updated [optimization.fuse.ave-width] to [4], config saved to ~/dask/dask.yaml
$ dask config get optimization.fuse.ave-width
4
分发配置¶
也可能希望将整个 Dask 配置打包以便在另一台机器上使用。这在某些 Dask 分布式库中用于确保远程组件与本地系统的配置相同。
这通常由使用 base64 编码通过 DASK_INTERNAL_INHERIT_CONFIG
环境变量传递配置的下游库处理。
|
将配置数据序列化为字符串。 |
|
将配置数据反序列化为原始对象。 |
转换工具¶
可以使用点符号、YAML 或环境变量来配置 Dask。您可以在下方输入自己的配置项以进行来回转换。
警告
此工具旨在提高对不同符号之间转换的理解,并不声称是一个完美的实现。请仅用于参考。
YAML
环境变量
使用点表示法内联
更新配置¶
操作配置字典¶
|
更新一系列嵌套的字典 |
|
使用另一个字典的值更新嵌套字典 |
在嵌套配置字典中展开环境变量 |
如上所述,配置可以来自许多地方,包括多个YAML文件、环境变量和项目默认值。每个配置都可能是嵌套的,如下所示:
x = {'a': 0, 'c': {'d': 4}}
y = {'a': 1, 'b': 2, 'c': {'e': 5}}
Dask 将合并这些配置,尊重嵌套的数据结构,并尊重顺序:
>>> dask.config.merge(x, y)
{'a': 1, 'b': 2, 'c': {'d': 4, 'e': 5}}
你也可以使用 update
函数来更新现有的配置,用新的配置取而代之。这可以通过优先考虑任一配置来完成。这通常用于更新 dask.config.config
中的全局配置:
dask.config.update(dask.config, new, priority='new') # Give priority to new values
dask.config.update(dask.config, new, priority='old') # Give priority to old values
有时扩展存储在配置中的环境变量是有用的。这可以通过 expand_environment_variables
函数来完成:
dask.config.config = dask.config.expand_environment_variables(dask.config.config)
刷新配置¶
|
从路径和环境变量中收集配置 |
|
通过重新读取yaml文件和环境变量来更新配置 |
如果你更改了环境变量或YAML文件,Dask不会立即看到这些更改。相反,你可以调用 refresh
来重新进行配置收集过程并更新默认配置:
>>> dask.config.config
{}
>>> # make some changes to yaml files
>>> dask.config.refresh()
>>> dask.config.config
{...}
此函数使用 dask.config.collect
,它返回配置而不修改全局配置。您可以使用它来确定尚未在配置路径上的特定路径的配置:
>>> dask.config.collect(paths=[...])
{...}
下游库¶
|
如果文件不存在,则将其复制到默认位置 |
|
使用另一个字典的值更新嵌套字典 |
|
向配置中添加一组新的默认值 |
下游的 Dask 库通常遵循一个标准惯例来使用中央 Dask 配置。本节以一个虚构的项目 dask-foo
为例,提供集成建议。
下游项目通常遵循以下约定:
在源目录中的YAML文件中维护默认配置:
setup.py dask_foo/__init__.py dask_foo/config.py dask_foo/core.py dask_foo/foo.yaml # <---
将配置放在该文件中,位于项目的命名空间内:
# dask_foo/foo.yaml foo: color: red admin: a: 1 b: 2
在 config.py 文件(或任何地方)加载该默认配置文件,并将其更新到全局配置中:
# dask_foo/config.py import os import yaml import dask.config fn = os.path.join(os.path.dirname(__file__), 'foo.yaml') with open(fn) as f: defaults = yaml.safe_load(f) dask.config.update_defaults(defaults)
通过将其包含在
__init__.py
中,确保此文件在导入时运行:# dask_foo/__init__.py from . import config
在
dask_foo
代码中,使用dask.config.get
函数来访问配置值:# dask_foo/core.py def process(fn, color=dask.config.get('foo.color')): ...
您可能还希望确保您的 yaml 配置文件包含在您的包中。这可以通过在您的 MANIFEST.in 中包含以下行来实现:
recursive-include <PACKAGE_NAME> *.yaml
并在您的 setup.py 文件中的
setup
调用中添加以下内容:from setuptools import setup setup(..., include_package_data=True, ...)
此过程将配置集中在一个地方,同时也在命名空间内保持其安全性。它默认将配置文件放置在易于访问的位置(~/.config/dask/*.yaml
),以便用户可以轻松发现他们可以更改的内容,但实际默认值保留在源代码中,以便它们更紧密地跟踪库中的更改。
然而,下游库可能会选择其他解决方案,例如将它们的配置隔离在其库中,而不是使用全局的 dask.config 系统。dask.config
模块中的所有函数也都可以使用参数,并且不需要改变全局状态。
API¶
- dask.config.get(key: str, default: Any = _NoDefault.no_default, config: dict | None = None, override_with: Any = None) Any [源代码]¶
从全局配置中获取元素
如果
override_with
不是 None,这个值将直接返回。这对于从 Dask 配置中获取 kwarg 默认值很有用。使用 ‘.’ 进行嵌套访问
示例
>>> from dask import config >>> config.get('foo') {'x': 1, 'y': 2}
>>> config.get('foo.x') 1
>>> config.get('foo.x.y', default=123) 123
>>> config.get('foo.y', override_with=None) 2
>>> config.get('foo.y', override_with=3) 3
- dask.config.set(arg: Mapping | None = None, config: dict = None, lock: threading.Lock = <unlocked _thread.lock object>, **kwargs)[源代码]¶
在上下文管理器中临时设置配置值
- 参数
- 参数映射或无,可选
设置配置键值对的映射。
- **kwargs
要设置的其他键值对。如果提供了
arg
,arg
中设置的值将在kwargs
中的值之前应用。关键字参数中的双下划线 (__
) 将被替换为.
,从而可以轻松设置嵌套值。
示例
>>> import dask
通过提供映射,在上下文中设置
'foo.bar'
。>>> with dask.config.set({'foo.bar': 123}): ... pass
通过提供关键字参数,在上下文中设置
'foo.bar'
。>>> with dask.config.set(foo__bar=123): ... pass
全局设置
'foo.bar'
。>>> dask.config.set(foo__bar=123)
- dask.config.merge(*dicts: collections.abc.Mapping) dict [源代码]¶
更新一系列嵌套的字典
这优先选择后一个字典中的值,而不是前一个字典中的值。
示例
>>> a = {'x': 1, 'y': {'a': 2}} >>> b = {'y': {'b': 3}} >>> merge(a, b) {'x': 1, 'y': {'a': 2, 'b': 3}}
- dask.config.update(old: dict, new: collections.abc.Mapping, priority: Literal['old', 'new', 'new-defaults'] = 'new', defaults: collections.abc.Mapping | None = None) dict [源代码]¶
使用另一个字典的值更新嵌套字典
这类似于 dict.update,但它可以平滑地合并嵌套值
此操作在原地进行并修改旧的
- 参数
- priority: string {‘old’, ‘new’, ‘new-defaults’}
如果为新(默认),则新字典优先。否则,旧字典优先。如果为’new-defaults’,则应提供当前默认值的映射。只有在``old``中的值与当前默认值匹配时,才会用``new``更新它。
示例
>>> a = {'x': 1, 'y': {'a': 2}} >>> b = {'x': 2, 'y': {'b': 3}} >>> update(a, b) {'x': 2, 'y': {'a': 2, 'b': 3}}
>>> a = {'x': 1, 'y': {'a': 2}} >>> b = {'x': 2, 'y': {'b': 3}} >>> update(a, b, priority='old') {'x': 1, 'y': {'a': 2, 'b': 3}}
>>> d = {'x': 0, 'y': {'a': 2}} >>> a = {'x': 1, 'y': {'a': 2}} >>> b = {'x': 2, 'y': {'a': 3, 'b': 3}} >>> update(a, b, priority='new-defaults', defaults=d) {'x': 1, 'y': {'a': 3, 'b': 3}}
- dask.config.collect(paths: list[str] = ['/etc/dask', '/Users/cw/baidu/code/fin_tool/github/dask/venv/etc/dask', '/Users/cw/.config/dask'], env: collections.abc.Mapping[str, str] | None = None) dict [源代码]¶
从路径和环境变量中收集配置
- 参数
- 路径list[str]
用于搜索yaml配置文件的路径列表
- 环境Mapping[str, str]
系统环境变量
- 返回
- config: dict
参见
dask.config.refresh
收集配置并更新到主配置中
- dask.config.refresh(config: dict | None = None, defaults: list[collections.abc.Mapping] = [{'temporary-directory': None, 'visualization': {'engine': None}, 'tokenize': {'ensure-deterministic': False}, 'dataframe': {'backend': 'pandas', 'shuffle': {'method': None, 'compression': None}, 'parquet': {'metadata-task-size-local': 512, 'metadata-task-size-remote': 1, 'minimum-partition-size': 75000000}, 'convert-string': None, 'query-planning': None}, 'array': {'backend': 'numpy', 'chunk-size': '128MiB', 'chunk-size-tolerance': 1.25, 'rechunk': {'method': None, 'threshold': 4}, 'svg': {'size': 120}, 'slicing': {'split-large-chunks': None}, 'query-planning': None}, 'optimization': {'annotations': {'fuse': True}, 'fuse': {'active': None, 'ave-width': 1, 'max-width': None, 'max-height': inf, 'max-depth-new-edges': None, 'subgraphs': None, 'rename-keys': True}}, 'admin': {'traceback': {'shorten': ['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiprocessing|optimization|threaded|utils)\\.py', 'dask[\\\\\\/]array[\\\\\\/]core\\.py', 'dask[\\\\\\/]dataframe[\\\\\\/](core|methods)\\.py', 'distributed[\\\\\\/](client|scheduler|utils|worker)\\.py', 'tornado[\\\\\\/]gen\\.py', 'pandas[\\\\\\/]core[\\\\\\/]']}}}, {'distributed': {'version': 2, 'scheduler': {'allowed-failures': 3, 'bandwidth': 100000000, 'blocked-handlers': [], 'contact-address': None, 'default-data-size': '1kiB', 'events-cleanup-delay': '1h', 'idle-timeout': None, 'no-workers-timeout': None, 'work-stealing': True, 'work-stealing-interval': '100ms', 'worker-saturation': 1.1, 'worker-ttl': '5 minutes', 'preload': [], 'preload-argv': [], 'unknown-task-duration': '500ms', 'default-task-durations': {'rechunk-split': '1us', 'split-shuffle': '1us', 'split-taskshuffle': '1us', 'split-stage': '1us'}, 'validate': False, 'dashboard': {'status': {'task-stream-length': 1000}, 'tasks': {'task-stream-length': 100000}, 'tls': {'ca-file': None, 'key': None, 'cert': None}, 'bokeh-application': {'allow_websocket_origin': ['*'], 'keep_alive_milliseconds': 500, 'check_unused_sessions_milliseconds': 500}}, 'locks': {'lease-validation-interval': '10s', 'lease-timeout': '30s'}, 'http': {'routes': ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']}, 'allowed-imports': ['dask', 'distributed'], 'active-memory-manager': {'start': True, 'interval': '2s', 'measure': 'optimistic', 'policies': [{'class': 'distributed.active_memory_manager.ReduceReplicas'}]}}, 'worker': {'blocked-handlers': [], 'multiprocessing-method': 'spawn', 'use-file-locking': True, 'transfer': {'message-bytes-limit': '50MB'}, 'connections': {'outgoing': 50, 'incoming': 10}, 'preload': [], 'preload-argv': [], 'daemon': True, 'validate': False, 'resources': {}, 'lifetime': {'duration': None, 'stagger': '0 seconds', 'restart': False}, 'profile': {'enabled': True, 'interval': '10ms', 'cycle': '1000ms', 'low-level': False}, 'memory': {'recent-to-old-time': '30s', 'rebalance': {'measure': 'optimistic', 'sender-min': 0.3, 'recipient-max': 0.6, 'sender-recipient-gap': 0.1}, 'transfer': 0.1, 'target': 0.6, 'spill': 0.7, 'pause': 0.8, 'terminate': 0.95, 'max-spill': False, 'spill-compression': 'auto', 'monitor-interval': '100ms'}, 'http': {'routes': ['distributed.http.worker.prometheus', 'distributed.http.health', 'distributed.http.statics']}}, 'nanny': {'preload': [], 'preload-argv': [], 'environ': {}, 'pre-spawn-environ': {'MALLOC_TRIM_THRESHOLD_': 65536, 'OMP_NUM_THREADS': 1, 'MKL_NUM_THREADS': 1, 'OPENBLAS_NUM_THREADS': 1}}, 'client': {'heartbeat': '5s', 'scheduler-info-interval': '2s', 'security-loader': None, 'preload': [], 'preload-argv': []}, 'deploy': {'lost-worker-timeout': '15s', 'cluster-repr-interval': '500ms'}, 'adaptive': {'interval': '1s', 'target-duration': '5s', 'minimum': 0, 'maximum': inf, 'wait-count': 3}, 'comm': {'retry': {'count': 0, 'delay': {'min': '1s', 'max': '20s'}}, 'compression': False, 'shard': '64MiB', 'offload': '10MiB', 'default-scheme': 'tcp', 'socket-backlog': 2048, 'ucx': {'cuda-copy': None, 'tcp': None, 'nvlink': None, 'infiniband': None, 'rdmacm': None, 'create-cuda-context': None, 'environment': {}}, 'zstd': {'level': 3, 'threads': 0}, 'timeouts': {'connect': '30s', 'tcp': '30s'}, 'require-encryption': None, 'tls': {'ciphers': None, 'min-version': 1.2, 'max-version': None, 'ca-file': None, 'scheduler': {'cert': None, 'key': None}, 'worker': {'key': None, 'cert': None}, 'client': {'key': None, 'cert': None}}, 'websockets': {'shard': '8MiB'}}, 'diagnostics': {'nvml': True, 'cudf': False, 'computations': {'max-history': 100, 'nframes': 0, 'ignore-modules': ['asyncio', 'functools', 'threading', 'datashader', 'dask', 'debugpy', 'distributed', 'coiled', 'cudf', 'cuml', 'matplotlib', 'pluggy', 'prefect', 'rechunker', 'xarray', 'xgboost', 'xdist', '__channelexec__', 'execnet'], 'ignore-files': ['runpy\\.py', 'pytest', 'py\\.test', 'pytest-script\\.py', '_pytest', 'pycharm', 'vscode_pytest', 'get_output_via_markers\\.py']}, 'erred-tasks': {'max-history': 100}}, 'p2p': {'comm': {'retry': {'count': 10, 'delay': {'min': '1s', 'max': '30s'}}}, 'disk': True}, 'dashboard': {'link': '{scheme}://{host}:{port}/status', 'export-tool': False, 'graph-max-items': 5000, 'prometheus': {'namespace': 'dask'}}, 'admin': {'large-graph-warning-threshold': '10MB', 'tick': {'interval': '20ms', 'limit': '3s', 'cycle': '1s'}, 'max-error-length': 10000, 'log-length': 10000, 'log-format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', 'low-level-log-length': 1000, 'pdb-on-err': False, 'system-monitor': {'interval': '500ms', 'log-length': 7200, 'disk': True, 'host-cpu': False, 'gil': {'enabled': True, 'interval': '1ms'}}, 'event-loop': 'tornado'}, 'rmm': {'pool-size': None}}}], **kwargs) None [源代码]¶
通过重新读取yaml文件和环境变量来更新配置
这会改变全局 dask.config.config,或者如果传入的话,改变 config 参数。
这经历了以下阶段:
清除所有旧的配置
从下游库的存储默认值更新(参见 update_defaults)
从yaml文件和环境变量更新
自动重命名已弃用的键(带有警告)
请注意,某些功能仅在启动时检查一次配置,即使配置发生变化,行为也可能不会改变。如果方便的话,建议重新启动您的Python进程,以确保新的配置更改生效。
参见
dask.config.collect
对于参数
dask.config.update_defaults
- dask.config.ensure_file(source: str, destination: str | None = None, comment: bool = True) None [源代码]¶
如果文件不存在,则将其复制到默认位置
如果默认配置文件尚不存在,此操作会尝试将其移动到默认位置。默认情况下,它还会注释掉该文件。
这是供下游模块(如 dask.distributed)使用的,这些模块可能有自己的默认配置文件,希望将其包含在默认配置路径中。
- 参数
- 源代码字符串, 文件名
源配置文件,通常位于源目录中。
- 目的地字符串, 目录
目标目录。可通过
DASK_CONFIG
环境变量配置,若未设置则回退到 ~/.config/dask。- 评论bool,默认为 True
在复制时是否注释掉配置文件。
配置参考¶
备注
可以使用点符号、YAML 或环境变量来配置 Dask。请参阅 转换工具 以将以下点符号转换为其他形式。
Dask¶
-
temporary-directory
None ¶ Temporary directory for local disk storage /tmp, /scratch, or /local. This directory is used during dask spill-to-disk operations. When the value is "null" (default), dask will create a directory from where dask was launched: `cwd/dask-worker-space`
-
visualization.engine
None ¶ Visualization engine to use when calling ``.visualize()`` on a Dask collection. Currently supports ``'graphviz'``, ``'ipycytoscape'``, and ``'cytoscape'`` (alias for ``'ipycytoscape'``)
-
tokenize.ensure-deterministic
False ¶ If ``true``, tokenize will error instead of falling back to uuids when a deterministic token cannot be generated. Defaults to ``false``.
-
dataframe.backend
pandas ¶ Backend to use for supported dataframe-creation functions. Default is "pandas".
-
dataframe.shuffle.method
None ¶ The default shuffle method to choose. Possible values are disk, tasks, p2p. If null, pick best method depending on application.
-
dataframe.shuffle.compression
None ¶ Compression algorithm used for on disk-shuffling. Partd, the library used for compression supports ZLib, BZ2, and SNAPPY
-
dataframe.parquet.metadata-task-size-local
512 ¶ The number of files to handle within each metadata-processing task when reading a parquet dataset from a LOCAL file system. Specifying 0 will result in serial execution on the client.
-
dataframe.parquet.metadata-task-size-remote
1 ¶ The number of files to handle within each metadata-processing task when reading a parquet dataset from a REMOTE file system. Specifying 0 will result in serial execution on the client.
-
dataframe.parquet.minimum-partition-size
75000000 ¶ The minimum in-memory size of a single partition after reading from parquet. Smaller parquet files will be combined into a single partitions to reach this threshold.
-
dataframe.convert-string
None ¶ Whether to convert string-like data to pyarrow strings.
-
dataframe.query-planning
None ¶ Whether to use query planning.
-
array.backend
numpy ¶ Backend to use for supported array-creation functions. Default is "numpy".
-
array.chunk-size
128MiB ¶ The default chunk size to target. Default is "128MiB".
-
array.chunk-size-tolerance
1.25 ¶ Upper tolerance for different algorithms when creating output chunks. Default is 1.25. This means that the algorithms can exceed the average input chunk size along this dimension by 25%.
-
array.rechunk.method
None ¶ The method to use for rechunking. Possible values are tasks or p2p. If null, pick best method depending on application.
-
array.rechunk.threshold
4 ¶ The graph growth factor above which task-based shuffling introduces an intermediate step.
-
array.svg.size
120 ¶ The size of pixels used when displaying a dask array as an SVG image. This is used, for example, for nice rendering in a Jupyter notebook
-
array.slicing.split-large-chunks
None ¶ How to handle large chunks created when slicing Arrays. By default a warning is produced. Set to ``False`` to silence the warning and allow large output chunks. Set to ``True`` to silence the warning and avoid large output chunks.
-
array.query-planning
None ¶ Whether to use query planning for arrays.
-
optimization.annotations.fuse
True ¶ If adjacent blockwise layers have different annotations (e.g., one has retries=3 and another has retries=4), Dask can make an attempt to merge those annotations according to some simple rules. ``retries`` is set to the max of the layers, ``priority`` is set to the max of the layers, ``resources`` are set to the max of all the resources, ``workers`` is set to the intersection of the requested workers. If this setting is disabled, then adjacent blockwise layers with different annotations will *not* be fused.
-
optimization.fuse.active
None ¶ Turn task fusion on/off. This option refers to the fusion of a fully-materialized task graph (not a high-Level graph). By default (None), the active task-fusion option will be treated as ``False`` for Dask-Dataframe collections, and as ``True`` for all other graphs (including Dask-Array collections).
-
optimization.fuse.ave-width
1 ¶ Upper limit for width, where width = num_nodes / height, a good measure of parallelizability
-
optimization.fuse.max-width
None ¶ Don't fuse if total width is greater than this. Set to null to dynamically adjust to 1.5 + ave_width * log(ave_width + 1)
-
optimization.fuse.max-height
inf ¶ Don't fuse more than this many levels
-
optimization.fuse.max-depth-new-edges
None ¶ Don't fuse if new dependencies are added after this many levels. Set to null to dynamically adjust to ave_width * 1.5.
-
optimization.fuse.subgraphs
None ¶ Set to True to fuse multiple tasks into SubgraphCallable objects. Set to None to let the default optimizer of individual dask collections decide. If no collection-specific default exists, None defaults to False.
-
optimization.fuse.rename-keys
True ¶ Set to true to rename the fused keys with `default_fused_keys_renamer`. Renaming fused keys can keep the graph more understandable and comprehensible, but it comes at the cost of additional processing. If False, then the top-most key will be used. For advanced usage, a function to create the new name is also accepted.
-
admin.traceback.shorten
['concurrent[\\\\\\/]futures[\\\\\\/]', 'dask[\\\\\\/](base|core|local|multiprocessing|optimization|threaded|utils)\\.py', 'dask[\\\\\\/]array[\\\\\\/]core\\.py', 'dask[\\\\\\/]dataframe[\\\\\\/](core|methods)\\.py', 'distributed[\\\\\\/](client|scheduler|utils|worker)\\.py', 'tornado[\\\\\\/]gen\\.py', 'pandas[\\\\\\/]core[\\\\\\/]'] ¶ Clean up Dask tracebacks for readability. Remove all modules that match one of the listed regular expressions. Always preserve the first and last frame.
分布式客户端¶
-
distributed.client.heartbeat
5s ¶ This value is the time between heartbeats The client sends a periodic heartbeat message to the scheduler. If it misses enough of these then the scheduler assumes that it has gone.
-
distributed.client.scheduler-info-interval
2s ¶ Interval between scheduler-info updates
-
distributed.client.security-loader
None ¶ A fully qualified name (e.g. ``module.submodule.function``) of a callback to use for loading security credentials for the client. If no security object is explicitly passed when creating a ``Client``, this callback is called with a dict containing client information (currently just ``address``), and should return a ``Security`` object to use for this client, or ``None`` to fallback to the default security configuration.
-
distributed.client.preload
[] ¶ Run custom modules during the lifetime of the client You can run custom modules when the client starts up and closes down. See http://www.aidoczh.com/dask/how-to/customize-initialization.html for more information
-
distributed.client.preload-argv
[] ¶ Arguments to pass into the preload scripts described above See http://www.aidoczh.com/dask/how-to/customize-initialization.html for more information
分布式通信¶
-
distributed.comm.retry.count
0 ¶ The number of times to retry a connection
-
distributed.comm.retry.delay.min
1s ¶ The first non-zero delay between retry attempts
-
distributed.comm.retry.delay.max
20s ¶ The maximum delay between retries
-
distributed.comm.compression
False ¶ The compression algorithm to use. 'auto' defaults to lz4 if installed, otherwise to snappy if installed, otherwise to false. zlib and zstd are only used if explicitly requested here. Uncompressible data and transfers on localhost are always uncompressed, regardless of this setting. See also distributed.worker.memory.spill-compression.
-
distributed.comm.shard
64MiB ¶ The maximum size of a frame to send through a comm Some network infrastructure doesn't like sending through very large messages. Dask comms will cut up these large messages into many small ones. This attribute determines the maximum size of such a shard.
-
distributed.comm.offload
10MiB ¶ The size of message after which we choose to offload serialization to another thread In some cases, you may also choose to disable this altogether with the value false This is useful if you want to include serialization in profiling data, or if you have data types that are particularly sensitive to deserialization
-
distributed.comm.default-scheme
tcp ¶ The default protocol to use, like tcp or tls
-
distributed.comm.socket-backlog
2048 ¶ When shuffling data between workers, there can really be O(cluster size) connection requests on a single worker socket, make sure the backlog is large enough not to lose any.
-
distributed.comm.ucx.cuda-copy
None ¶ Set environment variables to enable CUDA support over UCX. This may be used even if InfiniBand and NVLink are not supported or disabled, then transferring data over TCP.
-
distributed.comm.ucx.tcp
None ¶ Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.
-
distributed.comm.ucx.nvlink
None ¶ Set environment variables to enable UCX over NVLink, implies ``distributed.comm.ucx.tcp=True``.
-
distributed.comm.ucx.infiniband
None ¶ Set environment variables to enable UCX over InfiniBand, implies ``distributed.comm.ucx.tcp=True``.
-
distributed.comm.ucx.rdmacm
None ¶ Set environment variables to enable UCX RDMA connection manager support, requires ``distributed.comm.ucx.infiniband=True``.
-
distributed.comm.ucx.create-cuda-context
None ¶ Creates a CUDA context before UCX is initialized. This is necessary to enable UCX to properly identify connectivity of GPUs with specialized networking hardware, such as InfiniBand. This permits UCX to choose transports automatically, without specifying additional variables for each transport, while ensuring optimal connectivity. When ``True``, a CUDA context will be created on the first device listed in ``CUDA_VISIBLE_DEVICES``.
-
distributed.comm.zstd.level
3 ¶ Compression level, between 1 and 22.
-
distributed.comm.zstd.threads
0 ¶ Number of threads to use. 0 for single-threaded, -1 to infer from cpu count.
-
distributed.comm.timeouts.connect
30s ¶ No Comment
-
distributed.comm.timeouts.tcp
30s ¶ No Comment
-
distributed.comm.require-encryption
None ¶ Whether to require encryption on non-local comms
-
distributed.comm.tls.ciphers
None ¶ Allowed ciphers, specified as an OpenSSL cipher string.
-
distributed.comm.tls.min-version
1.2 ¶ The minimum TLS version to support. Defaults to TLS 1.2.
-
distributed.comm.tls.max-version
None ¶ The maximum TLS version to support. Defaults to the maximum version supported by the platform.
-
distributed.comm.tls.ca-file
None ¶ Path to a CA file, in pem format
-
distributed.comm.tls.scheduler.cert
None ¶ Path to certificate file
-
distributed.comm.tls.scheduler.key
None ¶ Path to key file. Alternatively, the key can be appended to the cert file above, and this field left blank
-
distributed.comm.tls.worker.key
None ¶ Path to key file. Alternatively, the key can be appended to the cert file above, and this field left blank
-
distributed.comm.tls.worker.cert
None ¶ Path to certificate file
-
distributed.comm.tls.client.key
None ¶ Path to key file. Alternatively, the key can be appended to the cert file above, and this field left blank
-
distributed.comm.tls.client.cert
None ¶ Path to certificate file
-
distributed.comm.websockets.shard
8MiB ¶ The maximum size of a websocket frame to send through a comm. This is somewhat duplicative of distributed.comm.shard, but websockets often have much smaller maximum message sizes than other protocols, so this attribute is used to set a smaller default shard size and to allow separate control of websocket message sharding.
分布式仪表板¶
-
distributed.dashboard.link
{scheme}://{host}:{port}/status ¶ The form for the dashboard links This is used wherever we print out the link for the dashboard It is filled in with relevant information like the schema, host, and port number
-
distributed.dashboard.export-tool
False ¶ No Comment
-
distributed.dashboard.graph-max-items
5000 ¶ maximum number of tasks to try to plot in "graph" view
-
distributed.dashboard.prometheus.namespace
dask ¶ Namespace prefix to use for all prometheus metrics.
分布式部署¶
-
distributed.deploy.lost-worker-timeout
15s ¶ Interval after which to hard-close a lost worker job Otherwise we wait for a while to see if a worker will reappear
-
distributed.deploy.cluster-repr-interval
500ms ¶ Interval between calls to update cluster-repr for the widget
分布式调度器¶
-
distributed.scheduler.allowed-failures
3 ¶ The number of retries before a task is considered bad When a worker dies when a task is running that task is rerun elsewhere. If many workers die while running this same task then we call the task bad, and raise a KilledWorker exception. This is the number of workers that are allowed to die before this task is marked as bad.
-
distributed.scheduler.bandwidth
100000000 ¶ The expected bandwidth between any pair of workers This is used when making scheduling decisions. The scheduler will use this value as a baseline, but also learn it over time.
-
distributed.scheduler.blocked-handlers
[] ¶ A list of handlers to exclude The scheduler operates by receiving messages from various workers and clients and then performing operations based on those messages. Each message has an operation like "close-worker" or "task-finished". In some high security situations administrators may choose to block certain handlers from running. Those handlers can be listed here. For a list of handlers see the `dask.distributed.Scheduler.handlers` attribute.
-
distributed.scheduler.contact-address
None ¶ The address that the scheduler advertises to workers for communication with it. To be specified when the address to which the scheduler binds cannot be the same as the address that workers use to contact the scheduler (e.g. because the former is private and the scheduler is in a different network than the workers).
-
distributed.scheduler.default-data-size
1kiB ¶ The default size of a piece of data if we don't know anything about it. This is used by the scheduler in some scheduling decisions
-
distributed.scheduler.events-cleanup-delay
1h ¶ The amount of time to wait until workers or clients are removed from the event log after they have been removed from the scheduler
-
distributed.scheduler.idle-timeout
None ¶ Shut down the scheduler after this duration if no activity has occurred
-
distributed.scheduler.no-workers-timeout
None ¶ Timeout for tasks in an unrunnable state. If task remains unrunnable for longer than this, it fails. A task is considered unrunnable IFF it has no pending dependencies, and the task has restrictions that are not satisfied by any available worker or no workers are running at all. In adaptive clusters, this timeout must be set to be safely higher than the time it takes for workers to spin up.
-
distributed.scheduler.work-stealing
True ¶ Whether or not to balance work between workers dynamically Some times one worker has more work than we expected. The scheduler will move these tasks around as necessary by default. Set this to false to disable this behavior
-
distributed.scheduler.work-stealing-interval
100ms ¶ How frequently to balance worker loads
-
distributed.scheduler.worker-saturation
1.1 ¶ Controls how many root tasks are sent to workers (like a `readahead`). Up to worker-saturation * nthreads root tasks are sent to a worker at a time. If `.inf`, all runnable tasks are immediately sent to workers. The target number is rounded up, so any `worker-saturation` value > 1.0 guarantees at least one extra task will be sent to workers. Allowing oversaturation (> 1.0) means a worker may start running a new root task as soon as it completes the previous, even if there is a higher-priority downstream task to run. This reduces worker idleness, by letting workers do something while waiting for further instructions from the scheduler, even if it's not the most efficient thing to do. This generally comes at the expense of increased memory usage. It leads to "wider" (more breadth-first) execution of the graph. Compute-bound workloads may benefit from oversaturation. Memory-bound workloads should generally leave `worker-saturation` at 1.0, though 1.25-1.5 could slightly improve performance if ample memory is available.
-
distributed.scheduler.worker-ttl
5 minutes ¶ Time to live for workers. If we don't receive a heartbeat faster than this then we assume that the worker has died.
-
distributed.scheduler.preload
[] ¶ Run custom modules during the lifetime of the scheduler You can run custom modules when the scheduler starts up and closes down. See http://www.aidoczh.com/dask/how-to/customize-initialization.html for more information
-
distributed.scheduler.preload-argv
[] ¶ Arguments to pass into the preload scripts described above See http://www.aidoczh.com/dask/how-to/customize-initialization.html for more information
-
distributed.scheduler.unknown-task-duration
500ms ¶ Default duration for all tasks with unknown durations Over time the scheduler learns a duration for tasks. However when it sees a new type of task for the first time it has to make a guess as to how long it will take. This value is that guess.
-
distributed.scheduler.default-task-durations.rechunk-split
1us ¶ No Comment
-
distributed.scheduler.default-task-durations.split-shuffle
1us ¶ No Comment
-
distributed.scheduler.default-task-durations.split-taskshuffle
1us ¶ No Comment
-
distributed.scheduler.default-task-durations.split-stage
1us ¶ No Comment
-
distributed.scheduler.validate
False ¶ Whether or not to run consistency checks during execution. This is typically only used for debugging.
-
distributed.scheduler.dashboard.status.task-stream-length
1000 ¶ The maximum number of tasks to include in the task stream plot
-
distributed.scheduler.dashboard.tasks.task-stream-length
100000 ¶ The maximum number of tasks to include in the task stream plot
-
distributed.scheduler.dashboard.tls.ca-file
None ¶ No Comment
-
distributed.scheduler.dashboard.tls.key
None ¶ No Comment
-
distributed.scheduler.dashboard.tls.cert
None ¶ No Comment
-
distributed.scheduler.dashboard.bokeh-application.allow_websocket_origin
['*'] ¶ No Comment
-
distributed.scheduler.dashboard.bokeh-application.keep_alive_milliseconds
500 ¶ No Comment
-
distributed.scheduler.dashboard.bokeh-application.check_unused_sessions_milliseconds
500 ¶ No Comment
-
distributed.scheduler.locks.lease-validation-interval
10s ¶ The interval in which the scheduler validates staleness of all acquired leases. Must always be smaller than the lease-timeout itself.
-
distributed.scheduler.locks.lease-timeout
30s ¶ Maximum interval to wait for a Client refresh before a lease is invalidated and released.
-
distributed.scheduler.http.routes
['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics'] ¶ A list of modules like "prometheus" and "health" that can be included or excluded as desired These modules will have a ``routes`` keyword that gets added to the main HTTP Server. This is also a list that can be extended with user defined modules.
-
distributed.scheduler.allowed-imports
['dask', 'distributed'] ¶ A list of trusted root modules the schedular is allowed to import (incl. submodules). For security reasons, the scheduler does not import arbitrary Python modules.
-
distributed.scheduler.active-memory-manager.start
True ¶ set to true to auto-start the AMM on Scheduler init
-
distributed.scheduler.active-memory-manager.interval
2s ¶ Time expression, e.g. "2s". Run the AMM cycle every
.
-
distributed.scheduler.active-memory-manager.measure
optimistic ¶ One of the attributes of distributed.scheduler.MemoryState
-
distributed.scheduler.active-memory-manager.policies
[{'class': 'distributed.active_memory_manager.ReduceReplicas'}] ¶ No Comment
分布式工作者¶
-
distributed.worker.blocked-handlers
[] ¶ A list of handlers to exclude The scheduler operates by receiving messages from various workers and clients and then performing operations based on those messages. Each message has an operation like "close-worker" or "task-finished". In some high security situations administrators may choose to block certain handlers from running. Those handlers can be listed here. For a list of handlers see the `dask.distributed.Scheduler.handlers` attribute.
-
distributed.worker.multiprocessing-method
spawn ¶ How we create new workers, one of "spawn", "forkserver", or "fork" This is passed to the ``multiprocessing.get_context`` function.
-
distributed.worker.use-file-locking
True ¶ Whether or not to use lock files when creating workers Workers create a local directory in which to place temporary files. When many workers are created on the same process at once these workers can conflict with each other by trying to create this directory all at the same time. To avoid this, Dask usually used a file-based lock. However, on some systems file-based locks don't work. This is particularly common on HPC NFS systems, where users may want to set this to false.
-
distributed.worker.transfer.message-bytes-limit
50MB ¶ The maximum amount of data for a worker to request from another in a single gather operation Tasks are gathered in batches, and if the first task in a batch is larger than this value, the task will still be gathered to ensure progress. Hence, this limit is not absolute. Note that this limit applies to a single gather operation and a worker may gather data from multiple workers in parallel.
-
distributed.worker.connections.outgoing
50 ¶ No Comment
-
distributed.worker.connections.incoming
10 ¶ No Comment
-
distributed.worker.preload
[] ¶ Run custom modules during the lifetime of the worker You can run custom modules when the worker starts up and closes down. See http://www.aidoczh.com/dask/how-to/customize-initialization.html for more information
-
distributed.worker.preload-argv
[] ¶ Arguments to pass into the preload scripts described above See http://www.aidoczh.com/dask/how-to/customize-initialization.html for more information
-
distributed.worker.daemon
True ¶ Whether or not to run our process as a daemon process
-
distributed.worker.validate
False ¶ Whether or not to run consistency checks during execution. This is typically only used for debugging.
-
distributed.worker.lifetime.duration
None ¶ The time after creation to close the worker, like "1 hour"
-
distributed.worker.lifetime.stagger
0 seconds ¶ Random amount by which to stagger lifetimes If you create many workers at the same time, you may want to avoid having them kill themselves all at the same time. To avoid this you might want to set a stagger time, so that they close themselves with some random variation, like "5 minutes" That way some workers can die, new ones can be brought up, and data can be transferred over smoothly.
-
distributed.worker.lifetime.restart
False ¶ Do we try to resurrect the worker after the lifetime deadline?
-
distributed.worker.profile.enabled
True ¶ Whether or not to enable profiling
-
distributed.worker.profile.interval
10ms ¶ The time between polling the worker threads, typically short like 10ms
-
distributed.worker.profile.cycle
1000ms ¶ The time between bundling together this data and sending it to the scheduler This controls the granularity at which people can query the profile information on the time axis.
-
distributed.worker.profile.low-level
False ¶ Whether or not to use the libunwind and stacktrace libraries to gather profiling information at the lower level (beneath Python) To get this to work you will need to install the experimental stacktrace library at conda install -c numba stacktrace See https://github.com/numba/stacktrace
-
distributed.worker.memory.recent-to-old-time
30s ¶ When there is an increase in process memory (as observed by the operating system) that is not accounted for by the dask keys stored on the worker, ignore it for this long before considering it in non-time-sensitive heuristics. This should be set to be longer than the duration of most dask tasks.
-
distributed.worker.memory.rebalance.measure
optimistic ¶ Which of the properties of distributed.scheduler.MemoryState should be used for measuring worker memory usage
-
distributed.worker.memory.rebalance.sender-min
0.3 ¶ Fraction of worker process memory at which we start potentially transferring data to other workers.
-
distributed.worker.memory.rebalance.recipient-max
0.6 ¶ Fraction of worker process memory at which we stop potentially receiving data from other workers. Ignored when max_memory is not set.
-
distributed.worker.memory.rebalance.sender-recipient-gap
0.1 ¶ Fraction of worker process memory, around the cluster mean, where a worker is neither a sender nor a recipient of data during a rebalance operation. E.g. if the mean cluster occupation is 50%, sender-recipient-gap=0.1 means that only nodes above 55% will donate data and only nodes below 45% will receive them. This helps avoid data from bouncing around the cluster repeatedly.
-
distributed.worker.memory.transfer
0.1 ¶ When the total size of incoming data transfers gets above this amount, we start throttling incoming data transfers
-
distributed.worker.memory.target
0.6 ¶ When the process memory (as observed by the operating system) gets above this amount, we start spilling the dask keys holding the oldest chunks of data to disk
-
distributed.worker.memory.spill
0.7 ¶ When the process memory (as observed by the operating system) gets above this amount, we spill data to disk, starting from the dask keys holding the oldest chunks of data, until the process memory falls below the target threshold.
-
distributed.worker.memory.pause
0.8 ¶ When the process memory (as observed by the operating system) gets above this amount, we no longer start new tasks or fetch new data on the worker.
-
distributed.worker.memory.terminate
0.95 ¶ When the process memory reaches this level the nanny process will kill the worker (if a nanny is present)
-
distributed.worker.memory.max-spill
False ¶ Limit of number of bytes to be spilled on disk.
-
distributed.worker.memory.spill-compression
auto ¶ The compression algorithm to use. 'auto' defaults to lz4 if installed, otherwise to snappy if installed, otherwise to false. zlib and zstd are only used if explicitly requested here. Uncompressible data is always uncompressed, regardless of this setting. See also distributed.comm.compression.
-
distributed.worker.memory.monitor-interval
100ms ¶ Interval between checks for the spill, pause, and terminate thresholds
-
distributed.worker.http.routes
['distributed.http.worker.prometheus', 'distributed.http.health', 'distributed.http.statics'] ¶ A list of modules like "prometheus" and "health" that can be included or excluded as desired These modules will have a ``routes`` keyword that gets added to the main HTTP Server. This is also a list that can be extended with user defined modules.
分布式保姆¶
-
distributed.nanny.preload
[] ¶ Run custom modules during the lifetime of the nanny You can run custom modules when the nanny starts up and closes down. See http://www.aidoczh.com/dask/how-to/customize-initialization.html for more information
-
distributed.nanny.preload-argv
[] ¶ Arguments to pass into the preload scripts described above See http://www.aidoczh.com/dask/how-to/customize-initialization.html for more information
-
distributed.nanny.pre-spawn-environ.MALLOC_TRIM_THRESHOLD_
65536 ¶ No Comment
-
distributed.nanny.pre-spawn-environ.OMP_NUM_THREADS
1 ¶ No Comment
-
distributed.nanny.pre-spawn-environ.MKL_NUM_THREADS
1 ¶ No Comment
-
distributed.nanny.pre-spawn-environ.OPENBLAS_NUM_THREADS
1 ¶ No Comment
分布式管理¶
-
distributed.admin.large-graph-warning-threshold
10MB ¶ Threshold in bytes for when a warning is raised about a large submitted task graph. Default is 10MB.
-
distributed.admin.tick.interval
20ms ¶ The time between ticks, default 20ms
-
distributed.admin.tick.limit
3s ¶ The time allowed before triggering a warning
-
distributed.admin.tick.cycle
1s ¶ The time in between verifying event loop speed
-
distributed.admin.max-error-length
10000 ¶ Maximum length of traceback as text Some Python tracebacks can be very very long (particularly in stack overflow errors) If the traceback is larger than this size (in bytes) then we truncate it.
-
distributed.admin.log-length
10000 ¶ Maximum length of worker/scheduler logs to keep in memory. They can be retrieved with get_scheduler_logs() / get_worker_logs(). Set to null for unlimited.
-
distributed.admin.log-format
%(asctime)s - %(name)s - %(levelname)s - %(message)s ¶ The log format to emit. See https://docs.python.org/3/library/logging.html#logrecord-attributes
-
distributed.admin.low-level-log-length
1000 ¶ Maximum length of various event logs for developers. Set to null for unlimited.
-
distributed.admin.pdb-on-err
False ¶ Enter Python Debugger on scheduling error
-
distributed.admin.system-monitor.interval
500ms ¶ Polling time to query cpu/memory statistics default 500ms
-
distributed.admin.system-monitor.log-length
7200 ¶ Maximum number of samples to keep in memory. Multiply by `interval` to obtain log duration. Set to null for unlimited.
-
distributed.admin.system-monitor.disk
True ¶ Should we include disk metrics? (they can cause issues in some systems)
-
distributed.admin.system-monitor.host-cpu
False ¶ Should we include host-wide CPU usage, with very granular breakdown?
-
distributed.admin.system-monitor.gil.enabled
True ¶ Enable monitoring of GIL contention
-
distributed.admin.system-monitor.gil.interval
1ms ¶ GIL polling interval. More frequent polling will reflect a more accurate GIL contention metric but will be more likely to impact runtime performance.
-
distributed.admin.event-loop
tornado ¶ The event loop to use, Must be one of tornado, asyncio, or uvloop