事件#
为了允许事件触发工作流,Ray Workflows 支持可插拔的事件系统。使用事件框架提供了一些特性。
高效等待事件(等待期间不需要运行工作流任务)。
支持精确一次的事件传递语义,同时提供容错能力。
与其他工作流任务类似,事件通过检查点支持容错。当事件发生时,事件会被检查点记录,然后可以选择提交。
使用事件#
工作流事件是一种特殊类型的工作流任务。当事件发生时,它们“完成”。workflow.wait_for_event(EventListenerType)
可用于创建事件任务。
import time
import ray
from ray import workflow
# Create an event which finishes after 2 seconds.
event1_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2)
# Create another event which finishes after 1 seconds.
event2_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 1)
@ray.remote
def gather(*args):
return args
# Gather will run after 2 seconds when both event1 and event2 are done.
workflow.run(gather.bind(event1_task, event2_task))
HTTP 事件#
工作流支持通过HTTP发送外部事件。工作流中的HTTP事件监听器用于连接到HTTP端点。以下是使用HTTP事件的工作流端到端示例。
HTTPListener
用于在工作流中监听HTTP事件。每个 HTTPListener
订阅一个唯一的 workflow_id
和 event_key
对。要从外部客户端向监听器发送事件,HTTP请求应将 workflow_id
作为请求URL的一部分,并在JSON请求体中指定 event_key
和 event_payload
键(见下文)。
# File name: wait_for_event_http.py
# Create a task waiting for an http event with a JSON message.
# The JSON message is expected to have an event_key field
# and an event_payload field.
event_task = workflow.wait_for_event(HTTPListener, event_key="my_event_key")
obj_ref = workflow.run_async(event_task, workflow_id="workflow_receive_event_by_http")
一个位于 http://hostname:port/event/send_event/<workflow_id>
的 HTTP 端点可以用来发送事件。在本地,该端点可以通过 http://127.0.0.1:8000/event/send_event/<workflow_id>
访问。请注意,HTTP 请求必须包含相同的 workflow_id
。每个请求还应包含一个 JSON 主体,其中包含两个字段:event_key
和 event_payload
,如下例所示。event_key
字段应与监听端 workflow.wait_for_event()
传递的参数匹配。在工作流中,一旦接收到 HTTP 事件,事件任务将返回 event_payload
字段的值。
总之,要在工作流中触发HTTP事件,外部客户端应具备:
HTTP 端点地址(例如
http://127.0.0.1:8000/event/send_event
)workflow_id
(例如:”workflow_receive_event_by_http”)一个包含字段
event_key
和event_payload
的有效 JSON 格式消息,其中event_key
与工作流中使用的键匹配。
HTTP 请求将在事件被工作流接收后收到回复。返回的状态码可以是:
200: 事件已成功处理。
500: 事件处理失败。
404: 找不到
workflow_id
或event_key
,可能是由于事件在目标工作流任务准备好之前被接收。
下面的代码片段展示了一个外部客户端发送HTTP请求的示例。
# File name: wait_for_event_http.py
res = requests.post(
"http://127.0.0.1:8000/event/send_event/"
+ "workflow_receive_event_by_http",
json={"event_key": "my_event_key", "event_payload": "my_event_message"},
)
if res.status_code == 200:
print("event processed successfully")
elif res.status_code == 500:
print("request sent but workflow event processing failed")
elif res.status_code == 404:
print("request sent but either workflow_id or event_key is not found")
自定义事件监听器#
可以通过继承 EventListener 接口来编写自定义事件监听器。
from ray.workflow.common import Event
class EventListener:
def __init__(self):
"""Optional constructor. Only the constructor with no arguments will be
called."""
pass
async def poll_for_event(self, *args, **kwargs) -> Event:
"""Should return only when the event is received."""
raise NotImplementedError
async def event_checkpointed(self, event: Event) -> None:
"""Optional. Called after an event has been checkpointed and a transaction can
be safely committed."""
pass
listener.poll_for_events()
协程应在事件完成时结束。workflow.wait_for_event
的参数会传递给 poll_for_events()
。例如,一个在时间戳之前休眠的事件监听器可以写成:
class TimerListener(EventListener):
async def poll_for_event(self, timestamp):
await asyncio.sleep(timestamp - time.time())
event_checkpointed
例程可以被重写以支持具有恰好一次传递语义的系统,这些系统通常遵循以下模式:
等待一个事件。
处理事件。
提交事件。
在工作流程完成事件的检查点后,事件监听器将被调用并可以释放事件。例如,为了保证从 kafkaesque<https://docs.confluent.io/clients-confluent-kafka-python/current/overview.html#synchronous-commits>
队列中消费事件:
KafkaEventType = ...
class QueueEventListener:
def __init__(self):
# Initialize the poll consumer.
self.consumer = Consumer({'enable.auto.commit': False})
async def poll_for_event(self, topic) -> KafkaEventType:
self.consumer.subscribe(topic)
message = await self.consumer.poll()
return message
async def event_checkpointed(self, event: KafkaEventType) -> None:
self.consumer.commit(event, asynchronous=False)
(高级) 事件监听器语义#
在编写复杂的事件监听器时,作者应注意几个属性。
事件监听器 定义 必须是可序列化的
事件监听器实例 _不_ 被序列化。
事件监听器应该是 无状态的。