Source code for langchain_community.document_loaders.snowflake_loader
from __future__ import annotations
from typing import Any, Dict, Iterator, List, Optional, Tuple
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
[docs]class SnowflakeLoader(BaseLoader):
"""从`Snowflake` API加载。
每个文档代表结果的一行。`page_content_columns`被写入文档的`page_content`中。`metadata_columns`被写入文档的`metadata`中。默认情况下,所有列都被写入`page_content`,而没有列被写入`metadata`。"""
[docs] def __init__(
self,
query: str,
user: str,
password: str,
account: str,
warehouse: str,
role: str,
database: str,
schema: str,
parameters: Optional[Dict[str, Any]] = None,
page_content_columns: Optional[List[str]] = None,
metadata_columns: Optional[List[str]] = None,
):
"""初始化Snowflake文档加载器。
参数:
query: 在Snowflake中运行的查询。
user: Snowflake用户。
password: Snowflake密码。
account: Snowflake账号。
warehouse: Snowflake仓库。
role: Snowflake角色。
database: Snowflake数据库。
schema: Snowflake模式。
parameters: 可选。传递给查询的参数。
page_content_columns: 可选。写入到文档`page_content`的列。
metadata_columns: 可选。写入到文档`metadata`的列。
"""
self.query = query
self.user = user
self.password = password
self.account = account
self.warehouse = warehouse
self.role = role
self.database = database
self.schema = schema
self.parameters = parameters
self.page_content_columns = (
page_content_columns if page_content_columns is not None else ["*"]
)
self.metadata_columns = metadata_columns if metadata_columns is not None else []
def _execute_query(self) -> List[Dict[str, Any]]:
try:
import snowflake.connector
except ImportError as ex:
raise ImportError(
"Could not import snowflake-connector-python package. "
"Please install it with `pip install snowflake-connector-python`."
) from ex
conn = snowflake.connector.connect(
user=self.user,
password=self.password,
account=self.account,
warehouse=self.warehouse,
role=self.role,
database=self.database,
schema=self.schema,
parameters=self.parameters,
)
try:
cur = conn.cursor()
cur.execute("USE DATABASE " + self.database)
cur.execute("USE SCHEMA " + self.schema)
cur.execute(self.query, self.parameters)
query_result = cur.fetchall()
column_names = [column[0] for column in cur.description]
query_result = [dict(zip(column_names, row)) for row in query_result]
except Exception as e:
print(f"An error occurred: {e}") # noqa: T201
query_result = []
finally:
cur.close()
return query_result
def _get_columns(
self, query_result: List[Dict[str, Any]]
) -> Tuple[List[str], List[str]]:
page_content_columns = (
self.page_content_columns if self.page_content_columns else []
)
metadata_columns = self.metadata_columns if self.metadata_columns else []
if page_content_columns is None and query_result:
page_content_columns = list(query_result[0].keys())
if metadata_columns is None:
metadata_columns = []
return page_content_columns or [], metadata_columns
[docs] def lazy_load(self) -> Iterator[Document]:
query_result = self._execute_query()
if isinstance(query_result, Exception):
print(f"An error occurred during the query: {query_result}") # noqa: T201
return [] # type: ignore[return-value]
page_content_columns, metadata_columns = self._get_columns(query_result)
if "*" in page_content_columns:
page_content_columns = list(query_result[0].keys())
for row in query_result:
page_content = "\n".join(
f"{k}: {v}" for k, v in row.items() if k in page_content_columns
)
metadata = {k: v for k, v in row.items() if k in metadata_columns}
doc = Document(page_content=page_content, metadata=metadata)
yield doc