Source code for langchain_community.document_loaders.max_compute

from __future__ import annotations

from typing import Any, Iterator, Optional, Sequence

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader
from langchain_community.utilities.max_compute import MaxComputeAPIWrapper


[docs]class MaxComputeLoader(BaseLoader): """从`阿里云MaxCompute`表中加载。"""
[docs] def __init__( self, query: str, api_wrapper: MaxComputeAPIWrapper, *, page_content_columns: Optional[Sequence[str]] = None, metadata_columns: Optional[Sequence[str]] = None, ): """初始化阿里云MaxCompute文档加载器。 参数: query: 要执行的SQL查询。 api_wrapper: MaxCompute API包装器。 page_content_columns: 要写入文档的`page_content`的列。如果未指定,所有列将被写入`page_content`。 metadata_columns: 要写入文档的`metadata`的列。如果未指定,所有未添加到`page_content`的列将被写入。 """ self.query = query self.api_wrapper = api_wrapper self.page_content_columns = page_content_columns self.metadata_columns = metadata_columns
[docs] @classmethod def from_params( cls, query: str, endpoint: str, project: str, *, access_id: Optional[str] = None, secret_access_key: Optional[str] = None, **kwargs: Any, ) -> MaxComputeLoader: """方便的构造函数,根据给定的参数构建MaxCompute API包装器。 参数: query: 要执行的SQL查询。 endpoint: MaxCompute端点。 project: 项目是MaxCompute的基本组织单元,类似于数据库。 access_id: MaxCompute访问ID。应直接传递或设置为环境变量`MAX_COMPUTE_ACCESS_ID`。 secret_access_key: MaxCompute秘密访问密钥。应直接传递或设置为环境变量`MAX_COMPUTE_SECRET_ACCESS_KEY`。 """ api_wrapper = MaxComputeAPIWrapper.from_params( endpoint, project, access_id=access_id, secret_access_key=secret_access_key ) return cls(query, api_wrapper, **kwargs)
[docs] def lazy_load(self) -> Iterator[Document]: for row in self.api_wrapper.query(self.query): if self.page_content_columns: page_content_data = { k: v for k, v in row.items() if k in self.page_content_columns } else: page_content_data = row page_content = "\n".join(f"{k}: {v}" for k, v in page_content_data.items()) if self.metadata_columns: metadata = {k: v for k, v in row.items() if k in self.metadata_columns} else: metadata = {k: v for k, v in row.items() if k not in page_content_data} yield Document(page_content=page_content, metadata=metadata)