from __future__ import annotations
import json
import logging
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Callable,
Dict,
Iterator,
List,
Optional,
)
from langchain_core._api.deprecation import deprecated
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
from langchain_community.utilities.astradb import _AstraDBEnvironment
if TYPE_CHECKING:
from astrapy.db import AstraDB, AsyncAstraDB
logger = logging.getLogger(__name__)
[docs]@deprecated(
since="0.0.29",
removal="0.3.0",
alternative_import="langchain_astradb.AstraDBLoader",
)
class AstraDBLoader(BaseLoader):
[docs] def __init__(
self,
collection_name: str,
*,
token: Optional[str] = None,
api_endpoint: Optional[str] = None,
astra_db_client: Optional[AstraDB] = None,
async_astra_db_client: Optional[AsyncAstraDB] = None,
namespace: Optional[str] = None,
filter_criteria: Optional[Dict[str, Any]] = None,
projection: Optional[Dict[str, Any]] = None,
find_options: Optional[Dict[str, Any]] = None,
nb_prefetched: int = 1000,
extraction_function: Callable[[Dict], str] = json.dumps,
) -> None:
"""加载DataStax Astra DB文档。
参数:
collection_name: 要使用的Astra DB集合的名称。
token: Astra DB使用的API令牌。
api_endpoint: API端点的完整URL,例如`https://<DB-ID>-us-east1.apps.astra.datastax.com`。
astra_db_client: *token+api_endpoint的替代方法*,可以传递一个已创建的'astrapy.db.AstraDB'实例。
async_astra_db_client: *token+api_endpoint的替代方法*,可以传递一个已创建的'astrapy.db.AsyncAstraDB'实例。
namespace: 集合所在的命名空间(又名键空间)。默认为数据库的“默认命名空间”。
filter_criteria: 用于过滤文档的条件。
projection: 指定要返回的字段。
find_options: 查询的其他选项。
nb_prefetched: 预取的文档的最大数量。默认为1000。
extraction_function: 应用于集合文档以创建LangChain文档的`page_content`的函数。默认为`json.dumps`。
"""
astra_env = _AstraDBEnvironment(
token=token,
api_endpoint=api_endpoint,
astra_db_client=astra_db_client,
async_astra_db_client=async_astra_db_client,
namespace=namespace,
)
self.astra_env = astra_env
self.collection = astra_env.astra_db.collection(collection_name)
self.collection_name = collection_name
self.filter = filter_criteria
self.projection = projection
self.find_options = find_options or {}
self.nb_prefetched = nb_prefetched
self.extraction_function = extraction_function
[docs] def lazy_load(self) -> Iterator[Document]:
for doc in self.collection.paginated_find(
filter=self.filter,
options=self.find_options,
projection=self.projection,
sort=None,
prefetched=self.nb_prefetched,
):
yield Document(
page_content=self.extraction_function(doc),
metadata={
"namespace": self.collection.astra_db.namespace,
"api_endpoint": self.collection.astra_db.base_url,
"collection": self.collection_name,
},
)
[docs] async def aload(self) -> List[Document]:
"""将数据加载到Document对象中。"""
return [doc async for doc in self.alazy_load()]
[docs] async def alazy_load(self) -> AsyncIterator[Document]:
async_collection = await self.astra_env.async_astra_db.collection(
self.collection_name
)
async for doc in async_collection.paginated_find(
filter=self.filter,
options=self.find_options,
projection=self.projection,
sort=None,
prefetched=self.nb_prefetched,
):
yield Document(
page_content=self.extraction_function(doc),
metadata={
"namespace": async_collection.astra_db.namespace,
"api_endpoint": async_collection.astra_db.base_url,
"collection": self.collection_name,
},
)