Source code for langchain_community.document_loaders.rocksetdb
from typing import Any, Callable, Iterator, List, Optional, Tuple
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
[docs]def default_joiner(docs: List[Tuple[str, Any]]) -> str:
"""内容列的默认连接器。"""
return "\n".join([doc[1] for doc in docs])
[docs]class ColumnNotFoundError(Exception):
"""列未找到错误。"""
def __init__(self, missing_key: str, query: str):
super().__init__(f'Column "{missing_key}" not selected in query:\n{query}')
[docs]class RocksetLoader(BaseLoader):
"""从`Rockset`数据库加载。
要使用,您应该已经安装了`rockset` python包。
示例:
.. code-block:: python
# 这段代码将从"langchain_demo"集合中加载3条记录作为文档,使用`text`列作为内容
from langchain_community.document_loaders import RocksetLoader
from rockset import RocksetClient, Regions, models
loader = RocksetLoader(
RocksetClient(Regions.usw2a1, "<api key>"),
models.QueryRequestSql(
query="select * from langchain_demo limit 3"
),
["text"]
)
)
"""
[docs] def __init__(
self,
client: Any,
query: Any,
content_keys: List[str],
metadata_keys: Optional[List[str]] = None,
content_columns_joiner: Callable[[List[Tuple[str, Any]]], str] = default_joiner,
):
"""初始化Rockset客户端。
参数:
client:Rockset客户端对象。
query:Rockset查询对象。
content_keys:要写入文档的`page_content`的集合列。
metadata_keys:要写入文档的`metadata`的集合列。默认情况下,这是文档中的所有键。
content_columns_joiner:将content_keys及其值连接成字符串的方法。这是一个接受List[Tuple[str, Any]]参数的方法,表示(column name, column value)元组列表。默认情况下,这是一个将每个列值与新行连接的方法。只有在存在多个content_keys时,此方法才相关。
"""
try:
from rockset import QueryPaginator, RocksetClient
from rockset.models import QueryRequestSql
except ImportError:
raise ImportError(
"Could not import rockset client python package. "
"Please install it with `pip install rockset`."
)
if not isinstance(client, RocksetClient):
raise ValueError(
f"client should be an instance of rockset.RocksetClient, "
f"got {type(client)}"
)
if not isinstance(query, QueryRequestSql):
raise ValueError(
f"query should be an instance of rockset.model.QueryRequestSql, "
f"got {type(query)}"
)
self.client = client
self.query = query
self.content_keys = content_keys
self.content_columns_joiner = content_columns_joiner
self.metadata_keys = metadata_keys
self.paginator = QueryPaginator
self.request_model = QueryRequestSql
try:
self.client.set_application("langchain")
except AttributeError:
# ignore
pass
[docs] def lazy_load(self) -> Iterator[Document]:
query_results = self.client.Queries.query(
sql=self.query
).results # execute the SQL query
for doc in query_results: # for each doc in the response
try:
yield Document(
page_content=self.content_columns_joiner(
[(col, doc[col]) for col in self.content_keys]
),
metadata={col: doc[col] for col in self.metadata_keys}
if self.metadata_keys is not None
else doc,
) # try to yield the Document
except (
KeyError
) as e: # either content_columns or metadata_columns is invalid
raise ColumnNotFoundError(
e.args[0], self.query
) # raise that the column isn't in the db schema