Skip to main content
在Colab中打开

Websockets: 使用Websockets进行流式输入和输出

在GitHub上打开

本笔记演示了如何使用IOStream类来使用Websockets进行流式输入和输出。使用Websockets可以构建比使用Web方法更具响应性和互动性的Web客户端。主要区别在于,Websockets允许您在需要时推送数据,而使用Web方法需要轮询服务器以获取新的响应。

在本指南中,我们探索了IOStream类的功能。它专门设计用于增强使用Websockets进行流式输入和输出的客户端(如Web客户端)的开发。IOStream类通过为Web应用程序提供更动态和互动的用户体验而脱颖而出。

Websockets技术是此功能的核心,通过允许数据实时“推送”到客户端,相比传统的Web方法,提供了显著的进步。这与传统方法不同,传统方法中客户端必须反复“轮询”服务器以检查是否有新的响应。通过使用底层的websockets库,IOStream类实现了服务器和客户端之间的持续的双向通信通道。这确保更新能够即时接收,无需不断轮询,从而使Web客户端更高效和响应更快。

通过IOStream类,Websockets的真正威力在于创建高度响应的Web客户端。这种响应性对于需要实时数据更新的应用程序(如聊天应用程序)至关重要。通过将IOStream类集成到您的Web应用程序中,不仅可以增强用户体验,还可以提高应用程序的效率。 通过即时数据传输,不仅可以减轻服务器的负载,还可以通过消除不必要的轮询来提高效率。

从本质上讲,通过IOStream类使用websockets标志着Web客户端开发的重大改进。这种方法不仅简化了客户端和服务器之间的数据交换过程,还为创建更具交互性和吸引力的Web应用程序打开了新的可能性。通过遵循本指南,开发人员可以充分利用websockets和IOStream类的潜力,推动Web客户端响应性和交互性的界限。

要求

要求

此笔记本需要一些额外的依赖项,可以通过pip安装:

pip install pyautogen[websockets] fastapi uvicorn

有关更多信息,请参阅安装指南

设置API端点

config_list_from_json函数从环境变量或json文件中加载配置列表。

from datetime import datetime
from tempfile import TemporaryDirectory

from websockets.sync.client import connect as ws_connect

import autogen
from autogen.io.websockets import IOWebsockets

config_list = autogen.config_list_from_json(
env_or_file="OAI_CONFIG_LIST",
filter_dict={
"model": ["gpt-4", "gpt-3.5-turbo", "gpt-3.5-turbo-16k"],
},
)
tip

了解有关为代理配置LLM的更多信息,请点击这里

定义on_connect函数

on_connect函数是利用websockets的应用程序的关键部分,它充当一个事件处理程序,每当建立新的客户端连接时就会调用。该函数旨在初始化任何必要的设置、通信协议或特定于新连接的数据交换过程。本质上,它为随后的交互会话奠定了基础,配置了服务器和客户端之间的通信方式,并在建立连接后采取了哪些初始操作。现在,让我们深入了解如何定义这个函数,特别是在使用AutoGen框架与websockets的上下文中。

当客户端连接到websocket服务器时,服务器会自动初始化一个新的IOWebsockets类的实例。这个实例对于管理服务器和客户端之间的数据流至关重要。on_connect函数利用这个实例来设置通信协议、定义交互规则和

def on_connect(iostream: IOWebsockets) -> None:
print(f" - on_connect(): 已连接到使用IOWebsockets的客户端 {iostream}", flush=True)

print(" - on_connect(): 从客户端接收消息。", flush=True)

# 1. 接收初始消息
initial_msg = iostream.input()

# 2. 实例化ConversableAgent
agent = autogen.ConversableAgent(
name="chatbot",
system_message="完成分配给你的任务并在完成任务后回复TERMINATE。如果被问到天气,请使用工具'weather_forecast(city)'获取城市的天气预报。",
llm_config={
"config_list": autogen.config_list_from_json(
env_or_file="OAI_CONFIG_LIST",
filter_dict={
"model": ["gpt-4", "gpt-3.5-turbo", "gpt-3.5-turbo-16k"],
},
),
"stream": True,
},
)

# 3. 定义UserProxyAgent
user_proxy = autogen.UserProxyAgent(
name="user_proxy",
system_message="用户的代理。",
is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"),
human_input_mode="NEVER",
max_consecutive_auto_reply=10,
code_execution_config=False,
)

# 4. 定义特定于Agent的函数
def weather_forecast(city: str) -> str:
return f"{datetime.now()}{city}的天气预报是晴朗。"

autogen.register_function(
weather_forecast, caller=agent, executor=user_proxy, description="城市的天气预报"
)

# 5. 启动对话
print(
f" - on_connect(): 使用消息'{initial_msg}'与代理{agent}启动对话",
flush=True,
)
user_proxy.initiate_chat( # noqa: F704
agent,
message=initial_msg,
)

下面是对上述示例中的on_connect函数的解释:

  1. 接收初始消息:在建立连接后立即从客户端接收初始消息。这一步对于理解客户端的请求或启动对话流程至关重要。

  2. 实例化ConversableAgent:使用特定的系统消息和LLM配置创建ConversableAgent的实例。如果你需要多个代理,请确保它们的llm_config不相同,因为向其中一个代理添加函数也会尝试将其添加到另一个代理中。

  3. 实例化UserProxyAgent:类似地,创建UserProxyAgent实例,定义其终止条件、人工输入模式和其他相关参数。不需要定义llm_config,因为UserProxyAgent不使用LLM。

  4. 定义特定于Agent的函数:如果你的对话代理需要特定的函数,可以在这一步定义。例如,定义一个获取城市天气预报的函数。

  5. 启动对话:使用初始消息启动对话,将用户代理和对话代理传入。

使用 Python 客户端测试 WebSocket 服务器

使用 Python 客户端测试 on_connect 函数涉及模拟客户端和服务器的交互,以确保设置、数据交换和通信协议按预期工作。下面是使用 Python 客户端进行此测试的简要说明:

  1. 启动 WebSocket 服务器:使用 IOWebsockets.run_server_in_thread 方法在单独的线程中启动服务器,指定 on_connect 函数和端口。该方法返回正在运行的 WebSocket 服务器的 URI。

  2. 连接到服务器:使用返回的 URI 打开与服务器的连接。这模拟了客户端发起与 WebSocket 服务器的连接。

  3. 向服务器发送消息:一旦连接成功,从客户端向服务器发送一条消息。这测试了服务器通过已建立的 WebSocket 连接接收消息的能力。

  4. 接收并处理消息:实现一个循环来持续接收来自服务器的消息。如果需要,解码消息,并相应地处理它们。这一步验证了服务器对客户端请求的能力以作出响应。

这个测试场景通过模拟实际的消息交换有效地评估了客户端和服务器之间使用 on_connect 函数的交互。它确保服务器能够处理传入的连接、处理消息并向客户端发送响应,这些都是强大的基于 WebSocket 的应用程序的关键功能。

with IOWebsockets.run_server_in_thread(on_connect=on_connect, port=8765) as uri:
print(f" - 在 {uri} 上运行 WebSocket 服务器的 test_setup()。", flush=True)

with ws_connect(uri) as websocket:
print(f" - 已连接到服务器 {uri}", flush=True)

print(" - 向服务器发送消息。", flush=True)
# websocket.send("2+2=?")
websocket.send("查看巴黎的天气并写一首关于它的诗。")

while True:
message = websocket.recv()
message = message.decode("utf-8") if isinstance(message, bytes) else message

print(message, end="", flush=True)

if "TERMINATE" in message:
print()
print(" - 收到 TERMINATE 消息。退出。", flush=True)
break
- test_setup() 使用 ws://127.0.0.1:8765 运行的 WebSocket 服务器。
- on_connect(): 使用 IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x7b8fd65b3c10> 连接到客户端。
- on_connect(): 从客户端接收到消息。
- 连接到 ws://127.0.0.1:8765 的服务器。
- 发送消息给服务器。
- on_connect(): 使用消息“查看巴黎的天气并写一首关于它的诗。”与代理 <autogen.agentchat.conversable_agent.ConversableAgent object at 0x7b909c086290> 开始聊天。
user_proxy (to chatbot):

查看巴黎的天气并写一首关于它的诗。

--------------------------------------------------------------------------------

>>>>>>>> 使用自动回复...
chatbot (to user_proxy):


*****建议的工具调用 (call_xFFWe52vwdpgZ8xTRV6adBdy): weather_forecast *****
参数:
{
"city": "Paris"
}
*********************************************************************************

--------------------------------------------------------------------------------

>>>>>>>> 执行函数 weather_forecast...
user_proxy (to chatbot):

user_proxy (to chatbot):

***** 调用工具 (call_xFFWe52vwdpgZ8xTRV6adBdy) 的响应 *****
2024-04-05 12:00:06.206125 时巴黎的天气预报是晴朗的。
**********************************************************************

--------------------------------------------------------------------------------

>>>>>>>> 使用自动回复...
在法国的心脏,阳光的温暖下,
躺着巴黎这座城市,塞纳河水流淌。
每条街道和尖塔都沐浴在阳光下,
照亮每一个细节,就像温顺的火焰。

曾经单调的城市景观,被阳光的明亮之光所吻,
如今是从早到晚的色彩万花筒。
这个被阳光洒满的城市,在蔚蓝的穹顶下闪耀,
她的居民找到了舒适,因为他们称这座城市为家。

在这个完美的天气日子里,你可以在阳光下漫步,
感受它带来的温暖,驱散你的忧伤。
因为巴黎的天气,不仅仅是一个预报,
它是居民和游客聚集的舞台。

结束

chatbot (to user_proxy):

在法国的心脏,阳光的温暖下,
躺着巴黎这座城市,塞纳河水流淌。
每条街道和尖塔都沐浴在阳光下,
照亮每一个细节,就像温顺的火焰。

曾经单调的城市景观,被阳光的明亮之光所吻,
如今是从早到晚的色彩万花筒。
这个被阳光洒满的城市,在蔚蓝的穹顶下闪耀,
她的居民找到了舒适,因为他们称这座城市为家。

在这个完美的天气日子里,你可以在阳光下漫步,
感受它带来的温暖,驱散你的忧伤。
因为巴黎的天气,不仅仅是一个预报,
它是居民和游客聚集的舞台。

结束

- 收到 TERMINATE 消息。退出。

使用HTML/JS客户端测试在FastAPI服务器内运行的Websockets服务器

下面的代码片段概述了一种使用FastAPI在Web环境中测试on_connect函数的方法,以提供一个简单的交互式HTML页面。这种方法允许用户通过Web界面发送消息,然后由运行AutoGen框架的服务器通过Websockets处理这些消息。以下是一个逐步解释:

  1. FastAPI应用程序设置:代码首先导入必要的库并设置一个FastAPI应用程序。FastAPI是一个现代化的、快速的Web框架,用于使用Python 3.7+构建API,基于标准的Python类型提示。

  2. 用户交互的HTML模板:将HTML模板定义为多行Python字符串,其中包括一个用于消息输入的基本表单和一个用于管理Websockets通信的脚本。该模板创建了一个用户界面,用户可以向服务器发送消息,并动态显示响应。

  3. 运行Websockets服务器run_websocket_server异步上下文管理器使用指定的on_connect函数和端口启动Websockets服务器。该服务器监听传入的Websockets连接。

  4. FastAPI路由用于提供HTML页面:定义了一个FastAPI路由(@app.get("/")),用于向用户提供HTML页面。当用户访问根URL时,将返回Websockets聊天的HTML内容,使他们能够与Websockets服务器进行交互。

  5. 启动FastAPI应用程序:最后,使用Uvicorn(一个ASGI服务器)启动FastAPI应用程序,配置了应用程序和其他必要的参数。然后启动服务器以提供FastAPI应用程序,使交互式HTML页面对用户可访问。

这种测试方法允许用户和服务器之间进行交互式通信,提供了一种实际的方式来实时演示和评估on_connect函数的行为。用户可以通过网页发送消息,服务器根据在on_connect函数中定义的逻辑处理这些消息,以用户友好的方式展示AutoGen框架的Websockets处理能力和响应性。

from contextlib import asynccontextmanager  # noqa: E402
from pathlib import Path # noqa: E402

from fastapi import FastAPI # noqa: E402
from fastapi.responses import HTMLResponse # noqa: E402

PORT = 8000

html = """
<!DOCTYPE html>
<html>
<head>
<title>自动生成的 WebSocket 测试</title>
</head>
<body>
<h1>WebSocket 聊天室</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>发送</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8080/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""


@asynccontextmanager
async def run_websocket_server(app):
with IOWebsockets.run_server_in_thread(on_connect=on_connect, port=8080) as uri:
print(f"WebSocket 服务器已启动,地址为 {uri}。", flush=True)

yield


app = FastAPI(lifespan=run_websocket_server)


@app.get("/")
async def get():
return HTMLResponse(html)
import uvicorn  # noqa: E402

config = uvicorn.Config(app)
server = uvicorn.Server(config)
await server.serve() # noqa: F704
INFO:     启动服务器进程 [5227]
INFO: 等待应用程序启动。
Websocket 服务器已启动,地址为 ws://127.0.0.1:8080。
INFO:     应用程序启动完成。
INFO: Uvicorn 运行在 http://127.0.0.1:8000 (按下 CTRL+C 退出)
INFO:     127.0.0.1:42548 - "GET / HTTP/1.1" 200 OK
INFO: 127.0.0.1:42548 - "GET /favicon.ico HTTP/1.1" 404 Not Found
- on_connect(): 使用 IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x7b8fc6991420> 连接到客户端
- on_connect(): 从客户端接收到消息
- on_connect(): 使用消息 'write a poem about lundon' 启动与代理 <autogen.agentchat.conversable_agent.ConversableAgent object at 0x7b909c0cab00> 的聊天
INFO:     关闭服务器
INFO: 等待应用程序关闭。
INFO: 应用程序关闭完成。
INFO: 完成服务器进程 [5227]

上述测试设置利用 FastAPI 和 Websockets,不仅作为一个强大的测试框架来测试 on_connect 函数,还为开发真实世界应用奠定了基础。这种方法展示了基于 Web 的交互如何实现动态和实时,这是现代应用开发的关键方面。

例如,这个设置可以直接应用或调整来构建交互式聊天应用、实时数据仪表盘或在线支持系统。Websockets 的集成使得服务器能够即时向客户端推送更新,这对于依赖及时信息传递的应用非常重要。例如,基于这个框架构建的聊天应用可以支持用户之间的即时消息传递,提升用户参与度和满意度。

此外,用于测试的 HTML 页面的简单性和互动性展示了如何设计用户界面以提供无缝体验。开发人员可以在此基础上扩展,加入更复杂的元素,如用户认证、消息加密和自定义用户交互,进一步根据特定用例需求定制应用。

FastAPI 框架的灵活性,结合 Websockets 提供的实时通信能力,为开发人员构建可扩展、高效和高度互动的 Web 应用提供了强大的工具集。无论是创建协作平台、流媒体服务还是互动游戏体验,这个测试设置展示了这些技术可以开发的潜在应用。

使用 HTML/JS 客户端测试 Websockets 服务器

下面提供的代码片段是一个示例,展示了如何使用 Python 内置的 http.server 模块创建一个交互式的测试环境,用于测试 on_connect 函数的功能。这个设置允许实时的交互。 在Web浏览器中实现交互,使开发人员能够以更用户友好和实用的方式测试websocket功能。以下是该代码的操作方式和潜在应用的详细说明:

  1. 提供一个简单的HTML页面:代码首先定义了一个包含发送消息表单和显示传入消息的列表的HTML页面。使用JavaScript处理表单提交和websocket通信。

  2. HTML文件的临时目录:创建一个临时目录来存储HTML文件。这种方法确保测试环境干净且隔离,最大程度地减少与现有文件或配置的冲突。

  3. 自定义HTTP请求处理程序:定义了SimpleHTTPRequestHandler的自定义子类来提供HTML文件。该处理程序重写了do_GET方法,将根路径(/)重定向到chat.html页面,确保访问服务器根URL的访问者立即看到聊天界面。

  4. 启动Websocket服务器:同时,在不同的端口上使用IOWebsockets.run_server_in_thread方法启动一个websocket服务器,以先前定义的on_connect函数作为新连接的回调。

  5. HTML界面的HTTP服务器:实例化一个HTTP服务器来提供HTML聊天界面,使用户能够通过Web浏览器与websocket服务器进行交互。

这个设置展示了将websocket与简单的HTTP服务器集成以创建动态和交互式Web应用程序的实际应用。通过使用Python的标准库模块,它演示了开发实时应用程序(如聊天系统、实时通知或交互式仪表板)的低门槛入门方法。

从这个代码示例中可以得出的关键点是,Python的内置库可以轻松利用来原型设计和测试复杂的Web功能。对于希望构建真实世界应用程序的开发人员来说,这种方法提供了一种简单的方法来验证和完善websocket通信逻辑,然后再将其集成到更大的框架或系统中。这个测试设置的简单性和易用性使它成为开发各种交互式Web应用程序的绝佳起点。

from http.server import HTTPServer, SimpleHTTPRequestHandler  # noqa: E402
from tempfile import TemporaryDirectory
from pathlib import Path
from iowebsockets import IOWebsockets

PORT = 8000

html = """
<!DOCTYPE html>
<html>
<head>
<title>自动生成的 WebSocket 测试</title>
</head>
<body>
<h1>WebSocket 聊天室</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>发送</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8080/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""

with TemporaryDirectory() as temp_dir:
# 创建一个简单的 HTTP 网页
path = Path(temp_dir) / "chat.html"
with open(path, "w") as f:
f.write(html)

#
class MyRequestHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=temp_dir, **kwargs)

def do_GET(self):
if self.path == "/":
self.path = "/chat.html"
return SimpleHTTPRequestHandler.do_GET(self)

handler = MyRequestHandler

with IOWebsockets.run_server_in_thread(on_connect=on_connect, port=8080) as uri:
print(f"WebSocket 服务器已启动,地址为 {uri}。", flush=True)

with HTTPServer(("", PORT), handler) as httpd:
print("HTTP 服务器已启动,地址为 http://localhost:" + str(PORT))
try:
httpd.serve_forever()
except KeyboardInterrupt:
print(" - HTTP 服务器已停止。", flush=True)
Websocket服务器已启动,地址为ws://127.0.0.1:8080。
HTTP服务器已启动,地址为http://localhost:8000。
- on_connect(): 使用IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x7b8fc69937f0> 连接到客户端。
- on_connect(): 收到客户端的消息。
127.0.0.1 - - [05/Apr/2024 12:01:51] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [05/Apr/2024 12:01:51] "GET / HTTP/1.1" 200 -
 - on_connect(): 使用消息'write a poem about new york' 启动与代理 <autogen.agentchat.conversable_agent.ConversableAgent object at 0x7b8fc6990310> 的聊天。
- on_connect(): 使用IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x7b8fc68bdc90> 连接到客户端。
- on_connect(): 收到客户端的消息。
127.0.0.1 - - [05/Apr/2024 12:02:27] "GET / HTTP/1.1" 304 -
 - on_connect(): 使用消息'check the weather in london and write a poem about it' 启动与代理 <autogen.agentchat.conversable_agent.ConversableAgent object at 0x7b8fc68be170> 的聊天。
- HTTP服务器已停止。