协议

调度器、工作者和客户端之间传递消息。这些消息在语义上编码了命令、状态更新和数据,例如:

  • 请在数据 x 上计算函数 sum 并将结果存储在 y

  • 计算 y 已完成

  • 请注意,一个新的工作者 alice 已可供使用

  • 这里是键 'x''y' 的数据

在实践中,我们用字典/映射来表示这些消息:

{'op': 'compute',
 'function': ...
 'args': ['x']}

{'op': 'task-complete',
 'key': 'y',
 'nbytes': 26}

{'op': 'register-worker',
 'address': '192.168.1.42',
 'name': 'alice',
 'nthreads': 4}

{'x': b'...',
 'y': b'...'}

当我们在节点之间传递这些消息时,我们需要将这些消息序列化为字节串,然后在另一端反序列化为内存中的字典形式。对于简单的情况,有几种选项,如 JSON、MsgPack、Protobuffers 和 Thrift。由于需要考虑序列化 Python 函数和对象、可选的压缩、跨语言支持、大消息和效率等问题,情况变得更加复杂。

本文档描述了 dask.distributed 当前使用的协议。请注意,随着我们继续优化性能,此协议可能会迅速变化。

概述

我们可能会将一条消息拆分为多个消息部分以适应不同的协议。通常,小数据块使用 MsgPack 编码,而大的字节串和复杂数据类型则由自定义格式处理。每个消息部分都有自己的头,头始终使用 msgpack 编码。在序列化所有消息部分后,我们得到一系列字节串或 ,这些帧在发送时会加上长度信息。

应用程序对此一无所知,它只是向我们发送包含各种数据类型的Python字典,我们生成一个字节串列表,这些字节串会被写入一个套接字。这种格式对于频繁发送的许多消息和大型消息都非常快速。

消息的MsgPack

大多数消息使用 MsgPack 编码,这是一种自描述的半结构化序列化格式,与 JSON 非常相似,但更小、更快,不可读,并且支持字节串和(即将支持)时间戳。我们选择 MsgPack 作为基础序列化格式的原因如下:

  • 它不需要单独的标题,因此使用起来既简单又灵活,这对于像 dask.distributed 这样的早期阶段项目尤为重要。

  • 它非常快,比 JSON 快得多,并且有优化得很好的实现。除了少数例外(稍后描述),MsgPack 在大量使用的情况下也不会成为瓶颈。

  • 与 JSON 不同,它支持字节字符串

  • 它涵盖了编码大多数信息所需的标准类型集。

  • 它在许多语言中得到了广泛实现(见下面的跨语言部分)

然而,MsgPack 在以下方面失败(正确地):

  • 它没有提供任何方式来编码Python函数或用户定义的数据类型

  • 它不支持大于4GB的字节串,并且对于非常大的消息通常效率不高。

由于这些不足,我们通过特定语言的协议和针对大字节字符串的特殊情况来补充它。

CloudPickle 用于函数和一些数据

Pickle 和 CloudPickle 是 Python 库,用于序列化几乎任何 Python 对象,包括函数。我们使用这些库在将用户的函数和数据转换为字节后,再将其包含在我们传递给 msgpack 的字典/映射中。在入门示例中,您可能已经注意到我们跳过了为函数参数提供示例:

{'op': 'compute',
 'function': ...
 'args': ['x']}

这是因为这个值 ... 实际上将是调用 cloudpickle.dumps(myfunction) 的结果。这些字节将被包含在我们发送给 msgpack 的字典中,msgpack 只需要处理字节而不是晦涩的 Python 函数。

注意:我们实际上会根据情况调用 pickle 和 cloudpickle 的某种组合。这是出于性能原因。

CloudPickle 可以通过引用(通过模块和名称引用它们)或通过值(序列化对象的实际代码)来序列化对象。默认情况下,如果可以,它会通过引用进行序列化,但从 CloudPickle 2.0 开始,您可以注册一个模块以通过值进行序列化。如果您想发送接收端不存在的模块中的对象,这可能会很有用:

import mymodule
cloudpickle.register_pickle_by_value(mymodule)

跨语言专业化

客户端和工作者必须就特定语言的序列化格式达成一致。在标准的 dask.distributed 客户端和工作对象中,最终结果如下:

bytes = cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
obj = cloudpickle.loads(bytes)

这在 Python 2 和 3 之间有所不同,因此您的客户端和工作程序必须匹配它们的 Python 版本和软件环境。

然而,调度器从不使用特定语言的序列化,而是只处理 MsgPack。如果客户端将一个经过 pickle 处理的函数发送给调度器,调度器将不会解包该函数,而是将其作为字节保留。最终,这些字节将被发送到一个工作节点,该工作节点随后会将这些字节解包成一个适当的 Python 函数。因为调度器从不解包特定语言的序列化字节,所以它可能是另一种语言。

客户端和工作者必须共享相同的语言和软件环境,调度器可能有所不同。

这有几个优点:

  1. 调度器受到保护,防止解封装不安全的代码

  2. 我们可以设想为其他语言(如R或Julia)实现工作者和客户端,并重用Python调度器。工作者和客户端代码相当简单,比复杂的调度器更容易重新实现。

  3. 调度器可能会在未来的某一天被重写为更加高度优化的 C 或 Go 语言。

压缩

像 LZ4 或 Snappy 这样的快速压缩库可以通过在发送前压缩数据并在接收后解压缩数据来提高有效带宽。这在低带宽网络中尤其有价值。

如果这些库中的任何一个可用(我们更倾向于 LZ4 而不是 Snappy),那么对于每个大于 1kB 的消息,我们会尝试压缩消息,如果压缩至少提高了 10%,我们就会发送压缩后的字节而不是原始负载。我们会在头部记录使用的压缩方式,例如 'lz4''snappy'

为了避免压缩大量不可压缩的数据,我们首先尝试压缩一个样本。我们从数据集中的五个位置提取10kB的数据块,将它们排列在一起,并尝试压缩结果。如果这没有导致显著的压缩,那么我们就不尝试压缩完整的结果。

序列化数据

对于更新状态等管理消息,msgpack 已经足够。然而,对于大型结果或特定于 Python 的数据,如 NumPy 数组或 Pandas 数据帧,或者对于更大的结果,我们需要使用其他方法将 Python 对象转换为字节字符串。具体如何实现这一点在 序列化文档 中有更详细的描述。

应用程序代码使用 to_serialize 函数标记特定于Python的结果:

>>> import numpy as np
>>> x = np.ones(5)

>>> from distributed.protocol import to_serialize
>>> msg = {'status': 'OK', 'data': to_serialize(x)}
>>> msg
{'data': <Serialize: [ 1.  1.  1.  1.  1.]>, 'status': 'OK'}

我们将消息分为两条消息,一条编码所有要序列化的数据,另一条编码其他所有内容:

{'key': 'x', 'address': 'alice'}
{'data': <Serialize: [ 1.  1.  1.  1.  1.]>}

我们通常使用 msgpack 传递第一个消息。第二个消息我们分多个部分传递,每个部分对应一个序列化的数据片段(参见 序列化),以及一个包含类型、压缩等信息的头部,用于每个值:

{'keys': ['data'],
 'compression': ['lz4']}
b'...'
b'...'

框架

在管道的末端,我们有一个字节串或帧的序列。我们需要告诉接收端有多少帧以及每个帧的长度。我们按以下顺序排列帧和帧长度:

  1. 帧数,存储为8字节无符号整数

  2. 每个帧的长度,每个存储为8字节无符号整数

  3. 每个框架

在以下部分中,我们将描述如何创建这些框架。

技术版本

一条消息被分解为以下组成部分:

  1. 8 字节编码消息中有多少帧(N)作为 uint64

  2. 8 * N 帧,每帧的长度编码为 uint64

  3. 管理消息的标题

  4. 管理消息,msgpack 编码,可能被压缩

  5. 所有有效载荷消息的标题

  6. 有效载荷消息

行政信息标题

管理消息是任意的 msgpack 编码数据。通常是一个字典。它可以选择性地被压缩。如果是这样,压缩类型将包含在头部。

有效载荷帧和头部

这些框架是可选的。

有效载荷帧用于发送大型或特定语言的数据。这些值在解码后将被插入到管理消息中。头部是msgpack编码的,并包含所有后续有效载荷消息的编码和压缩信息。

一个有效载荷可能分布在多个帧中。每个帧可以单独压缩。

简单示例

这个简单的例子展示了一个最小的消息。只有一个空的头部和一个小型的 msgpack 消息。没有额外的负载帧。

消息: {'status': 'OK'}

框架:

  • 标题: {}

  • 管理消息: {'status': 'OK'}

自定义数据的示例

这个示例包含一个由单个帧组成的单个有效载荷消息。它使用了一种特殊的序列化方法来处理NumPy数组。

消息: {'op': 'get-data', 'data': np.ones(5)}

框架:

  • 标题: {}

  • 管理消息: {'op': 'get-data'}

  • 有效载荷头:

    {'headers': [{'type': 'numpy.ndarray',
                  'compression': 'lz4',
                  'count': 1,
                  'lengths': [40],
                  'dtype': '<f8',
                  'strides': (8,),
                  'shape': (5,)}],
                 'keys': [('data',)]}
    
  • 有效载荷帧: b'(\x00\x00\x00\x11\x00\x01\x00!\xf0?\x07\x00\x0f\x08\x00\x03P\x00\x00\x00\xf0?'