from __future__ import annotations
import json
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Optional
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
BaseMessage,
message_to_dict,
messages_from_dict,
)
from langchain_core.utils import get_from_env
if TYPE_CHECKING:
import momento
def _ensure_cache_exists(cache_client: momento.CacheClient, cache_name: str) -> None:
"""如果缓存不存在,则创建缓存。
抛出:
SdkException:Momento 服务或网络错误
Exception:意外响应
"""
from momento.responses import CreateCache
create_cache_response = cache_client.create_cache(cache_name)
if isinstance(create_cache_response, CreateCache.Success) or isinstance(
create_cache_response, CreateCache.CacheAlreadyExists
):
return None
elif isinstance(create_cache_response, CreateCache.Error):
raise create_cache_response.inner_exception
else:
raise Exception(f"Unexpected response cache creation: {create_cache_response}")
[docs]class MomentoChatMessageHistory(BaseChatMessageHistory):
"""聊天消息历史记录缓存,使用Momento作为后端。
查看https://gomomento.com/"""
[docs] def __init__(
self,
session_id: str,
cache_client: momento.CacheClient,
cache_name: str,
*,
key_prefix: str = "message_store:",
ttl: Optional[timedelta] = None,
ensure_cache_exists: bool = True,
):
"""实例化一个使用Momento作为后端的聊天消息历史记录缓存。
注意:要实例化传递给MomentoChatMessageHistory的缓存客户端,您必须在https://gomomento.com/拥有一个Momento账户。
参数:
session_id (str):用于此聊天会话的会话ID。
cache_client (CacheClient):Momento缓存客户端。
cache_name (str):用于存储消息的缓存的名称。
key_prefix (str, optional):要应用于缓存键的前缀。
默认为"message_store:"。
ttl (Optional[timedelta], optional):用于消息的TTL。
默认为None,即将使用缓存的默认TTL。
ensure_cache_exists (bool, optional):如果缓存不存在则创建缓存。
默认为True。
引发:
ImportError:Momento Python包未安装。
TypeError:cache_client不是momento.CacheClientObject类型。
"""
try:
from momento import CacheClient
from momento.requests import CollectionTtl
except ImportError:
raise ImportError(
"Could not import momento python package. "
"Please install it with `pip install momento`."
)
if not isinstance(cache_client, CacheClient):
raise TypeError("cache_client must be a momento.CacheClient object.")
if ensure_cache_exists:
_ensure_cache_exists(cache_client, cache_name)
self.key = key_prefix + session_id
self.cache_client = cache_client
self.cache_name = cache_name
if ttl is not None:
self.ttl = CollectionTtl.of(ttl)
else:
self.ttl = CollectionTtl.from_cache_ttl()
[docs] @classmethod
def from_client_params(
cls,
session_id: str,
cache_name: str,
ttl: timedelta,
*,
configuration: Optional[momento.config.Configuration] = None,
api_key: Optional[str] = None,
auth_token: Optional[str] = None, # for backwards compatibility
**kwargs: Any,
) -> MomentoChatMessageHistory:
"""从CacheClient参数构建缓存。"""
try:
from momento import CacheClient, Configurations, CredentialProvider
except ImportError:
raise ImportError(
"Could not import momento python package. "
"Please install it with `pip install momento`."
)
if configuration is None:
configuration = Configurations.Laptop.v1()
# Try checking `MOMENTO_AUTH_TOKEN` first for backwards compatibility
try:
api_key = auth_token or get_from_env("auth_token", "MOMENTO_AUTH_TOKEN")
except ValueError:
api_key = api_key or get_from_env("api_key", "MOMENTO_API_KEY")
credentials = CredentialProvider.from_string(api_key)
cache_client = CacheClient(configuration, credentials, default_ttl=ttl)
return cls(session_id, cache_client, cache_name, ttl=ttl, **kwargs)
@property
def messages(self) -> list[BaseMessage]: # type: ignore[override]
"""从Momento中检索消息。
抛出:
SdkException:Momento服务或网络错误
Exception:意外响应
返回:
list[BaseMessage]:缓存消息列表
"""
from momento.responses import CacheListFetch
fetch_response = self.cache_client.list_fetch(self.cache_name, self.key)
if isinstance(fetch_response, CacheListFetch.Hit):
items = [json.loads(m) for m in fetch_response.value_list_string]
return messages_from_dict(items)
elif isinstance(fetch_response, CacheListFetch.Miss):
return []
elif isinstance(fetch_response, CacheListFetch.Error):
raise fetch_response.inner_exception
else:
raise Exception(f"Unexpected response: {fetch_response}")
[docs] def add_message(self, message: BaseMessage) -> None:
"""在缓存中存储一条消息。
参数:
message(BaseMessage):要存储的消息对象。
引发:
SdkException:Momento服务或网络错误。
Exception:意外响应。
"""
from momento.responses import CacheListPushBack
item = json.dumps(message_to_dict(message))
push_response = self.cache_client.list_push_back(
self.cache_name, self.key, item, ttl=self.ttl
)
if isinstance(push_response, CacheListPushBack.Success):
return None
elif isinstance(push_response, CacheListPushBack.Error):
raise push_response.inner_exception
else:
raise Exception(f"Unexpected response: {push_response}")
[docs] def clear(self) -> None:
"""从缓存中删除会话的消息。
引发:
SdkException:Momento 服务或网络错误。
Exception:意外响应。
"""
from momento.responses import CacheDelete
delete_response = self.cache_client.delete(self.cache_name, self.key)
if isinstance(delete_response, CacheDelete.Success):
return None
elif isinstance(delete_response, CacheDelete.Error):
raise delete_response.inner_exception
else:
raise Exception(f"Unexpected response: {delete_response}")