from __future__ import annotations
import logging
import os
import pathlib
import platform
from typing import Optional, Tuple
from langchain_core.documents import Document
from langchain_core.env import get_runtime_environment
from langchain_core.pydantic_v1 import BaseModel
from langchain_community.document_loaders.base import BaseLoader
logger = logging.getLogger(__name__)
PLUGIN_VERSION = "0.1.1"
CLASSIFIER_URL = os.getenv("PEBBLO_CLASSIFIER_URL", "http://localhost:8000")
PEBBLO_CLOUD_URL = os.getenv("PEBBLO_CLOUD_URL", "https://api.daxa.ai")
LOADER_DOC_URL = "/v1/loader/doc"
APP_DISCOVER_URL = "/v1/app/discover"
# Supported loaders for Pebblo safe data loading
file_loader = [
"JSONLoader",
"S3FileLoader",
"UnstructuredMarkdownLoader",
"UnstructuredPDFLoader",
"UnstructuredFileLoader",
"UnstructuredJsonLoader",
"PyPDFLoader",
"GCSFileLoader",
"AmazonTextractPDFLoader",
"CSVLoader",
"UnstructuredExcelLoader",
"UnstructuredEmailLoader",
]
dir_loader = [
"DirectoryLoader",
"S3DirLoader",
"SlackDirectoryLoader",
"PyPDFDirectoryLoader",
"NotionDirectoryLoader",
]
in_memory = ["DataFrameLoader"]
remote_db = [
"NotionDBLoader",
"GoogleDriveLoader",
]
LOADER_TYPE_MAPPING = {
"file": file_loader,
"dir": dir_loader,
"in-memory": in_memory,
"remote_db": remote_db,
}
SUPPORTED_LOADERS = (*file_loader, *dir_loader, *in_memory)
logger = logging.getLogger(__name__)
[docs]class IndexedDocument(Document):
id: str
[docs]class Runtime(BaseModel):
"""Pebblo运行时。
Args:
type (Optional[str]): 运行时类型。默认为""
host (str): 运行时的主机名。
path (str): 当前工作目录路径。
ip (Optional[str]): 当前运行时的IP。默认为""
platform (str): 当前运行时的平台详情。
os (str): 操作系统名称。
os_version (str): 操作系统版本。
language (str): 运行时内核。
language_version (str): 当前运行时内核的版本。
runtime (Optional[str]) 更多运行时详情。默认为"""""
type: str = "local"
host: str
path: str
ip: Optional[str] = ""
platform: str
os: str
os_version: str
language: str
language_version: str
runtime: str = "local"
[docs]class Framework(BaseModel):
"""Pebblo框架实例。
参数:
name (str): 框架的名称。
version (str): 框架的版本。"""
name: str
version: str
[docs]class App(BaseModel):
"""Pebblo AI 应用程序。
Args:
name (str): 应用程序的名称。
owner (str): 应用程序的所有者。
description (Optional[str]): 应用程序的描述。
load_id (str): 应用程序实例的唯一 load_id。
runtime (Runtime): 应用程序的运行时详细信息。
framework (Framework): 应用程序的框架详细信息。
plugin_version (str): 用于应用程序的插件版本。"""
name: str
owner: str
description: Optional[str]
load_id: str
runtime: Runtime
framework: Framework
plugin_version: str
[docs]class Doc(BaseModel):
"""Pebblo文档。
Args:
name (str): 创建此文档的应用程序的名称。
owner (str): 应用程序的所有者。
docs (list): 具有元数据的文档列表。
plugin_version (str): Pebblo插件版本
load_id (str): 应用程序实例的唯一load_id。
loader_details (dict): 具有元数据的加载程序详细信息。
loading_end (bool): 布尔值,指定源加载结束。
source_owner (str): 加载程序源的所有者。"""
name: str
owner: str
docs: list
plugin_version: str
load_id: str
loader_details: dict
loading_end: bool
source_owner: str
[docs]def get_full_path(path: str) -> str:
"""返回本地文件/目录的绝对本地路径,对于网络相关路径,原样返回。
参数:
path(str):要解析的相对路径。
返回:
str:解析后的绝对路径。
"""
if (
not path
or ("://" in path)
or ("/" == path[0])
or (path in ["unknown", "-", "in-memory"])
):
return path
full_path = pathlib.Path(path)
if full_path.exists():
full_path = full_path.resolve()
return str(full_path)
[docs]def get_loader_type(loader: str) -> str:
"""返回加载器类型,可以是文件、目录或内存中的一种。
参数:
loader (str): 要解析其类型的加载器的名称。
返回:
str: 文件/目录/内存中的加载器类型之一。
"""
for loader_type, loaders in LOADER_TYPE_MAPPING.items():
if loader in loaders:
return loader_type
return "unsupported"
[docs]def get_loader_full_path(loader: BaseLoader) -> str:
"""根据文档中存在的键,返回基于加载器源的绝对源路径。
参数:
loader(BaseLoader):Langchain文档加载器,派生自Baseloader。
"""
from langchain_community.document_loaders import (
DataFrameLoader,
GCSFileLoader,
NotionDBLoader,
S3FileLoader,
)
location = "-"
if not isinstance(loader, BaseLoader):
logger.error(
"loader is not derived from BaseLoader, source location will be unknown!"
)
return location
loader_dict = loader.__dict__
try:
if "bucket" in loader_dict:
if isinstance(loader, GCSFileLoader):
location = f"gc://{loader.bucket}/{loader.blob}"
elif isinstance(loader, S3FileLoader):
location = f"s3://{loader.bucket}/{loader.key}"
elif "source" in loader_dict:
location = loader_dict["source"]
if location and "channel" in loader_dict:
channel = loader_dict["channel"]
if channel:
location = f"{location}/{channel}"
elif "path" in loader_dict:
location = loader_dict["path"]
elif "file_path" in loader_dict:
location = loader_dict["file_path"]
elif "web_paths" in loader_dict:
web_paths = loader_dict["web_paths"]
if web_paths and isinstance(web_paths, list) and len(web_paths) > 0:
location = web_paths[0]
# For in-memory types:
elif isinstance(loader, DataFrameLoader):
location = "in-memory"
elif isinstance(loader, NotionDBLoader):
location = f"notiondb://{loader.database_id}"
elif loader.__class__.__name__ == "GoogleDriveLoader":
if loader_dict.get("folder_id"):
folder_id = loader_dict.get("folder_id")
location = f"https://drive.google.com/drive/u/2/folders/{folder_id}"
elif loader_dict.get("file_ids"):
file_ids = loader_dict.get("file_ids", [])
location = ", ".join(
[
f"https://drive.google.com/file/d/{file_id}/view"
for file_id in file_ids
]
)
elif loader_dict.get("document_ids"):
document_ids = loader_dict.get("document_ids", [])
location = ", ".join(
[
f"https://docs.google.com/document/d/{doc_id}/edit"
for doc_id in document_ids
]
)
except Exception:
pass
return get_full_path(str(location))
[docs]def get_runtime() -> Tuple[Framework, Runtime]:
"""获取当前框架和运行时详情。
返回:
Tuple[Framework, Runtime]: 当前应用实例的框架和运行时。
"""
runtime_env = get_runtime_environment()
framework = Framework(
name="langchain", version=runtime_env.get("library_version", None)
)
uname = platform.uname()
runtime = Runtime(
host=uname.node,
path=os.environ["PWD"],
platform=runtime_env.get("platform", "unknown"),
os=uname.system,
os_version=uname.version,
ip=get_ip(),
language=runtime_env.get("runtime", "unknown"),
language_version=runtime_env.get("runtime_version", "unknown"),
)
if "Darwin" in runtime.os:
runtime.type = "desktop"
runtime.runtime = "Mac OSX"
logger.debug(f"framework {framework}")
logger.debug(f"runtime {runtime}")
return framework, runtime
[docs]def get_ip() -> str:
"""获取本地运行时的IP地址。
返回:
str:IP地址
"""
import socket # lazy imports
host = socket.gethostname()
try:
public_ip = socket.gethostbyname(host)
except Exception:
public_ip = socket.gethostbyname("localhost")
return public_ip