"""模块包含将文档索引到向量存储中的逻辑。"""
from __future__ import annotations
import hashlib
import json
import uuid
from itertools import islice
from typing import (
Any,
AsyncIterable,
AsyncIterator,
Callable,
Dict,
Iterable,
Iterator,
List,
Literal,
Optional,
Sequence,
Set,
TypedDict,
TypeVar,
Union,
cast,
)
from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_core.indexing.base import RecordManager
from langchain_core.pydantic_v1 import root_validator
from langchain_core.vectorstores import VectorStore
# Magic UUID to use as a namespace for hashing.
# Used to try and generate a unique UUID for each document
# from hashing the document content and metadata.
NAMESPACE_UUID = uuid.UUID(int=1984)
T = TypeVar("T")
def _hash_string_to_uuid(input_string: str) -> uuid.UUID:
"""对字符串进行哈希处理,并返回相应的UUID。"""
hash_value = hashlib.sha1(input_string.encode("utf-8")).hexdigest()
return uuid.uuid5(NAMESPACE_UUID, hash_value)
def _hash_nested_dict_to_uuid(data: dict[Any, Any]) -> uuid.UUID:
"""对嵌套字典进行哈希处理,并返回相应的UUID。"""
serialized_data = json.dumps(data, sort_keys=True)
hash_value = hashlib.sha1(serialized_data.encode("utf-8")).hexdigest()
return uuid.uuid5(NAMESPACE_UUID, hash_value)
class _HashedDocument(Document):
"""具有唯一ID的哈希文档。"""
uid: str
hash_: str
"""文档的哈希值,包括内容和元数据。"""
content_hash: str
"""文档内容的哈希值。"""
metadata_hash: str
"""文档元数据的哈希值。"""
@classmethod
def is_lc_serializable(cls) -> bool:
return False
@root_validator(pre=True)
def calculate_hashes(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""根验证器,用于计算内容和元数据哈希值。"""
content = values.get("page_content", "")
metadata = values.get("metadata", {})
forbidden_keys = ("hash_", "content_hash", "metadata_hash")
for key in forbidden_keys:
if key in metadata:
raise ValueError(
f"Metadata cannot contain key {key} as it "
f"is reserved for internal use."
)
content_hash = str(_hash_string_to_uuid(content))
try:
metadata_hash = str(_hash_nested_dict_to_uuid(metadata))
except Exception as e:
raise ValueError(
f"Failed to hash metadata: {e}. "
f"Please use a dict that can be serialized using json."
)
values["content_hash"] = content_hash
values["metadata_hash"] = metadata_hash
values["hash_"] = str(_hash_string_to_uuid(content_hash + metadata_hash))
_uid = values.get("uid", None)
if _uid is None:
values["uid"] = values["hash_"]
return values
def to_document(self) -> Document:
"""返回一个文档对象。"""
return Document(
page_content=self.page_content,
metadata=self.metadata,
)
@classmethod
def from_document(
cls, document: Document, *, uid: Optional[str] = None
) -> _HashedDocument:
"""从一个文档创建一个哈希文档。"""
return cls( # type: ignore[call-arg]
uid=uid, # type: ignore[arg-type]
page_content=document.page_content,
metadata=document.metadata,
)
def _batch(size: int, iterable: Iterable[T]) -> Iterator[List[T]]:
"""实用的批处理函数。"""
it = iter(iterable)
while True:
chunk = list(islice(it, size))
if not chunk:
return
yield chunk
async def _abatch(size: int, iterable: AsyncIterable[T]) -> AsyncIterator[List[T]]:
"""实用的批处理函数。"""
batch: List[T] = []
async for element in iterable:
if len(batch) < size:
batch.append(element)
if len(batch) >= size:
yield batch
batch = []
if batch:
yield batch
def _get_source_id_assigner(
source_id_key: Union[str, Callable[[Document], str], None],
) -> Callable[[Document], Union[str, None]]:
"""从文档中获取源ID。"""
if source_id_key is None:
return lambda doc: None
elif isinstance(source_id_key, str):
return lambda doc: doc.metadata[source_id_key]
elif callable(source_id_key):
return source_id_key
else:
raise ValueError(
f"source_id_key should be either None, a string or a callable. "
f"Got {source_id_key} of type {type(source_id_key)}."
)
def _deduplicate_in_order(
hashed_documents: Iterable[_HashedDocument],
) -> Iterator[_HashedDocument]:
"""对哈希文档列表进行去重,同时保持顺序。"""
seen: Set[str] = set()
for hashed_doc in hashed_documents:
if hashed_doc.hash_ not in seen:
seen.add(hashed_doc.hash_)
yield hashed_doc
# PUBLIC API
[docs]class IndexingResult(TypedDict):
"""返回对索引操作结果的详细分析。"""
num_added: int
"""新增文档的数量。"""
num_updated: int
"""因为它们不是最新的,所以更新的文档数量。"""
num_deleted: int
"""删除的文档数量。"""
num_skipped: int
"""因为它们已经是最新的,所以跳过的文档数量。"""
[docs]def index(
docs_source: Union[BaseLoader, Iterable[Document]],
record_manager: RecordManager,
vector_store: VectorStore,
*,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
source_id_key: Union[str, Callable[[Document], str], None] = None,
cleanup_batch_size: int = 1_000,
force_update: bool = False,
) -> IndexingResult:
"""将加载器中的数据索引到向量存储中。
索引功能使用管理器来跟踪哪些文档在向量存储中。
这使我们能够跟踪哪些文档已更新,哪些文档已删除,哪些文档应该被跳过。
目前,文档是使用它们的哈希进行索引的,用户无法指定文档的uid。
重要提示:
如果auto_cleanup设置为True,则加载器应返回整个数据集,而不仅仅是数据集的子集。
否则,auto_cleanup将删除不应删除的文档。
参数:
docs_source:要索引的数据加载器或文档的可迭代对象。
record_manager:时间戳集,用于跟踪哪些文档已更新。
vector_store:要将文档索引到的向量存储。
batch_size:索引时要使用的批量大小。
cleanup:如何处理文档的清理。
- Incremental:清理所有未更新的文档和与索引期间看到的源ID相关联的文档。
清理在索引过程中持续进行,有助于最小化用户看到重复内容的可能性。
- Full:删除在此次索引运行期间未由加载器返回的所有文档。
清理在所有文档被索引后运行。
这意味着用户在索引过程中可能会看到重复内容。
- None:不删除任何文档。
source_id_key:帮助识别文档原始来源的可选键。
cleanup_batch_size:清理文档时要使用的批量大小。
force_update:即使文档已存在于记录管理器中,也强制更新文档。如果您正在重新索引具有更新嵌入的文档,则此选项很有用。
返回:
包含有关添加、更新、删除或跳过多少文档的索引结果。
"""
if cleanup not in {"incremental", "full", None}:
raise ValueError(
f"cleanup should be one of 'incremental', 'full' or None. "
f"Got {cleanup}."
)
if cleanup == "incremental" and source_id_key is None:
raise ValueError("Source id key is required when cleanup mode is incremental.")
# Check that the Vectorstore has required methods implemented
methods = ["delete", "add_documents"]
for method in methods:
if not hasattr(vector_store, method):
raise ValueError(
f"Vectorstore {vector_store} does not have required method {method}"
)
if type(vector_store).delete == VectorStore.delete:
# Checking if the vectorstore has overridden the default delete method
# implementation which just raises a NotImplementedError
raise ValueError("Vectorstore has not implemented the delete method")
if isinstance(docs_source, BaseLoader):
try:
doc_iterator = docs_source.lazy_load()
except NotImplementedError:
doc_iterator = iter(docs_source.load())
else:
doc_iterator = iter(docs_source)
source_id_assigner = _get_source_id_assigner(source_id_key)
# Mark when the update started.
index_start_dt = record_manager.get_time()
num_added = 0
num_skipped = 0
num_updated = 0
num_deleted = 0
for doc_batch in _batch(batch_size, doc_iterator):
hashed_docs = list(
_deduplicate_in_order(
[_HashedDocument.from_document(doc) for doc in doc_batch]
)
)
source_ids: Sequence[Optional[str]] = [
source_id_assigner(doc) for doc in hashed_docs
]
if cleanup == "incremental":
# If the cleanup mode is incremental, source ids are required.
for source_id, hashed_doc in zip(source_ids, hashed_docs):
if source_id is None:
raise ValueError(
"Source ids are required when cleanup mode is incremental. "
f"Document that starts with "
f"content: {hashed_doc.page_content[:100]} was not assigned "
f"as source id."
)
# source ids cannot be None after for loop above.
source_ids = cast(Sequence[str], source_ids) # type: ignore[assignment]
exists_batch = record_manager.exists([doc.uid for doc in hashed_docs])
# Filter out documents that already exist in the record store.
uids = []
docs_to_index = []
uids_to_refresh = []
seen_docs: Set[str] = set()
for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
if doc_exists:
if force_update:
seen_docs.add(hashed_doc.uid)
else:
uids_to_refresh.append(hashed_doc.uid)
continue
uids.append(hashed_doc.uid)
docs_to_index.append(hashed_doc.to_document())
# Update refresh timestamp
if uids_to_refresh:
record_manager.update(uids_to_refresh, time_at_least=index_start_dt)
num_skipped += len(uids_to_refresh)
# Be pessimistic and assume that all vector store write will fail.
# First write to vector store
if docs_to_index:
vector_store.add_documents(docs_to_index, ids=uids, batch_size=batch_size)
num_added += len(docs_to_index) - len(seen_docs)
num_updated += len(seen_docs)
# And only then update the record store.
# Update ALL records, even if they already exist since we want to refresh
# their timestamp.
record_manager.update(
[doc.uid for doc in hashed_docs],
group_ids=source_ids,
time_at_least=index_start_dt,
)
# If source IDs are provided, we can do the deletion incrementally!
if cleanup == "incremental":
# Get the uids of the documents that were not returned by the loader.
# mypy isn't good enough to determine that source ids cannot be None
# here due to a check that's happening above, so we check again.
for source_id in source_ids:
if source_id is None:
raise AssertionError("Source ids cannot be None here.")
_source_ids = cast(Sequence[str], source_ids)
uids_to_delete = record_manager.list_keys(
group_ids=_source_ids, before=index_start_dt
)
if uids_to_delete:
# Then delete from vector store.
vector_store.delete(uids_to_delete)
# First delete from record store.
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
if cleanup == "full":
while uids_to_delete := record_manager.list_keys(
before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
vector_store.delete(uids_to_delete)
# Then delete from record manager.
record_manager.delete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
return {
"num_added": num_added,
"num_updated": num_updated,
"num_skipped": num_skipped,
"num_deleted": num_deleted,
}
# Define an asynchronous generator function
async def _to_async_iterator(iterator: Iterable[T]) -> AsyncIterator[T]:
"""将可迭代对象转换为异步迭代器。"""
for item in iterator:
yield item
[docs]async def aindex(
docs_source: Union[BaseLoader, Iterable[Document], AsyncIterator[Document]],
record_manager: RecordManager,
vector_store: VectorStore,
*,
batch_size: int = 100,
cleanup: Literal["incremental", "full", None] = None,
source_id_key: Union[str, Callable[[Document], str], None] = None,
cleanup_batch_size: int = 1_000,
force_update: bool = False,
) -> IndexingResult:
"""将加载器中的数据索引到向量存储中。
索引功能使用管理器来跟踪向量存储中的文档。
这使我们能够跟踪哪些文档已更新,哪些文档已删除,哪些文档应该被跳过。
目前,文档是使用它们的哈希进行索引的,用户无法指定文档的uid。
重要提示:
如果auto_cleanup设置为True,则加载器应返回整个数据集,而不仅仅是数据集的子集。
否则,auto_cleanup将删除不应删除的文档。
参数:
docs_source: 要索引的数据加载器或文档的可迭代对象。
record_manager: 时间戳集,用于跟踪哪些文档已更新。
vector_store: 用于将文档索引到的向量存储。
batch_size: 索引时要使用的批量大小。
cleanup: 如何处理文档的清理。
- Incremental: 清理所有未更新的文档和与索引期间看到的源ID相关联的文档。
清理在索引过程中持续进行,有助于最小化用户看到重复内容的可能性。
- Full: 删除加载器未返回的所有文档。
清理在所有文档被索引后运行。
这意味着用户在索引过程中可能会看到重复内容。
- None: 不删除任何文档。
source_id_key: 有助于识别文档原始来源的可选键。
cleanup_batch_size: 清理文档时要使用的批量大小。
force_update: 即使文档已在记录管理器中存在,也强制更新文档。如果您正在重新索引具有更新嵌入的文档,则此选项很有用。
返回:
包含有关添加、更新、删除或跳过多少文档的信息的索引结果。
"""
if cleanup not in {"incremental", "full", None}:
raise ValueError(
f"cleanup should be one of 'incremental', 'full' or None. "
f"Got {cleanup}."
)
if cleanup == "incremental" and source_id_key is None:
raise ValueError("Source id key is required when cleanup mode is incremental.")
# Check that the Vectorstore has required methods implemented
methods = ["adelete", "aadd_documents"]
for method in methods:
if not hasattr(vector_store, method):
raise ValueError(
f"Vectorstore {vector_store} does not have required method {method}"
)
if type(vector_store).adelete == VectorStore.adelete:
# Checking if the vectorstore has overridden the default delete method
# implementation which just raises a NotImplementedError
raise ValueError("Vectorstore has not implemented the delete method")
async_doc_iterator: AsyncIterator[Document]
if isinstance(docs_source, BaseLoader):
try:
async_doc_iterator = docs_source.alazy_load()
except NotImplementedError:
# Exception triggered when neither lazy_load nor alazy_load are implemented.
# * The default implementation of alazy_load uses lazy_load.
# * The default implementation of lazy_load raises NotImplementedError.
# In such a case, we use the load method and convert it to an async
# iterator.
async_doc_iterator = _to_async_iterator(docs_source.load())
else:
if hasattr(docs_source, "__aiter__"):
async_doc_iterator = docs_source # type: ignore[assignment]
else:
async_doc_iterator = _to_async_iterator(docs_source)
source_id_assigner = _get_source_id_assigner(source_id_key)
# Mark when the update started.
index_start_dt = await record_manager.aget_time()
num_added = 0
num_skipped = 0
num_updated = 0
num_deleted = 0
async for doc_batch in _abatch(batch_size, async_doc_iterator):
hashed_docs = list(
_deduplicate_in_order(
[_HashedDocument.from_document(doc) for doc in doc_batch]
)
)
source_ids: Sequence[Optional[str]] = [
source_id_assigner(doc) for doc in hashed_docs
]
if cleanup == "incremental":
# If the cleanup mode is incremental, source ids are required.
for source_id, hashed_doc in zip(source_ids, hashed_docs):
if source_id is None:
raise ValueError(
"Source ids are required when cleanup mode is incremental. "
f"Document that starts with "
f"content: {hashed_doc.page_content[:100]} was not assigned "
f"as source id."
)
# source ids cannot be None after for loop above.
source_ids = cast(Sequence[str], source_ids)
exists_batch = await record_manager.aexists([doc.uid for doc in hashed_docs])
# Filter out documents that already exist in the record store.
uids: list[str] = []
docs_to_index: list[Document] = []
uids_to_refresh = []
seen_docs: Set[str] = set()
for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
if doc_exists:
if force_update:
seen_docs.add(hashed_doc.uid)
else:
uids_to_refresh.append(hashed_doc.uid)
continue
uids.append(hashed_doc.uid)
docs_to_index.append(hashed_doc.to_document())
if uids_to_refresh:
# Must be updated to refresh timestamp.
await record_manager.aupdate(uids_to_refresh, time_at_least=index_start_dt)
num_skipped += len(uids_to_refresh)
# Be pessimistic and assume that all vector store write will fail.
# First write to vector store
if docs_to_index:
await vector_store.aadd_documents(
docs_to_index, ids=uids, batch_size=batch_size
)
num_added += len(docs_to_index) - len(seen_docs)
num_updated += len(seen_docs)
# And only then update the record store.
# Update ALL records, even if they already exist since we want to refresh
# their timestamp.
await record_manager.aupdate(
[doc.uid for doc in hashed_docs],
group_ids=source_ids,
time_at_least=index_start_dt,
)
# If source IDs are provided, we can do the deletion incrementally!
if cleanup == "incremental":
# Get the uids of the documents that were not returned by the loader.
# mypy isn't good enough to determine that source ids cannot be None
# here due to a check that's happening above, so we check again.
for source_id in source_ids:
if source_id is None:
raise AssertionError("Source ids cannot be None here.")
_source_ids = cast(Sequence[str], source_ids)
uids_to_delete = await record_manager.alist_keys(
group_ids=_source_ids, before=index_start_dt
)
if uids_to_delete:
# Then delete from vector store.
await vector_store.adelete(uids_to_delete)
# First delete from record store.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
if cleanup == "full":
while uids_to_delete := await record_manager.alist_keys(
before=index_start_dt, limit=cleanup_batch_size
):
# First delete from record store.
await vector_store.adelete(uids_to_delete)
# Then delete from record manager.
await record_manager.adelete_keys(uids_to_delete)
num_deleted += len(uids_to_delete)
return {
"num_added": num_added,
"num_updated": num_updated,
"num_skipped": num_skipped,
"num_deleted": num_deleted,
}