Source code for langchain_core.indexing.api

"""模块包含将文档索引到向量存储中的逻辑。"""
from __future__ import annotations

import hashlib
import json
import uuid
from itertools import islice
from typing import (
    Any,
    AsyncIterable,
    AsyncIterator,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    Literal,
    Optional,
    Sequence,
    Set,
    TypedDict,
    TypeVar,
    Union,
    cast,
)

from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_core.indexing.base import RecordManager
from langchain_core.pydantic_v1 import root_validator
from langchain_core.vectorstores import VectorStore

# Magic UUID to use as a namespace for hashing.
# Used to try and generate a unique UUID for each document
# from hashing the document content and metadata.
NAMESPACE_UUID = uuid.UUID(int=1984)


T = TypeVar("T")


def _hash_string_to_uuid(input_string: str) -> uuid.UUID:
    """对字符串进行哈希处理,并返回相应的UUID。"""
    hash_value = hashlib.sha1(input_string.encode("utf-8")).hexdigest()
    return uuid.uuid5(NAMESPACE_UUID, hash_value)


def _hash_nested_dict_to_uuid(data: dict[Any, Any]) -> uuid.UUID:
    """对嵌套字典进行哈希处理,并返回相应的UUID。"""
    serialized_data = json.dumps(data, sort_keys=True)
    hash_value = hashlib.sha1(serialized_data.encode("utf-8")).hexdigest()
    return uuid.uuid5(NAMESPACE_UUID, hash_value)


class _HashedDocument(Document):
    """具有唯一ID的哈希文档。"""

    uid: str
    hash_: str
    """文档的哈希值,包括内容和元数据。"""
    content_hash: str
    """文档内容的哈希值。"""
    metadata_hash: str
    """文档元数据的哈希值。"""

    @classmethod
    def is_lc_serializable(cls) -> bool:
        return False

    @root_validator(pre=True)
    def calculate_hashes(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """根验证器,用于计算内容和元数据哈希值。"""
        content = values.get("page_content", "")
        metadata = values.get("metadata", {})

        forbidden_keys = ("hash_", "content_hash", "metadata_hash")

        for key in forbidden_keys:
            if key in metadata:
                raise ValueError(
                    f"Metadata cannot contain key {key} as it "
                    f"is reserved for internal use."
                )

        content_hash = str(_hash_string_to_uuid(content))

        try:
            metadata_hash = str(_hash_nested_dict_to_uuid(metadata))
        except Exception as e:
            raise ValueError(
                f"Failed to hash metadata: {e}. "
                f"Please use a dict that can be serialized using json."
            )

        values["content_hash"] = content_hash
        values["metadata_hash"] = metadata_hash
        values["hash_"] = str(_hash_string_to_uuid(content_hash + metadata_hash))

        _uid = values.get("uid", None)

        if _uid is None:
            values["uid"] = values["hash_"]
        return values

    def to_document(self) -> Document:
        """返回一个文档对象。"""
        return Document(
            page_content=self.page_content,
            metadata=self.metadata,
        )

    @classmethod
    def from_document(
        cls, document: Document, *, uid: Optional[str] = None
    ) -> _HashedDocument:
        """从一个文档创建一个哈希文档。"""
        return cls(  # type: ignore[call-arg]
            uid=uid,  # type: ignore[arg-type]
            page_content=document.page_content,
            metadata=document.metadata,
        )


def _batch(size: int, iterable: Iterable[T]) -> Iterator[List[T]]:
    """实用的批处理函数。"""
    it = iter(iterable)
    while True:
        chunk = list(islice(it, size))
        if not chunk:
            return
        yield chunk


async def _abatch(size: int, iterable: AsyncIterable[T]) -> AsyncIterator[List[T]]:
    """实用的批处理函数。"""
    batch: List[T] = []
    async for element in iterable:
        if len(batch) < size:
            batch.append(element)

        if len(batch) >= size:
            yield batch
            batch = []

    if batch:
        yield batch


def _get_source_id_assigner(
    source_id_key: Union[str, Callable[[Document], str], None],
) -> Callable[[Document], Union[str, None]]:
    """从文档中获取源ID。"""
    if source_id_key is None:
        return lambda doc: None
    elif isinstance(source_id_key, str):
        return lambda doc: doc.metadata[source_id_key]
    elif callable(source_id_key):
        return source_id_key
    else:
        raise ValueError(
            f"source_id_key should be either None, a string or a callable. "
            f"Got {source_id_key} of type {type(source_id_key)}."
        )


def _deduplicate_in_order(
    hashed_documents: Iterable[_HashedDocument],
) -> Iterator[_HashedDocument]:
    """对哈希文档列表进行去重,同时保持顺序。"""
    seen: Set[str] = set()

    for hashed_doc in hashed_documents:
        if hashed_doc.hash_ not in seen:
            seen.add(hashed_doc.hash_)
            yield hashed_doc


# PUBLIC API


[docs]class IndexingResult(TypedDict): """返回对索引操作结果的详细分析。""" num_added: int """新增文档的数量。""" num_updated: int """因为它们不是最新的,所以更新的文档数量。""" num_deleted: int """删除的文档数量。""" num_skipped: int """因为它们已经是最新的,所以跳过的文档数量。"""
[docs]def index( docs_source: Union[BaseLoader, Iterable[Document]], record_manager: RecordManager, vector_store: VectorStore, *, batch_size: int = 100, cleanup: Literal["incremental", "full", None] = None, source_id_key: Union[str, Callable[[Document], str], None] = None, cleanup_batch_size: int = 1_000, force_update: bool = False, ) -> IndexingResult: """将加载器中的数据索引到向量存储中。 索引功能使用管理器来跟踪哪些文档在向量存储中。 这使我们能够跟踪哪些文档已更新,哪些文档已删除,哪些文档应该被跳过。 目前,文档是使用它们的哈希进行索引的,用户无法指定文档的uid。 重要提示: 如果auto_cleanup设置为True,则加载器应返回整个数据集,而不仅仅是数据集的子集。 否则,auto_cleanup将删除不应删除的文档。 参数: docs_source:要索引的数据加载器或文档的可迭代对象。 record_manager:时间戳集,用于跟踪哪些文档已更新。 vector_store:要将文档索引到的向量存储。 batch_size:索引时要使用的批量大小。 cleanup:如何处理文档的清理。 - Incremental:清理所有未更新的文档和与索引期间看到的源ID相关联的文档。 清理在索引过程中持续进行,有助于最小化用户看到重复内容的可能性。 - Full:删除在此次索引运行期间未由加载器返回的所有文档。 清理在所有文档被索引后运行。 这意味着用户在索引过程中可能会看到重复内容。 - None:不删除任何文档。 source_id_key:帮助识别文档原始来源的可选键。 cleanup_batch_size:清理文档时要使用的批量大小。 force_update:即使文档已存在于记录管理器中,也强制更新文档。如果您正在重新索引具有更新嵌入的文档,则此选项很有用。 返回: 包含有关添加、更新、删除或跳过多少文档的索引结果。 """ if cleanup not in {"incremental", "full", None}: raise ValueError( f"cleanup should be one of 'incremental', 'full' or None. " f"Got {cleanup}." ) if cleanup == "incremental" and source_id_key is None: raise ValueError("Source id key is required when cleanup mode is incremental.") # Check that the Vectorstore has required methods implemented methods = ["delete", "add_documents"] for method in methods: if not hasattr(vector_store, method): raise ValueError( f"Vectorstore {vector_store} does not have required method {method}" ) if type(vector_store).delete == VectorStore.delete: # Checking if the vectorstore has overridden the default delete method # implementation which just raises a NotImplementedError raise ValueError("Vectorstore has not implemented the delete method") if isinstance(docs_source, BaseLoader): try: doc_iterator = docs_source.lazy_load() except NotImplementedError: doc_iterator = iter(docs_source.load()) else: doc_iterator = iter(docs_source) source_id_assigner = _get_source_id_assigner(source_id_key) # Mark when the update started. index_start_dt = record_manager.get_time() num_added = 0 num_skipped = 0 num_updated = 0 num_deleted = 0 for doc_batch in _batch(batch_size, doc_iterator): hashed_docs = list( _deduplicate_in_order( [_HashedDocument.from_document(doc) for doc in doc_batch] ) ) source_ids: Sequence[Optional[str]] = [ source_id_assigner(doc) for doc in hashed_docs ] if cleanup == "incremental": # If the cleanup mode is incremental, source ids are required. for source_id, hashed_doc in zip(source_ids, hashed_docs): if source_id is None: raise ValueError( "Source ids are required when cleanup mode is incremental. " f"Document that starts with " f"content: {hashed_doc.page_content[:100]} was not assigned " f"as source id." ) # source ids cannot be None after for loop above. source_ids = cast(Sequence[str], source_ids) # type: ignore[assignment] exists_batch = record_manager.exists([doc.uid for doc in hashed_docs]) # Filter out documents that already exist in the record store. uids = [] docs_to_index = [] uids_to_refresh = [] seen_docs: Set[str] = set() for hashed_doc, doc_exists in zip(hashed_docs, exists_batch): if doc_exists: if force_update: seen_docs.add(hashed_doc.uid) else: uids_to_refresh.append(hashed_doc.uid) continue uids.append(hashed_doc.uid) docs_to_index.append(hashed_doc.to_document()) # Update refresh timestamp if uids_to_refresh: record_manager.update(uids_to_refresh, time_at_least=index_start_dt) num_skipped += len(uids_to_refresh) # Be pessimistic and assume that all vector store write will fail. # First write to vector store if docs_to_index: vector_store.add_documents(docs_to_index, ids=uids, batch_size=batch_size) num_added += len(docs_to_index) - len(seen_docs) num_updated += len(seen_docs) # And only then update the record store. # Update ALL records, even if they already exist since we want to refresh # their timestamp. record_manager.update( [doc.uid for doc in hashed_docs], group_ids=source_ids, time_at_least=index_start_dt, ) # If source IDs are provided, we can do the deletion incrementally! if cleanup == "incremental": # Get the uids of the documents that were not returned by the loader. # mypy isn't good enough to determine that source ids cannot be None # here due to a check that's happening above, so we check again. for source_id in source_ids: if source_id is None: raise AssertionError("Source ids cannot be None here.") _source_ids = cast(Sequence[str], source_ids) uids_to_delete = record_manager.list_keys( group_ids=_source_ids, before=index_start_dt ) if uids_to_delete: # Then delete from vector store. vector_store.delete(uids_to_delete) # First delete from record store. record_manager.delete_keys(uids_to_delete) num_deleted += len(uids_to_delete) if cleanup == "full": while uids_to_delete := record_manager.list_keys( before=index_start_dt, limit=cleanup_batch_size ): # First delete from record store. vector_store.delete(uids_to_delete) # Then delete from record manager. record_manager.delete_keys(uids_to_delete) num_deleted += len(uids_to_delete) return { "num_added": num_added, "num_updated": num_updated, "num_skipped": num_skipped, "num_deleted": num_deleted, }
# Define an asynchronous generator function async def _to_async_iterator(iterator: Iterable[T]) -> AsyncIterator[T]: """将可迭代对象转换为异步迭代器。""" for item in iterator: yield item
[docs]async def aindex( docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]], record_manager: RecordManager, vector_store: VectorStore, *, batch_size: int = 100, cleanup: Literal["incremental", "full", None] = None, source_id_key: Union[str, Callable[[Document], str], None] = None, cleanup_batch_size: int = 1_000, force_update: bool = False, ) -> IndexingResult: """将加载器中的数据索引到向量存储中。 索引功能使用管理器来跟踪向量存储中的文档。 这使我们能够跟踪哪些文档已更新,哪些文档已删除,哪些文档应该被跳过。 目前,文档是使用它们的哈希进行索引的,用户无法指定文档的uid。 重要提示: 如果auto_cleanup设置为True,则加载器应返回整个数据集,而不仅仅是数据集的子集。 否则,auto_cleanup将删除不应删除的文档。 参数: docs_source: 要索引的数据加载器或文档的可迭代对象。 record_manager: 时间戳集,用于跟踪哪些文档已更新。 vector_store: 用于将文档索引到的向量存储。 batch_size: 索引时要使用的批量大小。 cleanup: 如何处理文档的清理。 - Incremental: 清理所有未更新的文档和与索引期间看到的源ID相关联的文档。 清理在索引过程中持续进行,有助于最小化用户看到重复内容的可能性。 - Full: 删除加载器未返回的所有文档。 清理在所有文档被索引后运行。 这意味着用户在索引过程中可能会看到重复内容。 - None: 不删除任何文档。 source_id_key: 有助于识别文档原始来源的可选键。 cleanup_batch_size: 清理文档时要使用的批量大小。 force_update: 即使文档已在记录管理器中存在,也强制更新文档。如果您正在重新索引具有更新嵌入的文档,则此选项很有用。 返回: 包含有关添加、更新、删除或跳过多少文档的信息的索引结果。 """ if cleanup not in {"incremental", "full", None}: raise ValueError( f"cleanup should be one of 'incremental', 'full' or None. " f"Got {cleanup}." ) if cleanup == "incremental" and source_id_key is None: raise ValueError("Source id key is required when cleanup mode is incremental.") # Check that the Vectorstore has required methods implemented methods = ["adelete", "aadd_documents"] for method in methods: if not hasattr(vector_store, method): raise ValueError( f"Vectorstore {vector_store} does not have required method {method}" ) if type(vector_store).adelete == VectorStore.adelete: # Checking if the vectorstore has overridden the default delete method # implementation which just raises a NotImplementedError raise ValueError("Vectorstore has not implemented the delete method") async_doc_iterator: AsyncIterator[Document] if isinstance(docs_source, BaseLoader): try: async_doc_iterator = docs_source.alazy_load() except NotImplementedError: # Exception triggered when neither lazy_load nor alazy_load are implemented. # * The default implementation of alazy_load uses lazy_load. # * The default implementation of lazy_load raises NotImplementedError. # In such a case, we use the load method and convert it to an async # iterator. async_doc_iterator = _to_async_iterator(docs_source.load()) else: if hasattr(docs_source, "__aiter__"): async_doc_iterator = docs_source # type: ignore[assignment] else: async_doc_iterator = _to_async_iterator(docs_source) source_id_assigner = _get_source_id_assigner(source_id_key) # Mark when the update started. index_start_dt = await record_manager.aget_time() num_added = 0 num_skipped = 0 num_updated = 0 num_deleted = 0 async for doc_batch in _abatch(batch_size, async_doc_iterator): hashed_docs = list( _deduplicate_in_order( [_HashedDocument.from_document(doc) for doc in doc_batch] ) ) source_ids: Sequence[Optional[str]] = [ source_id_assigner(doc) for doc in hashed_docs ] if cleanup == "incremental": # If the cleanup mode is incremental, source ids are required. for source_id, hashed_doc in zip(source_ids, hashed_docs): if source_id is None: raise ValueError( "Source ids are required when cleanup mode is incremental. " f"Document that starts with " f"content: {hashed_doc.page_content[:100]} was not assigned " f"as source id." ) # source ids cannot be None after for loop above. source_ids = cast(Sequence[str], source_ids) exists_batch = await record_manager.aexists([doc.uid for doc in hashed_docs]) # Filter out documents that already exist in the record store. uids: list[str] = [] docs_to_index: list[Document] = [] uids_to_refresh = [] seen_docs: Set[str] = set() for hashed_doc, doc_exists in zip(hashed_docs, exists_batch): if doc_exists: if force_update: seen_docs.add(hashed_doc.uid) else: uids_to_refresh.append(hashed_doc.uid) continue uids.append(hashed_doc.uid) docs_to_index.append(hashed_doc.to_document()) if uids_to_refresh: # Must be updated to refresh timestamp. await record_manager.aupdate(uids_to_refresh, time_at_least=index_start_dt) num_skipped += len(uids_to_refresh) # Be pessimistic and assume that all vector store write will fail. # First write to vector store if docs_to_index: await vector_store.aadd_documents( docs_to_index, ids=uids, batch_size=batch_size ) num_added += len(docs_to_index) - len(seen_docs) num_updated += len(seen_docs) # And only then update the record store. # Update ALL records, even if they already exist since we want to refresh # their timestamp. await record_manager.aupdate( [doc.uid for doc in hashed_docs], group_ids=source_ids, time_at_least=index_start_dt, ) # If source IDs are provided, we can do the deletion incrementally! if cleanup == "incremental": # Get the uids of the documents that were not returned by the loader. # mypy isn't good enough to determine that source ids cannot be None # here due to a check that's happening above, so we check again. for source_id in source_ids: if source_id is None: raise AssertionError("Source ids cannot be None here.") _source_ids = cast(Sequence[str], source_ids) uids_to_delete = await record_manager.alist_keys( group_ids=_source_ids, before=index_start_dt ) if uids_to_delete: # Then delete from vector store. await vector_store.adelete(uids_to_delete) # First delete from record store. await record_manager.adelete_keys(uids_to_delete) num_deleted += len(uids_to_delete) if cleanup == "full": while uids_to_delete := await record_manager.alist_keys( before=index_start_dt, limit=cleanup_batch_size ): # First delete from record store. await vector_store.adelete(uids_to_delete) # Then delete from record manager. await record_manager.adelete_keys(uids_to_delete) num_deleted += len(uids_to_delete) return { "num_added": num_added, "num_updated": num_updated, "num_skipped": num_skipped, "num_deleted": num_deleted, }