Source code for langchain_community.document_loaders.astradb

from __future__ import annotations

import json
import logging
from typing import (
    TYPE_CHECKING,
    Any,
    AsyncIterator,
    Callable,
    Dict,
    Iterator,
    List,
    Optional,
)

from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader
from langchain_community.utilities.astradb import _AstraDBEnvironment

if TYPE_CHECKING:
    from astrapy.db import AstraDB, AsyncAstraDB

logger = logging.getLogger(__name__)


[docs]@deprecated( since="0.0.29", removal="0.3.0", alternative_import="langchain_astradb.AstraDBLoader", ) class AstraDBLoader(BaseLoader):
[docs] def __init__( self, collection_name: str, *, token: Optional[str] = None, api_endpoint: Optional[str] = None, astra_db_client: Optional[AstraDB] = None, async_astra_db_client: Optional[AsyncAstraDB] = None, namespace: Optional[str] = None, filter_criteria: Optional[Dict[str, Any]] = None, projection: Optional[Dict[str, Any]] = None, find_options: Optional[Dict[str, Any]] = None, nb_prefetched: int = 1000, extraction_function: Callable[[Dict], str] = json.dumps, ) -> None: """加载DataStax Astra DB文档。 参数: collection_name: 要使用的Astra DB集合的名称。 token: Astra DB使用的API令牌。 api_endpoint: API端点的完整URL,例如`https://<DB-ID>-us-east1.apps.astra.datastax.com`。 astra_db_client: *token+api_endpoint的替代方法*,可以传递一个已创建的'astrapy.db.AstraDB'实例。 async_astra_db_client: *token+api_endpoint的替代方法*,可以传递一个已创建的'astrapy.db.AsyncAstraDB'实例。 namespace: 集合所在的命名空间(又名键空间)。默认为数据库的“默认命名空间”。 filter_criteria: 用于过滤文档的条件。 projection: 指定要返回的字段。 find_options: 查询的其他选项。 nb_prefetched: 预取的文档的最大数量。默认为1000。 extraction_function: 应用于集合文档以创建LangChain文档的`page_content`的函数。默认为`json.dumps`。 """ astra_env = _AstraDBEnvironment( token=token, api_endpoint=api_endpoint, astra_db_client=astra_db_client, async_astra_db_client=async_astra_db_client, namespace=namespace, ) self.astra_env = astra_env self.collection = astra_env.astra_db.collection(collection_name) self.collection_name = collection_name self.filter = filter_criteria self.projection = projection self.find_options = find_options or {} self.nb_prefetched = nb_prefetched self.extraction_function = extraction_function
[docs] def lazy_load(self) -> Iterator[Document]: for doc in self.collection.paginated_find( filter=self.filter, options=self.find_options, projection=self.projection, sort=None, prefetched=self.nb_prefetched, ): yield Document( page_content=self.extraction_function(doc), metadata={ "namespace": self.collection.astra_db.namespace, "api_endpoint": self.collection.astra_db.base_url, "collection": self.collection_name, }, )
[docs] async def aload(self) -> List[Document]: """将数据加载到Document对象中。""" return [doc async for doc in self.alazy_load()]
[docs] async def alazy_load(self) -> AsyncIterator[Document]: async_collection = await self.astra_env.async_astra_db.collection( self.collection_name ) async for doc in async_collection.paginated_find( filter=self.filter, options=self.find_options, projection=self.projection, sort=None, prefetched=self.nb_prefetched, ): yield Document( page_content=self.extraction_function(doc), metadata={ "namespace": async_collection.astra_db.namespace, "api_endpoint": async_collection.astra_db.base_url, "collection": self.collection_name, }, )