Source code for langchain_community.document_loaders.tidb
from typing import Any, Dict, Iterator, List, Optional
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
[docs]class TiDBLoader(BaseLoader):
"""从TiDB加载文档。"""
[docs] def __init__(
self,
connection_string: str,
query: str,
page_content_columns: Optional[List[str]] = None,
metadata_columns: Optional[List[str]] = None,
engine_args: Optional[Dict[str, Any]] = None,
) -> None:
"""初始化 TiDB 文档加载器。
参数:
connection_string (str):TiDB 数据库的连接字符串,
格式:"mysql+pymysql://root@127.0.0.1:4000/test"。
query:在 TiDB 中运行的查询。
page_content_columns:可选。写入到文档 `page_content` 的列,默认为所有列。
metadata_columns:可选。写入到文档 `metadata` 的列,默认为空。
engine_args:可选。传递给 sqlalchemy 引擎的额外参数。
"""
self.connection_string = connection_string
self.query = query
self.page_content_columns = page_content_columns
self.metadata_columns = metadata_columns if metadata_columns is not None else []
self.engine_args = engine_args
[docs] def lazy_load(self) -> Iterator[Document]:
"""将TiDB数据延迟加载到文档对象中。"""
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.sql import text
# use sqlalchemy to create db connection
engine: Engine = create_engine(
self.connection_string, **(self.engine_args or {})
)
# execute query
with engine.connect() as conn:
result = conn.execute(text(self.query))
# convert result to Document objects
column_names = list(result.keys())
for row in result:
# convert row to dict{column:value}
row_data = {
column_names[index]: value for index, value in enumerate(row)
}
page_content = "\n".join(
f"{k}: {v}"
for k, v in row_data.items()
if self.page_content_columns is None
or k in self.page_content_columns
)
metadata = {col: row_data[col] for col in self.metadata_columns}
yield Document(page_content=page_content, metadata=metadata)