Source code for langchain_community.document_loaders.pebblo

"""Pebblo的安全数据加载器是文档加载器的包装器"""

import json
import logging
import os
import uuid
from http import HTTPStatus
from typing import Any, Dict, Iterator, List, Optional, Union

import requests  # type: ignore
from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseLoader
from langchain_community.utilities.pebblo import (
    APP_DISCOVER_URL,
    CLASSIFIER_URL,
    LOADER_DOC_URL,
    PEBBLO_CLOUD_URL,
    PLUGIN_VERSION,
    App,
    Doc,
    IndexedDocument,
    get_full_path,
    get_loader_full_path,
    get_loader_type,
    get_runtime,
)

logger = logging.getLogger(__name__)


[docs]class PebbloSafeLoader(BaseLoader): """Pebblo Safe Loader类是对文档加载器的封装,使数据可以被仔细审查。""" _discover_sent: bool = False _loader_sent: bool = False
[docs] def __init__( self, langchain_loader: BaseLoader, name: str, owner: str = "", description: str = "", api_key: Optional[str] = None, load_semantic: bool = False, classifier_url: Optional[str] = None, ): if not name or not isinstance(name, str): raise NameError("Must specify a valid name.") self.app_name = name self.api_key = os.environ.get("PEBBLO_API_KEY") or api_key self.load_id = str(uuid.uuid4()) self.loader = langchain_loader self.load_semantic = os.environ.get("PEBBLO_LOAD_SEMANTIC") or load_semantic self.owner = owner self.description = description self.source_path = get_loader_full_path(self.loader) self.source_owner = PebbloSafeLoader.get_file_owner_from_path(self.source_path) self.docs: List[Document] = [] self.docs_with_id: Union[List[IndexedDocument], List[Document], List] = [] loader_name = str(type(self.loader)).split(".")[-1].split("'")[0] self.source_type = get_loader_type(loader_name) self.source_path_size = self.get_source_size(self.source_path) self.source_aggregate_size = 0 self.classifier_url = classifier_url or CLASSIFIER_URL self.loader_details = { "loader": loader_name, "source_path": self.source_path, "source_type": self.source_type, **( {"source_path_size": str(self.source_path_size)} if self.source_path_size > 0 else {} ), } # generate app self.app = self._get_app_details() self._send_discover()
[docs] def load(self) -> List[Document]: """加载文档。 返回: 列表:从包装的“loader”的加载方法获取的文档。 """ self.docs = self.loader.load() if not self.load_semantic: self._classify_doc(self.docs, loading_end=True) return self.docs self.docs_with_id = self._index_docs() classified_docs = self._classify_doc(self.docs_with_id, loading_end=True) self.docs_with_id = self._add_semantic_to_docs( self.docs_with_id, classified_docs ) self.docs = self._unindex_docs(self.docs_with_id) # type: ignore return self.docs
[docs] def lazy_load(self) -> Iterator[Document]: """以惰性方式加载文档。 引发: NotImplementedError: 当惰性加载未在包装的加载器中实现时引发。 产出: list: 从加载器的惰性加载中获取的文档。 """ try: doc_iterator = self.loader.lazy_load() except NotImplementedError as exc: err_str = f"{self.loader.__class__.__name__} does not implement lazy_load()" logger.error(err_str) raise NotImplementedError(err_str) from exc while True: try: doc = next(doc_iterator) except StopIteration: self.docs = [] break self.docs = list((doc,)) if not self.load_semantic: self._classify_doc(self.docs, loading_end=True) yield self.docs[0] else: self.docs_with_id = self._index_docs() classified_doc = self._classify_doc(self.docs) self.docs_with_id = self._add_semantic_to_docs( self.docs_with_id, classified_doc ) self.docs = self._unindex_docs(self.docs_with_id) # type: ignore yield self.docs[0]
[docs] @classmethod def set_discover_sent(cls) -> None: cls._discover_sent = True
[docs] @classmethod def set_loader_sent(cls) -> None: cls._loader_sent = True
def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list: """将从加载程序获取的文档发送到pebblo-server。然后将分类的文档发送到Daxa云(如果存在api_key)。内部方法。 参数: loaded_docs(列表):从加载程序的加载操作中获取的文档列表。 loading_end(bool,可选):指示加载程序加载数据是否停止的标志。默认为False。 """ headers = { "Accept": "application/json", "Content-Type": "application/json", } if loading_end is True: PebbloSafeLoader.set_loader_sent() doc_content = [doc.dict() for doc in loaded_docs] docs = [] for doc in doc_content: doc_metadata = doc.get("metadata", {}) doc_authorized_identities = doc_metadata.get("authorized_identities", []) doc_source_path = get_full_path( doc_metadata.get( "full_path", doc_metadata.get("source", self.source_path) ) ) doc_source_owner = doc_metadata.get( "owner", PebbloSafeLoader.get_file_owner_from_path(doc_source_path) ) doc_source_size = doc_metadata.get( "size", self.get_source_size(doc_source_path) ) page_content = str(doc.get("page_content")) page_content_size = self.calculate_content_size(page_content) self.source_aggregate_size += page_content_size doc_id = doc.get("id", None) or 0 docs.append( { "doc": page_content, "source_path": doc_source_path, "id": doc_id, "last_modified": doc.get("metadata", {}).get("last_modified"), "file_owner": doc_source_owner, **( {"authorized_identities": doc_authorized_identities} if doc_authorized_identities else {} ), **( {"source_path_size": doc_source_size} if doc_source_size is not None else {} ), } ) payload: Dict[str, Any] = { "name": self.app_name, "owner": self.owner, "docs": docs, "plugin_version": PLUGIN_VERSION, "load_id": self.load_id, "loader_details": self.loader_details, "loading_end": "false", "source_owner": self.source_owner, } if loading_end is True: payload["loading_end"] = "true" if "loader_details" in payload: payload["loader_details"]["source_aggregate_size"] = ( # noqa self.source_aggregate_size ) payload = Doc(**payload).dict(exclude_unset=True) load_doc_url = f"{self.classifier_url}{LOADER_DOC_URL}" classified_docs = [] try: pebblo_resp = requests.post( load_doc_url, headers=headers, json=payload, timeout=300 ) classified_docs = json.loads(pebblo_resp.text).get("docs", None) if pebblo_resp.status_code not in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: logger.warning( "Received unexpected HTTP response code: %s", pebblo_resp.status_code, ) logger.debug( "send_loader_doc[local]: request url %s, body %s len %s\ response status %s body %s", pebblo_resp.request.url, str(pebblo_resp.request.body), str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])), str(pebblo_resp.status_code), pebblo_resp.json(), ) except requests.exceptions.RequestException: logger.warning("Unable to reach pebblo server.") except Exception as e: logger.warning("An Exception caught in _send_loader_doc: local %s", e) if self.api_key: if not classified_docs: return classified_docs try: payload["docs"] = classified_docs payload["classified"] = True headers.update({"x-api-key": self.api_key}) pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{LOADER_DOC_URL}" pebblo_cloud_response = requests.post( pebblo_cloud_url, headers=headers, json=payload, timeout=20 ) logger.debug( "send_loader_doc[cloud]: request url %s, body %s len %s\ response status %s body %s", pebblo_cloud_response.request.url, str(pebblo_cloud_response.request.body), str( len( pebblo_cloud_response.request.body if pebblo_cloud_response.request.body else [] ) ), str(pebblo_cloud_response.status_code), pebblo_cloud_response.json(), ) except requests.exceptions.RequestException: logger.warning("Unable to reach Pebblo cloud server.") except Exception as e: logger.warning("An Exception caught in _send_loader_doc: cloud %s", e) if loading_end is True: PebbloSafeLoader.set_loader_sent() return classified_docs
[docs] @staticmethod def calculate_content_size(page_content: str) -> int: """计算内容大小(以字节为单位): - 使用特定编码(例如UTF-8)将字符串编码为字节。 - 获取编码后字节的长度。 参数: page_content(str):数据字符串。 返回: int:字符串的字节大小。 """ # Encode the content to bytes using UTF-8 encoded_content = page_content.encode("utf-8") size = len(encoded_content) return size
def _send_discover(self) -> None: """发送应用程序发现负载到pebblo-server。内部方法。""" pebblo_resp = None headers = { "Accept": "application/json", "Content-Type": "application/json", } payload = self.app.dict(exclude_unset=True) app_discover_url = f"{self.classifier_url}{APP_DISCOVER_URL}" try: pebblo_resp = requests.post( app_discover_url, headers=headers, json=payload, timeout=20 ) logger.debug( "send_discover[local]: request url %s, body %s len %s\ response status %s body %s", pebblo_resp.request.url, str(pebblo_resp.request.body), str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])), str(pebblo_resp.status_code), pebblo_resp.json(), ) if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: PebbloSafeLoader.set_discover_sent() else: logger.warning( f"Received unexpected HTTP response code: {pebblo_resp.status_code}" ) except requests.exceptions.RequestException: logger.warning("Unable to reach pebblo server.") except Exception as e: logger.warning("An Exception caught in _send_discover: local %s", e) if self.api_key: try: headers.update({"x-api-key": self.api_key}) if pebblo_resp: pebblo_resp_docs = json.loads(pebblo_resp.text).get("ai_apps_data") payload.update( { "pebblo_server_version": pebblo_resp_docs.get( "pebbloServerVersion" ), "pebblo_client_version": pebblo_resp_docs.get( "pebbloClientVersion" ), } ) pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{APP_DISCOVER_URL}" pebblo_cloud_response = requests.post( pebblo_cloud_url, headers=headers, json=payload, timeout=20 ) logger.debug( "send_discover[cloud]: request url %s, body %s len %s\ response status %s body %s", pebblo_cloud_response.request.url, str(pebblo_cloud_response.request.body), str( len( pebblo_cloud_response.request.body if pebblo_cloud_response.request.body else [] ) ), str(pebblo_cloud_response.status_code), pebblo_cloud_response.json(), ) except requests.exceptions.RequestException: logger.warning("Unable to reach Pebblo cloud server.") except Exception as e: logger.warning("An Exception caught in _send_discover: cloud %s", e) def _get_app_details(self) -> App: """获取应用程序详情。内部方法。 返回: App:应用程序详情。 """ framework, runtime = get_runtime() app = App( name=self.app_name, owner=self.owner, description=self.description, load_id=self.load_id, runtime=runtime, framework=framework, plugin_version=PLUGIN_VERSION, ) return app
[docs] @staticmethod def get_file_owner_from_path(file_path: str) -> str: """获取本地文件路径的所有者。 参数: file_path(str):本地文件路径。 返回: str:所有者的名称。 """ try: import pwd file_owner_uid = os.stat(file_path).st_uid file_owner_name = pwd.getpwuid(file_owner_uid).pw_name except Exception: file_owner_name = "unknown" return file_owner_name
[docs] def get_source_size(self, source_path: str) -> int: """获取源路径的大小。源可以是目录或文件。 参数: source_path(str):数据源的本地路径。 返回: int:以字节为单位的源大小。 """ if not source_path: return 0 size = 0 if os.path.isfile(source_path): size = os.path.getsize(source_path) elif os.path.isdir(source_path): total_size = 0 for dirpath, _, filenames in os.walk(source_path): for f in filenames: fp = os.path.join(dirpath, f) if not os.path.islink(fp): total_size += os.path.getsize(fp) size = total_size return size
def _index_docs(self) -> List[IndexedDocument]: """索引文档并返回一个IndexedDocument对象列表。 返回: List[IndexedDocument]: 一个具有唯一ID的IndexedDocument对象列表。 """ docs_with_id = [ IndexedDocument(id=hex(i)[2:], **doc.dict()) for i, doc in enumerate(self.docs) ] return docs_with_id def _add_semantic_to_docs( self, docs_with_id: List[IndexedDocument], classified_docs: List[dict] ) -> List[Document]: """给定文档列表添加语义元数据。 参数: docs_with_id(List[IndexedDocument]):包含带有ID的文档的IndexedDocument对象列表。 classified_docs(List[dict]):包含分类文档的字典列表。 返回: List[Document]:添加了语义元数据的Document对象列表。 """ indexed_docs = { doc.id: Document(page_content=doc.page_content, metadata=doc.metadata) for doc in docs_with_id } for classified_doc in classified_docs: doc_id = classified_doc.get("id") if doc_id in indexed_docs: self._add_semantic_to_doc(indexed_docs[doc_id], classified_doc) semantic_metadata_docs = [doc for doc in indexed_docs.values()] return semantic_metadata_docs def _unindex_docs(self, docs_with_id: List[IndexedDocument]) -> List[Document]: """将IndexedDocument对象的列表转换为Document对象的列表。 参数: docs_with_id (List[IndexedDocument]): 一个IndexedDocument对象的列表。 返回: List[Document]: 一个Document对象的列表。 """ docs = [ Document(page_content=doc.page_content, metadata=doc.metadata) for i, doc in enumerate(docs_with_id) ] return docs def _add_semantic_to_doc(self, doc: Document, classified_doc: dict) -> Document: """在给定的文档中添加语义元数据。 参数: doc (Document): 一个Document对象。 classified_doc (dict): 包含分类文档的字典。 返回: Document: 添加了语义元数据的Document对象。 """ doc.metadata["pebblo_semantic_entities"] = list( classified_doc.get("entities", {}).keys() ) doc.metadata["pebblo_semantic_topics"] = list( classified_doc.get("topics", {}).keys() ) return doc