Source code for langchain_community.document_loaders.max_compute
from __future__ import annotations
from typing import Any, Iterator, Optional, Sequence
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
from langchain_community.utilities.max_compute import MaxComputeAPIWrapper
[docs]class MaxComputeLoader(BaseLoader):
"""从`阿里云MaxCompute`表中加载。"""
[docs] def __init__(
self,
query: str,
api_wrapper: MaxComputeAPIWrapper,
*,
page_content_columns: Optional[Sequence[str]] = None,
metadata_columns: Optional[Sequence[str]] = None,
):
"""初始化阿里云MaxCompute文档加载器。
参数:
query: 要执行的SQL查询。
api_wrapper: MaxCompute API包装器。
page_content_columns: 要写入文档的`page_content`的列。如果未指定,所有列将被写入`page_content`。
metadata_columns: 要写入文档的`metadata`的列。如果未指定,所有未添加到`page_content`的列将被写入。
"""
self.query = query
self.api_wrapper = api_wrapper
self.page_content_columns = page_content_columns
self.metadata_columns = metadata_columns
[docs] @classmethod
def from_params(
cls,
query: str,
endpoint: str,
project: str,
*,
access_id: Optional[str] = None,
secret_access_key: Optional[str] = None,
**kwargs: Any,
) -> MaxComputeLoader:
"""方便的构造函数,根据给定的参数构建MaxCompute API包装器。
参数:
query: 要执行的SQL查询。
endpoint: MaxCompute端点。
project: 项目是MaxCompute的基本组织单元,类似于数据库。
access_id: MaxCompute访问ID。应直接传递或设置为环境变量`MAX_COMPUTE_ACCESS_ID`。
secret_access_key: MaxCompute秘密访问密钥。应直接传递或设置为环境变量`MAX_COMPUTE_SECRET_ACCESS_KEY`。
"""
api_wrapper = MaxComputeAPIWrapper.from_params(
endpoint, project, access_id=access_id, secret_access_key=secret_access_key
)
return cls(query, api_wrapper, **kwargs)
[docs] def lazy_load(self) -> Iterator[Document]:
for row in self.api_wrapper.query(self.query):
if self.page_content_columns:
page_content_data = {
k: v for k, v in row.items() if k in self.page_content_columns
}
else:
page_content_data = row
page_content = "\n".join(f"{k}: {v}" for k, v in page_content_data.items())
if self.metadata_columns:
metadata = {k: v for k, v in row.items() if k in self.metadata_columns}
else:
metadata = {k: v for k, v in row.items() if k not in page_content_data}
yield Document(page_content=page_content, metadata=metadata)