Source code for langchain_community.document_loaders.tidb

from typing import Any, Dict, Iterator, List, Optional

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader


[docs]class TiDBLoader(BaseLoader): """从TiDB加载文档。"""
[docs] def __init__( self, connection_string: str, query: str, page_content_columns: Optional[List[str]] = None, metadata_columns: Optional[List[str]] = None, engine_args: Optional[Dict[str, Any]] = None, ) -> None: """初始化 TiDB 文档加载器。 参数: connection_string (str):TiDB 数据库的连接字符串, 格式:"mysql+pymysql://root@127.0.0.1:4000/test"。 query:在 TiDB 中运行的查询。 page_content_columns:可选。写入到文档 `page_content` 的列,默认为所有列。 metadata_columns:可选。写入到文档 `metadata` 的列,默认为空。 engine_args:可选。传递给 sqlalchemy 引擎的额外参数。 """ self.connection_string = connection_string self.query = query self.page_content_columns = page_content_columns self.metadata_columns = metadata_columns if metadata_columns is not None else [] self.engine_args = engine_args
[docs] def lazy_load(self) -> Iterator[Document]: """将TiDB数据延迟加载到文档对象中。""" from sqlalchemy import create_engine from sqlalchemy.engine import Engine from sqlalchemy.sql import text # use sqlalchemy to create db connection engine: Engine = create_engine( self.connection_string, **(self.engine_args or {}) ) # execute query with engine.connect() as conn: result = conn.execute(text(self.query)) # convert result to Document objects column_names = list(result.keys()) for row in result: # convert row to dict{column:value} row_data = { column_names[index]: value for index, value in enumerate(row) } page_content = "\n".join( f"{k}: {v}" for k, v in row_data.items() if self.page_content_columns is None or k in self.page_content_columns ) metadata = {col: row_data[col] for col in self.metadata_columns} yield Document(page_content=page_content, metadata=metadata)