模式:使用一个角色来同步其他任务和角色#

当你有多个任务需要等待某些条件或需要在集群上的任务和角色之间进行同步时,你可以使用一个中心角色来协调它们。

示例用例#

您可以使用一个actor来实现一个分布式的 asyncio.Event,多个任务可以等待该事件。

代码示例#

import asyncio

import ray


# We set num_cpus to zero because this actor will mostly just block on I/O.
@ray.remote(num_cpus=0)
class SignalActor:
    def __init__(self):
        self.ready_event = asyncio.Event()

    def send(self, clear=False):
        self.ready_event.set()
        if clear:
            self.ready_event.clear()

    async def wait(self, should_wait=True):
        if should_wait:
            await self.ready_event.wait()


@ray.remote
def wait_and_go(signal):
    ray.get(signal.wait.remote())

    print("go!")


signal = SignalActor.remote()
tasks = [wait_and_go.remote(signal) for _ in range(4)]
print("ready...")
# Tasks will all be waiting for the signals.
print("set..")
ray.get(signal.send.remote())

# Tasks are unblocked.
ray.get(tasks)

# Output is:
# ready...
# set..

# (wait_and_go pid=77366) go!
# (wait_and_go pid=77372) go!
# (wait_and_go pid=77367) go!
# (wait_and_go pid=77358) go!