Source code for langchain_community.document_loaders.cassandra

from __future__ import annotations

from typing import (
    TYPE_CHECKING,
    Any,
    AsyncIterator,
    Callable,
    Iterator,
    Optional,
    Sequence,
    Union,
)

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader
from langchain_community.utilities.cassandra import wrapped_response_future

_NOT_SET = object()

if TYPE_CHECKING:
    from cassandra.cluster import Session
    from cassandra.pool import Host
    from cassandra.query import Statement


[docs]class CassandraLoader(BaseLoader):
[docs] def __init__( self, table: Optional[str] = None, session: Optional[Session] = None, keyspace: Optional[str] = None, query: Union[str, Statement, None] = None, page_content_mapper: Callable[[Any], str] = str, metadata_mapper: Callable[[Any], dict] = lambda _: {}, *, query_parameters: Union[dict, Sequence, None] = None, query_timeout: Optional[float] = _NOT_SET, # type: ignore[assignment] query_trace: bool = False, query_custom_payload: Optional[dict] = None, query_execution_profile: Any = _NOT_SET, query_paging_state: Any = None, query_host: Optional[Host] = None, query_execute_as: Optional[str] = None, ) -> None: """为Apache Cassandra加载文档。 参数: table: 从中加载数据的表。 (不要与查询参数一起使用) session: Cassandra驱动程序会话。 如果未提供,则将使用cassio解析的会话。 keyspace: 表的键空间。 如果未提供,则将使用cassio解析的键空间。 query: 用于加载数据的查询。 (不要与表参数一起使用) page_content_mapper: 将行转换为字符串页面内容的函数。 默认为行的str表示。 metadata_mapper: 将行转换为文档元数据的函数。 query_parameters: 调用session.execute时使用的查询参数。 query_timeout: 调用session.execute时使用的查询超时。 query_trace: 调用session.execute时是否使用跟踪。 query_custom_payload: 调用session.execute时使用的查询自定义负载。 query_execution_profile: 调用session.execute时使用的查询执行配置文件。 query_host: 调用session.execute时使用的查询主机。 query_execute_as: 调用session.execute时使用的查询执行者。 """ if query and table: raise ValueError("Cannot specify both query and table.") if not query and not table: raise ValueError("Must specify query or table.") if not session or (table and not keyspace): try: from cassio.config import check_resolve_keyspace, check_resolve_session except (ImportError, ModuleNotFoundError): raise ImportError( "Could not import a recent cassio package." "Please install it with `pip install --upgrade cassio`." ) if table: _keyspace = keyspace or check_resolve_keyspace(keyspace) self.query = f"SELECT * FROM {_keyspace}.{table};" self.metadata = {"table": table, "keyspace": _keyspace} else: self.query = query # type: ignore[assignment] self.metadata = {} self.session = session or check_resolve_session(session) self.page_content_mapper = page_content_mapper self.metadata_mapper = metadata_mapper self.query_kwargs = { "parameters": query_parameters, "trace": query_trace, "custom_payload": query_custom_payload, "paging_state": query_paging_state, "host": query_host, "execute_as": query_execute_as, } if query_timeout is not _NOT_SET: self.query_kwargs["timeout"] = query_timeout if query_execution_profile is not _NOT_SET: self.query_kwargs["execution_profile"] = query_execution_profile
[docs] def lazy_load(self) -> Iterator[Document]: for row in self.session.execute(self.query, **self.query_kwargs): metadata = self.metadata.copy() metadata.update(self.metadata_mapper(row)) yield Document( page_content=self.page_content_mapper(row), metadata=metadata )
[docs] async def alazy_load(self) -> AsyncIterator[Document]: for row in await wrapped_response_future( self.session.execute_async, self.query, **self.query_kwargs, ): metadata = self.metadata.copy() metadata.update(self.metadata_mapper(row)) yield Document( page_content=self.page_content_mapper(row), metadata=metadata )