from __future__ import annotations
import logging
import os
import sys
import warnings
from typing import (
AbstractSet,
Any,
AsyncIterator,
Callable,
Collection,
Dict,
Iterator,
List,
Literal,
Mapping,
Optional,
Set,
Tuple,
Union,
)
from langchain_core._api.deprecation import deprecated
from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.llms import BaseLLM, create_base_retry_decorator
from langchain_core.outputs import Generation, GenerationChunk, LLMResult
from langchain_core.pydantic_v1 import Field, root_validator
from langchain_core.utils import get_from_dict_or_env, get_pydantic_field_names
from langchain_core.utils.utils import build_extra_kwargs
from langchain_community.utils.openai import is_openai_v1
logger = logging.getLogger(__name__)
[docs]def update_token_usage(
keys: Set[str], response: Dict[str, Any], token_usage: Dict[str, Any]
) -> None:
"""更新令牌使用。"""
_keys_to_use = keys.intersection(response["usage"])
for _key in _keys_to_use:
if _key not in token_usage:
token_usage[_key] = response["usage"][_key]
else:
token_usage[_key] += response["usage"][_key]
def _stream_response_to_generation_chunk(
stream_response: Dict[str, Any],
) -> GenerationChunk:
"""将流响应转换为生成块。"""
if not stream_response["choices"]:
return GenerationChunk(text="")
return GenerationChunk(
text=stream_response["choices"][0]["text"],
generation_info=dict(
finish_reason=stream_response["choices"][0].get("finish_reason", None),
logprobs=stream_response["choices"][0].get("logprobs", None),
),
)
def _update_response(response: Dict[str, Any], stream_response: Dict[str, Any]) -> None:
"""更新来自流响应的响应。"""
response["choices"][0]["text"] += stream_response["choices"][0]["text"]
response["choices"][0]["finish_reason"] = stream_response["choices"][0].get(
"finish_reason", None
)
response["choices"][0]["logprobs"] = stream_response["choices"][0]["logprobs"]
def _streaming_response_template() -> Dict[str, Any]:
return {
"choices": [
{
"text": "",
"finish_reason": None,
"logprobs": None,
}
]
}
def _create_retry_decorator(
llm: Union[BaseOpenAI, OpenAIChat],
run_manager: Optional[
Union[AsyncCallbackManagerForLLMRun, CallbackManagerForLLMRun]
] = None,
) -> Callable[[Any], Any]:
import openai
errors = [
openai.error.Timeout,
openai.error.APIError,
openai.error.APIConnectionError,
openai.error.RateLimitError,
openai.error.ServiceUnavailableError,
]
return create_base_retry_decorator(
error_types=errors, max_retries=llm.max_retries, run_manager=run_manager
)
[docs]def completion_with_retry(
llm: Union[BaseOpenAI, OpenAIChat],
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Any:
"""使用tenacity来重试完成调用。"""
if is_openai_v1():
return llm.client.create(**kwargs)
retry_decorator = _create_retry_decorator(llm, run_manager=run_manager)
@retry_decorator
def _completion_with_retry(**kwargs: Any) -> Any:
return llm.client.create(**kwargs)
return _completion_with_retry(**kwargs)
[docs]async def acompletion_with_retry(
llm: Union[BaseOpenAI, OpenAIChat],
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Any:
"""使用tenacity来重试异步完成调用。"""
if is_openai_v1():
return await llm.async_client.create(**kwargs)
retry_decorator = _create_retry_decorator(llm, run_manager=run_manager)
@retry_decorator
async def _completion_with_retry(**kwargs: Any) -> Any:
# Use OpenAI's async api https://github.com/openai/openai-python#async-api
return await llm.client.acreate(**kwargs)
return await _completion_with_retry(**kwargs)
[docs]class BaseOpenAI(BaseLLM):
"""基础OpenAI大型语言模型类。"""
@property
def lc_secrets(self) -> Dict[str, str]:
return {"openai_api_key": "OPENAI_API_KEY"}
[docs] @classmethod
def get_lc_namespace(cls) -> List[str]:
"""获取langchain对象的命名空间。"""
return ["langchain", "llms", "openai"]
@property
def lc_attributes(self) -> Dict[str, Any]:
attributes: Dict[str, Any] = {}
if self.openai_api_base:
attributes["openai_api_base"] = self.openai_api_base
if self.openai_organization:
attributes["openai_organization"] = self.openai_organization
if self.openai_proxy:
attributes["openai_proxy"] = self.openai_proxy
return attributes
[docs] @classmethod
def is_lc_serializable(cls) -> bool:
return True
client: Any = Field(default=None, exclude=True) #: :meta private:
async_client: Any = Field(default=None, exclude=True) #: :meta private:
model_name: str = Field(default="gpt-3.5-turbo-instruct", alias="model")
"""要使用的模型名称。"""
temperature: float = 0.7
"""使用哪种采样温度。"""
max_tokens: int = 256
"""生成完成时要生成的最大令牌数。
-1 返回尽可能多的令牌,考虑到提示和模型的最大上下文大小。"""
top_p: float = 1
"""每一步需要考虑的标记的总概率质量。"""
frequency_penalty: float = 0
"""根据频率惩罚重复的标记。"""
presence_penalty: float = 0
"""惩罚重复的标记。"""
n: int = 1
"""每个提示生成多少个完成。"""
best_of: int = 1
"""生成服务器端的最佳完成并返回“最佳”。"""
model_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""保存任何在`create`调用中有效但未明确指定的模型参数。"""
# When updating this to use a SecretStr
# Check for classes that derive from this class (as some of them
# may assume openai_api_key is a str)
openai_api_key: Optional[str] = Field(default=None, alias="api_key")
"""如果未提供,将自动从环境变量`OPENAI_API_KEY`中推断。"""
openai_api_base: Optional[str] = Field(default=None, alias="base_url")
"""API请求的基本URL路径,如果不使用代理或服务模拟器,请留空。"""
openai_organization: Optional[str] = Field(default=None, alias="organization")
"""如果未提供,将自环境变量`OPENAI_ORG_ID`自动推断。"""
# to support explicit proxy for OpenAI
openai_proxy: Optional[str] = None
batch_size: int = 20
"""用于传递多个文档以生成的批处理大小。"""
request_timeout: Union[float, Tuple[float, float], Any, None] = Field(
default=None, alias="timeout"
)
"""请求到OpenAI完成API的超时时间。可以是浮点数、httpx.Timeout或None。"""
logit_bias: Optional[Dict[str, float]] = Field(default_factory=dict)
"""调整生成特定令牌的概率。"""
max_retries: int = 2
"""生成时最大的重试次数。"""
streaming: bool = False
"""是否要流式传输结果。"""
allowed_special: Union[Literal["all"], AbstractSet[str]] = set()
"""允许的特殊标记集合。"""
disallowed_special: Union[Literal["all"], Collection[str]] = "all"
"""不允许的特殊标记集合。"""
tiktoken_model_name: Optional[str] = None
"""在使用这个类时,传递给tiktoken的模型名称。
Tiktoken用于计算文档中令牌的数量,以限制它们在某个特定限制之下。默认情况下,当设置为None时,这将与嵌入模型名称相同。然而,在一些情况下,您可能希望使用这个嵌入类与tiktoken不支持的模型名称一起使用。这可能包括使用Azure嵌入或使用许多提供类似OpenAI API但具有不同模型的模型提供商之一。在这些情况下,为了避免在调用tiktoken时出错,您可以在这里指定要使用的模型名称。"""
default_headers: Union[Mapping[str, str], None] = None
default_query: Union[Mapping[str, object], None] = None
# Configure a custom httpx client. See the
# [httpx documentation](https://www.python-httpx.org/api/#client) for more details.
http_client: Union[Any, None] = None
"""可选的 httpx.Client。"""
def __new__(cls, **data: Any) -> Union[OpenAIChat, BaseOpenAI]: # type: ignore
"""初始化OpenAI对象。"""
model_name = data.get("model_name", "")
if (
model_name.startswith("gpt-3.5-turbo") or model_name.startswith("gpt-4")
) and "-instruct" not in model_name:
warnings.warn(
"You are trying to use a chat model. This way of initializing it is "
"no longer supported. Instead, please use: "
"`from langchain_community.chat_models import ChatOpenAI`"
)
return OpenAIChat(**data)
return super().__new__(cls)
class Config:
"""此pydantic对象的配置。"""
allow_population_by_field_name = True
@root_validator(pre=True)
def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""从传入的额外参数构建额外的kwargs。"""
all_required_field_names = get_pydantic_field_names(cls)
extra = values.get("model_kwargs", {})
values["model_kwargs"] = build_extra_kwargs(
extra, values, all_required_field_names
)
return values
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""验证环境中是否存在API密钥和Python包。"""
if values["n"] < 1:
raise ValueError("n must be at least 1.")
if values["streaming"] and values["n"] > 1:
raise ValueError("Cannot stream results when n > 1.")
if values["streaming"] and values["best_of"] > 1:
raise ValueError("Cannot stream results when best_of > 1.")
values["openai_api_key"] = get_from_dict_or_env(
values, "openai_api_key", "OPENAI_API_KEY"
)
values["openai_api_base"] = values["openai_api_base"] or os.getenv(
"OPENAI_API_BASE"
)
values["openai_proxy"] = get_from_dict_or_env(
values,
"openai_proxy",
"OPENAI_PROXY",
default="",
)
values["openai_organization"] = (
values["openai_organization"]
or os.getenv("OPENAI_ORG_ID")
or os.getenv("OPENAI_ORGANIZATION")
)
try:
import openai
except ImportError:
raise ImportError(
"Could not import openai python package. "
"Please install it with `pip install openai`."
)
if is_openai_v1():
client_params = {
"api_key": values["openai_api_key"],
"organization": values["openai_organization"],
"base_url": values["openai_api_base"],
"timeout": values["request_timeout"],
"max_retries": values["max_retries"],
"default_headers": values["default_headers"],
"default_query": values["default_query"],
"http_client": values["http_client"],
}
if not values.get("client"):
values["client"] = openai.OpenAI(**client_params).completions
if not values.get("async_client"):
values["async_client"] = openai.AsyncOpenAI(**client_params).completions
elif not values.get("client"):
values["client"] = openai.Completion
else:
pass
return values
@property
def _default_params(self) -> Dict[str, Any]:
"""获取调用OpenAI API的默认参数。"""
normal_params: Dict[str, Any] = {
"temperature": self.temperature,
"top_p": self.top_p,
"frequency_penalty": self.frequency_penalty,
"presence_penalty": self.presence_penalty,
"n": self.n,
"logit_bias": self.logit_bias,
}
if self.max_tokens is not None:
normal_params["max_tokens"] = self.max_tokens
if self.request_timeout is not None and not is_openai_v1():
normal_params["request_timeout"] = self.request_timeout
# Azure gpt-35-turbo doesn't support best_of
# don't specify best_of if it is 1
if self.best_of > 1:
normal_params["best_of"] = self.best_of
return {**normal_params, **self.model_kwargs}
def _stream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[GenerationChunk]:
params = {**self._invocation_params, **kwargs, "stream": True}
self.get_sub_prompts(params, [prompt], stop) # this mutates params
for stream_resp in completion_with_retry(
self, prompt=prompt, run_manager=run_manager, **params
):
if not isinstance(stream_resp, dict):
stream_resp = stream_resp.dict()
chunk = _stream_response_to_generation_chunk(stream_resp)
if run_manager:
run_manager.on_llm_new_token(
chunk.text,
chunk=chunk,
verbose=self.verbose,
logprobs=chunk.generation_info["logprobs"]
if chunk.generation_info
else None,
)
yield chunk
async def _astream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[GenerationChunk]:
params = {**self._invocation_params, **kwargs, "stream": True}
self.get_sub_prompts(params, [prompt], stop) # this mutates params
async for stream_resp in await acompletion_with_retry(
self, prompt=prompt, run_manager=run_manager, **params
):
if not isinstance(stream_resp, dict):
stream_resp = stream_resp.dict()
chunk = _stream_response_to_generation_chunk(stream_resp)
if run_manager:
await run_manager.on_llm_new_token(
chunk.text,
chunk=chunk,
verbose=self.verbose,
logprobs=chunk.generation_info["logprobs"]
if chunk.generation_info
else None,
)
yield chunk
def _generate(
self,
prompts: List[str],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> LLMResult:
"""使用k个唯一提示调用OpenAI的端点。
参数:
prompts:传递给模型的提示。
stop:生成时可选的停止词列表。
返回:
完整的LLM输出。
示例:
.. code-block:: python
response = openai.generate(["Tell me a joke."])
"""
# TODO: write a unit test for this
params = self._invocation_params
params = {**params, **kwargs}
sub_prompts = self.get_sub_prompts(params, prompts, stop)
choices = []
token_usage: Dict[str, int] = {}
# Get the token usage from the response.
# Includes prompt, completion, and total tokens used.
_keys = {"completion_tokens", "prompt_tokens", "total_tokens"}
system_fingerprint: Optional[str] = None
for _prompts in sub_prompts:
if self.streaming:
if len(_prompts) > 1:
raise ValueError("Cannot stream results with multiple prompts.")
generation: Optional[GenerationChunk] = None
for chunk in self._stream(_prompts[0], stop, run_manager, **kwargs):
if generation is None:
generation = chunk
else:
generation += chunk
assert generation is not None
choices.append(
{
"text": generation.text,
"finish_reason": generation.generation_info.get("finish_reason")
if generation.generation_info
else None,
"logprobs": generation.generation_info.get("logprobs")
if generation.generation_info
else None,
}
)
else:
response = completion_with_retry(
self, prompt=_prompts, run_manager=run_manager, **params
)
if not isinstance(response, dict):
# V1 client returns the response in an PyDantic object instead of
# dict. For the transition period, we deep convert it to dict.
response = response.dict()
choices.extend(response["choices"])
update_token_usage(_keys, response, token_usage)
if not system_fingerprint:
system_fingerprint = response.get("system_fingerprint")
return self.create_llm_result(
choices,
prompts,
params,
token_usage,
system_fingerprint=system_fingerprint,
)
async def _agenerate(
self,
prompts: List[str],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> LLMResult:
"""使用k个唯一提示异步调用OpenAI的端点。"""
params = self._invocation_params
params = {**params, **kwargs}
sub_prompts = self.get_sub_prompts(params, prompts, stop)
choices = []
token_usage: Dict[str, int] = {}
# Get the token usage from the response.
# Includes prompt, completion, and total tokens used.
_keys = {"completion_tokens", "prompt_tokens", "total_tokens"}
system_fingerprint: Optional[str] = None
for _prompts in sub_prompts:
if self.streaming:
if len(_prompts) > 1:
raise ValueError("Cannot stream results with multiple prompts.")
generation: Optional[GenerationChunk] = None
async for chunk in self._astream(
_prompts[0], stop, run_manager, **kwargs
):
if generation is None:
generation = chunk
else:
generation += chunk
assert generation is not None
choices.append(
{
"text": generation.text,
"finish_reason": generation.generation_info.get("finish_reason")
if generation.generation_info
else None,
"logprobs": generation.generation_info.get("logprobs")
if generation.generation_info
else None,
}
)
else:
response = await acompletion_with_retry(
self, prompt=_prompts, run_manager=run_manager, **params
)
if not isinstance(response, dict):
response = response.dict()
choices.extend(response["choices"])
update_token_usage(_keys, response, token_usage)
return self.create_llm_result(
choices,
prompts,
params,
token_usage,
system_fingerprint=system_fingerprint,
)
[docs] def get_sub_prompts(
self,
params: Dict[str, Any],
prompts: List[str],
stop: Optional[List[str]] = None,
) -> List[List[str]]:
"""获取llm调用的子提示。"""
if stop is not None:
if "stop" in params:
raise ValueError("`stop` found in both the input and default params.")
params["stop"] = stop
if params["max_tokens"] == -1:
if len(prompts) != 1:
raise ValueError(
"max_tokens set to -1 not supported for multiple inputs."
)
params["max_tokens"] = self.max_tokens_for_prompt(prompts[0])
sub_prompts = [
prompts[i : i + self.batch_size]
for i in range(0, len(prompts), self.batch_size)
]
return sub_prompts
[docs] def create_llm_result(
self,
choices: Any,
prompts: List[str],
params: Dict[str, Any],
token_usage: Dict[str, int],
*,
system_fingerprint: Optional[str] = None,
) -> LLMResult:
"""从选择和提示中创建LLMResult。"""
generations = []
n = params.get("n", self.n)
for i, _ in enumerate(prompts):
sub_choices = choices[i * n : (i + 1) * n]
generations.append(
[
Generation(
text=choice["text"],
generation_info=dict(
finish_reason=choice.get("finish_reason"),
logprobs=choice.get("logprobs"),
),
)
for choice in sub_choices
]
)
llm_output = {"token_usage": token_usage, "model_name": self.model_name}
if system_fingerprint:
llm_output["system_fingerprint"] = system_fingerprint
return LLMResult(generations=generations, llm_output=llm_output)
@property
def _invocation_params(self) -> Dict[str, Any]:
"""获取用于调用模型的参数。"""
openai_creds: Dict[str, Any] = {}
if not is_openai_v1():
openai_creds.update(
{
"api_key": self.openai_api_key,
"api_base": self.openai_api_base,
"organization": self.openai_organization,
}
)
if self.openai_proxy:
import openai
openai.proxy = {"http": self.openai_proxy, "https": self.openai_proxy} # type: ignore[assignment] # noqa: E501
return {**openai_creds, **self._default_params}
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""获取识别参数。"""
return {**{"model_name": self.model_name}, **self._default_params}
@property
def _llm_type(self) -> str:
"""llm的返回类型。"""
return "openai"
[docs] def get_token_ids(self, text: str) -> List[int]:
"""使用tiktoken包获取令牌ID。"""
# tiktoken NOT supported for Python < 3.8
if sys.version_info[1] < 8:
return super().get_num_tokens(text)
try:
import tiktoken
except ImportError:
raise ImportError(
"Could not import tiktoken python package. "
"This is needed in order to calculate get_num_tokens. "
"Please install it with `pip install tiktoken`."
)
model_name = self.tiktoken_model_name or self.model_name
try:
enc = tiktoken.encoding_for_model(model_name)
except KeyError:
logger.warning("Warning: model not found. Using cl100k_base encoding.")
model = "cl100k_base"
enc = tiktoken.get_encoding(model)
return enc.encode(
text,
allowed_special=self.allowed_special,
disallowed_special=self.disallowed_special,
)
[docs] @staticmethod
def modelname_to_contextsize(modelname: str) -> int:
"""计算模型可以生成的最大令牌数量。
参数:
modelname: 我们想要知道上下文大小的模型名称。
返回值:
最大上下文大小
示例:
.. code-block:: python
max_tokens = openai.modelname_to_contextsize("gpt-3.5-turbo-instruct")
"""
model_token_mapping = {
"gpt-4o": 128_000,
"gpt-4o-2024-05-13": 128_000,
"gpt-4": 8192,
"gpt-4-0314": 8192,
"gpt-4-0613": 8192,
"gpt-4-32k": 32768,
"gpt-4-32k-0314": 32768,
"gpt-4-32k-0613": 32768,
"gpt-3.5-turbo": 4096,
"gpt-3.5-turbo-0301": 4096,
"gpt-3.5-turbo-0613": 4096,
"gpt-3.5-turbo-16k": 16385,
"gpt-3.5-turbo-16k-0613": 16385,
"gpt-3.5-turbo-instruct": 4096,
"text-ada-001": 2049,
"ada": 2049,
"text-babbage-001": 2040,
"babbage": 2049,
"text-curie-001": 2049,
"curie": 2049,
"davinci": 2049,
"text-davinci-003": 4097,
"text-davinci-002": 4097,
"code-davinci-002": 8001,
"code-davinci-001": 8001,
"code-cushman-002": 2048,
"code-cushman-001": 2048,
}
# handling finetuned models
if "ft-" in modelname:
modelname = modelname.split(":")[0]
context_size = model_token_mapping.get(modelname, None)
if context_size is None:
raise ValueError(
f"Unknown model: {modelname}. Please provide a valid OpenAI model name."
"Known models are: " + ", ".join(model_token_mapping.keys())
)
return context_size
@property
def max_context_size(self) -> int:
"""获取此模型的最大上下文大小。"""
return self.modelname_to_contextsize(self.model_name)
[docs] def max_tokens_for_prompt(self, prompt: str) -> int:
"""计算为提示生成的最大令牌数量。
参数:
prompt:传递给模型的提示。
返回:
为提示生成的最大令牌数量。
示例:
.. code-block:: python
max_tokens = openai.max_token_for_prompt("Tell me a joke.")
"""
num_tokens = self.get_num_tokens(prompt)
return self.max_context_size - num_tokens
[docs]@deprecated(
since="0.0.10", removal="0.3.0", alternative_import="langchain_openai.OpenAI"
)
class OpenAI(BaseOpenAI):
"""OpenAI大型语言模型。
要使用,您应该已安装``openai`` python包,并且设置了环境变量``OPENAI_API_KEY``为您的API密钥。
任何可以传递给openai.create调用的有效参数都可以传递,即使在此类上没有明确保存。
示例:
.. code-block:: python
from langchain_community.llms import OpenAI
openai = OpenAI(model_name="gpt-3.5-turbo-instruct")
"""
[docs] @classmethod
def get_lc_namespace(cls) -> List[str]:
"""获取langchain对象的命名空间。"""
return ["langchain", "llms", "openai"]
@property
def _invocation_params(self) -> Dict[str, Any]:
return {**{"model": self.model_name}, **super()._invocation_params}
[docs]@deprecated(
since="0.0.10", removal="0.3.0", alternative_import="langchain_openai.AzureOpenAI"
)
class AzureOpenAI(BaseOpenAI):
"""Azure特定的OpenAI大型语言模型。
要使用,您应该已安装``openai`` python包,并且设置了环境变量``OPENAI_API_KEY``为您的API密钥。
可以传递任何有效传递给openai.create调用的参数,即使在此类上没有明确保存。
示例:
.. code-block:: python
from langchain_community.llms import AzureOpenAI
openai = AzureOpenAI(model_name="gpt-3.5-turbo-instruct")
"""
azure_endpoint: Union[str, None] = None
"""您的Azure端点,包括资源。
如果未提供,则会自动从环境变量`AZURE_OPENAI_ENDPOINT`中推断。
示例:`https://example-resource.azure.openai.com/`"""
deployment_name: Union[str, None] = Field(default=None, alias="azure_deployment")
"""模型部署。
如果给定,则设置基本客户端URL以包括`/deployments/{azure_deployment}`。
注意:这意味着您将无法使用非部署端点。"""
openai_api_version: str = Field(default="", alias="api_version")
"""如果未提供,将自动从环境变量`OPENAI_API_VERSION`中推断。"""
openai_api_key: Union[str, None] = Field(default=None, alias="api_key")
"""如果未提供,将自动从环境变量`AZURE_OPENAI_API_KEY`中推断。"""
azure_ad_token: Union[str, None] = None
"""您的Azure Active Directory令牌。
如果未提供,将自动从环境变量`AZURE_OPENAI_AD_TOKEN`中推断。
了解更多信息:
https://www.microsoft.com/en-us/security/business/identity-access/microsoft-entra-id。""" # noqa: E501
azure_ad_token_provider: Union[Callable[[], str], None] = None
"""一个返回Azure Active Directory令牌的函数。
将在每个请求上被调用。"""
openai_api_type: str = ""
"""遗留代码,用于支持openai<1.0.0。"""
validate_base_url: bool = True
"""为了向后兼容。如果传入了旧的值openai_api_base,请尝试推断它是base_url还是azure_endpoint,并相应地进行更新。"""
[docs] @classmethod
def get_lc_namespace(cls) -> List[str]:
"""获取langchain对象的命名空间。"""
return ["langchain", "llms", "openai"]
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""验证环境中是否存在API密钥和Python包。"""
if values["n"] < 1:
raise ValueError("n must be at least 1.")
if values["streaming"] and values["n"] > 1:
raise ValueError("Cannot stream results when n > 1.")
if values["streaming"] and values["best_of"] > 1:
raise ValueError("Cannot stream results when best_of > 1.")
# Check OPENAI_KEY for backwards compatibility.
# TODO: Remove OPENAI_API_KEY support to avoid possible conflict when using
# other forms of azure credentials.
values["openai_api_key"] = (
values["openai_api_key"]
or os.getenv("AZURE_OPENAI_API_KEY")
or os.getenv("OPENAI_API_KEY")
)
values["azure_endpoint"] = values["azure_endpoint"] or os.getenv(
"AZURE_OPENAI_ENDPOINT"
)
values["azure_ad_token"] = values["azure_ad_token"] or os.getenv(
"AZURE_OPENAI_AD_TOKEN"
)
values["openai_api_base"] = values["openai_api_base"] or os.getenv(
"OPENAI_API_BASE"
)
values["openai_proxy"] = get_from_dict_or_env(
values,
"openai_proxy",
"OPENAI_PROXY",
default="",
)
values["openai_organization"] = (
values["openai_organization"]
or os.getenv("OPENAI_ORG_ID")
or os.getenv("OPENAI_ORGANIZATION")
)
values["openai_api_version"] = values["openai_api_version"] or os.getenv(
"OPENAI_API_VERSION"
)
values["openai_api_type"] = get_from_dict_or_env(
values, "openai_api_type", "OPENAI_API_TYPE", default="azure"
)
try:
import openai
except ImportError:
raise ImportError(
"Could not import openai python package. "
"Please install it with `pip install openai`."
)
if is_openai_v1():
# For backwards compatibility. Before openai v1, no distinction was made
# between azure_endpoint and base_url (openai_api_base).
openai_api_base = values["openai_api_base"]
if openai_api_base and values["validate_base_url"]:
if "/openai" not in openai_api_base:
values["openai_api_base"] = (
values["openai_api_base"].rstrip("/") + "/openai"
)
warnings.warn(
"As of openai>=1.0.0, Azure endpoints should be specified via "
f"the `azure_endpoint` param not `openai_api_base` "
f"(or alias `base_url`). Updating `openai_api_base` from "
f"{openai_api_base} to {values['openai_api_base']}."
)
if values["deployment_name"]:
warnings.warn(
"As of openai>=1.0.0, if `deployment_name` (or alias "
"`azure_deployment`) is specified then "
"`openai_api_base` (or alias `base_url`) should not be. "
"Instead use `deployment_name` (or alias `azure_deployment`) "
"and `azure_endpoint`."
)
if values["deployment_name"] not in values["openai_api_base"]:
warnings.warn(
"As of openai>=1.0.0, if `openai_api_base` "
"(or alias `base_url`) is specified it is expected to be "
"of the form "
"https://example-resource.azure.openai.com/openai/deployments/example-deployment. " # noqa: E501
f"Updating {openai_api_base} to "
f"{values['openai_api_base']}."
)
values["openai_api_base"] += (
"/deployments/" + values["deployment_name"]
)
values["deployment_name"] = None
client_params = {
"api_version": values["openai_api_version"],
"azure_endpoint": values["azure_endpoint"],
"azure_deployment": values["deployment_name"],
"api_key": values["openai_api_key"],
"azure_ad_token": values["azure_ad_token"],
"azure_ad_token_provider": values["azure_ad_token_provider"],
"organization": values["openai_organization"],
"base_url": values["openai_api_base"],
"timeout": values["request_timeout"],
"max_retries": values["max_retries"],
"default_headers": values["default_headers"],
"default_query": values["default_query"],
"http_client": values["http_client"],
}
values["client"] = openai.AzureOpenAI(**client_params).completions
values["async_client"] = openai.AsyncAzureOpenAI(
**client_params
).completions
else:
values["client"] = openai.Completion
return values
@property
def _identifying_params(self) -> Mapping[str, Any]:
return {
**{"deployment_name": self.deployment_name},
**super()._identifying_params,
}
@property
def _invocation_params(self) -> Dict[str, Any]:
if is_openai_v1():
openai_params = {"model": self.deployment_name}
else:
openai_params = {
"engine": self.deployment_name,
"api_type": self.openai_api_type,
"api_version": self.openai_api_version,
}
return {**openai_params, **super()._invocation_params}
@property
def _llm_type(self) -> str:
"""llm的返回类型。"""
return "azure"
@property
def lc_attributes(self) -> Dict[str, Any]:
return {
"openai_api_type": self.openai_api_type,
"openai_api_version": self.openai_api_version,
}
[docs]@deprecated(
since="0.0.1",
removal="0.3.0",
alternative_import="langchain_openai.ChatOpenAI",
)
class OpenAIChat(BaseLLM):
"""打开AI聊天大型语言模型。
要使用,您应该已经安装了``openai`` python包,并且设置了环境变量``OPENAI_API_KEY``为您的API密钥。
可以传递到openai.create调用的任何有效参数,即使在此类上没有明确保存。
示例:
.. code-block:: python
from langchain_community.llms import OpenAIChat
openaichat = OpenAIChat(model_name="gpt-3.5-turbo")
"""
client: Any = Field(default=None, exclude=True) #: :meta private:
async_client: Any = Field(default=None, exclude=True) #: :meta private:
model_name: str = "gpt-3.5-turbo"
"""要使用的模型名称。"""
model_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""保存任何在`create`调用中有效但未明确指定的模型参数。"""
# When updating this to use a SecretStr
# Check for classes that derive from this class (as some of them
# may assume openai_api_key is a str)
openai_api_key: Optional[str] = Field(default=None, alias="api_key")
"""如果未提供,将自动从环境变量`OPENAI_API_KEY`中推断。"""
openai_api_base: Optional[str] = Field(default=None, alias="base_url")
"""API请求的基本URL路径,如果不使用代理或服务模拟器,请留空。"""
# to support explicit proxy for OpenAI
openai_proxy: Optional[str] = None
max_retries: int = 6
"""生成时最大的重试次数。"""
prefix_messages: List = Field(default_factory=list)
"""聊天输入的消息系列。"""
streaming: bool = False
"""是否要流式传输结果。"""
allowed_special: Union[Literal["all"], AbstractSet[str]] = set()
"""允许的特殊标记集合。"""
disallowed_special: Union[Literal["all"], Collection[str]] = "all"
"""不允许的特殊标记集合。"""
@root_validator(pre=True)
def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""从传入的额外参数构建额外的kwargs。"""
all_required_field_names = {field.alias for field in cls.__fields__.values()}
extra = values.get("model_kwargs", {})
for field_name in list(values):
if field_name not in all_required_field_names:
if field_name in extra:
raise ValueError(f"Found {field_name} supplied twice.")
extra[field_name] = values.pop(field_name)
values["model_kwargs"] = extra
return values
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""验证环境中是否存在API密钥和Python包。"""
openai_api_key = get_from_dict_or_env(
values, "openai_api_key", "OPENAI_API_KEY"
)
openai_api_base = get_from_dict_or_env(
values,
"openai_api_base",
"OPENAI_API_BASE",
default="",
)
openai_proxy = get_from_dict_or_env(
values,
"openai_proxy",
"OPENAI_PROXY",
default="",
)
openai_organization = get_from_dict_or_env(
values, "openai_organization", "OPENAI_ORGANIZATION", default=""
)
try:
import openai
openai.api_key = openai_api_key
if openai_api_base:
openai.api_base = openai_api_base
if openai_organization:
openai.organization = openai_organization
if openai_proxy:
openai.proxy = {"http": openai_proxy, "https": openai_proxy} # type: ignore[assignment] # noqa: E501
except ImportError:
raise ImportError(
"Could not import openai python package. "
"Please install it with `pip install openai`."
)
try:
values["client"] = openai.ChatCompletion
except AttributeError:
raise ValueError(
"`openai` has no `ChatCompletion` attribute, this is likely "
"due to an old version of the openai package. Try upgrading it "
"with `pip install --upgrade openai`."
)
warnings.warn(
"You are trying to use a chat model. This way of initializing it is "
"no longer supported. Instead, please use: "
"`from langchain_community.chat_models import ChatOpenAI`"
)
return values
@property
def _default_params(self) -> Dict[str, Any]:
"""获取调用OpenAI API的默认参数。"""
return self.model_kwargs
def _get_chat_params(
self, prompts: List[str], stop: Optional[List[str]] = None
) -> Tuple:
if len(prompts) > 1:
raise ValueError(
f"OpenAIChat currently only supports single prompt, got {prompts}"
)
messages = self.prefix_messages + [{"role": "user", "content": prompts[0]}]
params: Dict[str, Any] = {**{"model": self.model_name}, **self._default_params}
if stop is not None:
if "stop" in params:
raise ValueError("`stop` found in both the input and default params.")
params["stop"] = stop
if params.get("max_tokens") == -1:
# for ChatGPT api, omitting max_tokens is equivalent to having no limit
del params["max_tokens"]
return messages, params
def _stream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[GenerationChunk]:
messages, params = self._get_chat_params([prompt], stop)
params = {**params, **kwargs, "stream": True}
for stream_resp in completion_with_retry(
self, messages=messages, run_manager=run_manager, **params
):
if not isinstance(stream_resp, dict):
stream_resp = stream_resp.dict()
token = stream_resp["choices"][0]["delta"].get("content", "")
chunk = GenerationChunk(text=token)
if run_manager:
run_manager.on_llm_new_token(token, chunk=chunk)
yield chunk
async def _astream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[GenerationChunk]:
messages, params = self._get_chat_params([prompt], stop)
params = {**params, **kwargs, "stream": True}
async for stream_resp in await acompletion_with_retry(
self, messages=messages, run_manager=run_manager, **params
):
if not isinstance(stream_resp, dict):
stream_resp = stream_resp.dict()
token = stream_resp["choices"][0]["delta"].get("content", "")
chunk = GenerationChunk(text=token)
if run_manager:
await run_manager.on_llm_new_token(token, chunk=chunk)
yield chunk
def _generate(
self,
prompts: List[str],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> LLMResult:
if self.streaming:
generation: Optional[GenerationChunk] = None
for chunk in self._stream(prompts[0], stop, run_manager, **kwargs):
if generation is None:
generation = chunk
else:
generation += chunk
assert generation is not None
return LLMResult(generations=[[generation]])
messages, params = self._get_chat_params(prompts, stop)
params = {**params, **kwargs}
full_response = completion_with_retry(
self, messages=messages, run_manager=run_manager, **params
)
if not isinstance(full_response, dict):
full_response = full_response.dict()
llm_output = {
"token_usage": full_response["usage"],
"model_name": self.model_name,
}
return LLMResult(
generations=[
[Generation(text=full_response["choices"][0]["message"]["content"])]
],
llm_output=llm_output,
)
async def _agenerate(
self,
prompts: List[str],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> LLMResult:
if self.streaming:
generation: Optional[GenerationChunk] = None
async for chunk in self._astream(prompts[0], stop, run_manager, **kwargs):
if generation is None:
generation = chunk
else:
generation += chunk
assert generation is not None
return LLMResult(generations=[[generation]])
messages, params = self._get_chat_params(prompts, stop)
params = {**params, **kwargs}
full_response = await acompletion_with_retry(
self, messages=messages, run_manager=run_manager, **params
)
if not isinstance(full_response, dict):
full_response = full_response.dict()
llm_output = {
"token_usage": full_response["usage"],
"model_name": self.model_name,
}
return LLMResult(
generations=[
[Generation(text=full_response["choices"][0]["message"]["content"])]
],
llm_output=llm_output,
)
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""获取识别参数。"""
return {**{"model_name": self.model_name}, **self._default_params}
@property
def _llm_type(self) -> str:
"""llm的返回类型。"""
return "openai-chat"
[docs] def get_token_ids(self, text: str) -> List[int]:
"""使用tiktoken包获取令牌ID。"""
# tiktoken NOT supported for Python < 3.8
if sys.version_info[1] < 8:
return super().get_token_ids(text)
try:
import tiktoken
except ImportError:
raise ImportError(
"Could not import tiktoken python package. "
"This is needed in order to calculate get_num_tokens. "
"Please install it with `pip install tiktoken`."
)
enc = tiktoken.encoding_for_model(self.model_name)
return enc.encode(
text,
allowed_special=self.allowed_special,
disallowed_special=self.disallowed_special,
)