Source code for langchain_community.utilities.requests

"""轻量级的requests库包装器,支持异步。"""
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Literal, Optional, Union

import aiohttp
import requests
from langchain_core.pydantic_v1 import BaseModel, Extra
from requests import Response


[docs]class Requests(BaseModel): """对requests的包装,用于处理身份验证和异步操作。 这个包装的主要目的是处理身份验证(通过保存头信息)并在同一个基本对象上启用简单的异步方法。""" headers: Optional[Dict[str, str]] = None aiosession: Optional[aiohttp.ClientSession] = None auth: Optional[Any] = None class Config: """此pydantic对象的配置。""" extra = Extra.forbid arbitrary_types_allowed = True
[docs] def get(self, url: str, **kwargs: Any) -> requests.Response: """获取URL并返回文本。""" return requests.get(url, headers=self.headers, auth=self.auth, **kwargs)
[docs] def post(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response: """向URL发送POST请求并返回文本。""" return requests.post( url, json=data, headers=self.headers, auth=self.auth, **kwargs )
[docs] def patch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response: """修补URL并返回文本。""" return requests.patch( url, json=data, headers=self.headers, auth=self.auth, **kwargs )
[docs] def put(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response: """将URL放入并返回文本。""" return requests.put( url, json=data, headers=self.headers, auth=self.auth, **kwargs )
[docs] def delete(self, url: str, **kwargs: Any) -> requests.Response: """删除URL并返回文本。""" return requests.delete(url, headers=self.headers, auth=self.auth, **kwargs)
@asynccontextmanager async def _arequest( self, method: str, url: str, **kwargs: Any ) -> AsyncGenerator[aiohttp.ClientResponse, None]: """发起一个异步请求。""" if not self.aiosession: async with aiohttp.ClientSession() as session: async with session.request( method, url, headers=self.headers, auth=self.auth, **kwargs ) as response: yield response else: async with self.aiosession.request( method, url, headers=self.headers, auth=self.auth, **kwargs ) as response: yield response
[docs] @asynccontextmanager async def aget( self, url: str, **kwargs: Any ) -> AsyncGenerator[aiohttp.ClientResponse, None]: """获取URL并异步返回文本。""" async with self._arequest("GET", url, **kwargs) as response: yield response
[docs] @asynccontextmanager async def apost( self, url: str, data: Dict[str, Any], **kwargs: Any ) -> AsyncGenerator[aiohttp.ClientResponse, None]: """将文本异步地POST到URL并返回。""" async with self._arequest("POST", url, json=data, **kwargs) as response: yield response
[docs] @asynccontextmanager async def apatch( self, url: str, data: Dict[str, Any], **kwargs: Any ) -> AsyncGenerator[aiohttp.ClientResponse, None]: """异步地PATCH URL并返回文本。""" async with self._arequest("PATCH", url, json=data, **kwargs) as response: yield response
[docs] @asynccontextmanager async def aput( self, url: str, data: Dict[str, Any], **kwargs: Any ) -> AsyncGenerator[aiohttp.ClientResponse, None]: """将URL放入并异步返回文本。""" async with self._arequest("PUT", url, json=data, **kwargs) as response: yield response
[docs] @asynccontextmanager async def adelete( self, url: str, **kwargs: Any ) -> AsyncGenerator[aiohttp.ClientResponse, None]: """删除URL并异步返回文本。""" async with self._arequest("DELETE", url, **kwargs) as response: yield response
[docs]class GenericRequestsWrapper(BaseModel): """轻量级的requests库封装。""" headers: Optional[Dict[str, str]] = None aiosession: Optional[aiohttp.ClientSession] = None auth: Optional[Any] = None response_content_type: Literal["text", "json"] = "text" class Config: """此pydantic对象的配置。""" extra = Extra.forbid arbitrary_types_allowed = True @property def requests(self) -> Requests: return Requests( headers=self.headers, aiosession=self.aiosession, auth=self.auth ) def _get_resp_content(self, response: Response) -> Union[str, Dict[str, Any]]: if self.response_content_type == "text": return response.text elif self.response_content_type == "json": return response.json() else: raise ValueError(f"Invalid return type: {self.response_content_type}") async def _aget_resp_content( self, response: aiohttp.ClientResponse ) -> Union[str, Dict[str, Any]]: if self.response_content_type == "text": return await response.text() elif self.response_content_type == "json": return await response.json() else: raise ValueError(f"Invalid return type: {self.response_content_type}")
[docs] def get(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]: """获取URL并返回文本。""" return self._get_resp_content(self.requests.get(url, **kwargs))
[docs] def post( self, url: str, data: Dict[str, Any], **kwargs: Any ) -> Union[str, Dict[str, Any]]: """向URL发送POST请求并返回文本。""" return self._get_resp_content(self.requests.post(url, data, **kwargs))
[docs] def patch( self, url: str, data: Dict[str, Any], **kwargs: Any ) -> Union[str, Dict[str, Any]]: """修补URL并返回文本。""" return self._get_resp_content(self.requests.patch(url, data, **kwargs))
[docs] def put( self, url: str, data: Dict[str, Any], **kwargs: Any ) -> Union[str, Dict[str, Any]]: """将URL放入并返回文本。""" return self._get_resp_content(self.requests.put(url, data, **kwargs))
[docs] def delete(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]: """删除URL并返回文本。""" return self._get_resp_content(self.requests.delete(url, **kwargs))
[docs] async def aget(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]: """获取URL并异步返回文本。""" async with self.requests.aget(url, **kwargs) as response: return await self._aget_resp_content(response)
[docs] async def apost( self, url: str, data: Dict[str, Any], **kwargs: Any ) -> Union[str, Dict[str, Any]]: """将文本异步地POST到URL并返回。""" async with self.requests.apost(url, data, **kwargs) as response: return await self._aget_resp_content(response)
[docs] async def apatch( self, url: str, data: Dict[str, Any], **kwargs: Any ) -> Union[str, Dict[str, Any]]: """异步地PATCH URL并返回文本。""" async with self.requests.apatch(url, data, **kwargs) as response: return await self._aget_resp_content(response)
[docs] async def aput( self, url: str, data: Dict[str, Any], **kwargs: Any ) -> Union[str, Dict[str, Any]]: """将URL放入并异步返回文本。""" async with self.requests.aput(url, data, **kwargs) as response: return await self._aget_resp_content(response)
[docs] async def adelete(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]: """删除URL并异步返回文本。""" async with self.requests.adelete(url, **kwargs) as response: return await self._aget_resp_content(response)
[docs]class JsonRequestsWrapper(GenericRequestsWrapper): """轻量级的requests库包装器,支持异步操作。 这个包装器的主要目的是始终返回一个json输出。""" response_content_type: Literal["text", "json"] = "json"
[docs]class TextRequestsWrapper(GenericRequestsWrapper): """轻量级的requests库包装器,支持异步。 这个包装器的主要目的是始终返回文本输出。""" response_content_type: Literal["text", "json"] = "text"
# For backwards compatibility RequestsWrapper = TextRequestsWrapper