序列化#

由于 Ray 进程不共享内存空间,工作人员和节点之间传输的数据将需要 序列化反序列化。Ray 使用 Plasma 对象存储 来高效地在不同进程和不同节点之间传输对象。对象存储中的 Numpy 数组在同一节点上的工作人员之间共享(零拷贝反序列化)。

概述#

Ray 决定使用自定义的 Pickle 协议版本 5 回溯版本来替换原始的 PyArrow 序列化器。这解决了之前的一些限制(例如,无法序列化递归对象)。

Ray 目前兼容 Pickle 协议版本 5,而 Ray 在 cloudpickle 的帮助下支持更广泛的对象序列化(例如 lambda 函数和嵌套函数、动态类)。

等离子体对象存储#

Plasma 是一个内存对象存储。它最初作为 Apache Arrow 的一部分开发。在 Ray 1.0.0 版本发布之前,Ray 将 Arrow 的 Plasma 代码分叉到 Ray 的代码库中,以便根据 Ray 的架构和性能需求解耦并继续开发。

Plasma 用于在不同进程和不同节点之间高效传输对象。Plasma 对象存储中的所有对象都是 不可变的,并且保存在共享内存中。这样,它们可以被同一节点上的许多工作者高效访问。

每个节点都有自己的对象存储。当数据被放入对象存储时,它不会自动广播到其他节点。数据保持在本地的写入者,直到被另一个节点上的任务或角色请求。

序列化 ObjectRefs#

显式地使用 ray.cloudpickle 序列化 ObjectRefs 应作为最后的手段。通过 Ray 任务参数和返回值传递 ObjectRefs 是推荐的方法。

Ray ObjectRefs 可以使用 ray.cloudpickle 进行序列化。ObjectRef 随后可以被反序列化并通过 ray.get() 访问。请注意,必须使用 ray.cloudpickle;其他 pickle 工具不能保证正常工作。此外,反序列化 ObjectRef 的过程必须是同一个 Ray 集群的一部分,该集群对其进行了序列化。

当序列化时,ObjectRef 的值将保持在 Ray 的共享内存对象存储中。必须通过调用 ray._private.internal_api.free(obj_ref) 来显式释放对象。

警告

ray._private.internal_api.free(obj_ref) 是一个私有API,可能在未来的Ray版本中有所改变。

这个代码示例展示了如何序列化一个 ObjectRef,将其存储在外部存储中,反序列化并使用它,最后释放其对象。

import ray
from ray import cloudpickle

FILE = "external_store.pickle"

ray.init()

my_dict = {"hello": "world"}

obj_ref = ray.put(my_dict)
with open(FILE, "wb+") as f:
    cloudpickle.dump(obj_ref, f)

# ObjectRef remains pinned in memory because
# it was serialized with ray.cloudpickle.
del obj_ref

with open(FILE, "rb") as f:
    new_obj_ref = cloudpickle.load(f)

# The deserialized ObjectRef works as expected.
assert ray.get(new_obj_ref) == my_dict

# Explicitly free the object.
ray._private.internal_api.free(new_obj_ref)

Numpy 数组#

Ray 通过使用带外数据的 Pickle 协议 5 来优化 numpy 数组。numpy 数组被存储为一个只读对象,同一节点上的所有 Ray 工作进程都可以在不复制的情况下(零拷贝读取)从对象存储中读取该 numpy 数组。工作进程中的每个 numpy 数组对象都持有指向共享内存中相关数组的指针。任何对只读对象的写操作都需要用户首先将其复制到本地进程内存中。

小技巧

通过仅使用原生类型(例如,numpy 数组或 numpy 数组和其他基本类型的列表/字典),或者通过使用 Actors 持有无法序列化的对象,您通常可以避免序列化问题。

修复“赋值目标为只读”#

因为 Ray 将 numpy 数组放入对象存储中,当它们作为远程函数的参数反序列化时,它们将成为只读的。例如,以下代码片段将会崩溃:

import ray
import numpy as np


@ray.remote
def f(arr):
    # arr = arr.copy()  # Adding a copy will fix the error.
    arr[0] = 1


try:
    ray.get(f.remote(np.zeros(100)))
except ray.exceptions.RayTaskError as e:
    print(e)
# ray.exceptions.RayTaskError(ValueError): ray::f()
#   File "test.py", line 6, in f
#     arr[0] = 1
# ValueError: assignment destination is read-only

为了避免这个问题,如果你需要修改数组,可以在目标位置手动复制数组(arr = arr.copy())。注意,这实际上相当于禁用了Ray提供的零拷贝反序列化功能。

序列化笔记#

  • Ray 目前使用的是 Pickle 协议版本 5。大多数 Python 发行版使用的默认 pickle 协议是协议 3。对于较大的对象,协议 4 和 5 比协议 3 更高效。

  • 对于非本地对象,Ray 将始终保持单个副本,即使它在对象中被多次引用:

    import ray
    import numpy as np
    
    obj = [np.zeros(42)] * 99
    l = ray.get(ray.put(obj))
    assert l[0] is l[1]  # no problem!
    
  • 尽可能使用 numpy 数组或 numpy 数组的 Python 集合以获得最佳性能。

  • 锁对象大多不可序列化,因为复制一个锁是无意义的,并且可能导致严重的并发问题。如果你的对象包含一个锁,你可能需要想出一个变通方法。

自定义序列化#

有时你可能希望自定义序列化过程,因为 Ray 使用的默认序列化器(pickle5 + cloudpickle)可能不适合你(无法序列化某些对象,对某些对象来说太慢等)。

至少有三种方式来定义你的自定义序列化过程:

  1. 如果你想自定义某种对象的序列化方式,并且你可以访问代码,你可以在相应的类中定义 __reduce__ 函数。大多数Python库通常都会这样做。示例代码:

    import ray
    import sqlite3
    
    class DBConnection:
        def __init__(self, path):
            self.path = path
            self.conn = sqlite3.connect(path)
    
        # without '__reduce__', the instance is unserializable.
        def __reduce__(self):
            deserializer = DBConnection
            serialized_data = (self.path,)
            return deserializer, serialized_data
    
    original = DBConnection("/tmp/db")
    print(original.conn)
    
    copied = ray.get(ray.put(original))
    print(copied.conn)
    
<sqlite3.Connection object at ...>
<sqlite3.Connection object at ...>
  1. 如果你想自定义某种对象的序列化方式,但你无法访问或修改相应的类,你可以使用你使用的序列化器注册该类:

    import ray
    import threading
    
    class A:
        def __init__(self, x):
            self.x = x
            self.lock = threading.Lock()  # could not be serialized!
    
    try:
      ray.get(ray.put(A(1)))  # fail!
    except TypeError:
      pass
    
    def custom_serializer(a):
        return a.x
    
    def custom_deserializer(b):
        return A(b)
    
    # Register serializer and deserializer for class A:
    ray.util.register_serializer(
      A, serializer=custom_serializer, deserializer=custom_deserializer)
    ray.get(ray.put(A(1)))  # success!
    
    # You can deregister the serializer at any time.
    ray.util.deregister_serializer(A)
    try:
      ray.get(ray.put(A(1)))  # fail!
    except TypeError:
      pass
    
    # Nothing happens when deregister an unavailable serializer.
    ray.util.deregister_serializer(A)
    

    注意:序列化器在每个 Ray 工作器本地管理。因此,对于每个 Ray 工作器,如果你想使用序列化器,你需要注册该序列化器。取消注册序列化器也仅在本地生效。

    如果你为一个类注册了一个新的序列化器,新的序列化器会立即在worker中替换旧的序列化器。这个API也是幂等的,重复注册相同的序列化器不会产生副作用。

  2. 我们还为您提供了一个示例,如果您想自定义特定对象的序列化:

    import threading
    
    class A:
        def __init__(self, x):
            self.x = x
            self.lock = threading.Lock()  # could not serialize!
    
    try:
       ray.get(ray.put(A(1)))  # fail!
    except TypeError:
       pass
    
    class SerializationHelperForA:
        """A helper class for serialization."""
        def __init__(self, a):
            self.a = a
    
        def __reduce__(self):
            return A, (self.a.x,)
    
    ray.get(ray.put(SerializationHelperForA(A(1))))  # success!
    # the serializer only works for a specific object, not all A
    # instances, so we still expect failure here.
    try:
       ray.get(ray.put(A(1)))  # still fail!
    except TypeError:
       pass
    

故障排除#

使用 ray.util.inspect_serializability 来识别棘手的序列化问题。此函数可用于追踪任何 Python 对象中的潜在不可序列化对象——无论是函数、类还是对象实例。

下面,我们演示了在一个包含不可序列化对象(线程锁)的函数上的这种行为:

from ray.util import inspect_serializability
import threading

lock = threading.Lock()

def test():
    print(lock)

inspect_serializability(test, name="test")

生成的输出是:

  =============================================================
  Checking Serializability of <function test at 0x7ff130697e50>
  =============================================================
  !!! FAIL serialization: cannot pickle '_thread.lock' object
  Detected 1 global variables. Checking serializability...
      Serializing 'lock' <unlocked _thread.lock object at 0x7ff1306a9f30>...
      !!! FAIL serialization: cannot pickle '_thread.lock' object
      WARNING: Did not find non-serializable object in <unlocked _thread.lock object at 0x7ff1306a9f30>. This may be an oversight.
  =============================================================
  Variable:

      FailTuple(lock [obj=<unlocked _thread.lock object at 0x7ff1306a9f30>, parent=<function test at 0x7ff130697e50>])

  was found to be non-serializable. There may be multiple other undetected variables that were non-serializable.
  Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class.
  =============================================================
  Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
  If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
  =============================================================

如需更详细的信息,请在导入 Ray 之前设置环境变量 RAY_PICKLE_VERBOSE_DEBUG='2'。这将启用基于 Python 的序列化后端,而不是 C-Pickle,因此您可以在序列化过程中调试 Python 代码。然而,这会大大降低序列化的速度。

已知问题#

在使用某些 Python 3.8 和 3.9 版本时,用户可能会遇到内存泄漏问题。这是由于 Python 的 pickle 模块中的一个错误 导致的。

此问题已在 Python 3.8.2rc1、Python 3.9.0 alpha 4 或更晚版本中得到解决。