Source code for langchain_community.llms.titan_takeoff

from enum import Enum
from typing import Any, Iterator, List, Optional

from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
from langchain_core.pydantic_v1 import BaseModel

from langchain_community.llms.utils import enforce_stop_tokens


[docs]class Device(str, Enum): """用于推理的设备,cuda或cpu""" cuda = "cuda" cpu = "cpu"
[docs]class ReaderConfig(BaseModel): """用于部署在Titan Takeoff API中的阅读器配置。""" class Config: protected_namespaces = () model_name: str """要使用的模型名称""" device: Device = Device.cuda """用于推理的设备,cuda或cpu""" consumer_group: str = "primary" """将读者放入的消费者组""" tensor_parallel: Optional[int] = None """您希望您的模型跨越的GPU数量""" max_seq_length: int = 512 """推理时要使用的最大序列长度,默认为512""" max_batch_size: int = 4 """连续批处理请求的最大批处理大小"""
[docs]class TitanTakeoff(LLM): """Titan Takeoff API LLMs。 Titan Takeoff是一个用于与Takeoff推理API进行交互的包装器,用于生成文本到文本的语言模型。 您可以使用这个包装器向生成式语言模型发送请求,并部署读者到Takeoff。 示例: 这是一个部署生成式语言模型并发送请求的示例。 .. code-block:: python # 从社区包中导入TitanTakeoff类 import time from langchain_community.llms import TitanTakeoff # 指定要部署的嵌入式读者 reader_1 = { "model_name": "TheBloke/Llama-2-7b-Chat-AWQ", "device": "cuda", "tensor_parallel": 1, "consumer_group": "llama" } # 对于传递到models参数的每个读者,Takeoff将根据您提供的规格启动一个读者。如果您不指定该参数,将不会启动任何模型,它会假定您已经单独完成了这一步。 llm = TitanTakeoff(models=[reader_1]) # 等待读者部署完成,所需时间取决于模型大小和您的互联网速度 time.sleep(60) # 返回发送到`llama`消费者组的查询,即一个List[float],在这里我们刚刚启动了Llama 7B模型 print(embed.invoke( "Where can I see football?", consumer_group="llama" )) # 您还可以向模型发送生成参数,任何以下参数都可以作为kwargs传递: # https://docs.titanml.co/docs/next/apis/Takeoff%20inference_REST_API/generate#request # 例如: print(embed.invoke( "Where can I see football?", consumer_group="llama", max_new_tokens=100 ))""" base_url: str = "http://localhost" """Titan Takeoff (Pro) 服务器的基本URL。默认值为 "http://localhost"。""" port: int = 3000 """Titan Takeoff(Pro)服务器的端口。默认值为3000。""" mgmt_port: int = 3001 """Titan Takeoff(Pro)服务器的管理端口。默认值为3001。""" streaming: bool = False """是否流式输出。默认值为False。""" client: Any = None """Takeoff客户端Python SDK用于与Takeoff API进行交互""" def __init__( self, base_url: str = "http://localhost", port: int = 3000, mgmt_port: int = 3001, streaming: bool = False, models: List[ReaderConfig] = [], ): """初始化Titan Takeoff语言包装器。 参数: base_url (str, 可选): Takeoff 推理服务器正在监听的基本URL。默认为 `http://localhost`。 port (int, 可选): Takeoff 推理API正在监听的端口。默认为 3000。 mgmt_port (int, 可选): Takeoff 管理API正在监听的端口。默认为 3001。 streaming (bool, 可选): 是否默认使用 generate_stream 端点而不是 generate 来流式传输响应。默认为 False。实际上,这并没有太大区别,因为流式传输的响应会被缓冲并类似于非流式传输的响应返回,但是运行管理器会应用于每个生成的标记。 models (List[ReaderConfig], 可选): 您想要启动的任何阅读器。默认为 []。 抛出: ImportError: 如果您尚未安装 takeoff-client,则会出现 ImportError。要解决此问题,请运行 `pip install 'takeoff-client==0.4.0'`。 """ super().__init__( # type: ignore[call-arg] base_url=base_url, port=port, mgmt_port=mgmt_port, streaming=streaming ) try: from takeoff_client import TakeoffClient except ImportError: raise ImportError( "takeoff-client is required for TitanTakeoff. " "Please install it with `pip install 'takeoff-client>=0.4.0'`." ) self.client = TakeoffClient( self.base_url, port=self.port, mgmt_port=self.mgmt_port ) for model in models: self.client.create_reader(model) @property def _llm_type(self) -> str: """llm的返回类型。""" return "titan_takeoff" def _call( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: """调用Titan Takeoff (Pro)生成端点。 参数: prompt: 传递到模型中的提示。 stop: 生成时可选的停止词列表。 run_manager: 在流式传输时使用的可选回调管理器。 返回: 模型生成的字符串。 示例: .. code-block:: python model = TitanTakeoff() prompt = "英国的首都是哪里?" # 在LangChain 0.1.7中,使用model(prompt)即`__call__`已被弃用, # 请改用model.invoke(prompt)。 response = model.invoke(prompt) """ if self.streaming: text_output = "" for chunk in self._stream( prompt=prompt, stop=stop, run_manager=run_manager, ): text_output += chunk.text return text_output response = self.client.generate(prompt, **kwargs) text = response["text"] if stop is not None: text = enforce_stop_tokens(text, stop) return text def _stream( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Iterator[GenerationChunk]: """调用Titan Takeoff (Pro)流端点。 参数: prompt: 传递给模型的提示。 stop: 生成时可选的停止词列表。 run_manager: 在流式传输时使用的可选回调管理器。 产生: 包含字符串标记的类似字典的对象。 示例: .. code-block:: python model = TitanTakeoff() prompt = "英国的首都是哪里?" response = model.stream(prompt) # 或者 model = TitanTakeoff(streaming=True) response = model.invoke(prompt) """ response = self.client.generate_stream(prompt, **kwargs) buffer = "" for text in response: buffer += text.data if "data:" in buffer: # Remove the first instance of "data:" from the buffer. if buffer.startswith("data:"): buffer = "" if len(buffer.split("data:", 1)) == 2: content, _ = buffer.split("data:", 1) buffer = content.rstrip("\n") # Trim the buffer to only have content after the "data:" part. if buffer: # Ensure that there's content to process. chunk = GenerationChunk(text=buffer) buffer = "" # Reset buffer for the next set of data. yield chunk if run_manager: run_manager.on_llm_new_token(token=chunk.text) # Yield any remaining content in the buffer. if buffer: chunk = GenerationChunk(text=buffer.replace("</s>", "")) yield chunk if run_manager: run_manager.on_llm_new_token(token=chunk.text)