开发指南

此仓库是 Dask 项目的一部分。 一般开发指南包括在哪里寻求帮助、仓库布局、测试实践以及文档和风格标准,可在主文档中的 Dask 开发者指南 中找到。

安装

  1. 使用 git 克隆此仓库:

    git clone git@github.com:dask/distributed.git
    cd distributed
    
  2. 安装 anaconda 或 miniconda(取决于操作系统)

  3. conda env create --file continuous_integration/environment-3.11.yaml
    conda activate dask-distributed
    python -m pip install -e .
    

要保持分支与上游源代码同步:

cd distributed
git remote add upstream git@github.com:dask/distributed.git
git remote -v
git fetch -a upstream
git checkout main
git pull upstream main
git push origin main

测试

使用 py.test 进行测试:

py.test distributed --verbose

Tornado

Dask.distributed 是一个 Tornado TCP 应用程序。Tornado 为我们提供了基于套接字的通信层,以及用于编写异步协程的语法,类似于 asyncio。你可以在不了解太多 Tornado 的情况下对这个库中的策略进行适度更改,但是中等程度的更改可能需要你理解 Tornado IOLoops、协程以及一些关于非阻塞通信的知识。Tornado API 文档非常出色,我们建议你阅读以下资源:

此外,如果你想在较低层次上与工作者和调度器之间的通信进行交互,那么你应该理解这里可用的 Tornado TCPServerIOStream

Dask.distributed 在 Tornado 周围封装了一些逻辑。更多信息请参见 基础

编写测试

测试分布式系统通常非常困难,因为当出现问题时,很难检查所有组件的状态。幸运的是,Tornado 中的非阻塞异步模型允许我们在单个线程中运行调度器、多个工作者和多个客户端。这为我们提供了可预测的性能、干净的关闭,以及在执行过程中进入代码任何点的能力。同时,有时我们希望所有内容在不同的进程中运行,以模拟更真实的设置。

测试套件包含三种测试

  1. @gen_cluster: 完全异步测试,其中所有组件在主线程的同一事件循环中运行。这些测试适用于测试复杂逻辑并直接检查系统状态。它们也更容易调试,并且在关闭时引发的问题最少。

  2. def test_foo(client): 使用从主进程分叉的多个进程进行测试。这些测试适用于同步(普通用户)API的测试,以及在触发硬故障以进行弹性测试时使用。

  3. popen: 调用命令行启动系统的测试。这些测试很少,主要用于测试命令行接口。

如果你对 Tornado 接口感到满意,那么你会最开心地使用 @gen_cluster 风格的测试,例如:

# tests/test_submit.py

from distributed.utils_test import gen_cluster, inc
from distributed import Client, Future, Scheduler, Worker

@gen_cluster(client=True)
async def test_submit(c, s, a, b):
    assert isinstance(c, Client)
    assert isinstance(s, Scheduler)
    assert isinstance(a, Worker)
    assert isinstance(b, Worker)

    future = c.submit(inc, 1)
    assert isinstance(future, Future)
    assert future.key in c.futures

    # result = future.result()  # This synchronous API call would block
    result = await future
    assert result == 2

    assert future.key in s.tasks
    assert future.key in a.data or future.key in b.data

@gen_cluster 装饰器为你设置调度器、客户端和工作线程,并在测试后清理它们。它还允许你直接检查集群中每个元素的状态。然而,你不能使用正常的同步API(这样做会导致测试永远等待),而是需要使用协程API,其中所有阻塞函数都以一个下划线(_)为前缀,并使用 await 等待。注意,在这些测试中使用阻塞接口是一个常见的错误。

如果你想测试正常的同步API,你可以使用 client pytest fixture 风格的测试,它会在不同的分叉进程中为你设置调度器和工作节点:

from distributed.utils_test import client

def test_submit(client):
    future = client.submit(inc, 10)
    assert future.result() == 11

此外,如果你想访问调度器和工作进程,你也可以添加 s, a, b 固定装置。

from distributed.utils_test import client

def test_submit(client, s, a, b):
    future = client.submit(inc, 10)
    assert future.result() == 11  # use the synchronous/blocking API here

    a['proc'].terminate()  # kill one of the workers

    result = future.result()  # test that future remains valid
    assert result == 2

在这种测试风格中,您无法访问调度器或工作器。变量 s, a, b 现在是包含 multiprocessing.Process 对象和端口整数的字典。然而,您现在可以使用正常的同步API(在这种测试风格中永远不要使用 await),并且可以通过终止它们来轻松关闭进程。

通常,对于大多数面向用户的功能,你会发现这两种测试。@gen_cluster 测试特定的逻辑,而 client pytest 夹具测试基本的接口和弹性。

除非绝对必要,例如需要测试命令行接口,否则应避免使用 popen 风格的测试。

代码格式化

Dask.distributed 使用多个代码检查工具(flake8、black、isort、pyupgrade、mypy),这些工具由 CI 强制执行。开发者在提交 PR 之前,应通过单个命令 pre-commit run --all-files 在本地运行这些工具。这确保了所有开发者的检查工具版本和选项保持一致。

可选地,您可能希望设置 pre-commit hooks 以便在您进行 git 提交时自动运行。这可以通过运行以下命令来完成:

pre-commit install

从分布式仓库的根目录开始。现在每次提交更改时,代码检查器都会运行。你可以使用 git commit --no-verify 或简短版本 git commit -n 跳过这些检查。