序列化
内容
序列化¶
当我们在计算机之间传输数据时,我们首先将这些数据转换为可以通过网络传输的字节序列。序列化过程中的选择会影响性能和安全性。
标准的Python解决方案,Pickle,通常但并非总是正确的解决方案。Dask在不同情况下使用多种不同的序列化方案。这些方案是可扩展的,允许用户在敏感情况下进行控制,同时也使库开发者能够插入性能更高的序列化解决方案。
本文档首先描述了 Dask 的默认序列化解决方案,然后讨论了控制和扩展该序列化的方法。
默认值¶
Dask 网络中传递的消息有三种类型:
像“工作者A已完成任务X”或“我内存不足”这样的小型管理消息。这些消息总是使用msgpack序列化。
程序数据的移动,例如 Numpy 数组和 Pandas 数据框。这使用了 pickle 和自定义序列化器的组合,是下一节的内容。
像
f(x)
这样的计算任务是在客户端进程上定义和序列化的,并在工作进程上反序列化和运行。这些任务使用由这些库决定的固定方案进行序列化。目前,这是 pickle 和 cloudpickle 的组合。
序列化家族¶
使用¶
对于程序数据的移动(上述第2项),我们可以使用几种不同的序列化器系列。默认情况下,以下系列是内置的:
Pickle 和 cloudpickle
Msgpack
Dask 自带的自定义每类型序列化器,用于重要数据类(如 Numpy 数组)的特殊序列化
在创建客户端时,您可以选择要用于序列化数据和反序列化数据的系列。
from dask.distributed import Client
client = Client('tcp://scheduler-address:8786',
serializers=['dask', 'pickle'],
deserializers=['dask', 'msgpack'])
如果你出于安全原因对接收Pickle序列化数据敏感,这可能会很有用。
Dask 默认使用序列化器 ['dask', 'pickle']
,如果 dask 自定义序列化器(如下所述)可用,则尝试使用它们,否则回退到 pickle/cloudpickle。
扩展¶
这些家族可以通过创建两个函数,dumps 和 loads,来扩展,这两个函数返回并消费一个 msgpack 可编码的头部,以及一个类似字节对象的列表。然后这些函数必须以适当的名称包含在 distributed.protocol.serialize
字典中。以下是 pickle_dumps
和 pickle_loads
的定义,作为示例。
import pickle
def pickle_dumps(x):
header = {'serializer': 'pickle'}
frames = [pickle.dumps(x)]
return header, frames
def pickle_loads(header, frames):
if len(frames) > 1: # this may be cut up for network reasons
frame = ''.join(frames)
else:
frame = frames[0]
return pickle.loads(frame)
from distributed.protocol.serialize import register_serialization_family
register_serialization_family('pickle', pickle_dumps, pickle_loads)
在此之后,名称 'pickle'
可以在 Client
和其他 Dask 部分的 serializers=
和 deserializers=
关键字中使用。
通信上下文¶
备注
这是一个实验性功能,可能会在没有通知的情况下发生变化
Dask 通信 如果提供 context=
关键字,可以为序列化家族函数提供额外的上下文。这使得序列化可以根据其使用方式表现出不同的行为。
def my_dumps(x, context=None):
if context and 'recipient' in context:
# check if we're sending to the same host or not
上下文取决于通信的类型。例如,当通过TCP发送时,发送方(我们)和接收方的地址可以在字典中找到。
>>> context
{'sender': 'tcp://127.0.0.1:1234', 'recipient': 'tcp://127.0.0.1:5678'}
其他通信可能提供其他信息。
Dask 序列化家族¶
使用¶
Dask 维护了自己的自定义序列化系列,专门处理一些重要类型,如 Numpy 数组。这些序列化器要么比 Pickle 更高效,要么序列化 Pickle 无法处理的类型。
使用这一系列的序列化器,你不需要做任何特别的事情。它默认是启用的(与 pickle 一起)。请注意,Dask 自定义序列化器在某些情况下可能会在内部使用 pickle。它不应被认为更安全。
扩展¶
dask_serialize 的单一调度 |
|
dask_deserialize 的单调度 |
与一般的序列化家族类似,Dask家族*也*是可扩展的。这是支持单一类型对象自定义序列化的一个好方法。方法类似,你创建序列化和反序列化函数,这些函数生成和消费头部和帧,然后将它们注册到Dask中。
class Human:
def __init__(self, name):
self.name = name
from distributed.protocol import dask_serialize, dask_deserialize
@dask_serialize.register(Human)
def serialize(human: Human) -> Tuple[Dict, List[bytes]]:
header = {}
frames = [human.name.encode()]
return header, frames
@dask_deserialize.register(Human)
def deserialize(header: Dict, frames: List[bytes]) -> Human:
return Human(frames[0].decode())
遍历属性¶
|
注册(反)序列化以遍历 __dict__ |
一个常见的情况是,你的对象只是包装了Numpy数组或其他Dask已经很好地序列化的对象。例如,Scikit-Learn估计器主要围绕Numpy数组添加了一些额外的元数据。在这些情况下,你可以使用``register_generic``函数为你的类注册自定义的Dask序列化。
API¶
|
将对象转换为标题和字节串列表 |
|
将序列化的头部和字节字符串列表转换回Python对象 |
dask_serialize 的单一调度 |
|
dask_deserialize 的单调度 |
|
|
注册(反)序列化以遍历 __dict__ |
- distributed.protocol.serialize.serialize(x: object, serializers=None, on_error: Literal['message' | 'raise'] = 'message', context=None, iterate_collection: bool | None = None) tuple[dict[str, Any], list[bytes | memoryview]] [源代码]¶
将对象转换为标题和字节串列表
这接受一个任意的 Python 对象,并返回一个 msgpack 可序列化的头部和一个字节或内存视图对象的列表。
要使用的序列化协议是可配置的:名称列表定义了要使用的序列化器集合,按顺序排列。这些名称是
serializer_registry
字典中的键(例如,’pickle’,’msgpack’),它们映射到解/序列化函数。名称 ‘dask’ 是特殊的,并将使用每个类的序列化方法。None
给出默认列表['dask', 'pickle']
。关于
iterate_collection
参数的说明(仅在x
是集合时相关): -iterate_collection=True
:分别序列化集合元素。 -iterate_collection=False
:一起序列化集合元素。 - ``iterate_collection=None``(默认):推断最佳设置。- 返回
- header: 包含任何 msgpack 可序列化元数据的字典
- frames: 字节或内存视图的列表,通常长度为1
参见
deserialize
将头部和框架转换回对象
to_serialize
标记消息中的数据应被序列化
register_serialization
注册自定义序列化函数
示例
>>> serialize(1) ({}, [b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00K\x01.'])
>>> serialize(b'123') # some special types get custom treatment ({'type': 'builtins.bytes'}, [b'123'])
>>> deserialize(*serialize(1)) 1
- distributed.protocol.serialize.deserialize(header, frames, deserializers=None)[源代码]¶
将序列化的头部和字节字符串列表转换回Python对象
- 参数
- 标题dict
- 框架字节列表
- 反序列化器dict[str, tuple[Callable, Callable, bool]] | None
一个可选的字典,将名称映射到一个(反)序列化器。更多信息请参见 dask_serialize 和 dask_deserialize。
参见
- distributed.protocol.serialize.dask_serialize(arg, *args, **kwargs)¶
dask_serialize 的单一调度
- distributed.protocol.serialize.dask_deserialize(arg, *args, **kwargs)¶
dask_deserialize 的单调度
- distributed.protocol.serialize.register_generic(cls, serializer_name='dask', serialize_func=<dask.utils.Dispatch object>, deserialize_func=<dask.utils.Dispatch object>)[源代码]¶
注册(反)序列化以遍历 __dict__
通常在为 Dask 的自定义序列化注册新类时,你需要管理头部和帧,这可能会很繁琐。如果你只想遍历你的对象并将其所有属性的序列化应用,那么这个函数可能会提供一个更简单的路径。
这为自定义的 Dask 序列化家族注册了一个类。它通过遍历其 __dict__ 属性并递归应用
serialize
和deserialize
来进行序列化。它会收集一组帧并将小属性保留在头部。反序列化则逆转这一过程。如果以下条件成立,这是一个好主意:
你的对象的大部分字节由 Dask 的自定义序列化已经处理得很好的数据类型组成,比如 Numpy 数组。
你的对象不需要任何特殊的构造逻辑,除了 object.__new__(cls)
示例
>>> import sklearn.base >>> from distributed.protocol import register_generic >>> register_generic(sklearn.base.BaseEstimator)