"""轻量级的requests库包装器,支持异步。"""
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Literal, Optional, Union
import aiohttp
import requests
from langchain_core.pydantic_v1 import BaseModel, Extra
from requests import Response
[docs]class Requests(BaseModel):
"""对requests的包装,用于处理身份验证和异步操作。
这个包装的主要目的是处理身份验证(通过保存头信息)并在同一个基本对象上启用简单的异步方法。"""
headers: Optional[Dict[str, str]] = None
aiosession: Optional[aiohttp.ClientSession] = None
auth: Optional[Any] = None
class Config:
"""此pydantic对象的配置。"""
extra = Extra.forbid
arbitrary_types_allowed = True
[docs] def get(self, url: str, **kwargs: Any) -> requests.Response:
"""获取URL并返回文本。"""
return requests.get(url, headers=self.headers, auth=self.auth, **kwargs)
[docs] def post(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
"""向URL发送POST请求并返回文本。"""
return requests.post(
url, json=data, headers=self.headers, auth=self.auth, **kwargs
)
[docs] def patch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
"""修补URL并返回文本。"""
return requests.patch(
url, json=data, headers=self.headers, auth=self.auth, **kwargs
)
[docs] def put(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
"""将URL放入并返回文本。"""
return requests.put(
url, json=data, headers=self.headers, auth=self.auth, **kwargs
)
[docs] def delete(self, url: str, **kwargs: Any) -> requests.Response:
"""删除URL并返回文本。"""
return requests.delete(url, headers=self.headers, auth=self.auth, **kwargs)
@asynccontextmanager
async def _arequest(
self, method: str, url: str, **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""发起一个异步请求。"""
if not self.aiosession:
async with aiohttp.ClientSession() as session:
async with session.request(
method, url, headers=self.headers, auth=self.auth, **kwargs
) as response:
yield response
else:
async with self.aiosession.request(
method, url, headers=self.headers, auth=self.auth, **kwargs
) as response:
yield response
[docs] @asynccontextmanager
async def aget(
self, url: str, **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""获取URL并异步返回文本。"""
async with self._arequest("GET", url, **kwargs) as response:
yield response
[docs] @asynccontextmanager
async def apost(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""将文本异步地POST到URL并返回。"""
async with self._arequest("POST", url, json=data, **kwargs) as response:
yield response
[docs] @asynccontextmanager
async def apatch(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""异步地PATCH URL并返回文本。"""
async with self._arequest("PATCH", url, json=data, **kwargs) as response:
yield response
[docs] @asynccontextmanager
async def aput(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""将URL放入并异步返回文本。"""
async with self._arequest("PUT", url, json=data, **kwargs) as response:
yield response
[docs] @asynccontextmanager
async def adelete(
self, url: str, **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""删除URL并异步返回文本。"""
async with self._arequest("DELETE", url, **kwargs) as response:
yield response
[docs]class GenericRequestsWrapper(BaseModel):
"""轻量级的requests库封装。"""
headers: Optional[Dict[str, str]] = None
aiosession: Optional[aiohttp.ClientSession] = None
auth: Optional[Any] = None
response_content_type: Literal["text", "json"] = "text"
class Config:
"""此pydantic对象的配置。"""
extra = Extra.forbid
arbitrary_types_allowed = True
@property
def requests(self) -> Requests:
return Requests(
headers=self.headers, aiosession=self.aiosession, auth=self.auth
)
def _get_resp_content(self, response: Response) -> Union[str, Dict[str, Any]]:
if self.response_content_type == "text":
return response.text
elif self.response_content_type == "json":
return response.json()
else:
raise ValueError(f"Invalid return type: {self.response_content_type}")
async def _aget_resp_content(
self, response: aiohttp.ClientResponse
) -> Union[str, Dict[str, Any]]:
if self.response_content_type == "text":
return await response.text()
elif self.response_content_type == "json":
return await response.json()
else:
raise ValueError(f"Invalid return type: {self.response_content_type}")
[docs] def get(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]:
"""获取URL并返回文本。"""
return self._get_resp_content(self.requests.get(url, **kwargs))
[docs] def post(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> Union[str, Dict[str, Any]]:
"""向URL发送POST请求并返回文本。"""
return self._get_resp_content(self.requests.post(url, data, **kwargs))
[docs] def patch(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> Union[str, Dict[str, Any]]:
"""修补URL并返回文本。"""
return self._get_resp_content(self.requests.patch(url, data, **kwargs))
[docs] def put(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> Union[str, Dict[str, Any]]:
"""将URL放入并返回文本。"""
return self._get_resp_content(self.requests.put(url, data, **kwargs))
[docs] def delete(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]:
"""删除URL并返回文本。"""
return self._get_resp_content(self.requests.delete(url, **kwargs))
[docs] async def aget(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]:
"""获取URL并异步返回文本。"""
async with self.requests.aget(url, **kwargs) as response:
return await self._aget_resp_content(response)
[docs] async def apost(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> Union[str, Dict[str, Any]]:
"""将文本异步地POST到URL并返回。"""
async with self.requests.apost(url, data, **kwargs) as response:
return await self._aget_resp_content(response)
[docs] async def apatch(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> Union[str, Dict[str, Any]]:
"""异步地PATCH URL并返回文本。"""
async with self.requests.apatch(url, data, **kwargs) as response:
return await self._aget_resp_content(response)
[docs] async def aput(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> Union[str, Dict[str, Any]]:
"""将URL放入并异步返回文本。"""
async with self.requests.aput(url, data, **kwargs) as response:
return await self._aget_resp_content(response)
[docs] async def adelete(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]:
"""删除URL并异步返回文本。"""
async with self.requests.adelete(url, **kwargs) as response:
return await self._aget_resp_content(response)
[docs]class JsonRequestsWrapper(GenericRequestsWrapper):
"""轻量级的requests库包装器,支持异步操作。
这个包装器的主要目的是始终返回一个json输出。"""
response_content_type: Literal["text", "json"] = "json"
[docs]class TextRequestsWrapper(GenericRequestsWrapper):
"""轻量级的requests库包装器,支持异步。
这个包装器的主要目的是始终返回文本输出。"""
response_content_type: Literal["text", "json"] = "text"
# For backwards compatibility
RequestsWrapper = TextRequestsWrapper