import importlib.util
import logging
import pickle
from typing import Any, Callable, List, Mapping, Optional
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.pydantic_v1 import Extra
from langchain_community.llms.utils import enforce_stop_tokens
logger = logging.getLogger(__name__)
def _generate_text(
pipeline: Any,
prompt: str,
*args: Any,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> str:
"""将推理函数发送到远程硬件。
接受一个流水线可调用对象(或更可能是指向集群对象存储中模型的键),
并为批处理中的每个文档返回文本预测。
"""
text = pipeline(prompt, *args, **kwargs)
if stop is not None:
text = enforce_stop_tokens(text, stop)
return text
def _send_pipeline_to_device(pipeline: Any, device: int) -> Any:
"""将一个管道发送到集群上的设备。"""
if isinstance(pipeline, str):
with open(pipeline, "rb") as f:
pipeline = pickle.load(f)
if importlib.util.find_spec("torch") is not None:
import torch
cuda_device_count = torch.cuda.device_count()
if device < -1 or (device >= cuda_device_count):
raise ValueError(
f"Got device=={device}, "
f"device is required to be within [-1, {cuda_device_count})"
)
if device < 0 and cuda_device_count > 0:
logger.warning(
"Device has %d GPUs available. "
"Provide device={deviceId} to `from_model_id` to use available"
"GPUs for execution. deviceId is -1 for CPU and "
"can be a positive integer associated with CUDA device id.",
cuda_device_count,
)
pipeline.device = torch.device(device)
pipeline.model = pipeline.model.to(pipeline.device)
return pipeline
[docs]class SelfHostedPipeline(LLM):
"""在自托管远程硬件上进行模型推理。
支持的硬件包括在AWS、GCP、Azure和Lambda上自动启动的实例,以及通过IP地址和SSH凭据指定的服务器(例如在本地,或者其他云平台如Paperspace、Coreweave等)。
要使用,您应该已安装``runhouse`` python包。
自定义管道和推理函数示例:
.. code-block:: python
from langchain_community.llms import SelfHostedPipeline
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import runhouse as rh
def load_pipeline():
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
return pipeline(
"text-generation", model=model, tokenizer=tokenizer,
max_new_tokens=10
)
def inference_fn(pipeline, prompt, stop = None):
return pipeline(prompt)[0]["generated_text"]
gpu = rh.cluster(name="rh-a10x", instance_type="A100:1")
llm = SelfHostedPipeline(
model_load_fn=load_pipeline,
hardware=gpu,
model_reqs=model_reqs, inference_fn=inference_fn
)
<2GB模型示例(可以序列化并直接发送到服务器):
.. code-block:: python
from langchain_community.llms import SelfHostedPipeline
import runhouse as rh
gpu = rh.cluster(name="rh-a10x", instance_type="A100:1")
my_model = ...
llm = SelfHostedPipeline.from_pipeline(
pipeline=my_model,
hardware=gpu,
model_reqs=["./", "torch", "transformers"],
)
传递较大模型路径的示例:
.. code-block:: python
from langchain_community.llms import SelfHostedPipeline
import runhouse as rh
import pickle
from transformers import pipeline
generator = pipeline(model="gpt2")
rh.blob(pickle.dumps(generator), path="models/pipeline.pkl"
).save().to(gpu, path="models")
llm = SelfHostedPipeline.from_pipeline(
pipeline="models/pipeline.pkl",
hardware=gpu,
model_reqs=["./", "torch", "transformers"],
)"""
pipeline_ref: Any #: :meta private:
client: Any #: :meta private:
inference_fn: Callable = _generate_text #: :meta private:
"""发送到远程硬件的推理函数。"""
hardware: Any
"""远程硬件发送推理函数。"""
model_load_fn: Callable
"""在服务器上远程加载模型的函数。"""
load_fn_kwargs: Optional[dict] = None
"""传递给模型加载函数的关键字参数。"""
model_reqs: List[str] = ["./", "torch"]
"""在硬件上安装推断模型所需的要求。"""
allow_dangerous_deserialization: bool = False
"""允许使用pickle进行反序列化,如果加载了受损数据可能会很危险。"""
class Config:
"""此pydantic对象的配置。"""
extra = Extra.forbid
def __init__(self, **kwargs: Any):
"""使用辅助函数初始化流水线。
加载函数必须在全局范围内才能被导入并在服务器上运行,即在一个模块中而不是在REPL或闭包中。然后,初始化远程推断函数。
"""
if not kwargs.get("allow_dangerous_deserialization"):
raise ValueError(
"SelfHostedPipeline relies on the pickle module. "
"You will need to set allow_dangerous_deserialization=True "
"if you want to opt-in to allow deserialization of data using pickle."
"Data can be compromised by a malicious actor if "
"not handled properly to include "
"a malicious payload that when deserialized with "
"pickle can execute arbitrary code. "
)
super().__init__(**kwargs)
try:
import runhouse as rh
except ImportError:
raise ImportError(
"Could not import runhouse python package. "
"Please install it with `pip install runhouse`."
)
remote_load_fn = rh.function(fn=self.model_load_fn).to(
self.hardware, reqs=self.model_reqs
)
_load_fn_kwargs = self.load_fn_kwargs or {}
self.pipeline_ref = remote_load_fn.remote(**_load_fn_kwargs)
self.client = rh.function(fn=self.inference_fn).to(
self.hardware, reqs=self.model_reqs
)
[docs] @classmethod
def from_pipeline(
cls,
pipeline: Any,
hardware: Any,
model_reqs: Optional[List[str]] = None,
device: int = 0,
**kwargs: Any,
) -> LLM:
"""从管道对象或字符串初始化SelfHostedPipeline。"""
if not isinstance(pipeline, str):
logger.warning(
"Serializing pipeline to send to remote hardware. "
"Note, it can be quite slow"
"to serialize and send large models with each execution. "
"Consider sending the pipeline"
"to the cluster and passing the path to the pipeline instead."
)
load_fn_kwargs = {"pipeline": pipeline, "device": device}
return cls(
load_fn_kwargs=load_fn_kwargs,
model_load_fn=_send_pipeline_to_device,
hardware=hardware,
model_reqs=["transformers", "torch"] + (model_reqs or []),
**kwargs,
)
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""获取识别参数。"""
return {
**{"hardware": self.hardware},
}
@property
def _llm_type(self) -> str:
return "self_hosted_llm"
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
return self.client(
pipeline=self.pipeline_ref, prompt=prompt, stop=stop, **kwargs
)