通信
内容
通信¶
工作者、调度器和客户端通过相互发送Python对象(如 协议 消息或用户数据)进行通信。通信层负责在分布式端点之间对这些Python对象进行适当的编码和传输。通信层能够根据用户选择或(可能)内部优化在不同的传输实现之间进行选择。
通信层位于 distributed.comm
包中。
地址¶
通信地址通常表示为URI,例如 tcp://127.0.0.1:1234
。为了与现有代码兼容,如果省略了URI方案,则假定默认方案为 tcp``(因此 ``127.0.0.1:456
实际上与 tcp://127.0.0.1:456
相同)。默认方案在未来可能会更改。
以下方案目前在 distributed
源码树中已实现:
tcp
是主要的传输方式;它使用TCP套接字,并支持IPv4和IPv6地址。tls
是一种使用众所周知的 TLS 协议 通过 TCP 套接字实现的加密传输。使用它需要指定密钥和证书,如 TLS/SSL 中所述。inproc
是一种使用简单对象队列的进程内传输;它消除了序列化和I/O开销,只要端点位于同一进程中,就能提供几乎零成本的通信。
某些URI可能仅用于监听,而不能用于连接。例如,URI tcp://
将监听所有IPv4和IPv6地址以及任意端口,但你无法连接到该地址。
在 distributed
中的更高层次 API 可能为了方便或兼容性接受其他地址格式,例如 (主机, 端口)
对。然而,抽象通信层总是处理 URI。
函数¶
在 distributed.comm
模块中有许多顶级函数,用于帮助处理地址:
- distributed.comm.parse_address(addr: str, strict: bool = False) tuple[str, str] [源代码]¶
将地址拆分为其方案和依赖于方案的位置字符串。
>>> parse_address('tcp://127.0.0.1') ('tcp', '127.0.0.1')
如果 strict 设置为 true,地址必须有一个 scheme。
- distributed.comm.unparse_address(scheme: str, loc: str) str [源代码]¶
撤销 parse_address()。
>>> unparse_address('tcp', '127.0.0.1') 'tcp://127.0.0.1'
- distributed.comm.normalize_address(addr: str) str [源代码]¶
规范化地址,必要时添加默认方案。
>>> normalize_address('tls://[::1]') 'tls://[::1]' >>> normalize_address('[::1]') 'tcp://[::1]'
通信 API¶
处理已建立通信的基本单位是 Comm
对象:
- class distributed.comm.Comm(deserialize: bool = True)[源代码]¶
一个面向消息的通信对象,代表一个已建立的通信通道。同一时间应只有一个读取者和一个写入者:为了管理当前的通信,即使与单个对等方通信,也必须创建不同的
Comm
对象。消息是任意的 Python 对象。此类型的具体实现可以根据底层传输的特性实现不同的序列化机制。
- property extra_info¶
返回关于通信的后端特定信息,作为一个字典。通常,这是在通信建立时初始化的信息,之后不会发生变化。
- static handshake_configuration(local: dict[str, Any], remote: dict[str, Any]) dict[str, Any] [源代码]¶
找到一个适合本地和远程的配置
- 参数
- 本地
此过程中 handshake_info() 的输出
- 远程
远程主机上 handshake_info() 的输出
- handshake_info() dict[str, Any] [源代码]¶
与可能不同的对等方共享环境信息,例如压缩设置。
注释
当此方法运行时,“自动”压缩设置已更新为实际的压缩算法。如果双方都将压缩设置为’自动’,但只有一方安装了lz4,这一点很重要。请参阅 distributed.protocol.compression._update_and_check_compression_settings()。
你不会直接创建 Comm
对象:你要么 监听
传入的通信,要么 连接
到一个正在监听连接的对等方:
- async distributed.comm.connect(addr, timeout=None, deserialize=True, handshake_overrides=None, **connection_args)[源代码]¶
连接到给定的地址(如
tcp://127.0.0.1:1234
的URI)并生成一个Comm
对象。如果连接尝试失败,它将重试直到 timeout 过期。
- distributed.comm.listen(addr, handle_comm, deserialize=True, **kwargs)[源代码]¶
使用给定的参数创建一个监听器对象。当调用其
start()
方法时,监听器将在给定的地址(如tcp://0.0.0.0
的URI)上监听,并为每个传入的连接调用 handle_comm 方法,传入一个Comm
对象。handle_comm 可以是一个常规函数或一个协程。
监听器对象暴露了以下接口:
扩展通信层¶
每个传输由一个URI方案(如 tcp
)表示,并由一个专门的 后端
实现支持,该实现提供了所有传输特定例程的入口点。
树外后端可以在 setuptools entry_points 中的 distributed.comm.backends
组下注册。例如,一个假设的 dask_udp
包可以通过在其 setup.py
文件中包含以下内容来注册其 UDP 后端类:
setup(name="dask_udp",
entry_points={
"distributed.comm.backends": [
"udp=dask_udp.backend:UDPBackend",
]
},
...
)