Source code for langchain_core.language_models.chat_models

from __future__ import annotations

import asyncio
import inspect
import uuid
import warnings
from abc import ABC, abstractmethod
from typing import (
    TYPE_CHECKING,
    Any,
    AsyncIterator,
    Callable,
    Dict,
    Iterator,
    List,
    Literal,
    Optional,
    Sequence,
    Type,
    Union,
    cast,
)

from typing_extensions import TypedDict

from langchain_core._api import deprecated
from langchain_core.caches import BaseCache
from langchain_core.callbacks import (
    AsyncCallbackManager,
    AsyncCallbackManagerForLLMRun,
    BaseCallbackManager,
    CallbackManager,
    CallbackManagerForLLMRun,
    Callbacks,
)
from langchain_core.globals import get_llm_cache
from langchain_core.language_models.base import BaseLanguageModel, LanguageModelInput
from langchain_core.load import dumpd, dumps
from langchain_core.messages import (
    AIMessage,
    AnyMessage,
    BaseMessage,
    BaseMessageChunk,
    HumanMessage,
    convert_to_messages,
    message_chunk_to_message,
)
from langchain_core.outputs import (
    ChatGeneration,
    ChatGenerationChunk,
    ChatResult,
    LLMResult,
    RunInfo,
)
from langchain_core.prompt_values import ChatPromptValue, PromptValue, StringPromptValue
from langchain_core.pydantic_v1 import Field, root_validator
from langchain_core.runnables.config import ensure_config, run_in_executor
from langchain_core.tracers._streaming import _StreamingCallbackHandler

if TYPE_CHECKING:
    from langchain_core.pydantic_v1 import BaseModel
    from langchain_core.runnables import Runnable, RunnableConfig
    from langchain_core.tools import BaseTool


[docs]class LangSmithParams(TypedDict, total=False): ls_provider: str ls_model_name: str ls_model_type: Literal["chat"] ls_temperature: Optional[float] ls_max_tokens: Optional[int] ls_stop: Optional[List[str]]
[docs]def generate_from_stream(stream: Iterator[ChatGenerationChunk]) -> ChatResult: """从流中生成。""" generation: Optional[ChatGenerationChunk] = None for chunk in stream: if generation is None: generation = chunk else: generation += chunk assert generation is not None return ChatResult( generations=[ ChatGeneration( message=message_chunk_to_message(generation.message), generation_info=generation.generation_info, ) ] )
[docs]async def agenerate_from_stream( stream: AsyncIterator[ChatGenerationChunk], ) -> ChatResult: """异步从流中生成数据。""" generation: Optional[ChatGenerationChunk] = None async for chunk in stream: if generation is None: generation = chunk else: generation += chunk assert generation is not None return ChatResult( generations=[ ChatGeneration( message=message_chunk_to_message(generation.message), generation_info=generation.generation_info, ) ] )
[docs]class BaseChatModel(BaseLanguageModel[BaseMessage], ABC): """用于聊天模型的基类。""" callback_manager: Optional[BaseCallbackManager] = Field(default=None, exclude=True) """[已弃用] 添加到运行跟踪的回调管理器。""" @root_validator() def raise_deprecation(cls, values: Dict) -> Dict: """如果使用callback_manager,则发出弃用警告。""" if values.get("callback_manager") is not None: warnings.warn( "callback_manager is deprecated. Please use callbacks instead.", DeprecationWarning, ) values["callbacks"] = values.pop("callback_manager", None) return values class Config: """这个pydantic对象的配置。""" arbitrary_types_allowed = True # --- Runnable methods --- @property def OutputType(self) -> Any: """获取此可运行程序的输出类型。""" return AnyMessage def _convert_input(self, input: LanguageModelInput) -> PromptValue: if isinstance(input, PromptValue): return input elif isinstance(input, str): return StringPromptValue(text=input) elif isinstance(input, Sequence): return ChatPromptValue(messages=convert_to_messages(input)) else: raise ValueError( f"Invalid input type {type(input)}. " "Must be a PromptValue, str, or list of BaseMessages." )
[docs] def invoke( self, input: LanguageModelInput, config: Optional[RunnableConfig] = None, *, stop: Optional[List[str]] = None, **kwargs: Any, ) -> BaseMessage: config = ensure_config(config) return cast( ChatGeneration, self.generate_prompt( [self._convert_input(input)], stop=stop, callbacks=config.get("callbacks"), tags=config.get("tags"), metadata=config.get("metadata"), run_name=config.get("run_name"), run_id=config.pop("run_id", None), **kwargs, ).generations[0][0], ).message
[docs] async def ainvoke( self, input: LanguageModelInput, config: Optional[RunnableConfig] = None, *, stop: Optional[List[str]] = None, **kwargs: Any, ) -> BaseMessage: config = ensure_config(config) llm_result = await self.agenerate_prompt( [self._convert_input(input)], stop=stop, callbacks=config.get("callbacks"), tags=config.get("tags"), metadata=config.get("metadata"), run_name=config.get("run_name"), run_id=config.pop("run_id", None), **kwargs, ) return cast(ChatGeneration, llm_result.generations[0][0]).message
[docs] def stream( self, input: LanguageModelInput, config: Optional[RunnableConfig] = None, *, stop: Optional[List[str]] = None, **kwargs: Any, ) -> Iterator[BaseMessageChunk]: if type(self)._stream == BaseChatModel._stream: # model doesn't implement streaming, so use default implementation yield cast( BaseMessageChunk, self.invoke(input, config=config, stop=stop, **kwargs) ) else: config = ensure_config(config) messages = self._convert_input(input).to_messages() params = self._get_invocation_params(stop=stop, **kwargs) options = {"stop": stop, **kwargs} inheritable_metadata = { **(config.get("metadata") or {}), **self._get_ls_params(stop=stop, **kwargs), } callback_manager = CallbackManager.configure( config.get("callbacks"), self.callbacks, self.verbose, config.get("tags"), self.tags, inheritable_metadata, self.metadata, ) (run_manager,) = callback_manager.on_chat_model_start( dumpd(self), [messages], invocation_params=params, options=options, name=config.get("run_name"), run_id=config.pop("run_id", None), batch_size=1, ) generation: Optional[ChatGenerationChunk] = None try: for chunk in self._stream(messages, stop=stop, **kwargs): if chunk.message.id is None: chunk.message.id = f"run-{run_manager.run_id}" chunk.message.response_metadata = _gen_info_and_msg_metadata(chunk) run_manager.on_llm_new_token( cast(str, chunk.message.content), chunk=chunk ) yield chunk.message if generation is None: generation = chunk else: generation += chunk assert generation is not None except BaseException as e: run_manager.on_llm_error( e, response=LLMResult( generations=[[generation]] if generation else [] ), ) raise e else: run_manager.on_llm_end(LLMResult(generations=[[generation]]))
[docs] async def astream( self, input: LanguageModelInput, config: Optional[RunnableConfig] = None, *, stop: Optional[List[str]] = None, **kwargs: Any, ) -> AsyncIterator[BaseMessageChunk]: if ( type(self)._astream is BaseChatModel._astream and type(self)._stream is BaseChatModel._stream ): # No async or sync stream is implemented, so fall back to ainvoke yield cast( BaseMessageChunk, await self.ainvoke(input, config=config, stop=stop, **kwargs), ) return config = ensure_config(config) messages = self._convert_input(input).to_messages() params = self._get_invocation_params(stop=stop, **kwargs) options = {"stop": stop, **kwargs} inheritable_metadata = { **(config.get("metadata") or {}), **self._get_ls_params(stop=stop, **kwargs), } callback_manager = AsyncCallbackManager.configure( config.get("callbacks"), self.callbacks, self.verbose, config.get("tags"), self.tags, inheritable_metadata, self.metadata, ) (run_manager,) = await callback_manager.on_chat_model_start( dumpd(self), [messages], invocation_params=params, options=options, name=config.get("run_name"), run_id=config.pop("run_id", None), batch_size=1, ) generation: Optional[ChatGenerationChunk] = None try: async for chunk in self._astream( messages, stop=stop, **kwargs, ): if chunk.message.id is None: chunk.message.id = f"run-{run_manager.run_id}" chunk.message.response_metadata = _gen_info_and_msg_metadata(chunk) await run_manager.on_llm_new_token( cast(str, chunk.message.content), chunk=chunk ) yield chunk.message if generation is None: generation = chunk else: generation += chunk assert generation is not None except BaseException as e: await run_manager.on_llm_error( e, response=LLMResult(generations=[[generation]] if generation else []), ) raise e else: await run_manager.on_llm_end( LLMResult(generations=[[generation]]), )
# --- Custom methods --- def _combine_llm_outputs(self, llm_outputs: List[Optional[dict]]) -> dict: return {} def _get_invocation_params( self, stop: Optional[List[str]] = None, **kwargs: Any, ) -> dict: params = self.dict() params["stop"] = stop return {**params, **kwargs} def _get_ls_params( self, stop: Optional[List[str]] = None, **kwargs: Any, ) -> LangSmithParams: """获取用于跟踪的标准参数。""" ls_params = LangSmithParams(ls_model_type="chat") if stop: ls_params["ls_stop"] = stop return ls_params def _get_llm_string(self, stop: Optional[List[str]] = None, **kwargs: Any) -> str: if self.is_lc_serializable(): params = {**kwargs, **{"stop": stop}} param_string = str(sorted([(k, v) for k, v in params.items()])) llm_string = dumps(self) return llm_string + "---" + param_string else: params = self._get_invocation_params(stop=stop, **kwargs) params = {**params, **kwargs} return str(sorted([(k, v) for k, v in params.items()]))
[docs] def generate( self, messages: List[List[BaseMessage]], stop: Optional[List[str]] = None, callbacks: Callbacks = None, *, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, run_id: Optional[uuid.UUID] = None, **kwargs: Any, ) -> LLMResult: """将一系列提示传递给模型,并返回模型生成的结果。 对于公开批处理API的模型,此方法应该使用批处理调用。 在以下情况下使用此方法: 1. 利用批处理调用, 2. 需要从模型中获取更多输出,而不仅仅是顶部生成的值, 3. 正在构建对基础语言模型类型不可知的链式结构(例如,纯文本完成模型与聊天模型)。 参数: messages:消息列表的列表。 stop:在生成时要使用的停止词。模型输出在出现这些子字符串的第一次出现时被截断。 callbacks:要传递的回调。用于在生成过程中执行额外功能,如日志记录或流式传输。 **kwargs:任意额外的关键字参数。通常这些参数会传递给模型提供者的API调用。 返回: 一个LLMResult,其中包含每个输入提示的候选生成列表以及额外的模型提供者特定输出。 """ params = self._get_invocation_params(stop=stop, **kwargs) options = {"stop": stop} inheritable_metadata = { **(metadata or {}), **self._get_ls_params(stop=stop, **kwargs), } callback_manager = CallbackManager.configure( callbacks, self.callbacks, self.verbose, tags, self.tags, inheritable_metadata, self.metadata, ) run_managers = callback_manager.on_chat_model_start( dumpd(self), messages, invocation_params=params, options=options, name=run_name, run_id=run_id, batch_size=len(messages), ) results = [] for i, m in enumerate(messages): try: results.append( self._generate_with_cache( m, stop=stop, run_manager=run_managers[i] if run_managers else None, **kwargs, ) ) except BaseException as e: if run_managers: run_managers[i].on_llm_error(e, response=LLMResult(generations=[])) raise e flattened_outputs = [ LLMResult(generations=[res.generations], llm_output=res.llm_output) # type: ignore[list-item] for res in results ] llm_output = self._combine_llm_outputs([res.llm_output for res in results]) generations = [res.generations for res in results] output = LLMResult(generations=generations, llm_output=llm_output) # type: ignore[arg-type] if run_managers: run_infos = [] for manager, flattened_output in zip(run_managers, flattened_outputs): manager.on_llm_end(flattened_output) run_infos.append(RunInfo(run_id=manager.run_id)) output.run = run_infos return output
[docs] async def agenerate( self, messages: List[List[BaseMessage]], stop: Optional[List[str]] = None, callbacks: Callbacks = None, *, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, run_name: Optional[str] = None, run_id: Optional[uuid.UUID] = None, **kwargs: Any, ) -> LLMResult: """将一个序列的提示异步传递给模型,并返回生成的结果。 该方法应该利用暴露批处理API的模型进行批处理调用。 在以下情况下使用此方法: 1. 利用批处理调用, 2. 需要模型生成的输出不仅仅是顶部生成的值, 3. 构建对底层语言模型类型不可知的链式结构(例如,纯文本完成模型与聊天模型)。 参数: messages: 消息列表的列表。 stop: 生成时使用的停止词。模型输出在出现这些子字符串的第一次发生时被截断。 callbacks: 要传递的回调函数。用于在生成过程中执行额外的功能,如日志记录或流式传输。 **kwargs: 任意额外的关键字参数。这些通常传递给模型提供者的API调用。 返回: 一个LLMResult,其中包含每个输入提示的候选生成列表和额外的模型提供者特定输出。 """ params = self._get_invocation_params(stop=stop, **kwargs) options = {"stop": stop} inheritable_metadata = { **(metadata or {}), **self._get_ls_params(stop=stop, **kwargs), } callback_manager = AsyncCallbackManager.configure( callbacks, self.callbacks, self.verbose, tags, self.tags, inheritable_metadata, self.metadata, ) run_managers = await callback_manager.on_chat_model_start( dumpd(self), messages, invocation_params=params, options=options, name=run_name, batch_size=len(messages), run_id=run_id, ) results = await asyncio.gather( *[ self._agenerate_with_cache( m, stop=stop, run_manager=run_managers[i] if run_managers else None, **kwargs, ) for i, m in enumerate(messages) ], return_exceptions=True, ) exceptions = [] for i, res in enumerate(results): if isinstance(res, BaseException): if run_managers: await run_managers[i].on_llm_error( res, response=LLMResult(generations=[]) ) exceptions.append(res) if exceptions: if run_managers: await asyncio.gather( *[ run_manager.on_llm_end( LLMResult( generations=[res.generations], # type: ignore[list-item, union-attr] llm_output=res.llm_output, # type: ignore[list-item, union-attr] ) ) for run_manager, res in zip(run_managers, results) if not isinstance(res, Exception) ] ) raise exceptions[0] flattened_outputs = [ LLMResult(generations=[res.generations], llm_output=res.llm_output) # type: ignore[list-item, union-attr] for res in results ] llm_output = self._combine_llm_outputs([res.llm_output for res in results]) # type: ignore[union-attr] generations = [res.generations for res in results] # type: ignore[union-attr] output = LLMResult(generations=generations, llm_output=llm_output) # type: ignore[arg-type] await asyncio.gather( *[ run_manager.on_llm_end(flattened_output) for run_manager, flattened_output in zip( run_managers, flattened_outputs ) ] ) if run_managers: output.run = [ RunInfo(run_id=run_manager.run_id) for run_manager in run_managers ] return output
[docs] def generate_prompt( self, prompts: List[PromptValue], stop: Optional[List[str]] = None, callbacks: Callbacks = None, **kwargs: Any, ) -> LLMResult: prompt_messages = [p.to_messages() for p in prompts] return self.generate(prompt_messages, stop=stop, callbacks=callbacks, **kwargs)
[docs] async def agenerate_prompt( self, prompts: List[PromptValue], stop: Optional[List[str]] = None, callbacks: Callbacks = None, **kwargs: Any, ) -> LLMResult: prompt_messages = [p.to_messages() for p in prompts] return await self.agenerate( prompt_messages, stop=stop, callbacks=callbacks, **kwargs )
def _generate_with_cache( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> ChatResult: if isinstance(self.cache, BaseCache): llm_cache = self.cache else: llm_cache = get_llm_cache() # We should check the cache unless it's explicitly set to False # A None cache means we should use the default global cache # if it's configured. check_cache = self.cache or self.cache is None if check_cache: if llm_cache: llm_string = self._get_llm_string(stop=stop, **kwargs) prompt = dumps(messages) cache_val = llm_cache.lookup(prompt, llm_string) if isinstance(cache_val, list): return ChatResult(generations=cache_val) elif self.cache is None: pass else: raise ValueError( "Asked to cache, but no cache found at `langchain.cache`." ) # If stream is not explicitly set, check if implicitly requested by # astream_events() or astream_log(). Bail out if _stream not implemented if type(self)._stream != BaseChatModel._stream and kwargs.pop( "stream", ( next( ( True for h in run_manager.handlers if isinstance(h, _StreamingCallbackHandler) ), False, ) if run_manager else False ), ): chunks: List[ChatGenerationChunk] = [] for chunk in self._stream(messages, stop=stop, **kwargs): chunk.message.response_metadata = _gen_info_and_msg_metadata(chunk) if run_manager: if chunk.message.id is None: chunk.message.id = f"run-{run_manager.run_id}" run_manager.on_llm_new_token( cast(str, chunk.message.content), chunk=chunk ) chunks.append(chunk) result = generate_from_stream(iter(chunks)) else: if inspect.signature(self._generate).parameters.get("run_manager"): result = self._generate( messages, stop=stop, run_manager=run_manager, **kwargs ) else: result = self._generate(messages, stop=stop, **kwargs) # Add response metadata to each generation for idx, generation in enumerate(result.generations): if run_manager and generation.message.id is None: generation.message.id = f"run-{run_manager.run_id}-{idx}" generation.message.response_metadata = _gen_info_and_msg_metadata( generation ) if len(result.generations) == 1 and result.llm_output is not None: result.generations[0].message.response_metadata = { **result.llm_output, **result.generations[0].message.response_metadata, } if check_cache and llm_cache: llm_cache.update(prompt, llm_string, result.generations) return result async def _agenerate_with_cache( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> ChatResult: if isinstance(self.cache, BaseCache): llm_cache = self.cache else: llm_cache = get_llm_cache() # We should check the cache unless it's explicitly set to False # A None cache means we should use the default global cache # if it's configured. check_cache = self.cache or self.cache is None if check_cache: if llm_cache: llm_string = self._get_llm_string(stop=stop, **kwargs) prompt = dumps(messages) cache_val = await llm_cache.alookup(prompt, llm_string) if isinstance(cache_val, list): return ChatResult(generations=cache_val) elif self.cache is None: pass else: raise ValueError( "Asked to cache, but no cache found at `langchain.cache`." ) # If stream is not explicitly set, check if implicitly requested by # astream_events() or astream_log(). Bail out if _astream not implemented if ( type(self)._astream != BaseChatModel._astream or type(self)._stream != BaseChatModel._stream ) and kwargs.pop( "stream", ( next( ( True for h in run_manager.handlers if isinstance(h, _StreamingCallbackHandler) ), False, ) if run_manager else False ), ): chunks: List[ChatGenerationChunk] = [] async for chunk in self._astream(messages, stop=stop, **kwargs): chunk.message.response_metadata = _gen_info_and_msg_metadata(chunk) if run_manager: if chunk.message.id is None: chunk.message.id = f"run-{run_manager.run_id}" await run_manager.on_llm_new_token( cast(str, chunk.message.content), chunk=chunk ) chunks.append(chunk) result = generate_from_stream(iter(chunks)) else: if inspect.signature(self._agenerate).parameters.get("run_manager"): result = await self._agenerate( messages, stop=stop, run_manager=run_manager, **kwargs ) else: result = await self._agenerate(messages, stop=stop, **kwargs) # Add response metadata to each generation for idx, generation in enumerate(result.generations): if run_manager and generation.message.id is None: generation.message.id = f"run-{run_manager.run_id}-{idx}" generation.message.response_metadata = _gen_info_and_msg_metadata( generation ) if len(result.generations) == 1 and result.llm_output is not None: result.generations[0].message.response_metadata = { **result.llm_output, **result.generations[0].message.response_metadata, } if check_cache and llm_cache: await llm_cache.aupdate(prompt, llm_string, result.generations) return result @abstractmethod def _generate( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> ChatResult: """顶层调用""" async def _agenerate( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> ChatResult: """顶层调用""" return await run_in_executor( None, self._generate, messages, stop, run_manager.get_sync() if run_manager else None, **kwargs, ) def _stream( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Iterator[ChatGenerationChunk]: raise NotImplementedError() async def _astream( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> AsyncIterator[ChatGenerationChunk]: iterator = await run_in_executor( None, self._stream, messages, stop, run_manager.get_sync() if run_manager else None, **kwargs, ) done = object() while True: item = await run_in_executor( None, next, iterator, done, # type: ignore[call-arg, arg-type] ) if item is done: break yield item # type: ignore[misc]
[docs] @deprecated("0.1.7", alternative="invoke", removal="0.3.0") def __call__( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, callbacks: Callbacks = None, **kwargs: Any, ) -> BaseMessage: generation = self.generate( [messages], stop=stop, callbacks=callbacks, **kwargs ).generations[0][0] if isinstance(generation, ChatGeneration): return generation.message else: raise ValueError("Unexpected generation type")
async def _call_async( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, callbacks: Callbacks = None, **kwargs: Any, ) -> BaseMessage: result = await self.agenerate( [messages], stop=stop, callbacks=callbacks, **kwargs ) generation = result.generations[0][0] if isinstance(generation, ChatGeneration): return generation.message else: raise ValueError("Unexpected generation type")
[docs] @deprecated("0.1.7", alternative="invoke", removal="0.3.0") def call_as_llm( self, message: str, stop: Optional[List[str]] = None, **kwargs: Any ) -> str: return self.predict(message, stop=stop, **kwargs)
[docs] @deprecated("0.1.7", alternative="invoke", removal="0.3.0") def predict( self, text: str, *, stop: Optional[Sequence[str]] = None, **kwargs: Any ) -> str: if stop is None: _stop = None else: _stop = list(stop) result = self([HumanMessage(content=text)], stop=_stop, **kwargs) if isinstance(result.content, str): return result.content else: raise ValueError("Cannot use predict when output is not a string.")
[docs] @deprecated("0.1.7", alternative="invoke", removal="0.3.0") def predict_messages( self, messages: List[BaseMessage], *, stop: Optional[Sequence[str]] = None, **kwargs: Any, ) -> BaseMessage: if stop is None: _stop = None else: _stop = list(stop) return self(messages, stop=_stop, **kwargs)
[docs] @deprecated("0.1.7", alternative="ainvoke", removal="0.3.0") async def apredict( self, text: str, *, stop: Optional[Sequence[str]] = None, **kwargs: Any ) -> str: if stop is None: _stop = None else: _stop = list(stop) result = await self._call_async( [HumanMessage(content=text)], stop=_stop, **kwargs ) if isinstance(result.content, str): return result.content else: raise ValueError("Cannot use predict when output is not a string.")
[docs] @deprecated("0.1.7", alternative="ainvoke", removal="0.3.0") async def apredict_messages( self, messages: List[BaseMessage], *, stop: Optional[Sequence[str]] = None, **kwargs: Any, ) -> BaseMessage: if stop is None: _stop = None else: _stop = list(stop) return await self._call_async(messages, stop=_stop, **kwargs)
@property @abstractmethod def _llm_type(self) -> str: """聊天模型的返回类型。"""
[docs] def dict(self, **kwargs: Any) -> Dict: """返回LLM的字典。""" starter_dict = dict(self._identifying_params) starter_dict["_type"] = self._llm_type return starter_dict
[docs] def bind_tools( self, tools: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool]], **kwargs: Any, ) -> Runnable[LanguageModelInput, BaseMessage]: raise NotImplementedError()
[docs]class SimpleChatModel(BaseChatModel): """简化的聊天模型实现,供继承使用。""" def _generate( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> ChatResult: output_str = self._call(messages, stop=stop, run_manager=run_manager, **kwargs) message = AIMessage(content=output_str) generation = ChatGeneration(message=message) return ChatResult(generations=[generation]) @abstractmethod def _call( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: """更简单的接口。""" async def _agenerate( self, messages: List[BaseMessage], stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> ChatResult: return await run_in_executor( None, self._generate, messages, stop=stop, run_manager=run_manager.get_sync() if run_manager else None, **kwargs, )
def _gen_info_and_msg_metadata( generation: Union[ChatGeneration, ChatGenerationChunk], ) -> dict: return { **(generation.generation_info or {}), **generation.message.response_metadata, }