from enum import Enum
from typing import Any, Iterator, List, Optional
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
from langchain_core.pydantic_v1 import BaseModel
from langchain_community.llms.utils import enforce_stop_tokens
[docs]class Device(str, Enum):
"""用于推理的设备,cuda或cpu"""
cuda = "cuda"
cpu = "cpu"
[docs]class ReaderConfig(BaseModel):
"""用于部署在Titan Takeoff API中的阅读器配置。"""
class Config:
protected_namespaces = ()
model_name: str
"""要使用的模型名称"""
device: Device = Device.cuda
"""用于推理的设备,cuda或cpu"""
consumer_group: str = "primary"
"""将读者放入的消费者组"""
tensor_parallel: Optional[int] = None
"""您希望您的模型跨越的GPU数量"""
max_seq_length: int = 512
"""推理时要使用的最大序列长度,默认为512"""
max_batch_size: int = 4
"""连续批处理请求的最大批处理大小"""
[docs]class TitanTakeoff(LLM):
"""Titan Takeoff API LLMs。
Titan Takeoff是一个用于与Takeoff推理API进行交互的包装器,用于生成文本到文本的语言模型。
您可以使用这个包装器向生成式语言模型发送请求,并部署读者到Takeoff。
示例:
这是一个部署生成式语言模型并发送请求的示例。
.. code-block:: python
# 从社区包中导入TitanTakeoff类
import time
from langchain_community.llms import TitanTakeoff
# 指定要部署的嵌入式读者
reader_1 = {
"model_name": "TheBloke/Llama-2-7b-Chat-AWQ",
"device": "cuda",
"tensor_parallel": 1,
"consumer_group": "llama"
}
# 对于传递到models参数的每个读者,Takeoff将根据您提供的规格启动一个读者。如果您不指定该参数,将不会启动任何模型,它会假定您已经单独完成了这一步。
llm = TitanTakeoff(models=[reader_1])
# 等待读者部署完成,所需时间取决于模型大小和您的互联网速度
time.sleep(60)
# 返回发送到`llama`消费者组的查询,即一个List[float],在这里我们刚刚启动了Llama 7B模型
print(embed.invoke(
"Where can I see football?", consumer_group="llama"
))
# 您还可以向模型发送生成参数,任何以下参数都可以作为kwargs传递:
# https://docs.titanml.co/docs/next/apis/Takeoff%20inference_REST_API/generate#request
# 例如:
print(embed.invoke(
"Where can I see football?", consumer_group="llama", max_new_tokens=100
))"""
base_url: str = "http://localhost"
"""Titan Takeoff (Pro) 服务器的基本URL。默认值为 "http://localhost"。"""
port: int = 3000
"""Titan Takeoff(Pro)服务器的端口。默认值为3000。"""
mgmt_port: int = 3001
"""Titan Takeoff(Pro)服务器的管理端口。默认值为3001。"""
streaming: bool = False
"""是否流式输出。默认值为False。"""
client: Any = None
"""Takeoff客户端Python SDK用于与Takeoff API进行交互"""
def __init__(
self,
base_url: str = "http://localhost",
port: int = 3000,
mgmt_port: int = 3001,
streaming: bool = False,
models: List[ReaderConfig] = [],
):
"""初始化Titan Takeoff语言包装器。
参数:
base_url (str, 可选): Takeoff 推理服务器正在监听的基本URL。默认为 `http://localhost`。
port (int, 可选): Takeoff 推理API正在监听的端口。默认为 3000。
mgmt_port (int, 可选): Takeoff 管理API正在监听的端口。默认为 3001。
streaming (bool, 可选): 是否默认使用 generate_stream 端点而不是 generate 来流式传输响应。默认为 False。实际上,这并没有太大区别,因为流式传输的响应会被缓冲并类似于非流式传输的响应返回,但是运行管理器会应用于每个生成的标记。
models (List[ReaderConfig], 可选): 您想要启动的任何阅读器。默认为 []。
抛出:
ImportError: 如果您尚未安装 takeoff-client,则会出现 ImportError。要解决此问题,请运行 `pip install 'takeoff-client==0.4.0'`。
"""
super().__init__( # type: ignore[call-arg]
base_url=base_url, port=port, mgmt_port=mgmt_port, streaming=streaming
)
try:
from takeoff_client import TakeoffClient
except ImportError:
raise ImportError(
"takeoff-client is required for TitanTakeoff. "
"Please install it with `pip install 'takeoff-client>=0.4.0'`."
)
self.client = TakeoffClient(
self.base_url, port=self.port, mgmt_port=self.mgmt_port
)
for model in models:
self.client.create_reader(model)
@property
def _llm_type(self) -> str:
"""llm的返回类型。"""
return "titan_takeoff"
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""调用Titan Takeoff (Pro)生成端点。
参数:
prompt: 传递到模型中的提示。
stop: 生成时可选的停止词列表。
run_manager: 在流式传输时使用的可选回调管理器。
返回:
模型生成的字符串。
示例:
.. code-block:: python
model = TitanTakeoff()
prompt = "英国的首都是哪里?"
# 在LangChain 0.1.7中,使用model(prompt)即`__call__`已被弃用,
# 请改用model.invoke(prompt)。
response = model.invoke(prompt)
"""
if self.streaming:
text_output = ""
for chunk in self._stream(
prompt=prompt,
stop=stop,
run_manager=run_manager,
):
text_output += chunk.text
return text_output
response = self.client.generate(prompt, **kwargs)
text = response["text"]
if stop is not None:
text = enforce_stop_tokens(text, stop)
return text
def _stream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[GenerationChunk]:
"""调用Titan Takeoff (Pro)流端点。
参数:
prompt: 传递给模型的提示。
stop: 生成时可选的停止词列表。
run_manager: 在流式传输时使用的可选回调管理器。
产生:
包含字符串标记的类似字典的对象。
示例:
.. code-block:: python
model = TitanTakeoff()
prompt = "英国的首都是哪里?"
response = model.stream(prompt)
# 或者
model = TitanTakeoff(streaming=True)
response = model.invoke(prompt)
"""
response = self.client.generate_stream(prompt, **kwargs)
buffer = ""
for text in response:
buffer += text.data
if "data:" in buffer:
# Remove the first instance of "data:" from the buffer.
if buffer.startswith("data:"):
buffer = ""
if len(buffer.split("data:", 1)) == 2:
content, _ = buffer.split("data:", 1)
buffer = content.rstrip("\n")
# Trim the buffer to only have content after the "data:" part.
if buffer: # Ensure that there's content to process.
chunk = GenerationChunk(text=buffer)
buffer = "" # Reset buffer for the next set of data.
yield chunk
if run_manager:
run_manager.on_llm_new_token(token=chunk.text)
# Yield any remaining content in the buffer.
if buffer:
chunk = GenerationChunk(text=buffer.replace("</s>", ""))
yield chunk
if run_manager:
run_manager.on_llm_new_token(token=chunk.text)