演员

Actor 在 Dask 工作流中启用有状态计算。它们对于某些需要额外性能并愿意牺牲弹性的罕见算法非常有用。

actor 是指向一个存在于远程工作节点上的用户定义对象的指针。任何拥有该 actor 的人都可以调用该远程对象的方法。

示例

在这里,我们创建一个简单的 Counter 类,在一个工作节点上实例化该类,然后远程调用该类的方法。

class Counter:
    """ A simple class to manage an incrementing counter """
    n = 0

    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1
        return self.n

    def add(self, x):
        self.n += x
        return self.n

from dask.distributed import Client          # Start a Dask Client
client = Client()

future = client.submit(Counter, actor=True)  # Create a Counter on a worker
counter = future.result()                    # Get back a pointer to that object

counter
# <Actor: Counter, key=Counter-1234abcd>

future = counter.increment()                 # Call remote method
future.result()                              # Get back result
# 1

future = counter.add(10)                     # Call remote method
future.result()                              # Get back result
# 11

动机

演员们受到使用纯任务图的一些挑战的激励。

普通的 Dask 计算由一个函数图组成。这种方法有一些有利于弹性的限制,但可能会对性能产生负面影响:

  1. 状态: 函数不应就地改变其输入或依赖全局状态。它们应以纯函数的方式运行,消耗输入并产生独立的输出。

  2. 中央开销:执行位置和顺序由中央调度器决定。由于调度器参与每个决策,有时可能会造成中央瓶颈。

某些工作负载可能需要直接更新状态,或者可能涉及比调度器能够处理的更多微小任务(调度器每秒可以协调大约4000个任务)。

演员们巧妙地避开了这两个限制:

  1. 状态: 角色可以持有并改变状态。它们被允许就地更新它们的状态。

  2. 开销: 对参与者的操作不会通知中央调度器,因此不会增加每秒4000任务的开销。它们还避免了额外的网络跳跃,因此延迟更低。

创建一个角色

你可以通过使用正常的 Dask 计算函数如 submitmapcomputepersist,并向 worker 提交一个类来创建一个 actor,使用 actors= 关键字(或在 submit 上使用 actor=)。

future = client.submit(Counter, actors=True)

你可以使用所有其他关键字来控制这个角色的最终位置,例如 workers=resources= 等等。

这会在一个普通的 Dask 未来对象上创建,你可以调用 .result() 来获取 Actor,一旦它在 worker 上成功运行。

>>> counter = future.result()
>>> counter
<Actor: Counter, key=...>

在一个工作节点上实例化了一个 Counter 对象,这个 Actor 对象作为我们访问该远程对象的代理。它具有相同的方法和属性。

>>> dir(counter)
['add', 'increment', 'n']

调用远程方法

然而,访问属性或调用方法将触发与远程工作者的通信,在远程工作者的单独线程池中运行该方法,然后将结果传回调用方。对于属性访问,这些操作会阻塞并在完成后返回,对于方法调用,它们会立即返回一个 BaseActorFuture

>>> future = counter.increment()  # Immediately returns a BaseActorFuture
>>> future.result()               # Block until finished and result arrives
1

BaseActorFuture 类似于普通的 Dask Future 对象,但功能并不全面。它们目前 支持 result 方法,不支持其他任何功能。它们目前无法与任何其他期望 futures 的 Dask 函数一起工作,例如 as_completedwaitclient.gather。它们不能被放入额外的提交或映射调用中以形成依赖关系。它们会立即传递其结果(而不是等待调用结果),并将结果缓存在 future 本身上。

访问属性

如果在类级别定义了一个属性,那么该属性将可被角色访问。

class Counter:
    n = 0   # Recall that we defined our class with `n` as a class variable

    ...

>>> counter.n                     # Blocks until finished
1

属性访问块自动关闭。就像你调用了 .result() 一样。

在Worker上执行

当你在一个actor上调用一个方法时,你的参数会被序列化并发送到拥有该actor对象的工作者。如果你从工作者执行此操作,这种通信是直接的。如果你从客户端执行此操作,那么如果客户端有直接访问工作者的权限(如果可能直接连接,请使用 Client(..., direct_to_workers=True) 创建客户端),这种通信将是直接的;如果客户端无法直接连接到工作者,则通过调度器进行代理。

然后,在单独的线程中调用 Actor 对象的适当方法,捕获结果,并将其发送回调用方。目前,工作线程仅为 Actor 提供一个线程,但这在未来可能会改变。

结果会立即发送回调用方,并且不会存储在带有actor的worker上。它被缓存在 BaseActorFuture 对象中。

从协程和 async/await 调用

如果在协程或 async/await 函数中使用actor,那么actor方法和属性访问将返回Tornado的future。

async def f():
    counter = await client.submit(Counter, actor=True)

    await counter.increment()
    n = await counter.n

Actor 上的协程和 async/await

如果你在actor类中定义了一个 async def 函数,那么该方法将在Worker的事件循环线程上运行,而不是在单独的线程上运行。

def Waiter:
    def __init__(self):
        self.event = asyncio.Event()

    async def set(self):
        self.event.set()

    async def wait(self):
        await self.event.wait()

waiter = client.submit(Waiter, actor=True).result()
waiter.wait().result()  # waits until set, without consuming a worker thread

性能

工作线程操作目前大约有1毫秒的延迟,再加上可能存在的任何网络延迟。然而,如果存在足够多的其他活动,工作线程中的其他活动很容易增加这些延迟。

限制

角色提供高级功能,但也有一些成本:

  1. 无弹性: 未采取任何措施使参与者工作负载对工作节点故障具有弹性。如果工作节点在持有参与者时死亡,该参与者将永远丢失。

  2. 无诊断信息: 由于调度器未被告知关于参与者计算的信息,因此无法获取这些计算的诊断信息。

  3. 无负载均衡: 角色被均匀地分配到工作节点上,没有特别考虑避免通信。

  4. 无动态集群: 角色不能迁移到其他工作者。持有角色的工作者不能被退役,无论是通过 retire_workers() 还是通过 Adaptive