Skip to main content

websockets

ServerConnection

class ServerConnection(Protocol)

send

def send(message: Union[Data, Iterable[Data]]) -> None

向客户端发送消息。

参数

  • message Union[Data, Iterable[Data]] - 要发送的消息。

recv

def recv(timeout: Optional[float] = None) -> Data

从客户端接收消息。

参数

  • timeout Optional[float], optional - 接收操作的超时时间。默认为 None。

返回值

  • Data - 从客户端接收到的消息。

close

def close() -> None

关闭连接。

WebSocketServer

class WebSocketServer(Protocol)

serve_forever

def serve_forever() -> None

持续运行服务器。

shutdown

def shutdown() -> None

关闭服务器。

__enter__

def __enter__() -> "WebSocketServer"

进入服务器上下文。

__exit__

def __exit__(exc_type: Any, exc_value: Any, traceback: Any) -> None

退出服务器上下文。

IOWebsockets

class IOWebsockets(IOStream)

一个 websocket 输入/输出流。

__init__

def __init__(websocket: ServerConnection) -> None

初始化 websocket 输入/输出流。

参数

  • websocket ServerConnection - websocket 服务器。

抛出

  • ImportError - 如果 websockets 模块不可用。

run_server_in_thread

@staticmethod
@contextmanager
def run_server_in_thread(*,
host: str = "127.0.0.1",
port: int = 8765,

- `end` _str, 可选_ - 输出的结尾。默认为 "
"。
- `flush` _bool, 可选_ - 是否刷新输出。默认为 False

### input

```python
def input(prompt: str = "", *, password: bool = False) -> str

从输入流中读取一行。

参数

  • prompt str, 可选 - 要显示的提示。默认为 ""。
  • password bool, 可选 - 是否读取密码。默认为 False。

返回值

  • str - 从输入流中读取的行。