Source code for langchain_community.chat_message_histories.momento

from __future__ import annotations

import json
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Optional

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
    BaseMessage,
    message_to_dict,
    messages_from_dict,
)
from langchain_core.utils import get_from_env

if TYPE_CHECKING:
    import momento


def _ensure_cache_exists(cache_client: momento.CacheClient, cache_name: str) -> None:
    """如果缓存不存在,则创建缓存。

抛出:
    SdkException:Momento 服务或网络错误
    Exception:意外响应
"""
    from momento.responses import CreateCache

    create_cache_response = cache_client.create_cache(cache_name)
    if isinstance(create_cache_response, CreateCache.Success) or isinstance(
        create_cache_response, CreateCache.CacheAlreadyExists
    ):
        return None
    elif isinstance(create_cache_response, CreateCache.Error):
        raise create_cache_response.inner_exception
    else:
        raise Exception(f"Unexpected response cache creation: {create_cache_response}")


[docs]class MomentoChatMessageHistory(BaseChatMessageHistory): """聊天消息历史记录缓存,使用Momento作为后端。 查看https://gomomento.com/"""
[docs] def __init__( self, session_id: str, cache_client: momento.CacheClient, cache_name: str, *, key_prefix: str = "message_store:", ttl: Optional[timedelta] = None, ensure_cache_exists: bool = True, ): """实例化一个使用Momento作为后端的聊天消息历史记录缓存。 注意:要实例化传递给MomentoChatMessageHistory的缓存客户端,您必须在https://gomomento.com/拥有一个Momento账户。 参数: session_id (str):用于此聊天会话的会话ID。 cache_client (CacheClient):Momento缓存客户端。 cache_name (str):用于存储消息的缓存的名称。 key_prefix (str, optional):要应用于缓存键的前缀。 默认为"message_store:"。 ttl (Optional[timedelta], optional):用于消息的TTL。 默认为None,即将使用缓存的默认TTL。 ensure_cache_exists (bool, optional):如果缓存不存在则创建缓存。 默认为True。 引发: ImportError:Momento Python包未安装。 TypeError:cache_client不是momento.CacheClientObject类型。 """ try: from momento import CacheClient from momento.requests import CollectionTtl except ImportError: raise ImportError( "Could not import momento python package. " "Please install it with `pip install momento`." ) if not isinstance(cache_client, CacheClient): raise TypeError("cache_client must be a momento.CacheClient object.") if ensure_cache_exists: _ensure_cache_exists(cache_client, cache_name) self.key = key_prefix + session_id self.cache_client = cache_client self.cache_name = cache_name if ttl is not None: self.ttl = CollectionTtl.of(ttl) else: self.ttl = CollectionTtl.from_cache_ttl()
[docs] @classmethod def from_client_params( cls, session_id: str, cache_name: str, ttl: timedelta, *, configuration: Optional[momento.config.Configuration] = None, api_key: Optional[str] = None, auth_token: Optional[str] = None, # for backwards compatibility **kwargs: Any, ) -> MomentoChatMessageHistory: """从CacheClient参数构建缓存。""" try: from momento import CacheClient, Configurations, CredentialProvider except ImportError: raise ImportError( "Could not import momento python package. " "Please install it with `pip install momento`." ) if configuration is None: configuration = Configurations.Laptop.v1() # Try checking `MOMENTO_AUTH_TOKEN` first for backwards compatibility try: api_key = auth_token or get_from_env("auth_token", "MOMENTO_AUTH_TOKEN") except ValueError: api_key = api_key or get_from_env("api_key", "MOMENTO_API_KEY") credentials = CredentialProvider.from_string(api_key) cache_client = CacheClient(configuration, credentials, default_ttl=ttl) return cls(session_id, cache_client, cache_name, ttl=ttl, **kwargs)
@property def messages(self) -> list[BaseMessage]: # type: ignore[override] """从Momento中检索消息。 抛出: SdkException:Momento服务或网络错误 Exception:意外响应 返回: list[BaseMessage]:缓存消息列表 """ from momento.responses import CacheListFetch fetch_response = self.cache_client.list_fetch(self.cache_name, self.key) if isinstance(fetch_response, CacheListFetch.Hit): items = [json.loads(m) for m in fetch_response.value_list_string] return messages_from_dict(items) elif isinstance(fetch_response, CacheListFetch.Miss): return [] elif isinstance(fetch_response, CacheListFetch.Error): raise fetch_response.inner_exception else: raise Exception(f"Unexpected response: {fetch_response}")
[docs] def add_message(self, message: BaseMessage) -> None: """在缓存中存储一条消息。 参数: message(BaseMessage):要存储的消息对象。 引发: SdkException:Momento服务或网络错误。 Exception:意外响应。 """ from momento.responses import CacheListPushBack item = json.dumps(message_to_dict(message)) push_response = self.cache_client.list_push_back( self.cache_name, self.key, item, ttl=self.ttl ) if isinstance(push_response, CacheListPushBack.Success): return None elif isinstance(push_response, CacheListPushBack.Error): raise push_response.inner_exception else: raise Exception(f"Unexpected response: {push_response}")
[docs] def clear(self) -> None: """从缓存中删除会话的消息。 引发: SdkException:Momento 服务或网络错误。 Exception:意外响应。 """ from momento.responses import CacheDelete delete_response = self.cache_client.delete(self.cache_name, self.key) if isinstance(delete_response, CacheDelete.Success): return None elif isinstance(delete_response, CacheDelete.Error): raise delete_response.inner_exception else: raise Exception(f"Unexpected response: {delete_response}")