Kafka聊天消息历史#

class langchain_community.chat_message_histories.kafka.KafkaChatMessageHistory(session_id: str, bootstrap_servers: str, ttl_ms: int = 604800000, replication_factor: int = 1, partition: int = 3)[source]#

聊天消息历史存储在Kafka中。

Setup:

安装 confluent-kafka-python

pip install confluent_kafka
Instantiate:
from langchain_community.chat_message_histories import KafkaChatMessageHistory

history = KafkaChatMessageHistory(
    session_id="your_session_id",
    bootstrap_servers="host:port",
)
Add and retrieve messages:
# Add messages
history.add_messages([message1, message2, message3, ...])

# Retrieve messages
message_batch_0 = history.messages

# retrieve messages after message_batch_0
message_batch_1 = history.messages

# Reset to beginning and retrieve messages
messages_from_beginning = history.messages_from_beginning()

检索消息是有状态的。在内部,它使用Kafka消费者来读取。消耗的偏移量是持久维护的。

要检索消息,您可以使用以下方法: - messages:

继续从最后一个开始消费聊天消息。

  • messages_from_beginning:

    将消费者重置到聊天记录的开头并返回消息。 可选参数: 1. max_message_count: 返回的最大消息数量。 2. max_time_sec: 等待消息的最大时间(秒)。

  • messages_from_latest:

    重置到聊天记录的末尾并尝试消费消息。 可选参数同上。

  • messages_from_last_consumed:

    从最后消费的消息继续,类似于messages。 可选参数同上。

max_message_count and max_time_sec are used to avoid blocking indefinitely

在检索消息时。因此,检索消息的方法可能不会返回所有消息。更改max_message_countmax_time_sec以检索所有历史消息。

Parameters:

属性

messages

从Kafka主题中持续检索会话的消息,从最后消费的消息开始。

方法

__init__(session_id, bootstrap_servers[, ...])

aadd_messages(messages)

异步添加消息列表。

aclear()

异步从存储中移除所有消息

add_ai_message(message)

用于将AI消息字符串添加到存储中的便捷方法。

add_message(message)

将消息对象添加到存储中。

add_messages(messages[, flush_timeout_seconds])

通过向Kafka主题生产消息来将消息添加到聊天历史记录中。

add_user_message(message)

方便的方法,用于将人类消息字符串添加到存储中。

aget_messages()

获取消息的异步版本。

clear()

通过删除Kafka主题来清除聊天历史记录。

close()

释放资源。

messages_from_beginning([max_message_count, ...])

从Kafka主题的开头检索消息。

messages_from_last_consumed([...])

从Kafka主题中检索自上次消费的消息。

messages_from_latest([max_message_count, ...])

重置到结束偏移量。

__init__(session_id: str, bootstrap_servers: str, ttl_ms: int = 604800000, replication_factor: int = 1, partition: int = 3)[来源]#
Parameters:
async aadd_messages(messages: Sequence[BaseMessage]) None#

异步添加消息列表。

Parameters:

messages (Sequence[BaseMessage]) – 要存储的BaseMessage对象序列。

Return type:

async aclear() None#

异步从存储中移除所有消息

Return type:

add_ai_message(message: AIMessage | str) None#

向存储中添加AI消息字符串的便捷方法。

请注意,这是一个便捷方法。代码应优先使用批量 add_messages 接口,以减少与底层持久层的往返次数。

此方法可能在未来的版本中被弃用。

Parameters:

message (AIMessage | str) – 要添加的AI消息。

Return type:

add_message(message: BaseMessage) None#

向存储中添加一个消息对象。

Parameters:

消息 (BaseMessage) – 一个用于存储的BaseMessage对象。

Raises:

NotImplementedError – 如果子类没有实现一个有效的 add_messages 方法。

Return type:

add_messages(messages: Sequence[BaseMessage], flush_timeout_seconds: float = 5.0) None[source]#

通过向Kafka主题生产消息来将消息添加到聊天历史记录中。

Parameters:
  • messages (Sequence[BaseMessage])

  • flush_timeout_seconds (float)

Return type:

add_user_message(message: HumanMessage | str) None#

用于向存储中添加人类消息字符串的便捷方法。

请注意,这是一个便捷方法。代码应优先使用批量 add_messages 接口,以减少对底层持久层的往返次数。

此方法可能在未来的版本中被弃用。

Parameters:

message (HumanMessage | str) – 要添加到存储中的人类消息。

Return type:

async aget_messages() list[BaseMessage]#

获取消息的异步版本。

可以重写此方法以提供高效的异步实现。

通常,获取消息可能涉及到底层持久层的IO操作。

Return type:

列表[BaseMessage]

clear() None[source]#

通过删除Kafka主题来清除聊天记录。

Return type:

close() None[来源]#

释放资源。 目前没有需要释放的资源。

Return type:

messages_from_beginning(max_message_count: int | None = 5, max_time_sec: float | None = 5.0) List[BaseMessage][source]#

从Kafka主题的开头检索消息。 此方法将消费者重置到开头并消费消息。

Args:

max_message_count: 要消费的最大消息数量。 max_time_sec: 消费消息的时间限制(以秒为单位)。

Returns:

消息列表。

Parameters:
  • max_message_count (int | None)

  • max_time_sec (float | None)

Return type:

列表[BaseMessage]

messages_from_last_consumed(max_message_count: int | None = 5, max_time_sec: float | None = 5.0) List[BaseMessage][source]#

从Kafka主题中检索自上次消费的消息。 请注意,此方法是有状态的。在内部,它使用Kafka消费者 来消费消息,并维护提交的偏移量。

Args:

max_message_count: 要消费的最大消息数量。 max_time_sec: 消费消息的时间限制(以秒为单位)。

Returns:

消息列表。

Parameters:
  • max_message_count (int | None)

  • max_time_sec (float | None)

Return type:

列表[BaseMessage]

messages_from_latest(max_message_count: int | None = 5, max_time_sec: float | None = 5.0) List[BaseMessage][source]#

重置到结束偏移量。如果有可用的消息,尝试消费它们。

Parameters:
  • max_message_count (int | None) – 要消费的最大消息数量。

  • max_time_sec (float | None) – 消费消息的时间限制,单位为秒。

Returns:

消息列表。

Return type:

列表[BaseMessage]

使用 KafkaChatMessageHistory 的示例