通信

工作者、调度器和客户端通过相互发送Python对象(如 协议 消息或用户数据)进行通信。通信层负责在分布式端点之间对这些Python对象进行适当的编码和传输。通信层能够根据用户选择或(可能)内部优化在不同的传输实现之间进行选择。

通信层位于 distributed.comm 包中。

地址

通信地址通常表示为URI,例如 tcp://127.0.0.1:1234。为了与现有代码兼容,如果省略了URI方案,则假定默认方案为 tcp``(因此 ``127.0.0.1:456 实际上与 tcp://127.0.0.1:456 相同)。默认方案在未来可能会更改。

以下方案目前在 distributed 源码树中已实现:

  • tcp 是主要的传输方式;它使用TCP套接字,并支持IPv4和IPv6地址。

  • tls 是一种使用众所周知的 TLS 协议 通过 TCP 套接字实现的加密传输。使用它需要指定密钥和证书,如 TLS/SSL 中所述。

  • inproc 是一种使用简单对象队列的进程内传输;它消除了序列化和I/O开销,只要端点位于同一进程中,就能提供几乎零成本的通信。

某些URI可能仅用于监听,而不能用于连接。例如,URI tcp:// 将监听所有IPv4和IPv6地址以及任意端口,但你无法连接到该地址。

distributed 中的更高层次 API 可能为了方便或兼容性接受其他地址格式,例如 (主机, 端口) 对。然而,抽象通信层总是处理 URI。

函数

distributed.comm 模块中有许多顶级函数,用于帮助处理地址:

distributed.comm.parse_address(addr: str, strict: bool = False) tuple[str, str][源代码]

将地址拆分为其方案和依赖于方案的位置字符串。

>>> parse_address('tcp://127.0.0.1')
('tcp', '127.0.0.1')

如果 strict 设置为 true,地址必须有一个 scheme。

distributed.comm.unparse_address(scheme: str, loc: str) str[源代码]

撤销 parse_address()。

>>> unparse_address('tcp', '127.0.0.1')
'tcp://127.0.0.1'
distributed.comm.normalize_address(addr: str) str[源代码]

规范化地址,必要时添加默认方案。

>>> normalize_address('tls://[::1]')
'tls://[::1]'
>>> normalize_address('[::1]')
'tcp://[::1]'
distributed.comm.resolve_address(addr: str) str[源代码]

将特定于方案的地址解析应用于 addr,将所有符号引用替换为具体的位置说明符。

在实践中,这可能意味着主机名被解析为IP地址。

>>> resolve_address('tcp://localhost:8786')
'tcp://127.0.0.1:8786'
distributed.comm.get_address_host(addr: str) str[源代码]

返回一个主机名/IP地址,标识此地址所在的机器。

与 get_address_host_port() 相比,对于格式正确的地址,此函数应始终成功。

>>> get_address_host('tcp://1.2.3.4:80')
'1.2.3.4'

通信 API

处理已建立通信的基本单位是 Comm 对象:

class distributed.comm.Comm(deserialize: bool = True)[源代码]

一个面向消息的通信对象,代表一个已建立的通信通道。同一时间应只有一个读取者和一个写入者:为了管理当前的通信,即使与单个对等方通信,也必须创建不同的 Comm 对象。

消息是任意的 Python 对象。此类型的具体实现可以根据底层传输的特性实现不同的序列化机制。

abstract abort()[源代码]

立即且突然地关闭通信。在析构函数或生成器的 finally 块中很有用。

abstract async close()[源代码]

干净地关闭通信。这将尝试在实际关闭底层传输之前刷新输出缓冲区。

此方法返回一个协程。

abstract closed()[源代码]

返回流是否已关闭。

property extra_info

返回关于通信的后端特定信息,作为一个字典。通常,这是在通信建立时初始化的信息,之后不会发生变化。

static handshake_configuration(local: dict[str, Any], remote: dict[str, Any]) dict[str, Any][源代码]

找到一个适合本地和远程的配置

参数
本地

此过程中 handshake_info() 的输出

远程

远程主机上 handshake_info() 的输出

handshake_info() dict[str, Any][源代码]

与可能不同的对等方共享环境信息,例如压缩设置。

注释

当此方法运行时,“自动”压缩设置已更新为实际的压缩算法。如果双方都将压缩设置为’自动’,但只有一方安装了lz4,这一点很重要。请参阅 distributed.protocol.compression._update_and_check_compression_settings()。

abstract property local_address: str

本地地址

abstract property peer_address: str

对等方的地址

abstract async read(deserializers=None)[源代码]

读取并返回一条消息(一个Python对象)。

此方法返回一个协程。

参数
反序列化器dict[str, tuple[Callable, Callable, bool]] | None

一个可选的字典,适用于 distributed.protocol.deserialize。更多信息请参见 序列化

property same_host: bool

如果对等方在本地主机上,则返回 True;否则返回 False

abstract async write(msg, serializers=None, on_error=None)[源代码]

写一条消息(一个 Python 对象)。

此方法返回一个协程。

参数
消息
on_errorstr | None

序列化失败时的行为。有关有效值,请参见 distributed.protocol.core.dumps

你不会直接创建 Comm 对象:你要么 监听 传入的通信,要么 连接 到一个正在监听连接的对等方:

async distributed.comm.connect(addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args)[源代码]

连接到给定的地址(如 tcp://127.0.0.1:1234 的URI)并生成一个 Comm 对象。如果连接尝试失败,它将重试直到 timeout 过期。

distributed.comm.listen(addr, handle_comm, deserialize=True, **kwargs)[源代码]

使用给定的参数创建一个监听器对象。当调用其 start() 方法时,监听器将在给定的地址(如 tcp://0.0.0.0 的URI)上监听,并为每个传入的连接调用 handle_comm 方法,传入一个 Comm 对象。

handle_comm 可以是一个常规函数或一个协程。

监听器对象暴露了以下接口:

class distributed.comm.core.Listener[源代码]
abstract property contact_address

此监听器可以联系的地址。如果后者是某种通配符地址,如 ‘tcp://0.0.0.0:123’,则此地址可以与 listen_address 不同。

abstract property listen_address

监听地址作为 URI 字符串。

abstract async start()[源代码]

开始监听传入的连接。

abstract stop()[源代码]

停止监听。这不会关闭已经建立的通信,但会阻止接受新的通信。

扩展通信层

每个传输由一个URI方案(如 tcp)表示,并由一个专门的 后端 实现支持,该实现提供了所有传输特定例程的入口点。

树外后端可以在 setuptools entry_points 中的 distributed.comm.backends 组下注册。例如,一个假设的 dask_udp 包可以通过在其 setup.py 文件中包含以下内容来注册其 UDP 后端类:

setup(name="dask_udp",
      entry_points={
        "distributed.comm.backends": [
            "udp=dask_udp.backend:UDPBackend",
        ]
      },
      ...
)
class distributed.comm.registry.Backend[源代码]

一个通信后端,由给定的URI方案(例如’tcp’)选择。

abstract get_address_host(loc)[源代码]

获取一个主机名(通常是一个IP地址),用于标识地址所在的主机。loc 是一个无方案的地址。

get_address_host_port(loc)[源代码]

获取无方案地址 loc 的 (主机, 端口) 元组。这应该仅由基于IP的传输实现。

abstract get_connector()[源代码]

获取一个可用于连接地址的连接器对象。

abstract get_listener(loc, handle_comm, deserialize, **connection_args)[源代码]

获取无方案地址 loc 的监听器对象。

abstract get_local_address_for(loc)[源代码]

获取适合连接到 loc 的本地监听地址。

abstract resolve_address(loc)[源代码]

将地址解析为规范形式。loc 是一个无方案的地址。

简单的实现可能会返回 loc 不变。