Python SDK 概述#

Ray Jobs Python SDK 是通过编程方式提交作业的推荐方式。跳转到 API 参考,或继续阅读以获取快速概览。

设置#

Ray Jobs 在 1.9 及以上版本中可用,并且需要完整安装 Ray。你可以通过运行以下命令来完成安装:

pip install "ray[default]"

有关安装 Ray 的更多详细信息,请参阅 安装指南

要运行一个 Ray 作业,我们还需要能够向 Ray 集群发送 HTTP 请求。为了方便,本指南将假设您使用的是本地 Ray 集群,我们可以通过运行以下命令来启动它:

ray start --head
# ...
# 2022-08-10 09:54:57,664   INFO services.py:1476 -- View the Ray dashboard at http://127.0.0.1:8265
# ...

这将在我们的本地机器上创建一个 Ray 头节点,用于开发目的。注意在启动或连接到 Ray 集群时打印的 Ray 仪表板 URL;我们稍后将使用此 URL 提交 Ray 作业。如果使用远程集群,请参阅 使用远程集群 以获取端口转发的提示。有关生产部署场景的更多详细信息,请查看在 虚拟机Kubernetes 上部署 Ray 的指南。

提交一个 Ray 作业#

让我们从一个可以在本地运行的示例脚本开始。以下脚本使用 Ray API 提交任务并打印其返回值:

# script.py
import ray

@ray.remote
def hello_world():
    return "hello world"

ray.init()
print(ray.get(hello_world.remote()))

SDK 调用通过 JobSubmissionClient 对象进行。要初始化客户端,请提供 Ray 集群头节点地址和 Ray Dashboard 使用的端口(默认为 8265)。在此示例中,我们将使用本地 Ray 集群,但相同的示例也适用于远程 Ray 集群地址;有关设置端口转发的详细信息,请参阅 使用远程集群

from ray.job_submission import JobSubmissionClient

# If using a remote cluster, replace 127.0.0.1 with the head node's IP address or set up port forwarding.
client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint="python script.py",
    # Path to the local directory that contains the script.py file
    runtime_env={"working_dir": "./"}
)
print(job_id)

小技巧

默认情况下,Ray 作业服务器会生成一个新的 job_id 并返回它,但您可以先选择一个唯一的 job_id 字符串,然后将其传递给 submit_job。在这种情况下,作业将使用您提供的 ID 执行,如果相同的 job_id 在同一 Ray 集群中提交多次,则会抛出错误。

因为作业提交是异步的,上述调用将立即返回,输出如下:

raysubmit_g8tDzJ6GqrCy7pd6

现在我们可以编写一个简单的轮询循环,检查作业状态直到它达到终端状态(即 JobStatus.SUCCEEDEDJobStatus.STOPPEDJobStatus.FAILED)。我们还可以通过调用 client.get_job_logs 来获取作业的输出。

from ray.job_submission import JobSubmissionClient, JobStatus
import time

# If using a remote cluster, replace 127.0.0.1 with the head node's IP address.
client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint="python script.py",
    # Path to the local directory that contains the script.py file
    runtime_env={"working_dir": "./"}
)
print(job_id)

def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):
    start = time.time()
    while time.time() - start <= timeout_seconds:
        status = client.get_job_status(job_id)
        print(f"status: {status}")
        if status in status_to_wait_for:
            break
        time.sleep(1)


wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)

输出应该看起来像这样:

raysubmit_pBwfn5jqRE1E7Wmc
status: PENDING
status: PENDING
status: RUNNING
status: RUNNING
status: RUNNING
2022-08-22 15:05:55,652 INFO worker.py:1203 -- Using address 127.0.0.1:6379 set in the environment variable RAY_ADDRESS
2022-08-22 15:05:55,652 INFO worker.py:1312 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2022-08-22 15:05:55,660 INFO worker.py:1487 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265.
hello world

与长时间运行的作业交互#

除了获取作业的当前状态和输出外,用户还可以在作业执行完成之前停止提交的作业。

job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint="python -c 'import time; print(\"Sleeping...\"); time.sleep(60)'"
)
wait_until_status(job_id, {JobStatus.RUNNING})
print(f'Stopping job {job_id}')
client.stop_job(job_id)
wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)

输出应该看起来像下面这样:

status: PENDING
status: PENDING
status: RUNNING
Stopping job raysubmit_VYCZZ2BQb4tfeCjq
status: STOPPED
Sleeping...

要获取所有作业的信息,请调用 client.list_jobs()。这将返回一个 Dict[str, JobInfo] 对象,该对象将作业ID映射到它们的信息。

作业信息(状态和相关元数据)会无限期地存储在集群中。要删除这些信息,可以对任何已处于终止状态的作业调用 client.delete_job(job_id)。更多详情请参阅 SDK API 参考

依赖管理#

类似于 Jobs CLI,我们也可以通过使用 Ray 运行时环境 来打包我们应用程序的依赖项。使用 Python SDK,语法看起来像这样:

job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint="python script.py",
    # Runtime environment for the job, specifying a working directory and pip package
    runtime_env={
        "working_dir": "./",
        "pip": ["requests==2.26.0"]
    }
)

小技巧

除了本地目录(本例中为 "./"),您还可以为作业的工作目录指定远程URI,例如S3存储桶或Git仓库。详情请参见 远程URI

有关详细信息,请参阅 API 参考

指定 CPU 和 GPU 资源#

默认情况下,作业入口脚本总是在头节点上运行。我们建议在 Ray 任务、角色或 Ray 库中进行繁重的计算,而不是直接在入口脚本的顶层进行。无需额外配置即可实现这一点。

然而,如果你需要在入口脚本中直接进行计算,并且希望为入口脚本保留CPU和GPU资源,你可以指定 submit_jobentrypoint_num_cpusentrypoint_num_gpusentrypoint_memoryentrypoint_resources 参数。这些参数的功能与任务和角色中 @ray.remote() 装饰器的 num_cpusnum_gpusresources_memory 参数相同,如 指定任务或角色资源需求 中所述。

如果指定了这些参数中的任何一个,入口脚本将被调度到一个至少具有指定资源的节点上,而不是默认的头节点。例如,以下代码将在至少有1个GPU的节点上调度入口脚本:

job_id = client.submit_job(
    entrypoint="python script.py",
    runtime_env={
        "working_dir": "./",
    }
    # Reserve 1 GPU for the entrypoint script
    entrypoint_num_gpus=1
)

同样的参数也可以作为选项 --entrypoint-num-cpus--entrypoint-num-gpus--entrypoint-memory--entrypoint-resources 在 Jobs CLI 中提交给 ray job submit;参见 Ray Job Submission CLI Reference

如果未指定 num_gpus,GPU 仍然可用于入口点脚本,但 Ray 不会在可见设备方面提供隔离。确切地说,环境变量 CUDA_VISIBLE_DEVICES 不会在入口点脚本中设置;它只会在其 @ray.remote() 装饰器中指定了 num_gpus 的任务和参与者内部设置。

备注

entrypoint_num_cpusentrypoint_num_gpusentrypoint-memoryentrypoint_resources 指定的资源与作业中任务和角色指定的任何资源是分开的。

例如,如果你指定 entrypoint_num_gpus=1 ,那么入口脚本将被调度到一个至少有1个GPU的节点上,但如果你的脚本还包含一个使用 @ray.remote(num_gpus=1) 定义的Ray任务,那么该任务将被调度使用一个不同的GPU(如果节点至少有2个GPU,则在同一节点上,否则在不同的节点上)。

备注

num_cpusnum_gpusresources_memory 参数在 资源需求 中描述的 @ray.remote() 一样,这些参数仅指用于调度目的的逻辑资源。实际的 CPU 和 GPU 利用率不由 Ray 控制或限制。

备注

默认情况下,为入口点脚本保留 0 个 CPU 和 0 个 GPU。

客户端配置#

额外的客户端连接选项,如自定义HTTP头和cookies,可以传递给 JobSubmissionClient 类。完整的选项列表可以在 API参考 中找到。

TLS 验证#

默认情况下,任何 HTTPS 客户端连接都将使用底层 requestsaiohttp 库找到的系统证书进行验证。可以通过设置 verify 参数来覆盖此行为。例如:

client = JobSubmissionClient("https://<job-server-url>", verify="/path/to/cert.pem")

将使用位于 /path/to/cert.pem 的证书来验证作业服务器的证书。可以通过将 verify 参数设置为 False 来禁用证书验证。