Source code for langchain_community.document_loaders.kinetica_loader

from __future__ import annotations

from typing import Any, Dict, Iterator, List, Optional, Tuple

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader


[docs]class KineticaLoader(BaseLoader): """从`Kinetica` API加载。 每个文档代表结果的一行。`page_content_columns`被写入文档的`page_content`中。`metadata_columns`被写入文档的`metadata`中。默认情况下,所有列都被写入`page_content`,而没有列被写入`metadata`。"""
[docs] def __init__( self, query: str, host: str, username: str, password: str, parameters: Optional[Dict[str, Any]] = None, page_content_columns: Optional[List[str]] = None, metadata_columns: Optional[List[str]] = None, ): """初始化Kinetica文档加载器。 参数: query: 在Kinetica中运行的查询。 parameters: 可选。传递给查询的参数。 page_content_columns: 可选。写入Document `page_content`的列。 metadata_columns: 可选。写入Document `metadata`的列。 """ self.query = query self.host = host self.username = username self.password = password self.parameters = parameters self.page_content_columns = page_content_columns self.metadata_columns = metadata_columns if metadata_columns is not None else []
def _execute_query(self) -> List[Dict[str, Any]]: try: from gpudb import GPUdb, GPUdbSqlIterator except ImportError: raise ImportError( "Could not import Kinetica python API. " "Please install it with `pip install gpudb==7.2.0.1`." ) try: options = GPUdb.Options() options.username = self.username options.password = self.password conn = GPUdb(host=self.host, options=options) with GPUdbSqlIterator(conn, self.query) as records: column_names = records.type_map.keys() query_result = [dict(zip(column_names, record)) for record in records] except Exception as e: print(f"An error occurred: {e}") # noqa: T201 query_result = [] return query_result def _get_columns( self, query_result: List[Dict[str, Any]] ) -> Tuple[List[str], List[str]]: page_content_columns = ( self.page_content_columns if self.page_content_columns else [] ) metadata_columns = self.metadata_columns if self.metadata_columns else [] if page_content_columns is None and query_result: page_content_columns = list(query_result[0].keys()) if metadata_columns is None: metadata_columns = [] return page_content_columns or [], metadata_columns
[docs] def lazy_load(self) -> Iterator[Document]: query_result = self._execute_query() if isinstance(query_result, Exception): print(f"An error occurred during the query: {query_result}") # noqa: T201 return [] # type: ignore[return-value] page_content_columns, metadata_columns = self._get_columns(query_result) if "*" in page_content_columns: page_content_columns = list(query_result[0].keys()) for row in query_result: page_content = "\n".join( f"{k}: {v}" for k, v in row.items() if k in page_content_columns ) metadata = {k: v for k, v in row.items() if k in metadata_columns} doc = Document(page_content=page_content, metadata=metadata) yield doc
[docs] def load(self) -> List[Document]: """将数据加载到文档对象中。""" return list(self.lazy_load())