Source code for langchain_community.chat_message_histories.dynamodb

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Dict, List, Optional

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
    BaseMessage,
    message_to_dict,
    messages_from_dict,
    messages_to_dict,
)

if TYPE_CHECKING:
    from boto3.session import Session

logger = logging.getLogger(__name__)


[docs]class DynamoDBChatMessageHistory(BaseChatMessageHistory): """聊天消息历史记录,将历史记录存储在AWS DynamoDB中。 该类期望存在一个名为`table_name`的DynamoDB表 参数: table_name: DynamoDB表的名称 session_id: 用于存储单个聊天会话消息的任意键。 endpoint_url: 用于连接的AWS端点的URL。此参数是可选的,对于测试目的很有用,比如使用Localstack。如果您计划使用AWS云服务,通常不必担心设置endpoint_url。 primary_key_name: DynamoDB表的主键名称。此参数是可选的,默认为"SessionId"。 key: 一个可选的带有自定义主键和次要键的字典。此参数是可选的,但在使用复合DynamoDB键或基于应用程序细节(如用户ID)隔离记录时很有用。这也可能包含全局和本地辅助索引键。 kms_key_id: 可选的AWS KMS密钥ID,AWS KMS密钥ARN或AWS KMS别名,用于客户端端加密 ttl: 可选的生存时间(TTL)(以秒为单位)。允许您定义每个项目的到期时间戳,指示何时可以从表中删除项目。DynamoDB处理过期项目的删除,而不会消耗写入吞吐量。要在表上启用此功能,请参阅[AWS DynamoDB文档](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/time-to-live-ttl-how-to.html) history_size: 要存储的最大消息数量。如果为None,则没有限制。如果不为None,则仅存储最新的`history_size`条消息。"""
[docs] def __init__( self, table_name: str, session_id: str, endpoint_url: Optional[str] = None, primary_key_name: str = "SessionId", key: Optional[Dict[str, str]] = None, boto3_session: Optional[Session] = None, kms_key_id: Optional[str] = None, ttl: Optional[int] = None, ttl_key_name: str = "expireAt", history_size: Optional[int] = None, ): if boto3_session: client = boto3_session.resource("dynamodb", endpoint_url=endpoint_url) else: try: import boto3 except ImportError as e: raise ImportError( "Unable to import boto3, please install with `pip install boto3`." ) from e if endpoint_url: client = boto3.resource("dynamodb", endpoint_url=endpoint_url) else: client = boto3.resource("dynamodb") self.table = client.Table(table_name) self.session_id = session_id self.key: Dict = key or {primary_key_name: session_id} self.ttl = ttl self.ttl_key_name = ttl_key_name self.history_size = history_size if kms_key_id: try: from dynamodb_encryption_sdk.encrypted.table import EncryptedTable from dynamodb_encryption_sdk.identifiers import CryptoAction from dynamodb_encryption_sdk.material_providers.aws_kms import ( AwsKmsCryptographicMaterialsProvider, ) from dynamodb_encryption_sdk.structures import AttributeActions except ImportError as e: raise ImportError( "Unable to import dynamodb_encryption_sdk, please install with " "`pip install dynamodb-encryption-sdk`." ) from e actions = AttributeActions( default_action=CryptoAction.DO_NOTHING, attribute_actions={"History": CryptoAction.ENCRYPT_AND_SIGN}, ) aws_kms_cmp = AwsKmsCryptographicMaterialsProvider(key_id=kms_key_id) self.table = EncryptedTable( table=self.table, materials_provider=aws_kms_cmp, attribute_actions=actions, auto_refresh_table_indexes=False, )
@property def messages(self) -> List[BaseMessage]: """从DynamoDB检索消息""" try: from botocore.exceptions import ClientError except ImportError as e: raise ImportError( "Unable to import botocore, please install with `pip install botocore`." ) from e response = None try: response = self.table.get_item(Key=self.key) except ClientError as error: if error.response["Error"]["Code"] == "ResourceNotFoundException": logger.warning("No record found with session id: %s", self.session_id) else: logger.error(error) if response and "Item" in response: items = response["Item"]["History"] else: items = [] messages = messages_from_dict(items) return messages @messages.setter def messages(self, messages: List[BaseMessage]) -> None: raise NotImplementedError( "Direct assignment to 'messages' is not allowed." " Use the 'add_messages' instead." )
[docs] def add_message(self, message: BaseMessage) -> None: """将消息附加到DynamoDB中的记录""" try: from botocore.exceptions import ClientError except ImportError as e: raise ImportError( "Unable to import botocore, please install with `pip install botocore`." ) from e messages = messages_to_dict(self.messages) _message = message_to_dict(message) messages.append(_message) if self.history_size: messages = messages[-self.history_size :] try: if self.ttl: import time expireAt = int(time.time()) + self.ttl self.table.put_item( Item={**self.key, "History": messages, self.ttl_key_name: expireAt} ) else: self.table.put_item(Item={**self.key, "History": messages}) except ClientError as err: logger.error(err)
[docs] def clear(self) -> None: """清除DynamoDB中的会话内存""" try: from botocore.exceptions import ClientError except ImportError as e: raise ImportError( "Unable to import botocore, please install with `pip install botocore`." ) from e try: self.table.delete_item(Key=self.key) except ClientError as err: logger.error(err)