Source code for langchain_community.document_loaders.couchbase
import logging
from typing import Iterator, List, Optional
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
logger = logging.getLogger(__name__)
[docs]class CouchbaseLoader(BaseLoader):
"""从`Couchbase`加载文档。
每个文档代表结果的一行。`page_content_fields`被写入文档的`page_content`。`metadata_fields`被写入文档的`metadata`。默认情况下,所有列都被写入`page_content`,而没有列被写入`metadata`。"""
[docs] def __init__(
self,
connection_string: str,
db_username: str,
db_password: str,
query: str,
*,
page_content_fields: Optional[List[str]] = None,
metadata_fields: Optional[List[str]] = None,
) -> None:
"""初始化Couchbase文档加载器。
参数:
connection_string(str):连接到Couchbase集群的连接字符串。
db_username(str):连接到Couchbase集群的用户名。
db_password(str):连接到Couchbase集群的密码。
query(str):要执行的SQL++查询。
page_content_fields(可选[List[str]]):要写入文档的`page_content`字段的列。默认情况下,写入所有列。
metadata_fields(可选[List[str]]):要写入文档的`metadata`字段的列。默认情况下,不写入任何列。
"""
try:
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
except ImportError as e:
raise ImportError(
"Could not import couchbase package."
"Please install couchbase SDK with `pip install couchbase`."
) from e
if not connection_string:
raise ValueError("connection_string must be provided.")
if not db_username:
raise ValueError("db_username must be provided.")
if not db_password:
raise ValueError("db_password must be provided.")
auth = PasswordAuthenticator(
db_username,
db_password,
)
self.cluster: Cluster = Cluster(connection_string, ClusterOptions(auth))
self.query = query
self.page_content_fields = page_content_fields
self.metadata_fields = metadata_fields
[docs] def lazy_load(self) -> Iterator[Document]:
"""将Couchbase数据延迟加载到文档对象中。"""
from datetime import timedelta
# Ensure connection to Couchbase cluster
self.cluster.wait_until_ready(timedelta(seconds=5))
# Run SQL++ Query
result = self.cluster.query(self.query)
for row in result:
metadata_fields = self.metadata_fields
page_content_fields = self.page_content_fields
if not page_content_fields:
page_content_fields = list(row.keys())
if not metadata_fields:
metadata_fields = []
metadata = {field: row[field] for field in metadata_fields}
document = "\n".join(
f"{k}: {v}" for k, v in row.items() if k in page_content_fields
)
yield (Document(page_content=document, metadata=metadata))