Source code for langchain_community.chat_message_histories.mongodb

import json
import logging
from typing import List

from langchain_core._api.deprecation import deprecated
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
    BaseMessage,
    message_to_dict,
    messages_from_dict,
)

logger = logging.getLogger(__name__)

DEFAULT_DBNAME = "chat_history"
DEFAULT_COLLECTION_NAME = "message_store"


[docs]@deprecated( since="0.0.25", removal="0.3.0", alternative_import="langchain_mongodb.MongoDBChatMessageHistory", ) class MongoDBChatMessageHistory(BaseChatMessageHistory): """在MongoDB中存储聊天消息历史记录。 参数: connection_string: 连接到MongoDB的连接字符串 session_id: 用于存储单个聊天会话消息的任意键。 database_name: 要使用的数据库名称 collection_name: 要使用的集合名称 create_index: 是否创建一个名为SessionId的索引。如果这样的索引已经存在,则设置为False。"""
[docs] def __init__( self, connection_string: str, session_id: str, database_name: str = DEFAULT_DBNAME, collection_name: str = DEFAULT_COLLECTION_NAME, create_index: bool = True, ): from pymongo import MongoClient, errors self.connection_string = connection_string self.session_id = session_id self.database_name = database_name self.collection_name = collection_name try: self.client: MongoClient = MongoClient(connection_string) except errors.ConnectionFailure as error: logger.error(error) self.db = self.client[database_name] self.collection = self.db[collection_name] if create_index: self.collection.create_index("SessionId")
@property def messages(self) -> List[BaseMessage]: # type: ignore """从MongoDB中检索消息""" from pymongo import errors try: cursor = self.collection.find({"SessionId": self.session_id}) except errors.OperationFailure as error: logger.error(error) if cursor: items = [json.loads(document["History"]) for document in cursor] else: items = [] messages = messages_from_dict(items) return messages
[docs] def add_message(self, message: BaseMessage) -> None: """将消息附加到MongoDB中的记录""" from pymongo import errors try: self.collection.insert_one( { "SessionId": self.session_id, "History": json.dumps(message_to_dict(message)), } ) except errors.WriteError as err: logger.error(err)
[docs] def clear(self) -> None: """清除MongoDB中的会话内存""" from pymongo import errors try: self.collection.delete_many({"SessionId": self.session_id}) except errors.WriteError as err: logger.error(err)