Source code for langchain_community.document_loaders.glue_catalog
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
if TYPE_CHECKING:
from boto3.session import Session
[docs]class GlueCatalogLoader(BaseLoader):
"""从AWS Glue加载表模式。
此加载程序获取指定AWS Glue数据库中每个表的模式。模式详细信息包括列名及其数据类型,类似于pandas的dtype表示。
AWS凭证会使用boto3自动加载,遵循标准的AWS方法:
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
如果需要特定的AWS配置文件,则可以指定并将用于建立会话。"""
[docs] def __init__(
self,
database: str,
*,
session: Optional[Session] = None,
profile_name: Optional[str] = None,
table_filter: Optional[List[str]] = None,
):
"""初始化Glue数据库加载器。
参数:
database:要加载表模式的Glue数据库的名称。
session:可选。一个boto3 Session对象。如果未提供,将创建一个新的会话。
profile_name:可选。用于凭据的AWS配置文件的名称。
table_filter:可选。要获取模式的表名称列表,如果为None,则获取所有表。
"""
self.database = database
self.profile_name = profile_name
self.table_filter = table_filter
if session:
self.glue_client = session.client("glue")
else:
self.glue_client = self._initialize_glue_client()
def _initialize_glue_client(self) -> Any:
"""初始化 AWS Glue 客户端。
返回:
初始化后的 AWS Glue 客户端。
引发:
ValueError: 如果 AWS 会话/客户端初始化存在问题。
"""
try:
import boto3
except ImportError as e:
raise ImportError(
"boto3 is required to use the GlueCatalogLoader. "
"Please install it with `pip install boto3`."
) from e
try:
session = (
boto3.Session(profile_name=self.profile_name)
if self.profile_name
else boto3.Session()
)
return session.client("glue")
except Exception as e:
raise ValueError("Issue with AWS session/client initialization.") from e
def _fetch_tables(self) -> List[str]:
"""获取指定Glue数据库中的所有表名。
返回:
一个表名列表。
"""
paginator = self.glue_client.get_paginator("get_tables")
table_names = []
for page in paginator.paginate(DatabaseName=self.database):
for table in page["TableList"]:
if self.table_filter is None or table["Name"] in self.table_filter:
table_names.append(table["Name"])
return table_names
def _fetch_table_schema(self, table_name: str) -> Dict[str, str]:
"""获取指定表的模式。
参数:
table_name:要获取模式的表的名称。
返回:
将列名映射到它们的数据类型的字典。
"""
response = self.glue_client.get_table(
DatabaseName=self.database, Name=table_name
)
columns = response["Table"]["StorageDescriptor"]["Columns"]
return {col["Name"]: col["Type"] for col in columns}
[docs] def lazy_load(self) -> Iterator[Document]:
"""惰性加载表模式作为文档对象。
产生:
每个代表表模式的文档对象。
"""
table_names = self._fetch_tables()
for table_name in table_names:
schema = self._fetch_table_schema(table_name)
page_content = (
f"Database: {self.database}\nTable: {table_name}\nSchema:\n"
+ "\n".join(f"{col}: {dtype}" for col, dtype in schema.items())
)
doc = Document(
page_content=page_content, metadata={"table_name": table_name}
)
yield doc