Source code for langchain_community.utilities.max_compute

from __future__ import annotations

from typing import TYPE_CHECKING, Iterator, List, Optional

from langchain_core.utils import get_from_env

if TYPE_CHECKING:
    from odps import ODPS


[docs]class MaxComputeAPIWrapper: """用于查询阿里云MaxCompute表的接口。"""
[docs] def __init__(self, client: ODPS): """初始化MaxCompute文档加载器。 参数: client: odps.ODPS MaxCompute客户端对象。 """ self.client = client
[docs] @classmethod def from_params( cls, endpoint: str, project: str, *, access_id: Optional[str] = None, secret_access_key: Optional[str] = None, ) -> MaxComputeAPIWrapper: """方便的构造函数,从给定的参数构建odsp.ODPS MaxCompute客户端。 参数: endpoint: MaxCompute的终端点。 project: 项目是MaxCompute的基本组织单元,类似于数据库。 access_id: MaxCompute访问ID。应直接传递或设置为环境变量`MAX_COMPUTE_ACCESS_ID`。 secret_access_key: MaxCompute秘密访问密钥。应直接传递或设置为环境变量`MAX_COMPUTE_SECRET_ACCESS_KEY`。 """ try: from odps import ODPS except ImportError as ex: raise ImportError( "Could not import pyodps python package. " "Please install it with `pip install pyodps` or refer to " "https://pyodps.readthedocs.io/." ) from ex access_id = access_id or get_from_env("access_id", "MAX_COMPUTE_ACCESS_ID") secret_access_key = secret_access_key or get_from_env( "secret_access_key", "MAX_COMPUTE_SECRET_ACCESS_KEY" ) client = ODPS( access_id=access_id, secret_access_key=secret_access_key, project=project, endpoint=endpoint, ) if not client.exist_project(project): raise ValueError(f'The project "{project}" does not exist.') return cls(client)
[docs] def lazy_query(self, query: str) -> Iterator[dict]: # Execute SQL query. with self.client.execute_sql(query).open_reader() as reader: if reader.count == 0: raise ValueError("Table contains no data.") for record in reader: yield {k: v for k, v in record}
[docs] def query(self, query: str) -> List[dict]: return list(self.lazy_query(query))