Source code for langchain_community.document_loaders.couchbase

import logging
from typing import Iterator, List, Optional

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader

logger = logging.getLogger(__name__)


[docs]class CouchbaseLoader(BaseLoader): """从`Couchbase`加载文档。 每个文档代表结果的一行。`page_content_fields`被写入文档的`page_content`。`metadata_fields`被写入文档的`metadata`。默认情况下,所有列都被写入`page_content`,而没有列被写入`metadata`。"""
[docs] def __init__( self, connection_string: str, db_username: str, db_password: str, query: str, *, page_content_fields: Optional[List[str]] = None, metadata_fields: Optional[List[str]] = None, ) -> None: """初始化Couchbase文档加载器。 参数: connection_string(str):连接到Couchbase集群的连接字符串。 db_username(str):连接到Couchbase集群的用户名。 db_password(str):连接到Couchbase集群的密码。 query(str):要执行的SQL++查询。 page_content_fields(可选[List[str]]):要写入文档的`page_content`字段的列。默认情况下,写入所有列。 metadata_fields(可选[List[str]]):要写入文档的`metadata`字段的列。默认情况下,不写入任何列。 """ try: from couchbase.auth import PasswordAuthenticator from couchbase.cluster import Cluster from couchbase.options import ClusterOptions except ImportError as e: raise ImportError( "Could not import couchbase package." "Please install couchbase SDK with `pip install couchbase`." ) from e if not connection_string: raise ValueError("connection_string must be provided.") if not db_username: raise ValueError("db_username must be provided.") if not db_password: raise ValueError("db_password must be provided.") auth = PasswordAuthenticator( db_username, db_password, ) self.cluster: Cluster = Cluster(connection_string, ClusterOptions(auth)) self.query = query self.page_content_fields = page_content_fields self.metadata_fields = metadata_fields
[docs] def lazy_load(self) -> Iterator[Document]: """将Couchbase数据延迟加载到文档对象中。""" from datetime import timedelta # Ensure connection to Couchbase cluster self.cluster.wait_until_ready(timedelta(seconds=5)) # Run SQL++ Query result = self.cluster.query(self.query) for row in result: metadata_fields = self.metadata_fields page_content_fields = self.page_content_fields if not page_content_fields: page_content_fields = list(row.keys()) if not metadata_fields: metadata_fields = [] metadata = {field: row[field] for field in metadata_fields} document = "\n".join( f"{k}: {v}" for k, v in row.items() if k in page_content_fields ) yield (Document(page_content=document, metadata=metadata))