Source code for

import logging
import os
import time
from typing import Any, Dict, Iterator, Literal, Optional, Tuple, Union

from langchain_core.documents import Document

from langchain_community.document_loaders.base import BaseBlobParser
from langchain_community.document_loaders.blob_loaders import Blob
from langchain_community.utils.openai import is_openai_v1

logger = logging.getLogger(__name__)

[docs]class OpenAIWhisperParser(BaseBlobParser): """转录和解析音频文件。 音频转录使用OpenAI Whisper模型。 参数: api_key: OpenAI API密钥 chunk_duration_threshold: 分块的最小持续时间(秒) 注意:根据OpenAI API的规定,分块持续时间至少应为0.1秒。 如果分块持续时间小于或等于阈值,则会被跳过。"""
[docs] def __init__( self, api_key: Optional[str] = None, *, chunk_duration_threshold: float = 0.1, base_url: Optional[str] = None, language: Union[str, None] = None, prompt: Union[str, None] = None, response_format: Union[ Literal["json", "text", "srt", "verbose_json", "vtt"], None ] = None, temperature: Union[float, None] = None, ): self.api_key = api_key self.chunk_duration_threshold = chunk_duration_threshold self.base_url = ( base_url if base_url is not None else os.environ.get("OPENAI_API_BASE") ) self.language = language self.prompt = prompt self.response_format = response_format self.temperature = temperature
@property def _create_params(self) -> Dict[str, Any]: params = { "language": self.language, "prompt": self.prompt, "response_format": self.response_format, "temperature": self.temperature, } return {k: v for k, v in params.items() if v is not None}
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]: """懒惰地解析blob。""" import io try: import openai except ImportError: raise ImportError( "openai package not found, please install it with " "`pip install openai`" ) try: from pydub import AudioSegment except ImportError: raise ImportError( "pydub package not found, please install it with " "`pip install pydub`" ) if is_openai_v1(): # api_key optional, defaults to `os.environ['OPENAI_API_KEY']` client = openai.OpenAI(api_key=self.api_key, base_url=self.base_url) else: # Set the API key if provided if self.api_key: openai.api_key = self.api_key if self.base_url: openai.base_url = self.base_url # Audio file from disk audio = AudioSegment.from_file(blob.path) # Define the duration of each chunk in minutes # Need to meet 25MB size limit for Whisper API chunk_duration = 20 chunk_duration_ms = chunk_duration * 60 * 1000 # Split the audio into chunk_duration_ms chunks for split_number, i in enumerate(range(0, len(audio), chunk_duration_ms)): # Audio chunk chunk = audio[i : i + chunk_duration_ms] # Skip chunks that are too short to transcribe if chunk.duration_seconds <= self.chunk_duration_threshold: continue file_obj = io.BytesIO(chunk.export(format="mp3").read()) if blob.source is not None: = blob.source + f"_part_{split_number}.mp3" else: = f"part_{split_number}.mp3" # Transcribe print(f"Transcribing part {split_number + 1}!") # noqa: T201 attempts = 0 while attempts < 3: try: if is_openai_v1(): transcript = model="whisper-1", file=file_obj, **self._create_params ) else: transcript = openai.Audio.transcribe("whisper-1", file_obj) break except Exception as e: attempts += 1 print(f"Attempt {attempts} failed. Exception: {str(e)}") # noqa: T201 time.sleep(5) else: print("Failed to transcribe after 3 attempts.") # noqa: T201 continue yield Document( page_content=transcript.text, metadata={"source": blob.source, "chunk": split_number}, )
[docs]class OpenAIWhisperParserLocal(BaseBlobParser): """使用OpenAI Whisper模型转录和解析音频文件。 使用transformers从本地进行OpenAI Whisper模型的音频转录。 参数: device - 要使用的设备 注意:默认情况下使用GPU(如果可用), 如果要使用CPU,请设置device = "cpu" lang_model - 要使用的whisper模型,例如"openai/whisper-medium" forced_decoder_ids - 多语言模型解码器的id状态, 使用示例: from transformers import WhisperProcessor processor = WhisperProcessor.from_pretrained("openai/whisper-medium") forced_decoder_ids = WhisperProcessor.get_decoder_prompt_ids(language="french", task="transcribe") forced_decoder_ids = WhisperProcessor.get_decoder_prompt_ids(language="french", task="translate")"""
[docs] def __init__( self, device: str = "0", lang_model: Optional[str] = None, batch_size: int = 8, chunk_length: int = 30, forced_decoder_ids: Optional[Tuple[Dict]] = None, ): """初始化解析器。 参数: device:要使用的设备。 lang_model:要使用的whisper模型,例如"openai/whisper-medium"。默认为None。 forced_decoder_ids:多语言模型中解码器的id状态。默认为None。 batch_size:用于解码的批处理大小。默认为8。 chunk_length:推理过程中使用的块长度。默认为30秒。 """ try: from transformers import pipeline except ImportError: raise ImportError( "transformers package not found, please install it with " "`pip install transformers`" ) try: import torch except ImportError: raise ImportError( "torch package not found, please install it with " "`pip install torch`" ) # Determine the device to use if device == "cpu": self.device = "cpu" else: self.device = "cuda:0" if torch.cuda.is_available() else "cpu" if self.device == "cpu": default_model = "openai/whisper-base" self.lang_model = lang_model if lang_model else default_model else: # Set the language model based on the device and available memory mem = torch.cuda.get_device_properties(self.device).total_memory / (1024**2) if mem < 5000: rec_model = "openai/whisper-base" elif mem < 7000: rec_model = "openai/whisper-small" elif mem < 12000: rec_model = "openai/whisper-medium" else: rec_model = "openai/whisper-large" self.lang_model = lang_model if lang_model else rec_model print("Using the following model: ", self.lang_model) # noqa: T201 self.batch_size = batch_size # load model for inference self.pipe = pipeline( "automatic-speech-recognition", model=self.lang_model, chunk_length_s=chunk_length, device=self.device, ) if forced_decoder_ids is not None: try: self.pipe.model.config.forced_decoder_ids = forced_decoder_ids except Exception as exception_text: "Unable to set forced_decoder_ids parameter for whisper model" f"Text of exception: {exception_text}" "Therefore whisper model will use default mode for decoder" )
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]: """懒惰地解析blob。""" import io try: from pydub import AudioSegment except ImportError: raise ImportError( "pydub package not found, please install it with `pip install pydub`" ) try: import librosa except ImportError: raise ImportError( "librosa package not found, please install it with " "`pip install librosa`" ) # Audio file from disk audio = AudioSegment.from_file(blob.path) file_obj = io.BytesIO(audio.export(format="mp3").read()) # Transcribe print(f"Transcribing part {blob.path}!") # noqa: T201 y, sr = librosa.load(file_obj, sr=16000) prediction = self.pipe(y.copy(), batch_size=self.batch_size)["text"] yield Document( page_content=prediction, metadata={"source": blob.source}, )
[docs]class YandexSTTParser(BaseBlobParser): """转录和解析音频文件。 音频转录使用OpenAI Whisper模型。"""
[docs] def __init__( self, *, api_key: Optional[str] = None, iam_token: Optional[str] = None, model: str = "general", language: str = "auto", ): """初始化解析器。 参数: api_key:用于服务帐户的API密钥,具有`ai.speechkit-stt.user`角色。 iam_token:用于服务帐户的IAM令牌,具有`ai.speechkit-stt.user`角色。 model:识别模型名称。 默认为通用模型。 language:以ISO 639-1格式表示的语言。 默认为自动语言识别。 必须提供`api_key`或`iam_token`,但不能同时提供。 """ if (api_key is None) == (iam_token is None): raise ValueError( "Either 'api_key' or 'iam_token' must be provided, but not both." ) self.api_key = api_key self.iam_token = iam_token self.model = model self.language = language
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]: """懒惰地解析blob。""" try: from speechkit import configure_credentials, creds, model_repository from speechkit.stt import AudioProcessingType except ImportError: raise ImportError( "yandex-speechkit package not found, please install it with " "`pip install yandex-speechkit`" ) try: from pydub import AudioSegment except ImportError: raise ImportError( "pydub package not found, please install it with " "`pip install pydub`" ) if self.api_key: configure_credentials( yandex_credentials=creds.YandexCredentials(api_key=self.api_key) ) else: configure_credentials( yandex_credentials=creds.YandexCredentials(iam_token=self.iam_token) ) audio = AudioSegment.from_file(blob.path) model = model_repository.recognition_model() model.model = self.model model.language = self.language model.audio_processing_type = AudioProcessingType.Full result = model.transcribe(audio) for res in result: yield Document( page_content=res.normalized_text, metadata={"source": blob.source}, )
[docs]class FasterWhisperParser(BaseBlobParser): """使用 faster-whisper 转录和解析音频文件。 faster-whisper 是使用 CTranslate2 重新实现的 OpenAI 的 Whisper 模型,速度比 openai/whisper 快4倍,同时使用更少的内存。通过在 CPU 和 GPU 上进行8位量化,效率可以进一步提高。 它可以自动检测以下14种语言,并将文本转录为它们各自的语言:en, zh, fr, de, ja, ko, ru, es, th, it, pt, vi, ar, tr。 faster-whisper 的 gitbub 仓库链接为: 示例:加载 YouTube 视频并将视频语音转录为文档。 .. code-block:: python from langchain.document_loaders.generic import GenericLoader from import FasterWhisperParser from langchain.document_loaders.blob_loaders.youtube_audio import YoutubeAudioLoader url="" save_dir="your_dir/" loader = GenericLoader( YoutubeAudioLoader([url],save_dir), FasterWhisperParser() ) docs = loader.load()"""
[docs] def __init__( self, *, device: Optional[str] = "cuda", model_size: Optional[str] = None, ): """初始化解析器。 参数: device:可以是"cuda"或"cpu",根据可用设备而定。 model_size:有四种模型大小可供选择:"base","small","medium"和"large-v3",根据可用的GPU内存而定。 """ try: import torch except ImportError: raise ImportError( "torch package not found, please install it with `pip install torch`" ) # Determine the device to use if device == "cpu": self.device = "cpu" else: self.device = "cuda" if torch.cuda.is_available() else "cpu" # Determine the model_size if self.device == "cpu": self.model_size = "base" else: # Set the model_size based on the available memory mem = torch.cuda.get_device_properties(self.device).total_memory / (1024**2) if mem < 1000: self.model_size = "base" elif mem < 3000: self.model_size = "small" elif mem < 5000: self.model_size = "medium" else: self.model_size = "large-v3" # If the user has assigned a model size, then use the assigned size if model_size is not None: if model_size in ["base", "small", "medium", "large-v3"]: self.model_size = model_size
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]: """懒惰地解析blob。""" import io try: from pydub import AudioSegment except ImportError: raise ImportError( "pydub package not found, please install it with `pip install pydub`" ) try: from faster_whisper import WhisperModel except ImportError: raise ImportError( "faster_whisper package not found, please install it with " "`pip install faster-whisper`" ) # get the audio if isinstance(, bytes): # blob contains the audio audio = AudioSegment.from_file(io.BytesIO( elif is None and blob.path: # Audio file from disk audio = AudioSegment.from_file(blob.path) else: raise ValueError("Unable to get audio from blob") file_obj = io.BytesIO(audio.export(format="mp3").read()) # Transcribe model = WhisperModel( self.model_size, device=self.device, compute_type="float16" ) segments, info = model.transcribe(file_obj, beam_size=5) for segment in segments: yield Document( page_content=segment.text, metadata={ "source": blob.source, "timestamps": "[%.2fs -> %.2fs]" % (segment.start, segment.end), "language": info.language, "probability": "%d%%" % round(info.language_probability * 100), **blob.metadata, }, )