使用并发组限制每方法的并发性#
除了为actor设置总体的最大并发数外,Ray还允许将方法分离到*并发组*中,每个组都有自己的线程。这使你可以为每个方法限制并发数,例如,允许健康检查方法拥有自己的并发配额,与请求服务方法分开。
小技巧
并发组同时适用于 asyncio 和线程化角色。语法是相同的。
定义并发组#
这定义了两个并发组,”io” 的最大并发数为 2,”compute” 的最大并发数为 4。方法 f1
和 f2
被放置在 “io” 组中,方法 f3
和 f4
被放置在 “compute” 组中。请注意,始终存在一个默认的并发组,其默认并发数为 1000 个 AsyncIO 角色,否则为 1。
你可以使用 concurrency_group
装饰器参数为演员定义并发组:
import ray
@ray.remote(concurrency_groups={"io": 2, "compute": 4})
class AsyncIOActor:
def __init__(self):
pass
@ray.method(concurrency_group="io")
async def f1(self):
pass
@ray.method(concurrency_group="io")
async def f2(self):
pass
@ray.method(concurrency_group="compute")
async def f3(self):
pass
@ray.method(concurrency_group="compute")
async def f4(self):
pass
async def f5(self):
pass
a = AsyncIOActor.remote()
a.f1.remote() # executed in the "io" group.
a.f2.remote() # executed in the "io" group.
a.f3.remote() # executed in the "compute" group.
a.f4.remote() # executed in the "compute" group.
a.f5.remote() # executed in the default group.
您可以使用 API setConcurrencyGroups()
参数为并发执行者定义并发组:
class ConcurrentActor {
public long f1() {
return Thread.currentThread().getId();
}
public long f2() {
return Thread.currentThread().getId();
}
public long f3(int a, int b) {
return Thread.currentThread().getId();
}
public long f4() {
return Thread.currentThread().getId();
}
public long f5() {
return Thread.currentThread().getId();
}
}
ConcurrencyGroup group1 =
new ConcurrencyGroupBuilder<ConcurrentActor>()
.setName("io")
.setMaxConcurrency(1)
.addMethod(ConcurrentActor::f1)
.addMethod(ConcurrentActor::f2)
.build();
ConcurrencyGroup group2 =
new ConcurrencyGroupBuilder<ConcurrentActor>()
.setName("compute")
.setMaxConcurrency(1)
.addMethod(ConcurrentActor::f3)
.addMethod(ConcurrentActor::f4)
.build();
ActorHandle<ConcurrentActor> myActor = Ray.actor(ConcurrentActor::new)
.setConcurrencyGroups(group1, group2)
.remote();
myActor.task(ConcurrentActor::f1).remote(); // executed in the "io" group.
myActor.task(ConcurrentActor::f2).remote(); // executed in the "io" group.
myActor.task(ConcurrentActor::f3, 3, 5).remote(); // executed in the "compute" group.
myActor.task(ConcurrentActor::f4).remote(); // executed in the "compute" group.
myActor.task(ConcurrentActor::f5).remote(); // executed in the "default" group.
默认并发组#
默认情况下,方法被放置在一个默认的并发组中,该组的并发限制为 AsyncIO 角色的 1000 和 其他情况下的 1。可以通过设置 max_concurrency
角色选项来更改默认组的并发性。
以下演员有2个并发组:“io”和“default”。“io”的最大并发数是2,“default”的最大并发数是10。
@ray.remote(concurrency_groups={"io": 2})
class AsyncIOActor:
async def f1(self):
pass
actor = AsyncIOActor.options(max_concurrency=10).remote()
以下并发角色有2个并发组:“io”和“default”。“io”的最大并发数为2,“default”的最大并发数为10。
class ConcurrentActor:
public long f1() {
return Thread.currentThread().getId();
}
ConcurrencyGroup group =
new ConcurrencyGroupBuilder<ConcurrentActor>()
.setName("io")
.setMaxConcurrency(2)
.addMethod(ConcurrentActor::f1)
.build();
ActorHandle<ConcurrentActor> myActor = Ray.actor(ConcurrentActor::new)
.setConcurrencyGroups(group1)
.setMaxConcurrency(10)
.remote();
在运行时设置并发组#
你也可以在运行时将演员方法分派到特定的并发组中。
以下代码片段展示了在运行时动态设置 f2
方法的并发组。
你可以使用 .options
方法。
# Executed in the "io" group (as defined in the actor class).
a.f2.options().remote()
# Executed in the "compute" group.
a.f2.options(concurrency_group="compute").remote()
你可以使用 setConcurrencyGroup
方法。
// Executed in the "io" group (as defined in the actor creation).
myActor.task(ConcurrentActor::f2).remote();
// Executed in the "compute" group.
myActor.task(ConcurrentActor::f2).setConcurrencyGroup("compute").remote();