Source code for langchain_community.llms.beam

import base64
import json
import logging
import subprocess
import textwrap
import time
from typing import Any, Dict, List, Mapping, Optional

import requests
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.pydantic_v1 import Extra, Field, root_validator
from langchain_core.utils import get_from_dict_or_env

logger = logging.getLogger(__name__)

DEFAULT_NUM_TRIES = 10
DEFAULT_SLEEP_TIME = 4


[docs]class Beam(LLM): """用于gpt2大型语言模型的Beam API。 要使用,您应该已安装``beam-sdk`` python包,并设置环境变量``BEAM_CLIENT_ID``为您的客户端ID,设置``BEAM_CLIENT_SECRET``为您的客户端密钥。有关如何获取这些信息的信息,请参阅:https://docs.beam.cloud/account/api-keys。 然后可以按照以下方式调用包装器,其中名称、cpu、内存、gpu、python版本和python包可以相应更新。部署后,可以调用该实例。 示例: .. code-block:: python llm = Beam(model_name="gpt2", name="langchain-gpt2", cpu=8, memory="32Gi", gpu="A10G", python_version="python3.8", python_packages=[ "diffusers[torch]>=0.10", "transformers", "torch", "pillow", "accelerate", "safetensors", "xformers",], max_length=50) llm._deploy() call_result = llm._call(input)""" model_name: str = "" name: str = "" cpu: str = "" memory: str = "" gpu: str = "" python_version: str = "" python_packages: List[str] = [] max_length: str = "" url: str = "" """使用的模型端点""" model_kwargs: Dict[str, Any] = Field(default_factory=dict) """包含在`create`调用中有效但未明确指定的任何模型参数。""" beam_client_id: str = "" beam_client_secret: str = "" app_id: Optional[str] = None class Config: """这是用于pydantic配置的设置。""" extra = Extra.forbid @root_validator(pre=True) def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]: """从传入的额外参数构建额外的kwargs。""" all_required_field_names = {field.alias for field in cls.__fields__.values()} extra = values.get("model_kwargs", {}) for field_name in list(values): if field_name not in all_required_field_names: if field_name in extra: raise ValueError(f"Found {field_name} supplied twice.") logger.warning( f"""{field_name} was transferred to model_kwargs. Please confirm that {field_name} is what you intended.""" ) extra[field_name] = values.pop(field_name) values["model_kwargs"] = extra return values @root_validator() def validate_environment(cls, values: Dict) -> Dict: """验证环境中是否存在API密钥和Python包。""" beam_client_id = get_from_dict_or_env( values, "beam_client_id", "BEAM_CLIENT_ID" ) beam_client_secret = get_from_dict_or_env( values, "beam_client_secret", "BEAM_CLIENT_SECRET" ) values["beam_client_id"] = beam_client_id values["beam_client_secret"] = beam_client_secret return values @property def _identifying_params(self) -> Mapping[str, Any]: """获取识别参数。""" return { "model_name": self.model_name, "name": self.name, "cpu": self.cpu, "memory": self.memory, "gpu": self.gpu, "python_version": self.python_version, "python_packages": self.python_packages, "max_length": self.max_length, "model_kwargs": self.model_kwargs, } @property def _llm_type(self) -> str: """llm的返回类型。""" return "beam"
[docs] def app_creation(self) -> None: """创建一个Python文件,其中包含您的Beam应用程序定义。""" script = textwrap.dedent( """\ import beam # The environment your code will run on app = beam.App( name="{name}", cpu={cpu}, memory="{memory}", gpu="{gpu}", python_version="{python_version}", python_packages={python_packages}, ) app.Trigger.RestAPI( inputs={{"prompt": beam.Types.String(), "max_length": beam.Types.String()}}, outputs={{"text": beam.Types.String()}}, handler="run.py:beam_langchain", ) """ ) script_name = "app.py" with open(script_name, "w") as file: file.write( script.format( name=self.name, cpu=self.cpu, memory=self.memory, gpu=self.gpu, python_version=self.python_version, python_packages=self.python_packages, ) )
[docs] def run_creation(self) -> None: """创建一个将部署在Beam上的Python文件。""" script = textwrap.dedent( """ import os import transformers from transformers import GPT2LMHeadModel, GPT2Tokenizer model_name = "{model_name}" def beam_langchain(**inputs): prompt = inputs["prompt"] length = inputs["max_length"] tokenizer = GPT2Tokenizer.from_pretrained(model_name) model = GPT2LMHeadModel.from_pretrained(model_name) encodedPrompt = tokenizer.encode(prompt, return_tensors='pt') outputs = model.generate(encodedPrompt, max_length=int(length), do_sample=True, pad_token_id=tokenizer.eos_token_id) output = tokenizer.decode(outputs[0], skip_special_tokens=True) print(output) # noqa: T201 return {{"text": output}} """ ) script_name = "run.py" with open(script_name, "w") as file: file.write(script.format(model_name=self.model_name))
def _deploy(self) -> str: """调用Beam。""" try: import beam # type: ignore if beam.__path__ == "": raise ImportError except ImportError: raise ImportError( "Could not import beam python package. " "Please install it with `curl " "https://raw.githubusercontent.com/slai-labs" "/get-beam/main/get-beam.sh -sSfL | sh`." ) self.app_creation() self.run_creation() process = subprocess.run( "beam deploy app.py", shell=True, capture_output=True, text=True ) if process.returncode == 0: output = process.stdout logger.info(output) lines = output.split("\n") for line in lines: if line.startswith(" i Send requests to: https://apps.beam.cloud/"): self.app_id = line.split("/")[-1] self.url = line.split(":")[1].strip() return self.app_id raise ValueError( f"""Failed to retrieve the appID from the deployment output. Deployment output: {output}""" ) else: raise ValueError(f"Deployment failed. Error: {process.stderr}") @property def authorization(self) -> str: if self.beam_client_id: credential_str = self.beam_client_id + ":" + self.beam_client_secret else: credential_str = self.beam_client_secret return base64.b64encode(credential_str.encode()).decode() def _call( self, prompt: str, stop: Optional[list] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: """调用Beam。""" url = "https://apps.beam.cloud/" + self.app_id if self.app_id else self.url payload = {"prompt": prompt, "max_length": self.max_length} payload.update(kwargs) headers = { "Accept": "*/*", "Accept-Encoding": "gzip, deflate", "Authorization": "Basic " + self.authorization, "Connection": "keep-alive", "Content-Type": "application/json", } for _ in range(DEFAULT_NUM_TRIES): request = requests.post(url, headers=headers, data=json.dumps(payload)) if request.status_code == 200: return request.json()["text"] time.sleep(DEFAULT_SLEEP_TIME) logger.warning("Unable to successfully call model.") return ""