from __future__ import annotations
import logging
from typing import Any, Callable, Dict, List, Optional, Sequence
from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.llms import LLM
from langchain_core.load.serializable import Serializable
from langchain_core.pydantic_v1 import SecretStr, root_validator
from langchain_core.utils import convert_to_secret_str, get_from_dict_or_env
from tenacity import (
before_sleep_log,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from langchain_community.llms.utils import enforce_stop_tokens
logger = logging.getLogger(__name__)
class _BaseYandexGPT(Serializable):
iam_token: SecretStr = "" # type: ignore[assignment]
"""Yandex Cloud IAM令牌适用于服务或用户帐户,具有`ai.languageModels.user`角色。"""
api_key: SecretStr = "" # type: ignore[assignment]
"""Yandex云服务帐户的API密钥
具有`ai.languageModels.user`角色"""
folder_id: str = ""
"""Yandex云文件夹ID"""
model_uri: str = ""
"""要使用的模型URI。"""
model_name: str = "yandexgpt-lite"
"""要使用的模型名称。"""
model_version: str = "latest"
"""要使用的模型版本。"""
temperature: float = 0.6
"""使用什么抽样温度。
应该是一个介于0(包括)和1(包括)之间的双精度数。"""
max_tokens: int = 7400
"""设置用于输入提示和生成响应的令牌总数的最大限制。
必须大于零且不超过7400个令牌。"""
stop: Optional[List[str]] = None
"""完成生成时序列将停止。"""
url: str = "llm.api.cloud.yandex.net:443"
"""API的URL。"""
max_retries: int = 6
"""生成时最大的重试次数。"""
sleep_interval: float = 1.0
"""API请求之间的延迟"""
disable_request_logging: bool = False
"""YandexGPT API默认记录所有请求数据。
如果您提供个人数据、机密信息,请禁用日志记录。"""
_grpc_metadata: Sequence
@property
def _llm_type(self) -> str:
return "yandex_gpt"
@property
def _identifying_params(self) -> Dict[str, Any]:
"""获取识别参数。"""
return {
"model_uri": self.model_uri,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"stop": self.stop,
"max_retries": self.max_retries,
}
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""验证环境中是否存在IAM令牌。"""
iam_token = convert_to_secret_str(
get_from_dict_or_env(values, "iam_token", "YC_IAM_TOKEN", "")
)
values["iam_token"] = iam_token
api_key = convert_to_secret_str(
get_from_dict_or_env(values, "api_key", "YC_API_KEY", "")
)
values["api_key"] = api_key
folder_id = get_from_dict_or_env(values, "folder_id", "YC_FOLDER_ID", "")
values["folder_id"] = folder_id
if api_key.get_secret_value() == "" and iam_token.get_secret_value() == "":
raise ValueError("Either 'YC_API_KEY' or 'YC_IAM_TOKEN' must be provided.")
if values["iam_token"]:
values["_grpc_metadata"] = [
("authorization", f"Bearer {values['iam_token'].get_secret_value()}")
]
if values["folder_id"]:
values["_grpc_metadata"].append(("x-folder-id", values["folder_id"]))
else:
values["_grpc_metadata"] = (
("authorization", f"Api-Key {values['api_key'].get_secret_value()}"),
)
if values["model_uri"] == "" and values["folder_id"] == "":
raise ValueError("Either 'model_uri' or 'folder_id' must be provided.")
if not values["model_uri"]:
values[
"model_uri"
] = f"gpt://{values['folder_id']}/{values['model_name']}/{values['model_version']}"
if values["disable_request_logging"]:
values["_grpc_metadata"].append(
(
"x-data-logging-enabled",
"false",
)
)
return values
[docs]class YandexGPT(_BaseYandexGPT, LLM):
"""Yandex大型语言模型。
要使用,您应该安装``yandexcloud`` python包。
有两种身份验证选项适用于具有``ai.languageModels.user``角色的服务帐户:
- 您可以在构造函数参数`iam_token`中指定令牌,或者在环境变量`YC_IAM_TOKEN`中指定。
- 您可以在构造函数参数`api_key`中指定密钥,或者在环境变量`YC_API_KEY`中指定。
要使用默认模型,请在参数`folder_id`中指定文件夹ID,或在环境变量`YC_FOLDER_ID`中指定。
或在构造函数参数`model_uri`中指定模型URI。
示例:
.. code-block:: python
from langchain_community.llms import YandexGPT
yandex_gpt = YandexGPT(iam_token="t1.9eu...", folder_id="b1g...")"""
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""调用Yandex GPT模型并返回输出。
参数:
prompt: 传递给模型的提示。
stop: 生成时可选的停止词列表。
返回:
模型生成的字符串。
示例:
.. code-block:: python
response = YandexGPT("Tell me a joke.")
"""
text = completion_with_retry(self, prompt=prompt)
if stop is not None:
text = enforce_stop_tokens(text, stop)
return text
async def _acall(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""异步调用Yandex GPT模型并返回输出。
参数:
prompt: 传递给模型的提示。
stop: 生成时可选的停止词列表。
返回:
模型生成的字符串。
"""
text = await acompletion_with_retry(self, prompt=prompt)
if stop is not None:
text = enforce_stop_tokens(text, stop)
return text
def _make_request(
self: YandexGPT,
prompt: str,
) -> str:
try:
import grpc
from google.protobuf.wrappers_pb2 import DoubleValue, Int64Value
try:
from yandex.cloud.ai.foundation_models.v1.text_common_pb2 import (
CompletionOptions,
Message,
)
from yandex.cloud.ai.foundation_models.v1.text_generation.text_generation_service_pb2 import ( # noqa: E501
CompletionRequest,
)
from yandex.cloud.ai.foundation_models.v1.text_generation.text_generation_service_pb2_grpc import ( # noqa: E501
TextGenerationServiceStub,
)
except ModuleNotFoundError:
from yandex.cloud.ai.foundation_models.v1.foundation_models_pb2 import (
CompletionOptions,
Message,
)
from yandex.cloud.ai.foundation_models.v1.foundation_models_service_pb2 import ( # noqa: E501
CompletionRequest,
)
from yandex.cloud.ai.foundation_models.v1.foundation_models_service_pb2_grpc import ( # noqa: E501
TextGenerationServiceStub,
)
except ImportError as e:
raise ImportError(
"Please install YandexCloud SDK with `pip install yandexcloud` \
or upgrade it to recent version."
) from e
channel_credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(self.url, channel_credentials)
request = CompletionRequest(
model_uri=self.model_uri,
completion_options=CompletionOptions(
temperature=DoubleValue(value=self.temperature),
max_tokens=Int64Value(value=self.max_tokens),
),
messages=[Message(role="user", text=prompt)],
)
stub = TextGenerationServiceStub(channel)
res = stub.Completion(request, metadata=self._grpc_metadata) # type: ignore[attr-defined]
return list(res)[0].alternatives[0].message.text
async def _amake_request(self: YandexGPT, prompt: str) -> str:
try:
import asyncio
import grpc
from google.protobuf.wrappers_pb2 import DoubleValue, Int64Value
try:
from yandex.cloud.ai.foundation_models.v1.text_common_pb2 import (
CompletionOptions,
Message,
)
from yandex.cloud.ai.foundation_models.v1.text_generation.text_generation_service_pb2 import ( # noqa: E501
CompletionRequest,
CompletionResponse,
)
from yandex.cloud.ai.foundation_models.v1.text_generation.text_generation_service_pb2_grpc import ( # noqa: E501
TextGenerationAsyncServiceStub,
)
except ModuleNotFoundError:
from yandex.cloud.ai.foundation_models.v1.foundation_models_pb2 import (
CompletionOptions,
Message,
)
from yandex.cloud.ai.foundation_models.v1.foundation_models_service_pb2 import ( # noqa: E501
CompletionRequest,
CompletionResponse,
)
from yandex.cloud.ai.foundation_models.v1.foundation_models_service_pb2_grpc import ( # noqa: E501
TextGenerationAsyncServiceStub,
)
from yandex.cloud.operation.operation_service_pb2 import GetOperationRequest
from yandex.cloud.operation.operation_service_pb2_grpc import (
OperationServiceStub,
)
except ImportError as e:
raise ImportError(
"Please install YandexCloud SDK with `pip install yandexcloud` \
or upgrade it to recent version."
) from e
operation_api_url = "operation.api.cloud.yandex.net:443"
channel_credentials = grpc.ssl_channel_credentials()
async with grpc.aio.secure_channel(self.url, channel_credentials) as channel:
request = CompletionRequest(
model_uri=self.model_uri,
completion_options=CompletionOptions(
temperature=DoubleValue(value=self.temperature),
max_tokens=Int64Value(value=self.max_tokens),
),
messages=[Message(role="user", text=prompt)],
)
stub = TextGenerationAsyncServiceStub(channel)
operation = await stub.Completion(request, metadata=self._grpc_metadata) # type: ignore[attr-defined]
async with grpc.aio.secure_channel(
operation_api_url, channel_credentials
) as operation_channel:
operation_stub = OperationServiceStub(operation_channel)
while not operation.done:
await asyncio.sleep(1)
operation_request = GetOperationRequest(operation_id=operation.id)
operation = await operation_stub.Get(
operation_request,
metadata=self._grpc_metadata, # type: ignore[attr-defined]
)
completion_response = CompletionResponse()
operation.response.Unpack(completion_response)
return completion_response.alternatives[0].message.text
def _create_retry_decorator(llm: YandexGPT) -> Callable[[Any], Any]:
from grpc import RpcError
min_seconds = llm.sleep_interval
max_seconds = 60
return retry(
reraise=True,
stop=stop_after_attempt(llm.max_retries),
wait=wait_exponential(multiplier=1, min=min_seconds, max=max_seconds),
retry=(retry_if_exception_type((RpcError))),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
[docs]def completion_with_retry(llm: YandexGPT, **kwargs: Any) -> Any:
"""使用tenacity来重试完成调用。"""
retry_decorator = _create_retry_decorator(llm)
@retry_decorator
def _completion_with_retry(**_kwargs: Any) -> Any:
return _make_request(llm, **_kwargs)
return _completion_with_retry(**kwargs)
[docs]async def acompletion_with_retry(llm: YandexGPT, **kwargs: Any) -> Any:
"""使用tenacity来重试异步完成调用。"""
retry_decorator = _create_retry_decorator(llm)
@retry_decorator
async def _completion_with_retry(**_kwargs: Any) -> Any:
return await _amake_request(llm, **_kwargs)
return await _completion_with_retry(**kwargs)