ray.workflow.获取元数据#

ray.workflow.get_metadata(workflow_id: str, task_id: str | None = None) Dict[str, Any][源代码]#

获取工作流的元数据。

这将返回一个元数据字典,如果只提供了 workflow_id,则返回工作流的元数据;如果同时提供了 workflow_id 和 task id,则返回特定工作流任务的元数据。如果提供的 workflow id 或 task id 不存在,则会引发异常。

如果只提供了工作流ID,这将返回工作流级别的元数据,其中包括运行状态、工作流级别的用户元数据和工作流级别的运行统计信息(例如工作流的开始时间和结束时间)。

如果同时提供了工作流ID和任务ID,这将返回工作流任务级别的元数据,其中包括任务输入、任务级别的用户元数据和任务级别的运行统计信息(例如任务的开始时间和结束时间)。

参数:
  • workflow_id – 获取元数据的流程。

  • task_id – 如果设置,则获取特定任务的元数据,而不是工作流的元数据。

示例

from ray import workflow

@ray.remote
def trip():
   pass

workflow_task = trip.options(
    **workflow.options(task_id="trip", metadata={"k1": "v1"})).bind()
workflow.run(workflow_task,
    workflow_id="trip1", metadata={"k2": "v2"})
workflow_metadata = workflow.get_metadata("trip1")
print(workflow_metadata)

task_metadata = workflow.get_metadata("trip1", "trip")
print(task_metadata)
{'status': 'SUCCESSFUL', 'user_metadata': {'k2': 'v2'}, 'stats': {'start_time': ..., 'end_time': ...}}
{'task_id': 'trip', 'task_options': {'task_type': 'FUNCTION', 'max_retries': 3, 'catch_exceptions': False, 'retry_exceptions': False, 'checkpoint': True, 'ray_options': {'_metadata': {'workflow.io/options': {'task_id': 'trip', 'metadata': {'k1': 'v1'}}}}}, 'user_metadata': {'k1': 'v1'}, 'workflow_refs': [], 'stats': {'start_time': ..., 'end_time': ...}}
返回:

包含工作流元数据的字典。

抛出:

ValueError – 如果给定的工作流或工作流任务不存在。

PublicAPI (alpha): 此API处于alpha阶段,可能在稳定之前发生变化。