Source code for langchain_community.chat_message_histories.dynamodb
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Dict, List, Optional
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
BaseMessage,
message_to_dict,
messages_from_dict,
messages_to_dict,
)
if TYPE_CHECKING:
from boto3.session import Session
logger = logging.getLogger(__name__)
[docs]class DynamoDBChatMessageHistory(BaseChatMessageHistory):
"""聊天消息历史记录,将历史记录存储在AWS DynamoDB中。
该类期望存在一个名为`table_name`的DynamoDB表
参数:
table_name: DynamoDB表的名称
session_id: 用于存储单个聊天会话消息的任意键。
endpoint_url: 用于连接的AWS端点的URL。此参数是可选的,对于测试目的很有用,比如使用Localstack。如果您计划使用AWS云服务,通常不必担心设置endpoint_url。
primary_key_name: DynamoDB表的主键名称。此参数是可选的,默认为"SessionId"。
key: 一个可选的带有自定义主键和次要键的字典。此参数是可选的,但在使用复合DynamoDB键或基于应用程序细节(如用户ID)隔离记录时很有用。这也可能包含全局和本地辅助索引键。
kms_key_id: 可选的AWS KMS密钥ID,AWS KMS密钥ARN或AWS KMS别名,用于客户端端加密
ttl: 可选的生存时间(TTL)(以秒为单位)。允许您定义每个项目的到期时间戳,指示何时可以从表中删除项目。DynamoDB处理过期项目的删除,而不会消耗写入吞吐量。要在表上启用此功能,请参阅[AWS DynamoDB文档](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/time-to-live-ttl-how-to.html)
history_size: 要存储的最大消息数量。如果为None,则没有限制。如果不为None,则仅存储最新的`history_size`条消息。"""
[docs] def __init__(
self,
table_name: str,
session_id: str,
endpoint_url: Optional[str] = None,
primary_key_name: str = "SessionId",
key: Optional[Dict[str, str]] = None,
boto3_session: Optional[Session] = None,
kms_key_id: Optional[str] = None,
ttl: Optional[int] = None,
ttl_key_name: str = "expireAt",
history_size: Optional[int] = None,
):
if boto3_session:
client = boto3_session.resource("dynamodb", endpoint_url=endpoint_url)
else:
try:
import boto3
except ImportError as e:
raise ImportError(
"Unable to import boto3, please install with `pip install boto3`."
) from e
if endpoint_url:
client = boto3.resource("dynamodb", endpoint_url=endpoint_url)
else:
client = boto3.resource("dynamodb")
self.table = client.Table(table_name)
self.session_id = session_id
self.key: Dict = key or {primary_key_name: session_id}
self.ttl = ttl
self.ttl_key_name = ttl_key_name
self.history_size = history_size
if kms_key_id:
try:
from dynamodb_encryption_sdk.encrypted.table import EncryptedTable
from dynamodb_encryption_sdk.identifiers import CryptoAction
from dynamodb_encryption_sdk.material_providers.aws_kms import (
AwsKmsCryptographicMaterialsProvider,
)
from dynamodb_encryption_sdk.structures import AttributeActions
except ImportError as e:
raise ImportError(
"Unable to import dynamodb_encryption_sdk, please install with "
"`pip install dynamodb-encryption-sdk`."
) from e
actions = AttributeActions(
default_action=CryptoAction.DO_NOTHING,
attribute_actions={"History": CryptoAction.ENCRYPT_AND_SIGN},
)
aws_kms_cmp = AwsKmsCryptographicMaterialsProvider(key_id=kms_key_id)
self.table = EncryptedTable(
table=self.table,
materials_provider=aws_kms_cmp,
attribute_actions=actions,
auto_refresh_table_indexes=False,
)
@property
def messages(self) -> List[BaseMessage]:
"""从DynamoDB检索消息"""
try:
from botocore.exceptions import ClientError
except ImportError as e:
raise ImportError(
"Unable to import botocore, please install with `pip install botocore`."
) from e
response = None
try:
response = self.table.get_item(Key=self.key)
except ClientError as error:
if error.response["Error"]["Code"] == "ResourceNotFoundException":
logger.warning("No record found with session id: %s", self.session_id)
else:
logger.error(error)
if response and "Item" in response:
items = response["Item"]["History"]
else:
items = []
messages = messages_from_dict(items)
return messages
@messages.setter
def messages(self, messages: List[BaseMessage]) -> None:
raise NotImplementedError(
"Direct assignment to 'messages' is not allowed."
" Use the 'add_messages' instead."
)
[docs] def add_message(self, message: BaseMessage) -> None:
"""将消息附加到DynamoDB中的记录"""
try:
from botocore.exceptions import ClientError
except ImportError as e:
raise ImportError(
"Unable to import botocore, please install with `pip install botocore`."
) from e
messages = messages_to_dict(self.messages)
_message = message_to_dict(message)
messages.append(_message)
if self.history_size:
messages = messages[-self.history_size :]
try:
if self.ttl:
import time
expireAt = int(time.time()) + self.ttl
self.table.put_item(
Item={**self.key, "History": messages, self.ttl_key_name: expireAt}
)
else:
self.table.put_item(Item={**self.key, "History": messages})
except ClientError as err:
logger.error(err)
[docs] def clear(self) -> None:
"""清除DynamoDB中的会话内存"""
try:
from botocore.exceptions import ClientError
except ImportError as e:
raise ImportError(
"Unable to import botocore, please install with `pip install botocore`."
) from e
try:
self.table.delete_item(Key=self.key)
except ClientError as err:
logger.error(err)