Source code for langchain_community.indexes.base

from __future__ import annotations

import uuid
from abc import ABC, abstractmethod
from typing import List, Optional, Sequence

NAMESPACE_UUID = uuid.UUID(int=1984)


[docs]class RecordManager(ABC): """用于记录管理器的抽象基类。"""
[docs] def __init__( self, namespace: str, ) -> None: """初始化记录管理器。 参数: namespace(str):记录管理器的命名空间。 """ self.namespace = namespace
[docs] @abstractmethod def create_schema(self) -> None: """为记录管理器创建数据库模式。"""
[docs] @abstractmethod async def acreate_schema(self) -> None: """为记录管理器创建数据库模式。"""
[docs] @abstractmethod def get_time(self) -> float: """获取当前服务器时间作为高分辨率时间戳! 重要的是从服务器获取此时间,以确保时钟单调增加, 否则在清理旧文档时可能会丢失数据! 返回: 作为浮点时间戳的当前服务器时间。 """
[docs] @abstractmethod async def aget_time(self) -> float: """获取当前服务器时间作为高分辨率时间戳! 重要的是从服务器获取此时间,以确保时钟单调增加, 否则在清理旧文档时可能会丢失数据! 返回: 作为浮点时间戳的当前服务器时间。 """
[docs] @abstractmethod def update( self, keys: Sequence[str], *, group_ids: Optional[Sequence[Optional[str]]] = None, time_at_least: Optional[float] = None, ) -> None: """将记录插入到数据库中。 参数: keys:要插入的记录键的列表。 group_ids:与键对应的组ID列表。 time_at_least:如果提供,只有当updated_at字段至少是这个时间时才会发生更新。 引发: ValueError:如果keys的长度与group_ids的长度不匹配。 """
[docs] @abstractmethod async def aupdate( self, keys: Sequence[str], *, group_ids: Optional[Sequence[Optional[str]]] = None, time_at_least: Optional[float] = None, ) -> None: """将记录插入到数据库中。 参数: keys:要插入的记录键的列表。 group_ids:与键对应的组ID列表。 time_at_least:如果提供,只有当updated_at字段至少是这个时间时才会发生更新。 引发: ValueError:如果keys的长度与group_ids的长度不匹配。 """
[docs] @abstractmethod def exists(self, keys: Sequence[str]) -> List[bool]: """检查提供的键是否存在于数据库中。 参数: keys:要检查的键列表。 返回: 一个布尔值列表,指示每个键的存在情况。 """
[docs] @abstractmethod async def aexists(self, keys: Sequence[str]) -> List[bool]: """检查提供的键是否存在于数据库中。 参数: keys:要检查的键列表。 返回: 一个布尔值列表,指示每个键的存在情况。 """
[docs] @abstractmethod def list_keys( self, *, before: Optional[float] = None, after: Optional[float] = None, group_ids: Optional[Sequence[str]] = None, limit: Optional[int] = None, ) -> List[str]: """根据提供的过滤器列出数据库中的记录。 参数: before:筛选出在此时间之前更新的记录。 after:筛选出在此时间之后更新的记录。 group_ids:筛选出具有特定组ID的记录。 limit:可选,限制要返回的记录数。 返回: 匹配记录的键列表。 """
[docs] @abstractmethod async def alist_keys( self, *, before: Optional[float] = None, after: Optional[float] = None, group_ids: Optional[Sequence[str]] = None, limit: Optional[int] = None, ) -> List[str]: """根据提供的过滤器列出数据库中的记录。 参数: before:筛选出在此时间之前更新的记录。 after:筛选出在此时间之后更新的记录。 group_ids:筛选出具有特定组ID的记录。 limit:可选,限制要返回的记录数。 返回: 匹配记录的键列表。 """
[docs] @abstractmethod def delete_keys(self, keys: Sequence[str]) -> None: """从数据库中删除指定的记录。 参数: keys:要删除的键的列表。 """
[docs] @abstractmethod async def adelete_keys(self, keys: Sequence[str]) -> None: """从数据库中删除指定的记录。 参数: keys:要删除的键的列表。 """