Ray 客户端#

警告

Ray Client 需要 pip 包 ray[client]。如果你安装了最小的 Ray(例如 pip install ray),请通过执行 pip install ray[client] 重新安装。

什么是 Ray 客户端?

Ray Client 是一个连接 Python 脚本到 远程 Ray 集群的 API。实际上,它允许你像使用本地运行的 Ray 一样利用远程 Ray 集群。

通过将 ray.init() 改为 ray.init("ray://<head_node_host>:<port>"),你可以从你的笔记本电脑(或任何地方)直接连接到一个远程集群,并扩展你的 Ray 代码,同时保持在一个 Python shell 中进行交互式开发的能力。这仅适用于 Ray 1.5+ 版本。

# You can run this code outside of the Ray cluster!
import ray

# Starting the Ray client. This connects to a remote Ray cluster.
ray.init("ray://<head_node_host>:10001")

# Normal Ray code follows
@ray.remote
def do_work(x):
    return x ** x

do_work.remote(2)
#....

何时使用 Ray Client#

备注

Ray Client 存在架构上的限制,在使用 Ray 进行 ML 工作负载(如 Ray Tune 或 Ray Train)时可能无法按预期工作。请使用 Ray Jobs API 进行 ML 项目的交互式开发。

当你想将交互式 Python shell 连接到 远程 集群时,可以使用 Ray Client。

  • 使用 ray.init("ray://<head_node_host>:10001")``(Ray 客户端),如果你在 ``<head_node_host> 上设置了一个远程集群,并且想要进行交互式工作。这将把你的 shell 连接到集群。有关设置集群的更多详细信息,请参阅 使用 Ray 客户端 部分。

  • 如果你在本地开发并希望连接到一个已有的集群(即已经运行了 ray start --head),或者自动创建一个本地集群并直接连接到它,请使用 ``ray.init()``(非客户端连接,未指定地址)。这也可以用于 Ray 作业 提交。

Ray Client 对于在本地 Python shell 中进行交互式开发非常有用。然而,它需要与远程集群保持稳定的连接,如果连接丢失超过 30 秒,将会终止工作负载。如果你有一个长时间运行的工作负载希望在集群上运行,我们建议使用 Ray Jobs 代替。

客户端参数#

当传递给 ray.init 的地址以 ray:// 为前缀时,使用 Ray Client。除了地址外,客户端模式目前还接受另外两个参数:

  • namespace (可选): 设置会话的命名空间。

  • runtime_env (可选): 设置会话的 运行时环境 ,允许您动态指定环境变量、包、本地文件等。

# Connects to an existing cluster at 1.2.3.4 listening on port 10001, using
# the namespace "my_namespace". The Ray workers will run inside a cluster-side
# copy of the local directory "files/my_project", in a Python environment with
# `toolz` and `requests` installed.
ray.init(
    "ray://1.2.3.4:10001",
    namespace="my_namespace",
    runtime_env={"working_dir": "files/my_project", "pip": ["toolz", "requests"]},
)
#....

如何使用 Ray 客户端?#

步骤 1:设置您的 Ray 集群#

如果你有一个正在运行的 Ray 集群(版本 >= 1.5),Ray Client 服务器可能已经在头节点的 10001 端口上默认运行了。否则,你需要创建一个 Ray 集群。要在本地启动 Ray 集群,你可以运行

ray start --head

要远程启动 Ray 集群,您可以按照 入门指南 中的说明进行操作。

如有必要,您可以通过在 ray start 命令 中指定 --ray-client-server-port=... 来修改 Ray Client 服务器端口,使其不同于 10001

步骤 2:配置访问#

确保您的本地机器可以访问头节点上的 Ray Client 端口。

完成此操作的最简单方法是使用 SSH 端口转发或 K8s 端口转发。这允许你通过 localhost 连接到头节点上的 Ray Client 服务器。

首先,使用 SSH 连接到您的 Ray 集群并转发监听端口(10001)。对于使用 Ray 集群启动器启动的集群,这看起来像:

$ ray up cluster.yaml
$ ray attach cluster.yaml -p 10001

然后从 另一个终端 连接到 Ray 集群,使用 localhost 作为 head_node_host

import ray

# This will connect to the cluster via the open SSH session.
ray.init("ray://localhost:10001")

# Normal Ray code follows
@ray.remote
def do_work(x):
    return x ** x

do_work.remote(2)

#....

步骤 3:运行 Ray 代码#

现在,使用以下方式连接到 Ray 集群,然后像往常一样使用 Ray:

import ray

# replace with the appropriate host and port
ray.init("ray://<head_node_host>:10001")

# Normal Ray code follows
@ray.remote
def do_work(x):
    return x ** x

do_work.remote(2)

#....

替代连接方法:#

除了端口转发,如果你的计算机能够访问头节点,你可以直接连接到头节点上的 Ray Client 服务器。如果你的计算机与集群在同一网络中,或者你的计算机可以通过 VPN 连接到集群,这是一个可选项。

如果你的计算机没有直接访问权限,你可以修改网络配置以授予访问权限。在 EC2 上,这可以通过修改安全组以允许从你的本地IP地址到Ray客户端服务器端口(默认为``10001``)的入站访问来完成。

通过 Ray 集群启动器,您可以通过在 cluster.yaml 中定义 集群配置-安全组 来配置安全组以允许入站访问。

# An unique identifier for the head node and workers of this cluster.
cluster_name: minimal_security_group

# Cloud-provider specific configuration.
provider:
    type: aws
    region: us-west-2
    security_group:
        GroupName: ray_client_security_group
        IpPermissions:
              - FromPort: 10001
                ToPort: 10001
                IpProtocol: TCP
                IpRanges:
                    # Allow traffic only from your local IP address.
                    - CidrIp: <YOUR_IP_ADDRESS>/32

警告

Anyone with Ray Client access can execute arbitrary code on the Ray Cluster.n Do not expose this to `0.0.0.0/0`.

连接到多个 Ray 集群(实验性)#

Ray Client 允许在一个 Python 进程中连接到多个 Ray 集群。要做到这一点,只需将 allow_multiple=True 传递给 ray.init

import ray
# Create a default client.
ray.init("ray://<head_node_host_cluster>:10001")

# Connect to other clusters.
cli1 = ray.init("ray://<head_node_host_cluster_1>:10001", allow_multiple=True)
cli2 = ray.init("ray://<head_node_host_cluster_2>:10001", allow_multiple=True)

# Data is put into the default cluster.
obj = ray.put("obj")

with cli1:
    obj1 = ray.put("obj1")

with cli2:
    obj2 = ray.put("obj2")

with cli1:
    assert ray.get(obj1) == "obj1"
    try:
        ray.get(obj2)  # Cross-cluster ops not allowed.
    except:
        print("Failed to get object which doesn't belong to this cluster")

with cli2:
    assert ray.get(obj2) == "obj2"
    try:
        ray.get(obj1)  # Cross-cluster ops not allowed.
    except:
        print("Failed to get object which doesn't belong to this cluster")
assert "obj" == ray.get(obj)
cli1.disconnect()
cli2.disconnect()

在使用 Ray 多客户端时,需要注意一些不同的行为:

  • 客户端不会自动断开连接。调用 disconnect 显式关闭连接。

  • 对象引用只能由获取它的客户端使用。

  • ray.init 在没有 allow_multiple 的情况下将创建一个默认的全局 Ray 客户端。

需要了解的事项#

客户端断开连接#

当客户端断开连接时,服务器代表客户端持有的任何对象或角色引用都会被丢弃,就像直接从集群断开连接一样。

如果客户端意外断开连接,例如由于网络故障,客户端将尝试在所有引用被丢弃之前重新连接到服务器30秒。您可以通过设置环境变量 RAY_CLIENT_RECONNECT_GRACE_PERIOD=N 来增加此时间,其中 N 是客户端在放弃之前应尝试重新连接的秒数。

版本控制要求#

通常,客户端 Ray 版本必须与服务器 Ray 版本匹配。如果使用不兼容的版本,将会引发错误。

同样地,客户端和服务器之间的小版本Python(例如,3.6 对 3.7)必须匹配。如果不匹配,将会引发错误。

在旧版 Ray 上启动连接#

如果你在使用 ray.init("ray://...") 时遇到 socket.gaierror: [Errno -2] Name or service not known,那么你可能使用的是 1.5 之前的 Ray 版本,该版本不支持通过 ray.init 启动客户端连接。

通过 Ingress 连接#

如果在使用 Ingress 连接 Ray Cluster 时遇到以下错误信息,可能是由 Ingress 的配置引起的。

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = ""
    debug_error_string = "{"created":"@1628668820.164591000","description":"Error received from peer ipv4:10.233.120.107:443","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"","grpc_status":3}"
>
Got Error from logger channel -- shutting down: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = ""
    debug_error_string = "{"created":"@1628668820.164713000","description":"Error received from peer ipv4:10.233.120.107:443","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"","grpc_status":3}"
>

如果你正在使用 nginx-ingress-controller,你可能可以通过添加以下 Ingress 配置来解决问题。

metadata:
  annotations:
     nginx.ingress.kubernetes.io/server-snippet: |
       underscores_in_headers on;
       ignore_invalid_headers on;

Ray 客户端日志#

Ray 客户端日志可以在头节点上的 /tmp/ray/session_latest/logs 找到。

上传#

如果在运行时环境中指定了 working_dir,当运行 ray.init() 时,Ray 客户端会将笔记本上的 working_dir 上传到 /tmp/ray/session_latest/runtime_resources/_ray_pkg_<目录内容的哈希值>

Ray workers 在集群的 /tmp/ray/session_latest/runtime_resources/_ray_pkg_<目录内容的哈希> 目录中启动。这意味着代码中的远程任务和角色的相对路径在笔记本电脑和集群上都能正常工作,无需任何代码更改。例如,如果笔记本电脑上的 working_dir 包含 data.txtrun.py,在 run.py 中的远程任务定义中,可以直接使用相对路径 "data.txt"。然后 python run.py 将在我的笔记本电脑和集群上都能工作。顺便提一下,由于代码中可以使用相对路径,绝对路径仅用于调试目的。

故障排除#

错误:尝试重新连接一个已经被清理的会话#

当 Ray Client 重新连接到一个不识别该客户端的头节点时,会发生此错误。如果头节点意外重启并丢失状态,可能会发生这种情况。在 Kubernetes 上,如果头节点在因驱逐或崩溃后重启,也可能发生这种情况。