演员
内容
演员¶
Actor 在 Dask 工作流中启用有状态计算。它们对于某些需要额外性能并愿意牺牲弹性的罕见算法非常有用。
actor 是指向一个存在于远程工作节点上的用户定义对象的指针。任何拥有该 actor 的人都可以调用该远程对象的方法。
示例¶
在这里,我们创建一个简单的 Counter
类,在一个工作节点上实例化该类,然后远程调用该类的方法。
class Counter:
""" A simple class to manage an incrementing counter """
n = 0
def __init__(self):
self.n = 0
def increment(self):
self.n += 1
return self.n
def add(self, x):
self.n += x
return self.n
from dask.distributed import Client # Start a Dask Client
client = Client()
future = client.submit(Counter, actor=True) # Create a Counter on a worker
counter = future.result() # Get back a pointer to that object
counter
# <Actor: Counter, key=Counter-1234abcd>
future = counter.increment() # Call remote method
future.result() # Get back result
# 1
future = counter.add(10) # Call remote method
future.result() # Get back result
# 11
动机¶
演员们受到使用纯任务图的一些挑战的激励。
普通的 Dask 计算由一个函数图组成。这种方法有一些有利于弹性的限制,但可能会对性能产生负面影响:
状态: 函数不应就地改变其输入或依赖全局状态。它们应以纯函数的方式运行,消耗输入并产生独立的输出。
中央开销:执行位置和顺序由中央调度器决定。由于调度器参与每个决策,有时可能会造成中央瓶颈。
某些工作负载可能需要直接更新状态,或者可能涉及比调度器能够处理的更多微小任务(调度器每秒可以协调大约4000个任务)。
演员们巧妙地避开了这两个限制:
状态: 角色可以持有并改变状态。它们被允许就地更新它们的状态。
开销: 对参与者的操作不会通知中央调度器,因此不会增加每秒4000任务的开销。它们还避免了额外的网络跳跃,因此延迟更低。
创建一个角色¶
你可以通过使用正常的 Dask 计算函数如 submit
、map
、compute
或 persist
,并向 worker 提交一个类来创建一个 actor,使用 actors=
关键字(或在 submit
上使用 actor=
)。
future = client.submit(Counter, actors=True)
你可以使用所有其他关键字来控制这个角色的最终位置,例如 workers=
、resources=
等等。
这会在一个普通的 Dask 未来对象上创建,你可以调用 .result()
来获取 Actor,一旦它在 worker 上成功运行。
>>> counter = future.result()
>>> counter
<Actor: Counter, key=...>
在一个工作节点上实例化了一个 Counter
对象,这个 Actor
对象作为我们访问该远程对象的代理。它具有相同的方法和属性。
>>> dir(counter)
['add', 'increment', 'n']
调用远程方法¶
然而,访问属性或调用方法将触发与远程工作者的通信,在远程工作者的单独线程池中运行该方法,然后将结果传回调用方。对于属性访问,这些操作会阻塞并在完成后返回,对于方法调用,它们会立即返回一个 BaseActorFuture
。
>>> future = counter.increment() # Immediately returns a BaseActorFuture
>>> future.result() # Block until finished and result arrives
1
BaseActorFuture
类似于普通的 Dask Future
对象,但功能并不全面。它们目前 仅 支持 result
方法,不支持其他任何功能。它们目前无法与任何其他期望 futures 的 Dask 函数一起工作,例如 as_completed
、wait
或 client.gather
。它们不能被放入额外的提交或映射调用中以形成依赖关系。它们会立即传递其结果(而不是等待调用结果),并将结果缓存在 future 本身上。
访问属性¶
如果在类级别定义了一个属性,那么该属性将可被角色访问。
class Counter:
n = 0 # Recall that we defined our class with `n` as a class variable
...
>>> counter.n # Blocks until finished
1
属性访问块自动关闭。就像你调用了 .result()
一样。
在Worker上执行¶
当你在一个actor上调用一个方法时,你的参数会被序列化并发送到拥有该actor对象的工作者。如果你从工作者执行此操作,这种通信是直接的。如果你从客户端执行此操作,那么如果客户端有直接访问工作者的权限(如果可能直接连接,请使用 Client(..., direct_to_workers=True)
创建客户端),这种通信将是直接的;如果客户端无法直接连接到工作者,则通过调度器进行代理。
然后,在单独的线程中调用 Actor 对象的适当方法,捕获结果,并将其发送回调用方。目前,工作线程仅为 Actor 提供一个线程,但这在未来可能会改变。
结果会立即发送回调用方,并且不会存储在带有actor的worker上。它被缓存在 BaseActorFuture
对象中。
从协程和 async/await 调用¶
如果在协程或 async/await 函数中使用actor,那么actor方法和属性访问将返回Tornado的future。
async def f():
counter = await client.submit(Counter, actor=True)
await counter.increment()
n = await counter.n
Actor 上的协程和 async/await¶
如果你在actor类中定义了一个 async def
函数,那么该方法将在Worker的事件循环线程上运行,而不是在单独的线程上运行。
def Waiter:
def __init__(self):
self.event = asyncio.Event()
async def set(self):
self.event.set()
async def wait(self):
await self.event.wait()
waiter = client.submit(Waiter, actor=True).result()
waiter.wait().result() # waits until set, without consuming a worker thread
性能¶
工作线程操作目前大约有1毫秒的延迟,再加上可能存在的任何网络延迟。然而,如果存在足够多的其他活动,工作线程中的其他活动很容易增加这些延迟。
限制¶
角色提供高级功能,但也有一些成本:
无弹性: 未采取任何措施使参与者工作负载对工作节点故障具有弹性。如果工作节点在持有参与者时死亡,该参与者将永远丢失。
无诊断信息: 由于调度器未被告知关于参与者计算的信息,因此无法获取这些计算的诊断信息。
无负载均衡: 角色被均匀地分配到工作节点上,没有特别考虑避免通信。
无动态集群: 角色不能迁移到其他工作者。持有角色的工作者不能被退役,无论是通过
retire_workers()
还是通过Adaptive
。