杂项主题#
本页将涵盖Ray中的一些杂项主题。
动态远程参数#
您可以在执行期间使用 .options
动态调整 ray.remote
的资源需求或返回值。
例如,这里我们实例化了多个相同角色的副本,但资源需求各不相同。请注意,为了成功创建这些角色,Ray 需要启动时具备足够的 CPU 资源和相关的自定义资源:
import ray
@ray.remote(num_cpus=4)
class Counter(object):
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
a1 = Counter.options(num_cpus=1, resources={"Custom1": 1}).remote()
a2 = Counter.options(num_cpus=2, resources={"Custom2": 1}).remote()
a3 = Counter.options(num_cpus=3, resources={"Custom3": 1}).remote()
你可以为任务指定不同的资源需求(但不适用于actor方法):
ray.init(num_cpus=1, num_gpus=1)
@ray.remote
def g():
return ray.get_gpu_ids()
object_gpu_ids = g.remote()
assert ray.get(object_gpu_ids) == []
dynamic_object_gpu_ids = g.options(num_cpus=1, num_gpus=1).remote()
assert ray.get(dynamic_object_gpu_ids) == [0]
并且可以改变任务(以及参与者方法)的返回值数量:
@ray.remote
def f(n):
return list(range(n))
id1, id2 = f.options(num_returns=2).remote(2)
assert ray.get(id1) == 0
assert ray.get(id2) == 1
并在任务提交时为任务(以及参与者方法)指定一个名称:
import setproctitle
@ray.remote
def f(x):
assert setproctitle.getproctitle() == "ray::special_f"
return x + 1
obj = f.options(name="special_f").remote(3)
assert ray.get(obj) == 4
此名称将在仪表板的机器视图中显示为任务名称,在执行此任务时(如果是Python任务)将显示为工作进程名称,并在日志中显示为任务名称。
重载函数#
Ray Java API 支持远程调用重载的 Java 函数。然而,由于 Java 编译器类型推断的限制,必须将方法引用显式转换为正确的函数类型。例如,考虑以下内容。
重载的普通任务调用:
public static class MyRayApp {
public static int overloadFunction() {
return 1;
}
public static int overloadFunction(int x) {
return x;
}
}
// Invoke overloaded functions.
Assert.assertEquals((int) Ray.task((RayFunc0<Integer>) MyRayApp::overloadFunction).remote().get(), 1);
Assert.assertEquals((int) Ray.task((RayFunc1<Integer, Integer>) MyRayApp::overloadFunction, 2).remote().get(), 2);
重载的actor任务调用:
public static class Counter {
protected int value = 0;
public int increment() {
this.value += 1;
return this.value;
}
}
public static class CounterOverloaded extends Counter {
public int increment(int diff) {
super.value += diff;
return super.value;
}
public int increment(int diff1, int diff2) {
super.value += diff1 + diff2;
return super.value;
}
}
ActorHandle<CounterOverloaded> a = Ray.actor(CounterOverloaded::new).remote();
// Call an overloaded actor method by super class method reference.
Assert.assertEquals((int) a.task(Counter::increment).remote().get(), 1);
// Call an overloaded actor method, cast method reference first.
a.task((RayFunc1<CounterOverloaded, Integer>) CounterOverloaded::increment).remote();
a.task((RayFunc2<CounterOverloaded, Integer, Integer>) CounterOverloaded::increment, 10).remote();
a.task((RayFunc3<CounterOverloaded, Integer, Integer, Integer>) CounterOverloaded::increment, 10, 10).remote();
Assert.assertEquals((int) a.task(Counter::increment).remote().get(), 33);
检查集群状态#
基于Ray编写的应用程序通常希望获取有关集群的一些信息或诊断。一些常见的问题包括:
我的自动扩展集群中有多少个节点?
我的集群中当前有哪些资源可用,包括已使用和总量?
我集群中当前有哪些对象?
为此,您可以使用全局状态 API。
节点信息#
要获取集群中当前节点的信息,可以使用 ray.nodes()
:
- ray.nodes()[源代码]
获取集群中的节点列表(仅用于调试)。
- 返回:
关于集群中 Ray 客户端的信息。
开发者API: 此API可能会在Ray的次要版本之间发生变化。
import ray
ray.init()
print(ray.nodes())
[{'NodeID': '2691a0c1aed6f45e262b2372baf58871734332d7',
'Alive': True,
'NodeManagerAddress': '192.168.1.82',
'NodeManagerHostname': 'host-MBP.attlocal.net',
'NodeManagerPort': 58472,
'ObjectManagerPort': 52383,
'ObjectStoreSocketName': '/tmp/ray/session_2020-08-04_11-00-17_114725_17883/sockets/plasma_store',
'RayletSocketName': '/tmp/ray/session_2020-08-04_11-00-17_114725_17883/sockets/raylet',
'MetricsExportPort': 64860,
'alive': True,
'Resources': {'CPU': 16.0, 'memory': 100.0, 'object_store_memory': 34.0, 'node:192.168.1.82': 1.0}}]
上述信息包括:
NodeID
: 用于 raylet 的唯一标识符。
alive
: 节点是否仍然存活。
NodeManagerAddress
: raylet 所在节点的私有IP。
资源
:节点上的总资源容量。
MetricsExportPort
: 指标通过 Prometheus 端点 暴露的端口号。
资源信息#
要获取集群当前的总资源容量信息,可以使用 ray.cluster_resources()
。
- ray.cluster_resources()[源代码]
获取当前集群资源总量。
请注意,随着节点被添加到集群或从集群中移除,这些信息可能会变得过时。
- 返回:
一个将资源名称映射到集群中该资源总量的字典。
开发者API: 此API可能会在Ray的次要版本之间发生变化。
要获取集群当前可用的资源容量信息,可以使用 ray.available_resources()
。
- ray.available_resources()[源代码]
获取当前可用的集群资源。
这与
cluster_resources
不同,因为它将返回空闲(可用)资源,而不是总资源。请注意,随着任务的开始和结束,这些信息可能会变得过时。
- 返回:
一个将资源名称映射到集群中该资源总量的字典。注意,如果一个资源(例如,“CPU”)当前不可用(即,数量为0),它将不会包含在这个字典中。
开发者API: 此API可能会在Ray的次要版本之间发生变化。
运行大型 Ray 集群#
以下是一些在超过1000个节点上运行Ray的技巧。当使用如此大量的节点运行Ray时,可能需要调整几个系统设置,以实现如此大量机器之间的通信。
调整操作系统设置#
因为所有节点和工作器都连接到GCS,将会创建许多网络连接,操作系统必须支持这些连接的数量。
最大打开文件数#
操作系统需要配置为支持打开许多TCP连接,因为每个worker和raylet都连接到GCS。在POSIX系统中,可以通过``ulimit -n``检查当前限制,如果它很小,应根据操作系统手册增加它。
ARP 缓存#
另一项需要配置的是ARP缓存。在一个大型集群中,所有的工作节点都连接到头节点,这会在ARP表中添加大量条目。确保ARP缓存的大小足够大以处理这么多节点。未能做到这一点将导致头节点挂起。当这种情况发生时,dmesg
会显示类似 neighbor table overflow message
的错误。
在Ubuntu中,可以通过增加 /etc/sysctl.conf
文件中的 net.ipv4.neigh.default.gc_thresh1
- net.ipv4.neigh.default.gc_thresh3
的值来调整ARP缓存大小。更多详情,请参考操作系统手册。
调整 Ray 设置#
备注
有一个正在进行中的 项目 专注于提升 Ray 的可扩展性和稳定性。欢迎分享您的想法和使用案例。
要运行一个大型集群,需要在 Ray 中调整几个参数。
基准测试#
机器设置:
1 主节点: m5.4xlarge (16 vCPUs/64GB 内存)
2000 个工作节点:m5.large(2 个 vCPU/8GB 内存)
操作系统设置:
将最大打开文件数设置为 1048576
- 增加 ARP 缓存大小:
net.ipv4.neigh.default.gc_thresh1=2048
net.ipv4.neigh.default.gc_thresh2=4096
net.ipv4.neigh.default.gc_thresh3=8192
Ray 设置:
RAY_event_stats=false
测试工作负载:
测试脚本: 代码
演员数量 |
Actor 启动时间 |
演员准备时间 |
总时间 |
---|---|---|---|
20k (每节点10个角色) |
14.5秒 |
136.1秒 |
150.7秒 |