Source code for langchain_community.document_loaders.cube_semantic

import json
import logging
import time
from typing import Iterator, List

import requests
from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader

logger = logging.getLogger(__name__)


[docs]class CubeSemanticLoader(BaseLoader): """加载`Cube语义层`元数据。 参数: cube_api_url: REST API端点。 使用您的Cube部署的REST API。 请在此处查找更多信息: https://cube.dev/docs/http-api/rest#configuration-base-path cube_api_token: Cube API令牌。 认证令牌是基于您的Cube API密钥生成的。 请在此处查找更多信息: https://cube.dev/docs/security#generating-json-web-tokens-jwt load_dimension_values: 是否加载每个字符串维度的维度值。 dimension_values_limit: 要加载的维度值的最大数量。 dimension_values_max_retries: 加载维度值的最大重试次数。 dimension_values_retry_delay: 加载维度值的重试之间的延迟。"""
[docs] def __init__( self, cube_api_url: str, cube_api_token: str, load_dimension_values: bool = True, dimension_values_limit: int = 10_000, dimension_values_max_retries: int = 10, dimension_values_retry_delay: int = 3, ): self.cube_api_url = cube_api_url self.cube_api_token = cube_api_token self.load_dimension_values = load_dimension_values self.dimension_values_limit = dimension_values_limit self.dimension_values_max_retries = dimension_values_max_retries self.dimension_values_retry_delay = dimension_values_retry_delay
def _get_dimension_values(self, dimension_name: str) -> List[str]: """调用Cube的REST API加载端点,以检索维度的值。 这些值可用于实现更准确的过滤。 """ logger.info("Loading dimension values for: {dimension_name}...") headers = { "Content-Type": "application/json", "Authorization": self.cube_api_token, } query = { "query": { "dimensions": [dimension_name], "limit": self.dimension_values_limit, } } retries = 0 while retries < self.dimension_values_max_retries: response = requests.request( "POST", f"{self.cube_api_url}/load", headers=headers, data=json.dumps(query), ) if response.status_code == 200: response_data = response.json() if ( "error" in response_data and response_data["error"] == "Continue wait" ): logger.info("Retrying...") retries += 1 time.sleep(self.dimension_values_retry_delay) continue else: dimension_values = [ item[dimension_name] for item in response_data["data"] ] return dimension_values else: logger.error("Request failed with status code:", response.status_code) break if retries == self.dimension_values_max_retries: logger.info("Maximum retries reached.") return []
[docs] def lazy_load(self) -> Iterator[Document]: """调用Cube的REST API元数据端点。 返回: 一个具有以下属性的文档列表: - page_content=column_title + column_description - metadata - table_name - column_name - column_data_type - column_member_type - column_title - column_description - column_values - cube_data_obj_type """ headers = { "Content-Type": "application/json", "Authorization": self.cube_api_token, } logger.info(f"Loading metadata from {self.cube_api_url}...") response = requests.get(f"{self.cube_api_url}/meta", headers=headers) response.raise_for_status() raw_meta_json = response.json() cube_data_objects = raw_meta_json.get("cubes", []) logger.info(f"Found {len(cube_data_objects)} cube data objects in metadata.") if not cube_data_objects: raise ValueError("No cubes found in metadata.") for cube_data_obj in cube_data_objects: cube_data_obj_name = cube_data_obj.get("name") cube_data_obj_type = cube_data_obj.get("type") cube_data_obj_is_public = cube_data_obj.get("public") measures = cube_data_obj.get("measures", []) dimensions = cube_data_obj.get("dimensions", []) logger.info(f"Processing {cube_data_obj_name}...") if not cube_data_obj_is_public: logger.info(f"Skipping {cube_data_obj_name} because it is not public.") continue for item in measures + dimensions: column_member_type = "measure" if item in measures else "dimension" dimension_values = [] item_name = str(item.get("name")) item_type = str(item.get("type")) if ( self.load_dimension_values and column_member_type == "dimension" and item_type == "string" ): dimension_values = self._get_dimension_values(item_name) metadata = dict( table_name=str(cube_data_obj_name), column_name=item_name, column_data_type=item_type, column_title=str(item.get("title")), column_description=str(item.get("description")), column_member_type=column_member_type, column_values=dimension_values, cube_data_obj_type=cube_data_obj_type, ) page_content = f"{str(item.get('title'))}, " page_content += f"{str(item.get('description'))}" yield Document(page_content=page_content, metadata=metadata)