from __future__ import annotations
import logging
from typing import Any, Callable, Iterator, List, Mapping, Optional
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
from langchain_core.pydantic_v1 import Extra
DEFAULT_MODEL_ID = "mlx-community/quantized-gemma-2b"
logger = logging.getLogger(__name__)
[docs]class MLXPipeline(LLM):
"""MLX Pipeline API。
要使用,您应该已安装``mlx-lm`` python包。
使用from_model_id的示例:
.. code-block:: python
from langchain_community.llms import MLXPipeline
pipe = MLXPipeline.from_model_id(
model_id="mlx-community/quantized-gemma-2b",
pipeline_kwargs={"max_tokens": 10, "temp": 0.7},
)
直接传递模型和分词器的示例:
.. code-block:: python
from langchain_community.llms import MLXPipeline
from mlx_lm import load
model_id="mlx-community/quantized-gemma-2b"
model, tokenizer = load(model_id)
pipe = MLXPipeline(model=model, tokenizer=tokenizer)"""
model_id: str = DEFAULT_MODEL_ID
"""要使用的模型名称。"""
model: Any #: :meta private:
"""模型。"""
tokenizer: Any #: :meta private:
"""标记器。"""
tokenizer_config: Optional[dict] = None
"""用于分词器的配置参数。
默认为空字典。"""
adapter_file: Optional[str] = None
"""适配器文件的路径。如果提供,则将LoRA层应用于模型。默认为None。"""
lazy: bool = False
"""如果为False,则评估模型参数以确保它们在返回之前已加载到内存中,否则它们将在需要时加载。默认值为``False``。"""
pipeline_kwargs: Optional[dict] = None
""" 传递给流水线的关键字参数。默认包括:
- temp (float): 生成的温度,默认为0.0。
- max_tokens (int): 生成的最大标记数,默认为100。
- verbose (bool): 是否输出详细日志,默认为False。
- formatter (Optional[Callable]): 用于格式化输出的可调用对象。
默认为None。
- repetition_penalty (Optional[float]): 重复序列的惩罚因子,默认为None。
- repetition_context_size (Optional[int]): 应用重复惩罚的上下文大小,默认为None。
- top_p (float): 用于top-p过滤的累积概率阈值,默认为1.0。"""
class Config:
"""此pydantic对象的配置。"""
extra = Extra.forbid
[docs] @classmethod
def from_model_id(
cls,
model_id: str,
tokenizer_config: Optional[dict] = None,
adapter_file: Optional[str] = None,
lazy: bool = False,
pipeline_kwargs: Optional[dict] = None,
**kwargs: Any,
) -> MLXPipeline:
"""根据model_id和task构建流水线对象。"""
try:
from mlx_lm import load
except ImportError:
raise ImportError(
"Could not import mlx_lm python package. "
"Please install it with `pip install mlx_lm`."
)
tokenizer_config = tokenizer_config or {}
if adapter_file:
model, tokenizer = load(model_id, tokenizer_config, adapter_file, lazy)
else:
model, tokenizer = load(model_id, tokenizer_config, lazy=lazy)
_pipeline_kwargs = pipeline_kwargs or {}
return cls(
model_id=model_id,
model=model,
tokenizer=tokenizer,
tokenizer_config=tokenizer_config,
adapter_file=adapter_file,
lazy=lazy,
pipeline_kwargs=_pipeline_kwargs,
**kwargs,
)
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""获取识别参数。"""
return {
"model_id": self.model_id,
"tokenizer_config": self.tokenizer_config,
"adapter_file": self.adapter_file,
"lazy": self.lazy,
"pipeline_kwargs": self.pipeline_kwargs,
}
@property
def _llm_type(self) -> str:
return "mlx_pipeline"
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
try:
from mlx_lm import generate
except ImportError:
raise ImportError(
"Could not import mlx_lm python package. "
"Please install it with `pip install mlx_lm`."
)
pipeline_kwargs = kwargs.get("pipeline_kwargs", self.pipeline_kwargs)
temp: float = pipeline_kwargs.get("temp", 0.0)
max_tokens: int = pipeline_kwargs.get("max_tokens", 100)
verbose: bool = pipeline_kwargs.get("verbose", False)
formatter: Optional[Callable] = pipeline_kwargs.get("formatter", None)
repetition_penalty: Optional[float] = pipeline_kwargs.get(
"repetition_penalty", None
)
repetition_context_size: Optional[int] = pipeline_kwargs.get(
"repetition_context_size", None
)
top_p: float = pipeline_kwargs.get("top_p", 1.0)
return generate(
model=self.model,
tokenizer=self.tokenizer,
prompt=prompt,
temp=temp,
max_tokens=max_tokens,
verbose=verbose,
formatter=formatter,
repetition_penalty=repetition_penalty,
repetition_context_size=repetition_context_size,
top_p=top_p,
)
def _stream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[GenerationChunk]:
try:
import mlx.core as mx
from mlx_lm.utils import generate_step
except ImportError:
raise ImportError(
"Could not import mlx_lm python package. "
"Please install it with `pip install mlx_lm`."
)
pipeline_kwargs = kwargs.get("pipeline_kwargs", self.pipeline_kwargs)
temp: float = pipeline_kwargs.get("temp", 0.0)
max_new_tokens: int = pipeline_kwargs.get("max_tokens", 100)
repetition_penalty: Optional[float] = pipeline_kwargs.get(
"repetition_penalty", None
)
repetition_context_size: Optional[int] = pipeline_kwargs.get(
"repetition_context_size", None
)
top_p: float = pipeline_kwargs.get("top_p", 1.0)
prompt = self.tokenizer.encode(prompt, return_tensors="np")
prompt_tokens = mx.array(prompt[0])
eos_token_id = self.tokenizer.eos_token_id
detokenizer = self.tokenizer.detokenizer
detokenizer.reset()
for (token, prob), n in zip(
generate_step(
prompt=prompt_tokens,
model=self.model,
temp=temp,
repetition_penalty=repetition_penalty,
repetition_context_size=repetition_context_size,
top_p=top_p,
),
range(max_new_tokens),
):
# identify text to yield
text: Optional[str] = None
detokenizer.add_token(token)
detokenizer.finalize()
text = detokenizer.last_segment
# yield text, if any
if text:
chunk = GenerationChunk(text=text)
yield chunk
if run_manager:
run_manager.on_llm_new_token(chunk.text)
# break if stop sequence found
if token == eos_token_id or (stop is not None and text in stop):
break