Source code for langchain_community.utilities.pebblo

from __future__ import annotations

import logging
import os
import pathlib
import platform
from typing import Optional, Tuple

from langchain_core.documents import Document
from langchain_core.env import get_runtime_environment
from langchain_core.pydantic_v1 import BaseModel

from langchain_community.document_loaders.base import BaseLoader

logger = logging.getLogger(__name__)

PLUGIN_VERSION = "0.1.1"
CLASSIFIER_URL = os.getenv("PEBBLO_CLASSIFIER_URL", "http://localhost:8000")
PEBBLO_CLOUD_URL = os.getenv("PEBBLO_CLOUD_URL", "https://api.daxa.ai")

LOADER_DOC_URL = "/v1/loader/doc"
APP_DISCOVER_URL = "/v1/app/discover"

# Supported loaders for Pebblo safe data loading
file_loader = [
    "JSONLoader",
    "S3FileLoader",
    "UnstructuredMarkdownLoader",
    "UnstructuredPDFLoader",
    "UnstructuredFileLoader",
    "UnstructuredJsonLoader",
    "PyPDFLoader",
    "GCSFileLoader",
    "AmazonTextractPDFLoader",
    "CSVLoader",
    "UnstructuredExcelLoader",
    "UnstructuredEmailLoader",
]
dir_loader = [
    "DirectoryLoader",
    "S3DirLoader",
    "SlackDirectoryLoader",
    "PyPDFDirectoryLoader",
    "NotionDirectoryLoader",
]

in_memory = ["DataFrameLoader"]
remote_db = [
    "NotionDBLoader",
    "GoogleDriveLoader",
]

LOADER_TYPE_MAPPING = {
    "file": file_loader,
    "dir": dir_loader,
    "in-memory": in_memory,
    "remote_db": remote_db,
}

SUPPORTED_LOADERS = (*file_loader, *dir_loader, *in_memory)

logger = logging.getLogger(__name__)


[docs]class IndexedDocument(Document): id: str
[docs]class Runtime(BaseModel): """Pebblo运行时。 Args: type (Optional[str]): 运行时类型。默认为"" host (str): 运行时的主机名。 path (str): 当前工作目录路径。 ip (Optional[str]): 当前运行时的IP。默认为"" platform (str): 当前运行时的平台详情。 os (str): 操作系统名称。 os_version (str): 操作系统版本。 language (str): 运行时内核。 language_version (str): 当前运行时内核的版本。 runtime (Optional[str]) 更多运行时详情。默认为""""" type: str = "local" host: str path: str ip: Optional[str] = "" platform: str os: str os_version: str language: str language_version: str runtime: str = "local"
[docs]class Framework(BaseModel): """Pebblo框架实例。 参数: name (str): 框架的名称。 version (str): 框架的版本。""" name: str version: str
[docs]class App(BaseModel): """Pebblo AI 应用程序。 Args: name (str): 应用程序的名称。 owner (str): 应用程序的所有者。 description (Optional[str]): 应用程序的描述。 load_id (str): 应用程序实例的唯一 load_id。 runtime (Runtime): 应用程序的运行时详细信息。 framework (Framework): 应用程序的框架详细信息。 plugin_version (str): 用于应用程序的插件版本。""" name: str owner: str description: Optional[str] load_id: str runtime: Runtime framework: Framework plugin_version: str
[docs]class Doc(BaseModel): """Pebblo文档。 Args: name (str): 创建此文档的应用程序的名称。 owner (str): 应用程序的所有者。 docs (list): 具有元数据的文档列表。 plugin_version (str): Pebblo插件版本 load_id (str): 应用程序实例的唯一load_id。 loader_details (dict): 具有元数据的加载程序详细信息。 loading_end (bool): 布尔值,指定源加载结束。 source_owner (str): 加载程序源的所有者。""" name: str owner: str docs: list plugin_version: str load_id: str loader_details: dict loading_end: bool source_owner: str
[docs]def get_full_path(path: str) -> str: """返回本地文件/目录的绝对本地路径,对于网络相关路径,原样返回。 参数: path(str):要解析的相对路径。 返回: str:解析后的绝对路径。 """ if ( not path or ("://" in path) or ("/" == path[0]) or (path in ["unknown", "-", "in-memory"]) ): return path full_path = pathlib.Path(path) if full_path.exists(): full_path = full_path.resolve() return str(full_path)
[docs]def get_loader_type(loader: str) -> str: """返回加载器类型,可以是文件、目录或内存中的一种。 参数: loader (str): 要解析其类型的加载器的名称。 返回: str: 文件/目录/内存中的加载器类型之一。 """ for loader_type, loaders in LOADER_TYPE_MAPPING.items(): if loader in loaders: return loader_type return "unsupported"
[docs]def get_loader_full_path(loader: BaseLoader) -> str: """根据文档中存在的键,返回基于加载器源的绝对源路径。 参数: loader(BaseLoader):Langchain文档加载器,派生自Baseloader。 """ from langchain_community.document_loaders import ( DataFrameLoader, GCSFileLoader, NotionDBLoader, S3FileLoader, ) location = "-" if not isinstance(loader, BaseLoader): logger.error( "loader is not derived from BaseLoader, source location will be unknown!" ) return location loader_dict = loader.__dict__ try: if "bucket" in loader_dict: if isinstance(loader, GCSFileLoader): location = f"gc://{loader.bucket}/{loader.blob}" elif isinstance(loader, S3FileLoader): location = f"s3://{loader.bucket}/{loader.key}" elif "source" in loader_dict: location = loader_dict["source"] if location and "channel" in loader_dict: channel = loader_dict["channel"] if channel: location = f"{location}/{channel}" elif "path" in loader_dict: location = loader_dict["path"] elif "file_path" in loader_dict: location = loader_dict["file_path"] elif "web_paths" in loader_dict: web_paths = loader_dict["web_paths"] if web_paths and isinstance(web_paths, list) and len(web_paths) > 0: location = web_paths[0] # For in-memory types: elif isinstance(loader, DataFrameLoader): location = "in-memory" elif isinstance(loader, NotionDBLoader): location = f"notiondb://{loader.database_id}" elif loader.__class__.__name__ == "GoogleDriveLoader": if loader_dict.get("folder_id"): folder_id = loader_dict.get("folder_id") location = f"https://drive.google.com/drive/u/2/folders/{folder_id}" elif loader_dict.get("file_ids"): file_ids = loader_dict.get("file_ids", []) location = ", ".join( [ f"https://drive.google.com/file/d/{file_id}/view" for file_id in file_ids ] ) elif loader_dict.get("document_ids"): document_ids = loader_dict.get("document_ids", []) location = ", ".join( [ f"https://docs.google.com/document/d/{doc_id}/edit" for doc_id in document_ids ] ) except Exception: pass return get_full_path(str(location))
[docs]def get_runtime() -> Tuple[Framework, Runtime]: """获取当前框架和运行时详情。 返回: Tuple[Framework, Runtime]: 当前应用实例的框架和运行时。 """ runtime_env = get_runtime_environment() framework = Framework( name="langchain", version=runtime_env.get("library_version", None) ) uname = platform.uname() runtime = Runtime( host=uname.node, path=os.environ["PWD"], platform=runtime_env.get("platform", "unknown"), os=uname.system, os_version=uname.version, ip=get_ip(), language=runtime_env.get("runtime", "unknown"), language_version=runtime_env.get("runtime_version", "unknown"), ) if "Darwin" in runtime.os: runtime.type = "desktop" runtime.runtime = "Mac OSX" logger.debug(f"framework {framework}") logger.debug(f"runtime {runtime}") return framework, runtime
[docs]def get_ip() -> str: """获取本地运行时的IP地址。 返回: str:IP地址 """ import socket # lazy imports host = socket.gethostname() try: public_ip = socket.gethostbyname(host) except Exception: public_ip = socket.gethostbyname("localhost") return public_ip