Source code for langchain_community.document_loaders.rocksetdb

from typing import Any, Callable, Iterator, List, Optional, Tuple

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader


[docs]def default_joiner(docs: List[Tuple[str, Any]]) -> str: """内容列的默认连接器。""" return "\n".join([doc[1] for doc in docs])
[docs]class ColumnNotFoundError(Exception): """列未找到错误。""" def __init__(self, missing_key: str, query: str): super().__init__(f'Column "{missing_key}" not selected in query:\n{query}')
[docs]class RocksetLoader(BaseLoader): """从`Rockset`数据库加载。 要使用,您应该已经安装了`rockset` python包。 示例: .. code-block:: python # 这段代码将从"langchain_demo"集合中加载3条记录作为文档,使用`text`列作为内容 from langchain_community.document_loaders import RocksetLoader from rockset import RocksetClient, Regions, models loader = RocksetLoader( RocksetClient(Regions.usw2a1, "<api key>"), models.QueryRequestSql( query="select * from langchain_demo limit 3" ), ["text"] ) ) """
[docs] def __init__( self, client: Any, query: Any, content_keys: List[str], metadata_keys: Optional[List[str]] = None, content_columns_joiner: Callable[[List[Tuple[str, Any]]], str] = default_joiner, ): """初始化Rockset客户端。 参数: client:Rockset客户端对象。 query:Rockset查询对象。 content_keys:要写入文档的`page_content`的集合列。 metadata_keys:要写入文档的`metadata`的集合列。默认情况下,这是文档中的所有键。 content_columns_joiner:将content_keys及其值连接成字符串的方法。这是一个接受List[Tuple[str, Any]]参数的方法,表示(column name, column value)元组列表。默认情况下,这是一个将每个列值与新行连接的方法。只有在存在多个content_keys时,此方法才相关。 """ try: from rockset import QueryPaginator, RocksetClient from rockset.models import QueryRequestSql except ImportError: raise ImportError( "Could not import rockset client python package. " "Please install it with `pip install rockset`." ) if not isinstance(client, RocksetClient): raise ValueError( f"client should be an instance of rockset.RocksetClient, " f"got {type(client)}" ) if not isinstance(query, QueryRequestSql): raise ValueError( f"query should be an instance of rockset.model.QueryRequestSql, " f"got {type(query)}" ) self.client = client self.query = query self.content_keys = content_keys self.content_columns_joiner = content_columns_joiner self.metadata_keys = metadata_keys self.paginator = QueryPaginator self.request_model = QueryRequestSql try: self.client.set_application("langchain") except AttributeError: # ignore pass
[docs] def lazy_load(self) -> Iterator[Document]: query_results = self.client.Queries.query( sql=self.query ).results # execute the SQL query for doc in query_results: # for each doc in the response try: yield Document( page_content=self.content_columns_joiner( [(col, doc[col]) for col in self.content_keys] ), metadata={col: doc[col] for col in self.metadata_keys} if self.metadata_keys is not None else doc, ) # try to yield the Document except ( KeyError ) as e: # either content_columns or metadata_columns is invalid raise ColumnNotFoundError( e.args[0], self.query ) # raise that the column isn't in the db schema