Source code for langchain_community.llms.self_hosted

import importlib.util
import logging
import pickle
from typing import Any, Callable, List, Mapping, Optional

from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.pydantic_v1 import Extra

from langchain_community.llms.utils import enforce_stop_tokens

logger = logging.getLogger(__name__)


def _generate_text(
    pipeline: Any,
    prompt: str,
    *args: Any,
    stop: Optional[List[str]] = None,
    **kwargs: Any,
) -> str:
    """将推理函数发送到远程硬件。

接受一个流水线可调用对象(或更可能是指向集群对象存储中模型的键),
并为批处理中的每个文档返回文本预测。
"""
    text = pipeline(prompt, *args, **kwargs)
    if stop is not None:
        text = enforce_stop_tokens(text, stop)
    return text


def _send_pipeline_to_device(pipeline: Any, device: int) -> Any:
    """将一个管道发送到集群上的设备。"""
    if isinstance(pipeline, str):
        with open(pipeline, "rb") as f:
            pipeline = pickle.load(f)

    if importlib.util.find_spec("torch") is not None:
        import torch

        cuda_device_count = torch.cuda.device_count()
        if device < -1 or (device >= cuda_device_count):
            raise ValueError(
                f"Got device=={device}, "
                f"device is required to be within [-1, {cuda_device_count})"
            )
        if device < 0 and cuda_device_count > 0:
            logger.warning(
                "Device has %d GPUs available. "
                "Provide device={deviceId} to `from_model_id` to use available"
                "GPUs for execution. deviceId is -1 for CPU and "
                "can be a positive integer associated with CUDA device id.",
                cuda_device_count,
            )

        pipeline.device = torch.device(device)
        pipeline.model = pipeline.model.to(pipeline.device)
    return pipeline


[docs]class SelfHostedPipeline(LLM): """在自托管远程硬件上进行模型推理。 支持的硬件包括在AWS、GCP、Azure和Lambda上自动启动的实例,以及通过IP地址和SSH凭据指定的服务器(例如在本地,或者其他云平台如Paperspace、Coreweave等)。 要使用,您应该已安装``runhouse`` python包。 自定义管道和推理函数示例: .. code-block:: python from langchain_community.llms import SelfHostedPipeline from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline import runhouse as rh def load_pipeline(): tokenizer = AutoTokenizer.from_pretrained("gpt2") model = AutoModelForCausalLM.from_pretrained("gpt2") return pipeline( "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=10 ) def inference_fn(pipeline, prompt, stop = None): return pipeline(prompt)[0]["generated_text"] gpu = rh.cluster(name="rh-a10x", instance_type="A100:1") llm = SelfHostedPipeline( model_load_fn=load_pipeline, hardware=gpu, model_reqs=model_reqs, inference_fn=inference_fn ) <2GB模型示例(可以序列化并直接发送到服务器): .. code-block:: python from langchain_community.llms import SelfHostedPipeline import runhouse as rh gpu = rh.cluster(name="rh-a10x", instance_type="A100:1") my_model = ... llm = SelfHostedPipeline.from_pipeline( pipeline=my_model, hardware=gpu, model_reqs=["./", "torch", "transformers"], ) 传递较大模型路径的示例: .. code-block:: python from langchain_community.llms import SelfHostedPipeline import runhouse as rh import pickle from transformers import pipeline generator = pipeline(model="gpt2") rh.blob(pickle.dumps(generator), path="models/pipeline.pkl" ).save().to(gpu, path="models") llm = SelfHostedPipeline.from_pipeline( pipeline="models/pipeline.pkl", hardware=gpu, model_reqs=["./", "torch", "transformers"], )""" pipeline_ref: Any #: :meta private: client: Any #: :meta private: inference_fn: Callable = _generate_text #: :meta private: """发送到远程硬件的推理函数。""" hardware: Any """远程硬件发送推理函数。""" model_load_fn: Callable """在服务器上远程加载模型的函数。""" load_fn_kwargs: Optional[dict] = None """传递给模型加载函数的关键字参数。""" model_reqs: List[str] = ["./", "torch"] """在硬件上安装推断模型所需的要求。""" allow_dangerous_deserialization: bool = False """允许使用pickle进行反序列化,如果加载了受损数据可能会很危险。""" class Config: """此pydantic对象的配置。""" extra = Extra.forbid def __init__(self, **kwargs: Any): """使用辅助函数初始化流水线。 加载函数必须在全局范围内才能被导入并在服务器上运行,即在一个模块中而不是在REPL或闭包中。然后,初始化远程推断函数。 """ if not kwargs.get("allow_dangerous_deserialization"): raise ValueError( "SelfHostedPipeline relies on the pickle module. " "You will need to set allow_dangerous_deserialization=True " "if you want to opt-in to allow deserialization of data using pickle." "Data can be compromised by a malicious actor if " "not handled properly to include " "a malicious payload that when deserialized with " "pickle can execute arbitrary code. " ) super().__init__(**kwargs) try: import runhouse as rh except ImportError: raise ImportError( "Could not import runhouse python package. " "Please install it with `pip install runhouse`." ) remote_load_fn = rh.function(fn=self.model_load_fn).to( self.hardware, reqs=self.model_reqs ) _load_fn_kwargs = self.load_fn_kwargs or {} self.pipeline_ref = remote_load_fn.remote(**_load_fn_kwargs) self.client = rh.function(fn=self.inference_fn).to( self.hardware, reqs=self.model_reqs )
[docs] @classmethod def from_pipeline( cls, pipeline: Any, hardware: Any, model_reqs: Optional[List[str]] = None, device: int = 0, **kwargs: Any, ) -> LLM: """从管道对象或字符串初始化SelfHostedPipeline。""" if not isinstance(pipeline, str): logger.warning( "Serializing pipeline to send to remote hardware. " "Note, it can be quite slow" "to serialize and send large models with each execution. " "Consider sending the pipeline" "to the cluster and passing the path to the pipeline instead." ) load_fn_kwargs = {"pipeline": pipeline, "device": device} return cls( load_fn_kwargs=load_fn_kwargs, model_load_fn=_send_pipeline_to_device, hardware=hardware, model_reqs=["transformers", "torch"] + (model_reqs or []), **kwargs, )
@property def _identifying_params(self) -> Mapping[str, Any]: """获取识别参数。""" return { **{"hardware": self.hardware}, } @property def _llm_type(self) -> str: return "self_hosted_llm" def _call( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: return self.client( pipeline=self.pipeline_ref, prompt=prompt, stop=stop, **kwargs )