事件#

为了允许事件触发工作流,Ray Workflows 支持可插拔的事件系统。使用事件框架提供了一些特性。

  1. 高效等待事件(等待期间不需要运行工作流任务)。

  2. 支持精确一次的事件传递语义,同时提供容错能力。

与其他工作流任务类似,事件通过检查点支持容错。当事件发生时,事件会被检查点记录,然后可以选择提交。

使用事件#

工作流事件是一种特殊类型的工作流任务。当事件发生时,它们“完成”。workflow.wait_for_event(EventListenerType) 可用于创建事件任务。

import time
import ray
from ray import workflow

# Create an event which finishes after 2 seconds.
event1_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2)

# Create another event which finishes after 1 seconds.
event2_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 1)

@ray.remote
def gather(*args):
    return args

# Gather will run after 2 seconds when both event1 and event2 are done.
workflow.run(gather.bind(event1_task, event2_task))

HTTP 事件#

工作流支持通过HTTP发送外部事件。工作流中的HTTP事件监听器用于连接到HTTP端点。以下是使用HTTP事件的工作流端到端示例。

HTTPListener 用于在工作流中监听HTTP事件。每个 HTTPListener 订阅一个唯一的 workflow_idevent_key 对。要从外部客户端向监听器发送事件,HTTP请求应将 workflow_id 作为请求URL的一部分,并在JSON请求体中指定 event_keyevent_payload 键(见下文)。

# File name: wait_for_event_http.py
# Create a task waiting for an http event with a JSON message.
# The JSON message is expected to have an event_key field
# and an event_payload field.

event_task = workflow.wait_for_event(HTTPListener, event_key="my_event_key")

obj_ref = workflow.run_async(event_task, workflow_id="workflow_receive_event_by_http")

一个位于 http://hostname:port/event/send_event/<workflow_id> 的 HTTP 端点可以用来发送事件。在本地,该端点可以通过 http://127.0.0.1:8000/event/send_event/<workflow_id> 访问。请注意,HTTP 请求必须包含相同的 workflow_id。每个请求还应包含一个 JSON 主体,其中包含两个字段:event_keyevent_payload,如下例所示。event_key 字段应与监听端 workflow.wait_for_event() 传递的参数匹配。在工作流中,一旦接收到 HTTP 事件,事件任务将返回 event_payload 字段的值。

总之,要在工作流中触发HTTP事件,外部客户端应具备:

  • HTTP 端点地址(例如 http://127.0.0.1:8000/event/send_event

  • workflow_id (例如:”workflow_receive_event_by_http”)

  • 一个包含字段 event_keyevent_payload 的有效 JSON 格式消息,其中 event_key 与工作流中使用的键匹配。

HTTP 请求将在事件被工作流接收后收到回复。返回的状态码可以是:

  1. 200: 事件已成功处理。

  2. 500: 事件处理失败。

  3. 404: 找不到 workflow_idevent_key,可能是由于事件在目标工作流任务准备好之前被接收。

下面的代码片段展示了一个外部客户端发送HTTP请求的示例。

# File name: wait_for_event_http.py
res = requests.post(
        "http://127.0.0.1:8000/event/send_event/"
        + "workflow_receive_event_by_http",
        json={"event_key": "my_event_key", "event_payload": "my_event_message"},
    )
if res.status_code == 200:
    print("event processed successfully")
elif res.status_code == 500:
    print("request sent but workflow event processing failed")
elif res.status_code == 404:
    print("request sent but either workflow_id or event_key is not found")

自定义事件监听器#

可以通过继承 EventListener 接口来编写自定义事件监听器。

from ray.workflow.common import Event

class EventListener:
    def __init__(self):
        """Optional constructor. Only the constructor with no arguments will be
          called."""
        pass

    async def poll_for_event(self, *args, **kwargs) -> Event:
        """Should return only when the event is received."""
        raise NotImplementedError

    async def event_checkpointed(self, event: Event) -> None:
        """Optional. Called after an event has been checkpointed and a transaction can
          be safely committed."""
        pass

listener.poll_for_events() 协程应在事件完成时结束。workflow.wait_for_event 的参数会传递给 poll_for_events()。例如,一个在时间戳之前休眠的事件监听器可以写成:

class TimerListener(EventListener):
    async def poll_for_event(self, timestamp):
        await asyncio.sleep(timestamp - time.time())

event_checkpointed 例程可以被重写以支持具有恰好一次传递语义的系统,这些系统通常遵循以下模式:

  1. 等待一个事件。

  2. 处理事件。

  3. 提交事件。

在工作流程完成事件的检查点后,事件监听器将被调用并可以释放事件。例如,为了保证从 kafkaesque<https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html#synchronous-commits> 队列中消费事件:

KafkaEventType = ...

class QueueEventListener:
    def __init__(self):
        # Initialize the poll consumer.
        self.consumer = Consumer({'enable.auto.commit': False})

    async def poll_for_event(self, topic) -> KafkaEventType:
        self.consumer.subscribe(topic)

        message = await self.consumer.poll()
        return message

    async def event_checkpointed(self, event: KafkaEventType) -> None:
         self.consumer.commit(event, asynchronous=False)

(高级) 事件监听器语义#

在编写复杂的事件监听器时,作者应注意几个属性。

  • 事件监听器 定义 必须是可序列化的

  • 事件监听器实例 _不_ 被序列化。

  • 事件监听器应该是 无状态的