Source code for langchain_community.tools.audio.huggingface_text_to_speech_inference

import logging
import os
import uuid
from datetime import datetime
from typing import Callable, Literal, Optional

import requests
from langchain_core.callbacks import CallbackManagerForToolRun
from langchain_core.pydantic_v1 import SecretStr
from langchain_core.tools import BaseTool

logger = logging.getLogger(__name__)


[docs]class HuggingFaceTextToSpeechModelInference(BaseTool): """HuggingFace 文本到语音模型推断。 要求: - 必须设置环境变量``HUGGINGFACE_API_KEY``, 或作为构造函数的命名参数传递。""" name: str = "openai_text_to_speech" """工具的名称。""" description: str = "A wrapper around OpenAI Text-to-Speech API. " """工具的描述。""" model: str """模型名称。""" file_extension: str """输出音频文件的文件扩展名。""" destination_dir: str """保存输出音频文件的目录。""" file_namer: Callable[[], str] """生成唯一文件名的函数。""" api_url: str huggingface_api_key: SecretStr _HUGGINGFACE_API_KEY_ENV_NAME = "HUGGINGFACE_API_KEY" _HUGGINGFACE_API_URL_ROOT = "https://api-inference.huggingface.co/models" def __init__( self, model: str, file_extension: str, *, destination_dir: str = "./tts", file_naming_func: Literal["uuid", "timestamp"] = "uuid", huggingface_api_key: Optional[SecretStr] = None, ) -> None: if not huggingface_api_key: huggingface_api_key = SecretStr( os.getenv(self._HUGGINGFACE_API_KEY_ENV_NAME, "") ) if ( not huggingface_api_key or not huggingface_api_key.get_secret_value() or huggingface_api_key.get_secret_value() == "" ): raise ValueError( f"'{self._HUGGINGFACE_API_KEY_ENV_NAME}' must be or set or passed" ) if file_naming_func == "uuid": file_namer = lambda: str(uuid.uuid4()) # noqa: E731 elif file_naming_func == "timestamp": file_namer = lambda: str(int(datetime.now().timestamp())) # noqa: E731 else: raise ValueError( f"Invalid value for 'file_naming_func': {file_naming_func}" ) super().__init__( # type: ignore[call-arg] model=model, file_extension=file_extension, api_url=f"{self._HUGGINGFACE_API_URL_ROOT}/{model}", destination_dir=destination_dir, file_namer=file_namer, huggingface_api_key=huggingface_api_key, ) def _run( self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None, ) -> str: response = requests.post( self.api_url, headers={ "Authorization": f"Bearer {self.huggingface_api_key.get_secret_value()}" }, json={"inputs": query}, ) audio_bytes = response.content try: os.makedirs(self.destination_dir, exist_ok=True) except Exception as e: logger.error(f"Error creating directory '{self.destination_dir}': {e}") raise output_file = os.path.join( self.destination_dir, f"{str(self.file_namer())}.{self.file_extension}", ) try: with open(output_file, mode="xb") as f: f.write(audio_bytes) except FileExistsError: raise ValueError("Output name must be unique") except Exception as e: logger.error(f"Error occurred while creating file: {e}") raise return output_file