from __future__ import annotations
import logging
import os
import warnings
from typing import (
Any,
Callable,
Dict,
List,
Literal,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
import numpy as np
from langchain_core._api.deprecation import deprecated
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseModel, Extra, Field, root_validator
from langchain_core.utils import get_from_dict_or_env, get_pydantic_field_names
from tenacity import (
AsyncRetrying,
before_sleep_log,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from langchain_community.utils.openai import is_openai_v1
logger = logging.getLogger(__name__)
def _create_retry_decorator(embeddings: OpenAIEmbeddings) -> Callable[[Any], Any]:
import openai
# Wait 2^x * 1 second between each retry starting with
# retry_min_seconds seconds, then up to retry_max_seconds seconds,
# then retry_max_seconds seconds afterwards
# retry_min_seconds and retry_max_seconds are optional arguments of
# OpenAIEmbeddings
return retry(
reraise=True,
stop=stop_after_attempt(embeddings.max_retries),
wait=wait_exponential(
multiplier=1,
min=embeddings.retry_min_seconds,
max=embeddings.retry_max_seconds,
),
retry=(
retry_if_exception_type(openai.error.Timeout)
| retry_if_exception_type(openai.error.APIError)
| retry_if_exception_type(openai.error.APIConnectionError)
| retry_if_exception_type(openai.error.RateLimitError)
| retry_if_exception_type(openai.error.ServiceUnavailableError)
),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def _async_retry_decorator(embeddings: OpenAIEmbeddings) -> Any:
import openai
# Wait 2^x * 1 second between each retry starting with
# retry_min_seconds seconds, then up to retry_max_seconds seconds,
# then retry_max_seconds seconds afterwards
# retry_min_seconds and retry_max_seconds are optional arguments of
# OpenAIEmbeddings
async_retrying = AsyncRetrying(
reraise=True,
stop=stop_after_attempt(embeddings.max_retries),
wait=wait_exponential(
multiplier=1,
min=embeddings.retry_min_seconds,
max=embeddings.retry_max_seconds,
),
retry=(
retry_if_exception_type(openai.error.Timeout)
| retry_if_exception_type(openai.error.APIError)
| retry_if_exception_type(openai.error.APIConnectionError)
| retry_if_exception_type(openai.error.RateLimitError)
| retry_if_exception_type(openai.error.ServiceUnavailableError)
),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def wrap(func: Callable) -> Callable:
async def wrapped_f(*args: Any, **kwargs: Any) -> Callable:
async for _ in async_retrying:
return await func(*args, **kwargs)
raise AssertionError("this is unreachable")
return wrapped_f
return wrap
# https://stackoverflow.com/questions/76469415/getting-embeddings-of-length-1-from-langchain-openaiembeddings
def _check_response(response: dict, skip_empty: bool = False) -> dict:
if any(len(d["embedding"]) == 1 for d in response["data"]) and not skip_empty:
import openai
raise openai.error.APIError("OpenAI API returned an empty embedding")
return response
[docs]def embed_with_retry(embeddings: OpenAIEmbeddings, **kwargs: Any) -> Any:
"""使用tenacity来重试嵌入调用。"""
if is_openai_v1():
return embeddings.client.create(**kwargs)
retry_decorator = _create_retry_decorator(embeddings)
@retry_decorator
def _embed_with_retry(**kwargs: Any) -> Any:
response = embeddings.client.create(**kwargs)
return _check_response(response, skip_empty=embeddings.skip_empty)
return _embed_with_retry(**kwargs)
[docs]async def async_embed_with_retry(embeddings: OpenAIEmbeddings, **kwargs: Any) -> Any:
"""使用tenacity来重试嵌入调用。"""
if is_openai_v1():
return await embeddings.async_client.create(**kwargs)
@_async_retry_decorator(embeddings)
async def _async_embed_with_retry(**kwargs: Any) -> Any:
response = await embeddings.client.acreate(**kwargs)
return _check_response(response, skip_empty=embeddings.skip_empty)
return await _async_embed_with_retry(**kwargs)
[docs]@deprecated(
since="0.0.9",
removal="0.3.0",
alternative_import="langchain_openai.OpenAIEmbeddings",
)
class OpenAIEmbeddings(BaseModel, Embeddings):
"""OpenAI嵌入模型。
要使用,请确保已安装``openai`` Python包,并将环境变量``OPENAI_API_KEY``设置为您的API密钥,或将其作为构造函数的命名参数传递。
示例:
.. code-block:: python
from langchain_community.embeddings import OpenAIEmbeddings
openai = OpenAIEmbeddings(openai_api_key="my-api-key")
要使用Microsoft Azure端点的库,需要设置OPENAI_API_TYPE、OPENAI_API_BASE、OPENAI_API_KEY和OPENAI_API_VERSION。
OPENAI_API_TYPE必须设置为'azure',其他属性与您的端点属性相对应。
另外,部署名称必须作为model参数传递。
示例:
.. code-block:: python
import os
os.environ["OPENAI_API_TYPE"] = "azure"
os.environ["OPENAI_API_BASE"] = "https://<your-endpoint.openai.azure.com/"
os.environ["OPENAI_API_KEY"] = "your AzureOpenAI key"
os.environ["OPENAI_API_VERSION"] = "2023-05-15"
os.environ["OPENAI_PROXY"] = "http://your-corporate-proxy:8080"
from langchain_community.embeddings.openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(
deployment="your-embeddings-deployment-name",
model="your-embeddings-model-name",
openai_api_base="https://your-endpoint.openai.azure.com/",
openai_api_type="azure",
)
text = "This is a test query."
query_result = embeddings.embed_query(text)
"""
client: Any = Field(default=None, exclude=True) #: :meta private:
async_client: Any = Field(default=None, exclude=True) #: :meta private:
model: str = "text-embedding-ada-002"
# to support Azure OpenAI Service custom deployment names
deployment: Optional[str] = model
# TODO: Move to AzureOpenAIEmbeddings.
openai_api_version: Optional[str] = Field(default=None, alias="api_version")
"""如果未提供,将自动从环境变量`OPENAI_API_VERSION`中推断。"""
# to support Azure OpenAI Service custom endpoints
openai_api_base: Optional[str] = Field(default=None, alias="base_url")
"""API请求的基本URL路径,如果不使用代理或服务模拟器,请留空。"""
# to support Azure OpenAI Service custom endpoints
openai_api_type: Optional[str] = None
# to support explicit proxy for OpenAI
openai_proxy: Optional[str] = None
embedding_ctx_length: int = 8191
"""一次嵌入的最大令牌数。"""
openai_api_key: Optional[str] = Field(default=None, alias="api_key")
"""如果未提供,将自动从环境变量`OPENAI_API_KEY`中推断。"""
openai_organization: Optional[str] = Field(default=None, alias="organization")
"""如果未提供,将自环境变量`OPENAI_ORG_ID`自动推断。"""
allowed_special: Union[Literal["all"], Set[str]] = set()
disallowed_special: Union[Literal["all"], Set[str], Sequence[str]] = "all"
chunk_size: int = 1000
"""每个批次中嵌入的最大文本数量"""
max_retries: int = 2
"""生成时最大的重试次数。"""
request_timeout: Optional[Union[float, Tuple[float, float], Any]] = Field(
default=None, alias="timeout"
)
"""请求到OpenAI完成API的超时时间。可以是浮点数、httpx.Timeout或None。"""
headers: Any = None
tiktoken_enabled: bool = True
"""将此设置为False,用于嵌入API的非OpenAI实现,例如`text-generation-webui`的`--extensions openai`扩展。"""
tiktoken_model_name: Optional[str] = None
"""在使用这个类时,传递给tiktoken的模型名称。
Tiktoken用于计算文档中令牌的数量,以限制它们在某个特定限制之下。默认情况下,当设置为None时,这将与嵌入模型名称相同。然而,在一些情况下,您可能希望使用这个嵌入类与tiktoken不支持的模型名称一起使用。这可能包括使用Azure嵌入或使用许多提供类似OpenAI API但具有不同模型的模型提供商之一。在这些情况下,为了避免在调用tiktoken时出错,您可以在这里指定要使用的模型名称。"""
show_progress_bar: bool = False
"""在嵌入时是否显示进度条。"""
model_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""保存任何在`create`调用中有效但未明确指定的模型参数。"""
skip_empty: bool = False
"""是否在嵌入时跳过空字符串或引发错误。
默认情况下不跳过。"""
default_headers: Union[Mapping[str, str], None] = None
default_query: Union[Mapping[str, object], None] = None
# Configure a custom httpx client. See the
# [httpx documentation](https://www.python-httpx.org/api/#client) for more details.
retry_min_seconds: int = 4
"""重试之间等待的最短秒数"""
retry_max_seconds: int = 20
"""重试之间等待的最大秒数"""
http_client: Union[Any, None] = None
"""可选的 httpx.Client。"""
class Config:
"""此pydantic对象的配置。"""
extra = Extra.forbid
allow_population_by_field_name = True
@root_validator(pre=True)
def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""从传入的额外参数构建额外的kwargs。"""
all_required_field_names = get_pydantic_field_names(cls)
extra = values.get("model_kwargs", {})
for field_name in list(values):
if field_name in extra:
raise ValueError(f"Found {field_name} supplied twice.")
if field_name not in all_required_field_names:
warnings.warn(
f"""WARNING! {field_name} is not default parameter.
{field_name} was transferred to model_kwargs.
Please confirm that {field_name} is what you intended."""
)
extra[field_name] = values.pop(field_name)
invalid_model_kwargs = all_required_field_names.intersection(extra.keys())
if invalid_model_kwargs:
raise ValueError(
f"Parameters {invalid_model_kwargs} should be specified explicitly. "
f"Instead they were passed in as part of `model_kwargs` parameter."
)
values["model_kwargs"] = extra
return values
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""验证环境中是否存在API密钥和Python包。"""
values["openai_api_key"] = get_from_dict_or_env(
values, "openai_api_key", "OPENAI_API_KEY"
)
values["openai_api_base"] = values["openai_api_base"] or os.getenv(
"OPENAI_API_BASE"
)
values["openai_api_type"] = get_from_dict_or_env(
values,
"openai_api_type",
"OPENAI_API_TYPE",
default="",
)
values["openai_proxy"] = get_from_dict_or_env(
values,
"openai_proxy",
"OPENAI_PROXY",
default="",
)
if values["openai_api_type"] in ("azure", "azure_ad", "azuread"):
default_api_version = "2023-05-15"
# Azure OpenAI embedding models allow a maximum of 16 texts
# at a time in each batch
# See: https://learn.microsoft.com/en-us/azure/ai-services/openai/reference#embeddings
values["chunk_size"] = min(values["chunk_size"], 16)
else:
default_api_version = ""
values["openai_api_version"] = get_from_dict_or_env(
values,
"openai_api_version",
"OPENAI_API_VERSION",
default=default_api_version,
)
# Check OPENAI_ORGANIZATION for backwards compatibility.
values["openai_organization"] = (
values["openai_organization"]
or os.getenv("OPENAI_ORG_ID")
or os.getenv("OPENAI_ORGANIZATION")
)
try:
import openai
except ImportError:
raise ImportError(
"Could not import openai python package. "
"Please install it with `pip install openai`."
)
else:
if is_openai_v1():
if values["openai_api_type"] in ("azure", "azure_ad", "azuread"):
warnings.warn(
"If you have openai>=1.0.0 installed and are using Azure, "
"please use the `AzureOpenAIEmbeddings` class."
)
client_params = {
"api_key": values["openai_api_key"],
"organization": values["openai_organization"],
"base_url": values["openai_api_base"],
"timeout": values["request_timeout"],
"max_retries": values["max_retries"],
"default_headers": values["default_headers"],
"default_query": values["default_query"],
"http_client": values["http_client"],
}
if not values.get("client"):
values["client"] = openai.OpenAI(**client_params).embeddings
if not values.get("async_client"):
values["async_client"] = openai.AsyncOpenAI(
**client_params
).embeddings
elif not values.get("client"):
values["client"] = openai.Embedding
else:
pass
return values
@property
def _invocation_params(self) -> Dict[str, Any]:
if is_openai_v1():
openai_args: Dict = {"model": self.model, **self.model_kwargs}
else:
openai_args = {
"model": self.model,
"request_timeout": self.request_timeout,
"headers": self.headers,
"api_key": self.openai_api_key,
"organization": self.openai_organization,
"api_base": self.openai_api_base,
"api_type": self.openai_api_type,
"api_version": self.openai_api_version,
**self.model_kwargs,
}
if self.openai_api_type in ("azure", "azure_ad", "azuread"):
openai_args["engine"] = self.deployment
# TODO: Look into proxy with openai v1.
if self.openai_proxy:
try:
import openai
except ImportError:
raise ImportError(
"Could not import openai python package. "
"Please install it with `pip install openai`."
)
openai.proxy = {
"http": self.openai_proxy,
"https": self.openai_proxy,
} # type: ignore[assignment] # noqa: E501
return openai_args
# please refer to
# https://github.com/openai/openai-cookbook/blob/main/examples/Embedding_long_inputs.ipynb
def _get_len_safe_embeddings(
self, texts: List[str], *, engine: str, chunk_size: Optional[int] = None
) -> List[List[float]]:
"""为文本列表生成长度安全的嵌入。
该方法处理标记化和嵌入生成,遵守设置的嵌入上下文长度和块大小。它支持基于tiktoken和HuggingFace tokenizer的tokenizer,取决于tiktoken_enabled标志。
参数:
texts(List[str]):要嵌入的文本列表。
engine(str):用于嵌入的引擎或模型。
chunk_size(Optional[int]):用于处理嵌入的块大小。
返回:
List[List[float]]:每个输入文本的嵌入列表。
"""
tokens = []
indices = []
model_name = self.tiktoken_model_name or self.model
_chunk_size = chunk_size or self.chunk_size
# If tiktoken flag set to False
if not self.tiktoken_enabled:
try:
from transformers import AutoTokenizer
except ImportError:
raise ImportError(
"Could not import transformers python package. "
"This is needed in order to for OpenAIEmbeddings without "
"`tiktoken`. Please install it with `pip install transformers`. "
)
tokenizer = AutoTokenizer.from_pretrained(
pretrained_model_name_or_path=model_name
)
for i, text in enumerate(texts):
# Tokenize the text using HuggingFace transformers
tokenized = tokenizer.encode(text, add_special_tokens=False)
# Split tokens into chunks respecting the embedding_ctx_length
for j in range(0, len(tokenized), self.embedding_ctx_length):
token_chunk = tokenized[j : j + self.embedding_ctx_length]
# Convert token IDs back to a string
chunk_text = tokenizer.decode(token_chunk)
tokens.append(chunk_text)
indices.append(i)
else:
try:
import tiktoken
except ImportError:
raise ImportError(
"Could not import tiktoken python package. "
"This is needed in order to for OpenAIEmbeddings. "
"Please install it with `pip install tiktoken`."
)
try:
encoding = tiktoken.encoding_for_model(model_name)
except KeyError:
logger.warning("Warning: model not found. Using cl100k_base encoding.")
model = "cl100k_base"
encoding = tiktoken.get_encoding(model)
for i, text in enumerate(texts):
if self.model.endswith("001"):
# See: https://github.com/openai/openai-python/
# issues/418#issuecomment-1525939500
# replace newlines, which can negatively affect performance.
text = text.replace("\n", " ")
token = encoding.encode(
text=text,
allowed_special=self.allowed_special,
disallowed_special=self.disallowed_special,
)
# Split tokens into chunks respecting the embedding_ctx_length
for j in range(0, len(token), self.embedding_ctx_length):
tokens.append(token[j : j + self.embedding_ctx_length])
indices.append(i)
if self.show_progress_bar:
try:
from tqdm.auto import tqdm
_iter = tqdm(range(0, len(tokens), _chunk_size))
except ImportError:
_iter = range(0, len(tokens), _chunk_size)
else:
_iter = range(0, len(tokens), _chunk_size)
batched_embeddings: List[List[float]] = []
for i in _iter:
response = embed_with_retry(
self,
input=tokens[i : i + _chunk_size],
**self._invocation_params,
)
if not isinstance(response, dict):
response = response.dict()
batched_embeddings.extend(r["embedding"] for r in response["data"])
results: List[List[List[float]]] = [[] for _ in range(len(texts))]
num_tokens_in_batch: List[List[int]] = [[] for _ in range(len(texts))]
for i in range(len(indices)):
if self.skip_empty and len(batched_embeddings[i]) == 1:
continue
results[indices[i]].append(batched_embeddings[i])
num_tokens_in_batch[indices[i]].append(len(tokens[i]))
embeddings: List[List[float]] = [[] for _ in range(len(texts))]
for i in range(len(texts)):
_result = results[i]
if len(_result) == 0:
average_embedded = embed_with_retry(
self,
input="",
**self._invocation_params,
)
if not isinstance(average_embedded, dict):
average_embedded = average_embedded.dict()
average = average_embedded["data"][0]["embedding"]
else:
average = np.average(_result, axis=0, weights=num_tokens_in_batch[i])
embeddings[i] = (average / np.linalg.norm(average)).tolist()
return embeddings
# please refer to
# https://github.com/openai/openai-cookbook/blob/main/examples/Embedding_long_inputs.ipynb
async def _aget_len_safe_embeddings(
self, texts: List[str], *, engine: str, chunk_size: Optional[int] = None
) -> List[List[float]]:
"""异步生成一组文本的长度安全嵌入。
该方法处理标记化和异步嵌入生成,遵守设置的嵌入上下文长度和块大小。它支持基于`tiktoken`和HuggingFace `tokenizer`的嵌入器,取决于tiktoken_enabled标志。
参数:
texts (List[str]): 要嵌入的文本列表。
engine (str): 用于嵌入的引擎或模型。
chunk_size (Optional[int]): 用于处理嵌入的块大小。
返回:
List[List[float]]: 每个输入文本的嵌入列表。
"""
tokens = []
indices = []
model_name = self.tiktoken_model_name or self.model
_chunk_size = chunk_size or self.chunk_size
# If tiktoken flag set to False
if not self.tiktoken_enabled:
try:
from transformers import AutoTokenizer
except ImportError:
raise ImportError(
"Could not import transformers python package. "
"This is needed in order to for OpenAIEmbeddings without "
" `tiktoken`. Please install it with `pip install transformers`."
)
tokenizer = AutoTokenizer.from_pretrained(
pretrained_model_name_or_path=model_name
)
for i, text in enumerate(texts):
# Tokenize the text using HuggingFace transformers
tokenized = tokenizer.encode(text, add_special_tokens=False)
# Split tokens into chunks respecting the embedding_ctx_length
for j in range(0, len(tokenized), self.embedding_ctx_length):
token_chunk = tokenized[j : j + self.embedding_ctx_length]
# Convert token IDs back to a string
chunk_text = tokenizer.decode(token_chunk)
tokens.append(chunk_text)
indices.append(i)
else:
try:
import tiktoken
except ImportError:
raise ImportError(
"Could not import tiktoken python package. "
"This is needed in order to for OpenAIEmbeddings. "
"Please install it with `pip install tiktoken`."
)
try:
encoding = tiktoken.encoding_for_model(model_name)
except KeyError:
logger.warning("Warning: model not found. Using cl100k_base encoding.")
model = "cl100k_base"
encoding = tiktoken.get_encoding(model)
for i, text in enumerate(texts):
if self.model.endswith("001"):
# See: https://github.com/openai/openai-python/
# issues/418#issuecomment-1525939500
# replace newlines, which can negatively affect performance.
text = text.replace("\n", " ")
token = encoding.encode(
text=text,
allowed_special=self.allowed_special,
disallowed_special=self.disallowed_special,
)
# Split tokens into chunks respecting the embedding_ctx_length
for j in range(0, len(token), self.embedding_ctx_length):
tokens.append(token[j : j + self.embedding_ctx_length])
indices.append(i)
batched_embeddings: List[List[float]] = []
_chunk_size = chunk_size or self.chunk_size
for i in range(0, len(tokens), _chunk_size):
response = await async_embed_with_retry(
self,
input=tokens[i : i + _chunk_size],
**self._invocation_params,
)
if not isinstance(response, dict):
response = response.dict()
batched_embeddings.extend(r["embedding"] for r in response["data"])
results: List[List[List[float]]] = [[] for _ in range(len(texts))]
num_tokens_in_batch: List[List[int]] = [[] for _ in range(len(texts))]
for i in range(len(indices)):
results[indices[i]].append(batched_embeddings[i])
num_tokens_in_batch[indices[i]].append(len(tokens[i]))
embeddings: List[List[float]] = [[] for _ in range(len(texts))]
for i in range(len(texts)):
_result = results[i]
if len(_result) == 0:
average_embedded = await async_embed_with_retry(
self,
input="",
**self._invocation_params,
)
if not isinstance(average_embedded, dict):
average_embedded = average_embedded.dict()
average = average_embedded["data"][0]["embedding"]
else:
average = np.average(_result, axis=0, weights=num_tokens_in_batch[i])
embeddings[i] = (average / np.linalg.norm(average)).tolist()
return embeddings
[docs] def embed_documents(
self, texts: List[str], chunk_size: Optional[int] = 0
) -> List[List[float]]:
"""调用OpenAI的嵌入端点以获取嵌入搜索文档。
参数:
texts:要嵌入的文本列表。
chunk_size:嵌入的块大小。如果为None,将使用类指定的块大小。
返回:
每个文本的嵌入列表。
"""
# NOTE: to keep things simple, we assume the list may contain texts longer
# than the maximum context and use length-safe embedding function.
engine = cast(str, self.deployment)
return self._get_len_safe_embeddings(texts, engine=engine)
[docs] async def aembed_documents(
self, texts: List[str], chunk_size: Optional[int] = 0
) -> List[List[float]]:
"""调用OpenAI的嵌入端点异步进行嵌入搜索文档。
参数:
texts:要嵌入的文本列表。
chunk_size:嵌入的块大小。如果为None,则将使用类别指定的块大小。
返回:
每个文本的嵌入列表。
"""
# NOTE: to keep things simple, we assume the list may contain texts longer
# than the maximum context and use length-safe embedding function.
engine = cast(str, self.deployment)
return await self._aget_len_safe_embeddings(texts, engine=engine)
[docs] def embed_query(self, text: str) -> List[float]:
"""调用OpenAI的嵌入端点来嵌入查询文本。
参数:
text:要嵌入的文本。
返回:
文本的嵌入。
"""
return self.embed_documents([text])[0]
[docs] async def aembed_query(self, text: str) -> List[float]:
"""调用OpenAI的嵌入端点异步地为嵌入查询文本。
参数:
text:要嵌入的文本。
返回:
文本的嵌入。
"""
embeddings = await self.aembed_documents([text])
return embeddings[0]