from __future__ import annotations
import asyncio
import inspect
import uuid
import warnings
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Callable,
Dict,
Iterator,
List,
Literal,
Optional,
Sequence,
Type,
Union,
cast,
)
from typing_extensions import TypedDict
from langchain_core._api import deprecated
from langchain_core.caches import BaseCache
from langchain_core.callbacks import (
AsyncCallbackManager,
AsyncCallbackManagerForLLMRun,
BaseCallbackManager,
CallbackManager,
CallbackManagerForLLMRun,
Callbacks,
)
from langchain_core.globals import get_llm_cache
from langchain_core.language_models.base import BaseLanguageModel, LanguageModelInput
from langchain_core.load import dumpd, dumps
from langchain_core.messages import (
AIMessage,
AnyMessage,
BaseMessage,
BaseMessageChunk,
HumanMessage,
convert_to_messages,
message_chunk_to_message,
)
from langchain_core.outputs import (
ChatGeneration,
ChatGenerationChunk,
ChatResult,
LLMResult,
RunInfo,
)
from langchain_core.prompt_values import ChatPromptValue, PromptValue, StringPromptValue
from langchain_core.pydantic_v1 import Field, root_validator
from langchain_core.runnables.config import ensure_config, run_in_executor
from langchain_core.tracers._streaming import _StreamingCallbackHandler
if TYPE_CHECKING:
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables import Runnable, RunnableConfig
from langchain_core.tools import BaseTool
[docs]class LangSmithParams(TypedDict, total=False):
ls_provider: str
ls_model_name: str
ls_model_type: Literal["chat"]
ls_temperature: Optional[float]
ls_max_tokens: Optional[int]
ls_stop: Optional[List[str]]
[docs]def generate_from_stream(stream: Iterator[ChatGenerationChunk]) -> ChatResult:
"""从流中生成。"""
generation: Optional[ChatGenerationChunk] = None
for chunk in stream:
if generation is None:
generation = chunk
else:
generation += chunk
assert generation is not None
return ChatResult(
generations=[
ChatGeneration(
message=message_chunk_to_message(generation.message),
generation_info=generation.generation_info,
)
]
)
[docs]async def agenerate_from_stream(
stream: AsyncIterator[ChatGenerationChunk],
) -> ChatResult:
"""异步从流中生成数据。"""
generation: Optional[ChatGenerationChunk] = None
async for chunk in stream:
if generation is None:
generation = chunk
else:
generation += chunk
assert generation is not None
return ChatResult(
generations=[
ChatGeneration(
message=message_chunk_to_message(generation.message),
generation_info=generation.generation_info,
)
]
)
[docs]class BaseChatModel(BaseLanguageModel[BaseMessage], ABC):
"""用于聊天模型的基类。"""
callback_manager: Optional[BaseCallbackManager] = Field(default=None, exclude=True)
"""[已弃用] 添加到运行跟踪的回调管理器。"""
@root_validator()
def raise_deprecation(cls, values: Dict) -> Dict:
"""如果使用callback_manager,则发出弃用警告。"""
if values.get("callback_manager") is not None:
warnings.warn(
"callback_manager is deprecated. Please use callbacks instead.",
DeprecationWarning,
)
values["callbacks"] = values.pop("callback_manager", None)
return values
class Config:
"""这个pydantic对象的配置。"""
arbitrary_types_allowed = True
# --- Runnable methods ---
@property
def OutputType(self) -> Any:
"""获取此可运行程序的输出类型。"""
return AnyMessage
def _convert_input(self, input: LanguageModelInput) -> PromptValue:
if isinstance(input, PromptValue):
return input
elif isinstance(input, str):
return StringPromptValue(text=input)
elif isinstance(input, Sequence):
return ChatPromptValue(messages=convert_to_messages(input))
else:
raise ValueError(
f"Invalid input type {type(input)}. "
"Must be a PromptValue, str, or list of BaseMessages."
)
[docs] def invoke(
self,
input: LanguageModelInput,
config: Optional[RunnableConfig] = None,
*,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> BaseMessage:
config = ensure_config(config)
return cast(
ChatGeneration,
self.generate_prompt(
[self._convert_input(input)],
stop=stop,
callbacks=config.get("callbacks"),
tags=config.get("tags"),
metadata=config.get("metadata"),
run_name=config.get("run_name"),
run_id=config.pop("run_id", None),
**kwargs,
).generations[0][0],
).message
[docs] async def ainvoke(
self,
input: LanguageModelInput,
config: Optional[RunnableConfig] = None,
*,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> BaseMessage:
config = ensure_config(config)
llm_result = await self.agenerate_prompt(
[self._convert_input(input)],
stop=stop,
callbacks=config.get("callbacks"),
tags=config.get("tags"),
metadata=config.get("metadata"),
run_name=config.get("run_name"),
run_id=config.pop("run_id", None),
**kwargs,
)
return cast(ChatGeneration, llm_result.generations[0][0]).message
[docs] def stream(
self,
input: LanguageModelInput,
config: Optional[RunnableConfig] = None,
*,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> Iterator[BaseMessageChunk]:
if type(self)._stream == BaseChatModel._stream:
# model doesn't implement streaming, so use default implementation
yield cast(
BaseMessageChunk, self.invoke(input, config=config, stop=stop, **kwargs)
)
else:
config = ensure_config(config)
messages = self._convert_input(input).to_messages()
params = self._get_invocation_params(stop=stop, **kwargs)
options = {"stop": stop, **kwargs}
inheritable_metadata = {
**(config.get("metadata") or {}),
**self._get_ls_params(stop=stop, **kwargs),
}
callback_manager = CallbackManager.configure(
config.get("callbacks"),
self.callbacks,
self.verbose,
config.get("tags"),
self.tags,
inheritable_metadata,
self.metadata,
)
(run_manager,) = callback_manager.on_chat_model_start(
dumpd(self),
[messages],
invocation_params=params,
options=options,
name=config.get("run_name"),
run_id=config.pop("run_id", None),
batch_size=1,
)
generation: Optional[ChatGenerationChunk] = None
try:
for chunk in self._stream(messages, stop=stop, **kwargs):
if chunk.message.id is None:
chunk.message.id = f"run-{run_manager.run_id}"
chunk.message.response_metadata = _gen_info_and_msg_metadata(chunk)
run_manager.on_llm_new_token(
cast(str, chunk.message.content), chunk=chunk
)
yield chunk.message
if generation is None:
generation = chunk
else:
generation += chunk
assert generation is not None
except BaseException as e:
run_manager.on_llm_error(
e,
response=LLMResult(
generations=[[generation]] if generation else []
),
)
raise e
else:
run_manager.on_llm_end(LLMResult(generations=[[generation]]))
[docs] async def astream(
self,
input: LanguageModelInput,
config: Optional[RunnableConfig] = None,
*,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> AsyncIterator[BaseMessageChunk]:
if (
type(self)._astream is BaseChatModel._astream
and type(self)._stream is BaseChatModel._stream
):
# No async or sync stream is implemented, so fall back to ainvoke
yield cast(
BaseMessageChunk,
await self.ainvoke(input, config=config, stop=stop, **kwargs),
)
return
config = ensure_config(config)
messages = self._convert_input(input).to_messages()
params = self._get_invocation_params(stop=stop, **kwargs)
options = {"stop": stop, **kwargs}
inheritable_metadata = {
**(config.get("metadata") or {}),
**self._get_ls_params(stop=stop, **kwargs),
}
callback_manager = AsyncCallbackManager.configure(
config.get("callbacks"),
self.callbacks,
self.verbose,
config.get("tags"),
self.tags,
inheritable_metadata,
self.metadata,
)
(run_manager,) = await callback_manager.on_chat_model_start(
dumpd(self),
[messages],
invocation_params=params,
options=options,
name=config.get("run_name"),
run_id=config.pop("run_id", None),
batch_size=1,
)
generation: Optional[ChatGenerationChunk] = None
try:
async for chunk in self._astream(
messages,
stop=stop,
**kwargs,
):
if chunk.message.id is None:
chunk.message.id = f"run-{run_manager.run_id}"
chunk.message.response_metadata = _gen_info_and_msg_metadata(chunk)
await run_manager.on_llm_new_token(
cast(str, chunk.message.content), chunk=chunk
)
yield chunk.message
if generation is None:
generation = chunk
else:
generation += chunk
assert generation is not None
except BaseException as e:
await run_manager.on_llm_error(
e,
response=LLMResult(generations=[[generation]] if generation else []),
)
raise e
else:
await run_manager.on_llm_end(
LLMResult(generations=[[generation]]),
)
# --- Custom methods ---
def _combine_llm_outputs(self, llm_outputs: List[Optional[dict]]) -> dict:
return {}
def _get_invocation_params(
self,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> dict:
params = self.dict()
params["stop"] = stop
return {**params, **kwargs}
def _get_ls_params(
self,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> LangSmithParams:
"""获取用于跟踪的标准参数。"""
ls_params = LangSmithParams(ls_model_type="chat")
if stop:
ls_params["ls_stop"] = stop
return ls_params
def _get_llm_string(self, stop: Optional[List[str]] = None, **kwargs: Any) -> str:
if self.is_lc_serializable():
params = {**kwargs, **{"stop": stop}}
param_string = str(sorted([(k, v) for k, v in params.items()]))
llm_string = dumps(self)
return llm_string + "---" + param_string
else:
params = self._get_invocation_params(stop=stop, **kwargs)
params = {**params, **kwargs}
return str(sorted([(k, v) for k, v in params.items()]))
[docs] def generate(
self,
messages: List[List[BaseMessage]],
stop: Optional[List[str]] = None,
callbacks: Callbacks = None,
*,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
run_name: Optional[str] = None,
run_id: Optional[uuid.UUID] = None,
**kwargs: Any,
) -> LLMResult:
"""将一系列提示传递给模型,并返回模型生成的结果。
对于公开批处理API的模型,此方法应该使用批处理调用。
在以下情况下使用此方法:
1. 利用批处理调用,
2. 需要从模型中获取更多输出,而不仅仅是顶部生成的值,
3. 正在构建对基础语言模型类型不可知的链式结构(例如,纯文本完成模型与聊天模型)。
参数:
messages:消息列表的列表。
stop:在生成时要使用的停止词。模型输出在出现这些子字符串的第一次出现时被截断。
callbacks:要传递的回调。用于在生成过程中执行额外功能,如日志记录或流式传输。
**kwargs:任意额外的关键字参数。通常这些参数会传递给模型提供者的API调用。
返回:
一个LLMResult,其中包含每个输入提示的候选生成列表以及额外的模型提供者特定输出。
"""
params = self._get_invocation_params(stop=stop, **kwargs)
options = {"stop": stop}
inheritable_metadata = {
**(metadata or {}),
**self._get_ls_params(stop=stop, **kwargs),
}
callback_manager = CallbackManager.configure(
callbacks,
self.callbacks,
self.verbose,
tags,
self.tags,
inheritable_metadata,
self.metadata,
)
run_managers = callback_manager.on_chat_model_start(
dumpd(self),
messages,
invocation_params=params,
options=options,
name=run_name,
run_id=run_id,
batch_size=len(messages),
)
results = []
for i, m in enumerate(messages):
try:
results.append(
self._generate_with_cache(
m,
stop=stop,
run_manager=run_managers[i] if run_managers else None,
**kwargs,
)
)
except BaseException as e:
if run_managers:
run_managers[i].on_llm_error(e, response=LLMResult(generations=[]))
raise e
flattened_outputs = [
LLMResult(generations=[res.generations], llm_output=res.llm_output) # type: ignore[list-item]
for res in results
]
llm_output = self._combine_llm_outputs([res.llm_output for res in results])
generations = [res.generations for res in results]
output = LLMResult(generations=generations, llm_output=llm_output) # type: ignore[arg-type]
if run_managers:
run_infos = []
for manager, flattened_output in zip(run_managers, flattened_outputs):
manager.on_llm_end(flattened_output)
run_infos.append(RunInfo(run_id=manager.run_id))
output.run = run_infos
return output
[docs] async def agenerate(
self,
messages: List[List[BaseMessage]],
stop: Optional[List[str]] = None,
callbacks: Callbacks = None,
*,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
run_name: Optional[str] = None,
run_id: Optional[uuid.UUID] = None,
**kwargs: Any,
) -> LLMResult:
"""将一个序列的提示异步传递给模型,并返回生成的结果。
该方法应该利用暴露批处理API的模型进行批处理调用。
在以下情况下使用此方法:
1. 利用批处理调用,
2. 需要模型生成的输出不仅仅是顶部生成的值,
3. 构建对底层语言模型类型不可知的链式结构(例如,纯文本完成模型与聊天模型)。
参数:
messages: 消息列表的列表。
stop: 生成时使用的停止词。模型输出在出现这些子字符串的第一次发生时被截断。
callbacks: 要传递的回调函数。用于在生成过程中执行额外的功能,如日志记录或流式传输。
**kwargs: 任意额外的关键字参数。这些通常传递给模型提供者的API调用。
返回:
一个LLMResult,其中包含每个输入提示的候选生成列表和额外的模型提供者特定输出。
"""
params = self._get_invocation_params(stop=stop, **kwargs)
options = {"stop": stop}
inheritable_metadata = {
**(metadata or {}),
**self._get_ls_params(stop=stop, **kwargs),
}
callback_manager = AsyncCallbackManager.configure(
callbacks,
self.callbacks,
self.verbose,
tags,
self.tags,
inheritable_metadata,
self.metadata,
)
run_managers = await callback_manager.on_chat_model_start(
dumpd(self),
messages,
invocation_params=params,
options=options,
name=run_name,
batch_size=len(messages),
run_id=run_id,
)
results = await asyncio.gather(
*[
self._agenerate_with_cache(
m,
stop=stop,
run_manager=run_managers[i] if run_managers else None,
**kwargs,
)
for i, m in enumerate(messages)
],
return_exceptions=True,
)
exceptions = []
for i, res in enumerate(results):
if isinstance(res, BaseException):
if run_managers:
await run_managers[i].on_llm_error(
res, response=LLMResult(generations=[])
)
exceptions.append(res)
if exceptions:
if run_managers:
await asyncio.gather(
*[
run_manager.on_llm_end(
LLMResult(
generations=[res.generations], # type: ignore[list-item, union-attr]
llm_output=res.llm_output, # type: ignore[list-item, union-attr]
)
)
for run_manager, res in zip(run_managers, results)
if not isinstance(res, Exception)
]
)
raise exceptions[0]
flattened_outputs = [
LLMResult(generations=[res.generations], llm_output=res.llm_output) # type: ignore[list-item, union-attr]
for res in results
]
llm_output = self._combine_llm_outputs([res.llm_output for res in results]) # type: ignore[union-attr]
generations = [res.generations for res in results] # type: ignore[union-attr]
output = LLMResult(generations=generations, llm_output=llm_output) # type: ignore[arg-type]
await asyncio.gather(
*[
run_manager.on_llm_end(flattened_output)
for run_manager, flattened_output in zip(
run_managers, flattened_outputs
)
]
)
if run_managers:
output.run = [
RunInfo(run_id=run_manager.run_id) for run_manager in run_managers
]
return output
[docs] def generate_prompt(
self,
prompts: List[PromptValue],
stop: Optional[List[str]] = None,
callbacks: Callbacks = None,
**kwargs: Any,
) -> LLMResult:
prompt_messages = [p.to_messages() for p in prompts]
return self.generate(prompt_messages, stop=stop, callbacks=callbacks, **kwargs)
[docs] async def agenerate_prompt(
self,
prompts: List[PromptValue],
stop: Optional[List[str]] = None,
callbacks: Callbacks = None,
**kwargs: Any,
) -> LLMResult:
prompt_messages = [p.to_messages() for p in prompts]
return await self.agenerate(
prompt_messages, stop=stop, callbacks=callbacks, **kwargs
)
def _generate_with_cache(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
if isinstance(self.cache, BaseCache):
llm_cache = self.cache
else:
llm_cache = get_llm_cache()
# We should check the cache unless it's explicitly set to False
# A None cache means we should use the default global cache
# if it's configured.
check_cache = self.cache or self.cache is None
if check_cache:
if llm_cache:
llm_string = self._get_llm_string(stop=stop, **kwargs)
prompt = dumps(messages)
cache_val = llm_cache.lookup(prompt, llm_string)
if isinstance(cache_val, list):
return ChatResult(generations=cache_val)
elif self.cache is None:
pass
else:
raise ValueError(
"Asked to cache, but no cache found at `langchain.cache`."
)
# If stream is not explicitly set, check if implicitly requested by
# astream_events() or astream_log(). Bail out if _stream not implemented
if type(self)._stream != BaseChatModel._stream and kwargs.pop(
"stream",
(
next(
(
True
for h in run_manager.handlers
if isinstance(h, _StreamingCallbackHandler)
),
False,
)
if run_manager
else False
),
):
chunks: List[ChatGenerationChunk] = []
for chunk in self._stream(messages, stop=stop, **kwargs):
chunk.message.response_metadata = _gen_info_and_msg_metadata(chunk)
if run_manager:
if chunk.message.id is None:
chunk.message.id = f"run-{run_manager.run_id}"
run_manager.on_llm_new_token(
cast(str, chunk.message.content), chunk=chunk
)
chunks.append(chunk)
result = generate_from_stream(iter(chunks))
else:
if inspect.signature(self._generate).parameters.get("run_manager"):
result = self._generate(
messages, stop=stop, run_manager=run_manager, **kwargs
)
else:
result = self._generate(messages, stop=stop, **kwargs)
# Add response metadata to each generation
for idx, generation in enumerate(result.generations):
if run_manager and generation.message.id is None:
generation.message.id = f"run-{run_manager.run_id}-{idx}"
generation.message.response_metadata = _gen_info_and_msg_metadata(
generation
)
if len(result.generations) == 1 and result.llm_output is not None:
result.generations[0].message.response_metadata = {
**result.llm_output,
**result.generations[0].message.response_metadata,
}
if check_cache and llm_cache:
llm_cache.update(prompt, llm_string, result.generations)
return result
async def _agenerate_with_cache(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
if isinstance(self.cache, BaseCache):
llm_cache = self.cache
else:
llm_cache = get_llm_cache()
# We should check the cache unless it's explicitly set to False
# A None cache means we should use the default global cache
# if it's configured.
check_cache = self.cache or self.cache is None
if check_cache:
if llm_cache:
llm_string = self._get_llm_string(stop=stop, **kwargs)
prompt = dumps(messages)
cache_val = await llm_cache.alookup(prompt, llm_string)
if isinstance(cache_val, list):
return ChatResult(generations=cache_val)
elif self.cache is None:
pass
else:
raise ValueError(
"Asked to cache, but no cache found at `langchain.cache`."
)
# If stream is not explicitly set, check if implicitly requested by
# astream_events() or astream_log(). Bail out if _astream not implemented
if (
type(self)._astream != BaseChatModel._astream
or type(self)._stream != BaseChatModel._stream
) and kwargs.pop(
"stream",
(
next(
(
True
for h in run_manager.handlers
if isinstance(h, _StreamingCallbackHandler)
),
False,
)
if run_manager
else False
),
):
chunks: List[ChatGenerationChunk] = []
async for chunk in self._astream(messages, stop=stop, **kwargs):
chunk.message.response_metadata = _gen_info_and_msg_metadata(chunk)
if run_manager:
if chunk.message.id is None:
chunk.message.id = f"run-{run_manager.run_id}"
await run_manager.on_llm_new_token(
cast(str, chunk.message.content), chunk=chunk
)
chunks.append(chunk)
result = generate_from_stream(iter(chunks))
else:
if inspect.signature(self._agenerate).parameters.get("run_manager"):
result = await self._agenerate(
messages, stop=stop, run_manager=run_manager, **kwargs
)
else:
result = await self._agenerate(messages, stop=stop, **kwargs)
# Add response metadata to each generation
for idx, generation in enumerate(result.generations):
if run_manager and generation.message.id is None:
generation.message.id = f"run-{run_manager.run_id}-{idx}"
generation.message.response_metadata = _gen_info_and_msg_metadata(
generation
)
if len(result.generations) == 1 and result.llm_output is not None:
result.generations[0].message.response_metadata = {
**result.llm_output,
**result.generations[0].message.response_metadata,
}
if check_cache and llm_cache:
await llm_cache.aupdate(prompt, llm_string, result.generations)
return result
@abstractmethod
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
"""顶层调用"""
async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
"""顶层调用"""
return await run_in_executor(
None,
self._generate,
messages,
stop,
run_manager.get_sync() if run_manager else None,
**kwargs,
)
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
raise NotImplementedError()
async def _astream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
iterator = await run_in_executor(
None,
self._stream,
messages,
stop,
run_manager.get_sync() if run_manager else None,
**kwargs,
)
done = object()
while True:
item = await run_in_executor(
None,
next,
iterator,
done, # type: ignore[call-arg, arg-type]
)
if item is done:
break
yield item # type: ignore[misc]
[docs] @deprecated("0.1.7", alternative="invoke", removal="0.3.0")
def __call__(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
callbacks: Callbacks = None,
**kwargs: Any,
) -> BaseMessage:
generation = self.generate(
[messages], stop=stop, callbacks=callbacks, **kwargs
).generations[0][0]
if isinstance(generation, ChatGeneration):
return generation.message
else:
raise ValueError("Unexpected generation type")
async def _call_async(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
callbacks: Callbacks = None,
**kwargs: Any,
) -> BaseMessage:
result = await self.agenerate(
[messages], stop=stop, callbacks=callbacks, **kwargs
)
generation = result.generations[0][0]
if isinstance(generation, ChatGeneration):
return generation.message
else:
raise ValueError("Unexpected generation type")
[docs] @deprecated("0.1.7", alternative="invoke", removal="0.3.0")
def call_as_llm(
self, message: str, stop: Optional[List[str]] = None, **kwargs: Any
) -> str:
return self.predict(message, stop=stop, **kwargs)
[docs] @deprecated("0.1.7", alternative="invoke", removal="0.3.0")
def predict(
self, text: str, *, stop: Optional[Sequence[str]] = None, **kwargs: Any
) -> str:
if stop is None:
_stop = None
else:
_stop = list(stop)
result = self([HumanMessage(content=text)], stop=_stop, **kwargs)
if isinstance(result.content, str):
return result.content
else:
raise ValueError("Cannot use predict when output is not a string.")
[docs] @deprecated("0.1.7", alternative="invoke", removal="0.3.0")
def predict_messages(
self,
messages: List[BaseMessage],
*,
stop: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> BaseMessage:
if stop is None:
_stop = None
else:
_stop = list(stop)
return self(messages, stop=_stop, **kwargs)
[docs] @deprecated("0.1.7", alternative="ainvoke", removal="0.3.0")
async def apredict(
self, text: str, *, stop: Optional[Sequence[str]] = None, **kwargs: Any
) -> str:
if stop is None:
_stop = None
else:
_stop = list(stop)
result = await self._call_async(
[HumanMessage(content=text)], stop=_stop, **kwargs
)
if isinstance(result.content, str):
return result.content
else:
raise ValueError("Cannot use predict when output is not a string.")
[docs] @deprecated("0.1.7", alternative="ainvoke", removal="0.3.0")
async def apredict_messages(
self,
messages: List[BaseMessage],
*,
stop: Optional[Sequence[str]] = None,
**kwargs: Any,
) -> BaseMessage:
if stop is None:
_stop = None
else:
_stop = list(stop)
return await self._call_async(messages, stop=_stop, **kwargs)
@property
@abstractmethod
def _llm_type(self) -> str:
"""聊天模型的返回类型。"""
[docs] def dict(self, **kwargs: Any) -> Dict:
"""返回LLM的字典。"""
starter_dict = dict(self._identifying_params)
starter_dict["_type"] = self._llm_type
return starter_dict
[docs]class SimpleChatModel(BaseChatModel):
"""简化的聊天模型实现,供继承使用。"""
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
output_str = self._call(messages, stop=stop, run_manager=run_manager, **kwargs)
message = AIMessage(content=output_str)
generation = ChatGeneration(message=message)
return ChatResult(generations=[generation])
@abstractmethod
def _call(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""更简单的接口。"""
async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
return await run_in_executor(
None,
self._generate,
messages,
stop=stop,
run_manager=run_manager.get_sync() if run_manager else None,
**kwargs,
)
def _gen_info_and_msg_metadata(
generation: Union[ChatGeneration, ChatGenerationChunk],
) -> dict:
return {
**(generation.generation_info or {}),
**generation.message.response_metadata,
}