序列化

当我们在计算机之间传输数据时,我们首先将这些数据转换为可以通过网络传输的字节序列。序列化过程中的选择会影响性能和安全性。

标准的Python解决方案,Pickle,通常但并非总是正确的解决方案。Dask在不同情况下使用多种不同的序列化方案。这些方案是可扩展的,允许用户在敏感情况下进行控制,同时也使库开发者能够插入性能更高的序列化解决方案。

本文档首先描述了 Dask 的默认序列化解决方案,然后讨论了控制和扩展该序列化的方法。

默认值

Dask 网络中传递的消息有三种类型:

  1. 像“工作者A已完成任务X”或“我内存不足”这样的小型管理消息。这些消息总是使用msgpack序列化。

  2. 程序数据的移动,例如 Numpy 数组和 Pandas 数据框。这使用了 pickle 和自定义序列化器的组合,是下一节的内容。

  3. f(x) 这样的计算任务是在客户端进程上定义和序列化的,并在工作进程上反序列化和运行。这些任务使用由这些库决定的固定方案进行序列化。目前,这是 pickle 和 cloudpickle 的组合。

序列化家族

使用

对于程序数据的移动(上述第2项),我们可以使用几种不同的序列化器系列。默认情况下,以下系列是内置的:

  1. Pickle 和 cloudpickle

  2. Msgpack

  3. Dask 自带的自定义每类型序列化器,用于重要数据类(如 Numpy 数组)的特殊序列化

在创建客户端时,您可以选择要用于序列化数据和反序列化数据的系列。

from dask.distributed import Client
client = Client('tcp://scheduler-address:8786',
                serializers=['dask', 'pickle'],
                deserializers=['dask', 'msgpack'])

如果你出于安全原因对接收Pickle序列化数据敏感,这可能会很有用。

Dask 默认使用序列化器 ['dask', 'pickle'],如果 dask 自定义序列化器(如下所述)可用,则尝试使用它们,否则回退到 pickle/cloudpickle。

扩展

这些家族可以通过创建两个函数,dumps 和 loads,来扩展,这两个函数返回并消费一个 msgpack 可编码的头部,以及一个类似字节对象的列表。然后这些函数必须以适当的名称包含在 distributed.protocol.serialize 字典中。以下是 pickle_dumpspickle_loads 的定义,作为示例。

import pickle

def pickle_dumps(x):
    header = {'serializer': 'pickle'}
    frames = [pickle.dumps(x)]
    return header, frames

def pickle_loads(header, frames):
    if len(frames) > 1:  # this may be cut up for network reasons
        frame = ''.join(frames)
    else:
        frame = frames[0]
    return pickle.loads(frame)

from distributed.protocol.serialize import register_serialization_family
register_serialization_family('pickle', pickle_dumps, pickle_loads)

在此之后,名称 'pickle' 可以在 Client 和其他 Dask 部分的 serializers=deserializers= 关键字中使用。

通信上下文

备注

这是一个实验性功能,可能会在没有通知的情况下发生变化

Dask 通信 如果提供 context= 关键字,可以为序列化家族函数提供额外的上下文。这使得序列化可以根据其使用方式表现出不同的行为。

def my_dumps(x, context=None):
    if context and 'recipient' in context:
        # check if we're sending to the same host or not

上下文取决于通信的类型。例如,当通过TCP发送时,发送方(我们)和接收方的地址可以在字典中找到。

>>> context
{'sender': 'tcp://127.0.0.1:1234', 'recipient': 'tcp://127.0.0.1:5678'}

其他通信可能提供其他信息。

Dask 序列化家族

使用

Dask 维护了自己的自定义序列化系列,专门处理一些重要类型,如 Numpy 数组。这些序列化器要么比 Pickle 更高效,要么序列化 Pickle 无法处理的类型。

使用这一系列的序列化器,你不需要做任何特别的事情。它默认是启用的(与 pickle 一起)。请注意,Dask 自定义序列化器在某些情况下可能会在内部使用 pickle。它不应被认为更安全。

扩展

dask_serialize

dask_serialize 的单一调度

dask_deserialize

dask_deserialize 的单调度

与一般的序列化家族类似,Dask家族*也*是可扩展的。这是支持单一类型对象自定义序列化的一个好方法。方法类似,你创建序列化和反序列化函数,这些函数生成和消费头部和帧,然后将它们注册到Dask中。

class Human:
    def __init__(self, name):
        self.name = name

from distributed.protocol import dask_serialize, dask_deserialize

@dask_serialize.register(Human)
def serialize(human: Human) -> Tuple[Dict, List[bytes]]:
    header = {}
    frames = [human.name.encode()]
    return header, frames

@dask_deserialize.register(Human)
def deserialize(header: Dict, frames: List[bytes]) -> Human:
    return Human(frames[0].decode())

遍历属性

register_generic(cls[, serializer_name, ...])

注册(反)序列化以遍历 __dict__

一个常见的情况是,你的对象只是包装了Numpy数组或其他Dask已经很好地序列化的对象。例如,Scikit-Learn估计器主要围绕Numpy数组添加了一些额外的元数据。在这些情况下,你可以使用``register_generic``函数为你的类注册自定义的Dask序列化。

API

serialize(x[, serializers, on_error, ...])

将对象转换为标题和字节串列表

deserialize(header, frames[, deserializers])

将序列化的头部和字节字符串列表转换回Python对象

dask_serialize

dask_serialize 的单一调度

dask_deserialize

dask_deserialize 的单调度

register_generic(cls[, serializer_name, ...])

注册(反)序列化以遍历 __dict__

distributed.protocol.serialize.serialize(x: object, serializers=None, on_error: Literal['message' | 'raise'] = 'message', context=None, iterate_collection: bool | None = None) tuple[dict[str, Any], list[bytes | memoryview]][源代码]

将对象转换为标题和字节串列表

这接受一个任意的 Python 对象,并返回一个 msgpack 可序列化的头部和一个字节或内存视图对象的列表。

要使用的序列化协议是可配置的:名称列表定义了要使用的序列化器集合,按顺序排列。这些名称是 serializer_registry 字典中的键(例如,’pickle’,’msgpack’),它们映射到解/序列化函数。名称 ‘dask’ 是特殊的,并将使用每个类的序列化方法。None 给出默认列表 ['dask', 'pickle']

关于 iterate_collection 参数的说明(仅在 x 是集合时相关): - iterate_collection=True:分别序列化集合元素。 - iterate_collection=False:一起序列化集合元素。 - ``iterate_collection=None``(默认):推断最佳设置。

返回
header: 包含任何 msgpack 可序列化元数据的字典
frames: 字节或内存视图的列表,通常长度为1

参见

deserialize

将头部和框架转换回对象

to_serialize

标记消息中的数据应被序列化

register_serialization

注册自定义序列化函数

示例

>>> serialize(1)
({}, [b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00K\x01.'])
>>> serialize(b'123')  # some special types get custom treatment
({'type': 'builtins.bytes'}, [b'123'])
>>> deserialize(*serialize(1))
1
distributed.protocol.serialize.deserialize(header, frames, deserializers=None)[源代码]

将序列化的头部和字节字符串列表转换回Python对象

参数
标题dict
框架字节列表
反序列化器dict[str, tuple[Callable, Callable, bool]] | None

一个可选的字典,将名称映射到一个(反)序列化器。更多信息请参见 dask_serializedask_deserialize

参见

serialize
distributed.protocol.serialize.dask_serialize(arg, *args, **kwargs)

dask_serialize 的单一调度

distributed.protocol.serialize.dask_deserialize(arg, *args, **kwargs)

dask_deserialize 的单调度

distributed.protocol.serialize.register_generic(cls, serializer_name='dask', serialize_func=<dask.utils.Dispatch object>, deserialize_func=<dask.utils.Dispatch object>)[源代码]

注册(反)序列化以遍历 __dict__

通常在为 Dask 的自定义序列化注册新类时,你需要管理头部和帧,这可能会很繁琐。如果你只想遍历你的对象并将其所有属性的序列化应用,那么这个函数可能会提供一个更简单的路径。

这为自定义的 Dask 序列化家族注册了一个类。它通过遍历其 __dict__ 属性并递归应用 serializedeserialize 来进行序列化。它会收集一组帧并将小属性保留在头部。反序列化则逆转这一过程。

如果以下条件成立,这是一个好主意:

  1. 你的对象的大部分字节由 Dask 的自定义序列化已经处理得很好的数据类型组成,比如 Numpy 数组。

  2. 你的对象不需要任何特殊的构造逻辑,除了 object.__new__(cls)

示例

>>> import sklearn.base
>>> from distributed.protocol import register_generic
>>> register_generic(sklearn.base.BaseEstimator)