ray.workflow.获取元数据#
- ray.workflow.get_metadata(workflow_id: str, task_id: str | None = None) Dict[str, Any] [源代码]#
获取工作流的元数据。
这将返回一个元数据字典,如果只提供了 workflow_id,则返回工作流的元数据;如果同时提供了 workflow_id 和 task id,则返回特定工作流任务的元数据。如果提供的 workflow id 或 task id 不存在,则会引发异常。
如果只提供了工作流ID,这将返回工作流级别的元数据,其中包括运行状态、工作流级别的用户元数据和工作流级别的运行统计信息(例如工作流的开始时间和结束时间)。
如果同时提供了工作流ID和任务ID,这将返回工作流任务级别的元数据,其中包括任务输入、任务级别的用户元数据和任务级别的运行统计信息(例如任务的开始时间和结束时间)。
- 参数:
workflow_id – 获取元数据的流程。
task_id – 如果设置,则获取特定任务的元数据,而不是工作流的元数据。
示例
from ray import workflow @ray.remote def trip(): pass workflow_task = trip.options( **workflow.options(task_id="trip", metadata={"k1": "v1"})).bind() workflow.run(workflow_task, workflow_id="trip1", metadata={"k2": "v2"}) workflow_metadata = workflow.get_metadata("trip1") print(workflow_metadata) task_metadata = workflow.get_metadata("trip1", "trip") print(task_metadata)
{'status': 'SUCCESSFUL', 'user_metadata': {'k2': 'v2'}, 'stats': {'start_time': ..., 'end_time': ...}} {'task_id': 'trip', 'task_options': {'task_type': 'FUNCTION', 'max_retries': 3, 'catch_exceptions': False, 'retry_exceptions': False, 'checkpoint': True, 'ray_options': {'_metadata': {'workflow.io/options': {'task_id': 'trip', 'metadata': {'k1': 'v1'}}}}}, 'user_metadata': {'k1': 'v1'}, 'workflow_refs': [], 'stats': {'start_time': ..., 'end_time': ...}}
- 返回:
包含工作流元数据的字典。
- 抛出:
ValueError – 如果给定的工作流或工作流任务不存在。
PublicAPI (alpha): 此API处于alpha阶段,可能在稳定之前发生变化。