Source code for langchain_community.document_loaders.glue_catalog

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader

if TYPE_CHECKING:
    from boto3.session import Session


[docs]class GlueCatalogLoader(BaseLoader): """从AWS Glue加载表模式。 此加载程序获取指定AWS Glue数据库中每个表的模式。模式详细信息包括列名及其数据类型,类似于pandas的dtype表示。 AWS凭证会使用boto3自动加载,遵循标准的AWS方法: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html 如果需要特定的AWS配置文件,则可以指定并将用于建立会话。"""
[docs] def __init__( self, database: str, *, session: Optional[Session] = None, profile_name: Optional[str] = None, table_filter: Optional[List[str]] = None, ): """初始化Glue数据库加载器。 参数: database:要加载表模式的Glue数据库的名称。 session:可选。一个boto3 Session对象。如果未提供,将创建一个新的会话。 profile_name:可选。用于凭据的AWS配置文件的名称。 table_filter:可选。要获取模式的表名称列表,如果为None,则获取所有表。 """ self.database = database self.profile_name = profile_name self.table_filter = table_filter if session: self.glue_client = session.client("glue") else: self.glue_client = self._initialize_glue_client()
def _initialize_glue_client(self) -> Any: """初始化 AWS Glue 客户端。 返回: 初始化后的 AWS Glue 客户端。 引发: ValueError: 如果 AWS 会话/客户端初始化存在问题。 """ try: import boto3 except ImportError as e: raise ImportError( "boto3 is required to use the GlueCatalogLoader. " "Please install it with `pip install boto3`." ) from e try: session = ( boto3.Session(profile_name=self.profile_name) if self.profile_name else boto3.Session() ) return session.client("glue") except Exception as e: raise ValueError("Issue with AWS session/client initialization.") from e def _fetch_tables(self) -> List[str]: """获取指定Glue数据库中的所有表名。 返回: 一个表名列表。 """ paginator = self.glue_client.get_paginator("get_tables") table_names = [] for page in paginator.paginate(DatabaseName=self.database): for table in page["TableList"]: if self.table_filter is None or table["Name"] in self.table_filter: table_names.append(table["Name"]) return table_names def _fetch_table_schema(self, table_name: str) -> Dict[str, str]: """获取指定表的模式。 参数: table_name:要获取模式的表的名称。 返回: 将列名映射到它们的数据类型的字典。 """ response = self.glue_client.get_table( DatabaseName=self.database, Name=table_name ) columns = response["Table"]["StorageDescriptor"]["Columns"] return {col["Name"]: col["Type"] for col in columns}
[docs] def lazy_load(self) -> Iterator[Document]: """惰性加载表模式作为文档对象。 产生: 每个代表表模式的文档对象。 """ table_names = self._fetch_tables() for table_name in table_names: schema = self._fetch_table_schema(table_name) page_content = ( f"Database: {self.database}\nTable: {table_name}\nSchema:\n" + "\n".join(f"{col}: {dtype}" for col, dtype in schema.items()) ) doc = Document( page_content=page_content, metadata={"table_name": table_name} ) yield doc