Kafka聊天消息历史#
- class langchain_community.chat_message_histories.kafka.KafkaChatMessageHistory(session_id: str, bootstrap_servers: str, ttl_ms: int = 604800000, replication_factor: int = 1, partition: int = 3)[source]#
聊天消息历史存储在Kafka中。
- Setup:
安装
confluent-kafka-python
。pip install confluent_kafka
- Instantiate:
from langchain_community.chat_message_histories import KafkaChatMessageHistory history = KafkaChatMessageHistory( session_id="your_session_id", bootstrap_servers="host:port", )
- Add and retrieve messages:
# Add messages history.add_messages([message1, message2, message3, ...]) # Retrieve messages message_batch_0 = history.messages # retrieve messages after message_batch_0 message_batch_1 = history.messages # Reset to beginning and retrieve messages messages_from_beginning = history.messages_from_beginning()
检索消息是有状态的。在内部,它使用Kafka消费者来读取。消耗的偏移量是持久维护的。
要检索消息,您可以使用以下方法: - messages:
继续从最后一个开始消费聊天消息。
- messages_from_beginning:
将消费者重置到聊天记录的开头并返回消息。 可选参数: 1. max_message_count: 返回的最大消息数量。 2. max_time_sec: 等待消息的最大时间(秒)。
- messages_from_latest:
重置到聊天记录的末尾并尝试消费消息。 可选参数同上。
- messages_from_last_consumed:
从最后消费的消息继续,类似于messages。 可选参数同上。
- max_message_count and max_time_sec are used to avoid blocking indefinitely
在检索消息时。因此,检索消息的方法可能不会返回所有消息。更改max_message_count和max_time_sec以检索所有历史消息。
- Parameters:
session_id (str) – 单次聊天会话的ID。它被用作Kafka主题名称。
bootstrap_servers (str) – 用于建立与Kafka集群连接的逗号分隔的主机/端口对 https://kafka.apache.org/documentation.html#adminclientconfigs_bootstrap.servers
ttl_ms (int) – 条目的自动过期时间(毫秒)。 默认7天。-1表示永不过期。 它对应于 https://kafka.apache.org/documentation.html#topicconfigs_retention.ms
replication_factor (int) – 主题的复制因子。默认值为1。
partition (int) – 主题的分区数量。默认值为3。
属性
messages
从Kafka主题中持续检索会话的消息,从最后消费的消息开始。
方法
__init__
(session_id, bootstrap_servers[, ...])aadd_messages
(messages)异步添加消息列表。
aclear
()异步从存储中移除所有消息
add_ai_message
(message)用于将AI消息字符串添加到存储中的便捷方法。
add_message
(message)将消息对象添加到存储中。
add_messages
(messages[, flush_timeout_seconds])通过向Kafka主题生产消息来将消息添加到聊天历史记录中。
add_user_message
(message)方便的方法,用于将人类消息字符串添加到存储中。
获取消息的异步版本。
clear
()通过删除Kafka主题来清除聊天历史记录。
close
()释放资源。
messages_from_beginning
([max_message_count, ...])从Kafka主题的开头检索消息。
messages_from_last_consumed
([...])从Kafka主题中检索自上次消费的消息。
messages_from_latest
([max_message_count, ...])重置到结束偏移量。
- __init__(session_id: str, bootstrap_servers: str, ttl_ms: int = 604800000, replication_factor: int = 1, partition: int = 3)[来源]#
- Parameters:
session_id (str) – 单次聊天会话的ID。它被用作Kafka主题名称。
bootstrap_servers (str) – 用于建立与Kafka集群连接的逗号分隔的主机/端口对 https://kafka.apache.org/documentation.html#adminclientconfigs_bootstrap.servers
ttl_ms (int) – 条目的自动过期时间(毫秒)。 默认7天。-1表示永不过期。 它对应于 https://kafka.apache.org/documentation.html#topicconfigs_retention.ms
replication_factor (int) – 主题的复制因子。默认值为1。
partition (int) – 主题的分区数量。默认值为3。
- async aadd_messages(messages: Sequence[BaseMessage]) None #
异步添加消息列表。
- Parameters:
messages (Sequence[BaseMessage]) – 要存储的BaseMessage对象序列。
- Return type:
无
- async aclear() None #
异步从存储中移除所有消息
- Return type:
无
- add_ai_message(message: AIMessage | str) None #
向存储中添加AI消息字符串的便捷方法。
请注意,这是一个便捷方法。代码应优先使用批量 add_messages 接口,以减少与底层持久层的往返次数。
此方法可能在未来的版本中被弃用。
- Parameters:
message (AIMessage | str) – 要添加的AI消息。
- Return type:
无
- add_message(message: BaseMessage) None #
向存储中添加一个消息对象。
- Parameters:
消息 (BaseMessage) – 一个用于存储的BaseMessage对象。
- Raises:
NotImplementedError – 如果子类没有实现一个有效的 add_messages 方法。
- Return type:
无
- add_messages(messages: Sequence[BaseMessage], flush_timeout_seconds: float = 5.0) None [source]#
通过向Kafka主题生产消息来将消息添加到聊天历史记录中。
- Parameters:
messages (Sequence[BaseMessage])
flush_timeout_seconds (float)
- Return type:
无
- add_user_message(message: HumanMessage | str) None #
用于向存储中添加人类消息字符串的便捷方法。
请注意,这是一个便捷方法。代码应优先使用批量 add_messages 接口,以减少对底层持久层的往返次数。
此方法可能在未来的版本中被弃用。
- Parameters:
message (HumanMessage | str) – 要添加到存储中的人类消息。
- Return type:
无
- async aget_messages() list[BaseMessage] #
获取消息的异步版本。
可以重写此方法以提供高效的异步实现。
通常,获取消息可能涉及到底层持久层的IO操作。
- Return type:
列表[BaseMessage]
- messages_from_beginning(max_message_count: int | None = 5, max_time_sec: float | None = 5.0) List[BaseMessage] [source]#
从Kafka主题的开头检索消息。 此方法将消费者重置到开头并消费消息。
- Args:
max_message_count: 要消费的最大消息数量。 max_time_sec: 消费消息的时间限制(以秒为单位)。
- Returns:
消息列表。
- Parameters:
max_message_count (int | None)
max_time_sec (float | None)
- Return type:
列表[BaseMessage]
- messages_from_last_consumed(max_message_count: int | None = 5, max_time_sec: float | None = 5.0) List[BaseMessage] [source]#
从Kafka主题中检索自上次消费的消息。 请注意,此方法是有状态的。在内部,它使用Kafka消费者 来消费消息,并维护提交的偏移量。
- Args:
max_message_count: 要消费的最大消息数量。 max_time_sec: 消费消息的时间限制(以秒为单位)。
- Returns:
消息列表。
- Parameters:
max_message_count (int | None)
max_time_sec (float | None)
- Return type:
列表[BaseMessage]
- messages_from_latest(max_message_count: int | None = 5, max_time_sec: float | None = 5.0) List[BaseMessage] [source]#
重置到结束偏移量。如果有可用的消息,尝试消费它们。
- Parameters:
max_message_count (int | None) – 要消费的最大消息数量。
max_time_sec (float | None) – 消费消息的时间限制,单位为秒。
- Returns:
消息列表。
- Return type:
列表[BaseMessage]
使用 KafkaChatMessageHistory 的示例