Source code for langchain_community.llms.baidu_qianfan_endpoint

from __future__ import annotations

import logging
from typing import (
    Any,
    AsyncIterator,
    Dict,
    Iterator,
    List,
    Optional,
)

from langchain_core.callbacks import (
    AsyncCallbackManagerForLLMRun,
    CallbackManagerForLLMRun,
)
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
from langchain_core.pydantic_v1 import Field, root_validator
from langchain_core.utils import convert_to_secret_str, get_from_dict_or_env

logger = logging.getLogger(__name__)


[docs]class QianfanLLMEndpoint(LLM): """百度千帆托管开源或定制模型。 要使用,您应该已安装``qianfan`` python包,并且设置了环境变量``qianfan_ak``和``qianfan_sk``,并使用您的API密钥和Secret Key。 ak、sk是必需的参数,您可以从以下网址获取:https://cloud.baidu.com/product/wenxinworkshop 示例: .. code-block:: python from langchain_community.llms import QianfanLLMEndpoint qianfan_model = QianfanLLMEndpoint(model="ERNIE-Bot", endpoint="your_endpoint", qianfan_ak="your_ak", qianfan_sk="your_sk")""" init_kwargs: Dict[str, Any] = Field(default_factory=dict) """初始化qianfan客户端的kwargs,例如`query_per_second`,它与qianfan资源对象相关联,用于限制QPS。""" model_kwargs: Dict[str, Any] = Field(default_factory=dict) """使用`do`时调用模型的额外参数。""" client: Any qianfan_ak: Optional[str] = None qianfan_sk: Optional[str] = None streaming: Optional[bool] = False """是否要流式传输结果。""" model: str = "ERNIE-Bot-turbo" """模型名称。 您可以从https://cloud.baidu.com/doc/WENXINWORKSHOP/s/Nlks5zkzu 获取 预设模型将映射到一个端点。 如果设置了`endpoint`,则将忽略`model`。""" endpoint: Optional[str] = None """Qianfan LLM的端点,如果使用自定义模型则需要。""" request_timeout: Optional[int] = 60 """聊天http请求的请求超时""" top_p: Optional[float] = 0.8 temperature: Optional[float] = 0.95 penalty_score: Optional[float] = 1 """模型参数,仅支持ERNIE-Bot和ERNIE-Bot-turbo。 在其他模型的情况下,传递这些参数不会影响结果。""" @root_validator() def validate_environment(cls, values: Dict) -> Dict: values["qianfan_ak"] = convert_to_secret_str( get_from_dict_or_env( values, "qianfan_ak", "QIANFAN_AK", default="", ) ) values["qianfan_sk"] = convert_to_secret_str( get_from_dict_or_env( values, "qianfan_sk", "QIANFAN_SK", default="", ) ) params = { **values.get("init_kwargs", {}), "model": values["model"], } if values["qianfan_ak"].get_secret_value() != "": params["ak"] = values["qianfan_ak"].get_secret_value() if values["qianfan_sk"].get_secret_value() != "": params["sk"] = values["qianfan_sk"].get_secret_value() if values["endpoint"] is not None and values["endpoint"] != "": params["endpoint"] = values["endpoint"] try: import qianfan values["client"] = qianfan.Completion(**params) except ImportError: raise ImportError( "qianfan package not found, please install it with " "`pip install qianfan`" ) return values @property def _identifying_params(self) -> Dict[str, Any]: return { **{"endpoint": self.endpoint, "model": self.model}, **super()._identifying_params, } @property def _llm_type(self) -> str: """llm的返回类型。""" return "baidu-qianfan-endpoint" @property def _default_params(self) -> Dict[str, Any]: """获取调用Qianfan API 的默认参数。""" normal_params = { "model": self.model, "endpoint": self.endpoint, "stream": self.streaming, "request_timeout": self.request_timeout, "top_p": self.top_p, "temperature": self.temperature, "penalty_score": self.penalty_score, } return {**normal_params, **self.model_kwargs} def _convert_prompt_msg_params( self, prompt: str, **kwargs: Any, ) -> dict: if "streaming" in kwargs: kwargs["stream"] = kwargs.pop("streaming") return { **{"prompt": prompt, "model": self.model}, **self._default_params, **kwargs, } def _call( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: """调用每一代的前翻模型端点来生成提示。 参数: prompt: 传递给模型的提示。 stop: 生成时可选的停止词列表。 返回: 模型生成的字符串。 示例: .. code-block:: python response = qianfan_model.invoke("Tell me a joke.") """ if self.streaming: completion = "" for chunk in self._stream(prompt, stop, run_manager, **kwargs): completion += chunk.text return completion params = self._convert_prompt_msg_params(prompt, **kwargs) params["stop"] = stop response_payload = self.client.do(**params) return response_payload["result"] async def _acall( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: if self.streaming: completion = "" async for chunk in self._astream(prompt, stop, run_manager, **kwargs): completion += chunk.text return completion params = self._convert_prompt_msg_params(prompt, **kwargs) params["stop"] = stop response_payload = await self.client.ado(**params) return response_payload["result"] def _stream( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Iterator[GenerationChunk]: params = self._convert_prompt_msg_params(prompt, **{**kwargs, "stream": True}) params["stop"] = stop for res in self.client.do(**params): if res: chunk = GenerationChunk(text=res["result"]) if run_manager: run_manager.on_llm_new_token(chunk.text) yield chunk async def _astream( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> AsyncIterator[GenerationChunk]: params = self._convert_prompt_msg_params(prompt, **{**kwargs, "stream": True}) params["stop"] = stop async for res in await self.client.ado(**params): if res: chunk = GenerationChunk(text=res["result"]) if run_manager: await run_manager.on_llm_new_token(chunk.text) yield chunk