import logging
import os
import time
from typing import Any, Dict, Iterator, Literal, Optional, Tuple, Union
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseBlobParser
from langchain_community.document_loaders.blob_loaders import Blob
from langchain_community.utils.openai import is_openai_v1
logger = logging.getLogger(__name__)
[docs]class OpenAIWhisperParser(BaseBlobParser):
"""转录和解析音频文件。
音频转录使用OpenAI Whisper模型。
参数:
api_key: OpenAI API密钥
chunk_duration_threshold: 分块的最小持续时间(秒)
注意:根据OpenAI API的规定,分块持续时间至少应为0.1秒。
如果分块持续时间小于或等于阈值,则会被跳过。"""
[docs] def __init__(
self,
api_key: Optional[str] = None,
*,
chunk_duration_threshold: float = 0.1,
base_url: Optional[str] = None,
language: Union[str, None] = None,
prompt: Union[str, None] = None,
response_format: Union[
Literal["json", "text", "srt", "verbose_json", "vtt"], None
] = None,
temperature: Union[float, None] = None,
):
self.api_key = api_key
self.chunk_duration_threshold = chunk_duration_threshold
self.base_url = (
base_url if base_url is not None else os.environ.get("OPENAI_API_BASE")
)
self.language = language
self.prompt = prompt
self.response_format = response_format
self.temperature = temperature
@property
def _create_params(self) -> Dict[str, Any]:
params = {
"language": self.language,
"prompt": self.prompt,
"response_format": self.response_format,
"temperature": self.temperature,
}
return {k: v for k, v in params.items() if v is not None}
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""懒惰地解析blob。"""
import io
try:
import openai
except ImportError:
raise ImportError(
"openai package not found, please install it with "
"`pip install openai`"
)
try:
from pydub import AudioSegment
except ImportError:
raise ImportError(
"pydub package not found, please install it with " "`pip install pydub`"
)
if is_openai_v1():
# api_key optional, defaults to `os.environ['OPENAI_API_KEY']`
client = openai.OpenAI(api_key=self.api_key, base_url=self.base_url)
else:
# Set the API key if provided
if self.api_key:
openai.api_key = self.api_key
if self.base_url:
openai.base_url = self.base_url
# Audio file from disk
audio = AudioSegment.from_file(blob.path)
# Define the duration of each chunk in minutes
# Need to meet 25MB size limit for Whisper API
chunk_duration = 20
chunk_duration_ms = chunk_duration * 60 * 1000
# Split the audio into chunk_duration_ms chunks
for split_number, i in enumerate(range(0, len(audio), chunk_duration_ms)):
# Audio chunk
chunk = audio[i : i + chunk_duration_ms]
# Skip chunks that are too short to transcribe
if chunk.duration_seconds <= self.chunk_duration_threshold:
continue
file_obj = io.BytesIO(chunk.export(format="mp3").read())
if blob.source is not None:
file_obj.name = blob.source + f"_part_{split_number}.mp3"
else:
file_obj.name = f"part_{split_number}.mp3"
# Transcribe
print(f"Transcribing part {split_number + 1}!") # noqa: T201
attempts = 0
while attempts < 3:
try:
if is_openai_v1():
transcript = client.audio.transcriptions.create(
model="whisper-1", file=file_obj, **self._create_params
)
else:
transcript = openai.Audio.transcribe("whisper-1", file_obj)
break
except Exception as e:
attempts += 1
print(f"Attempt {attempts} failed. Exception: {str(e)}") # noqa: T201
time.sleep(5)
else:
print("Failed to transcribe after 3 attempts.") # noqa: T201
continue
yield Document(
page_content=transcript.text,
metadata={"source": blob.source, "chunk": split_number},
)
[docs]class OpenAIWhisperParserLocal(BaseBlobParser):
"""使用OpenAI Whisper模型转录和解析音频文件。
使用transformers从本地进行OpenAI Whisper模型的音频转录。
参数:
device - 要使用的设备
注意:默认情况下使用GPU(如果可用),
如果要使用CPU,请设置device = "cpu"
lang_model - 要使用的whisper模型,例如"openai/whisper-medium"
forced_decoder_ids - 多语言模型解码器的id状态,
使用示例:
from transformers import WhisperProcessor
processor = WhisperProcessor.from_pretrained("openai/whisper-medium")
forced_decoder_ids = WhisperProcessor.get_decoder_prompt_ids(language="french",
task="transcribe")
forced_decoder_ids = WhisperProcessor.get_decoder_prompt_ids(language="french",
task="translate")"""
[docs] def __init__(
self,
device: str = "0",
lang_model: Optional[str] = None,
batch_size: int = 8,
chunk_length: int = 30,
forced_decoder_ids: Optional[Tuple[Dict]] = None,
):
"""初始化解析器。
参数:
device:要使用的设备。
lang_model:要使用的whisper模型,例如"openai/whisper-medium"。默认为None。
forced_decoder_ids:多语言模型中解码器的id状态。默认为None。
batch_size:用于解码的批处理大小。默认为8。
chunk_length:推理过程中使用的块长度。默认为30秒。
"""
try:
from transformers import pipeline
except ImportError:
raise ImportError(
"transformers package not found, please install it with "
"`pip install transformers`"
)
try:
import torch
except ImportError:
raise ImportError(
"torch package not found, please install it with " "`pip install torch`"
)
# Determine the device to use
if device == "cpu":
self.device = "cpu"
else:
self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
if self.device == "cpu":
default_model = "openai/whisper-base"
self.lang_model = lang_model if lang_model else default_model
else:
# Set the language model based on the device and available memory
mem = torch.cuda.get_device_properties(self.device).total_memory / (1024**2)
if mem < 5000:
rec_model = "openai/whisper-base"
elif mem < 7000:
rec_model = "openai/whisper-small"
elif mem < 12000:
rec_model = "openai/whisper-medium"
else:
rec_model = "openai/whisper-large"
self.lang_model = lang_model if lang_model else rec_model
print("Using the following model: ", self.lang_model) # noqa: T201
self.batch_size = batch_size
# load model for inference
self.pipe = pipeline(
"automatic-speech-recognition",
model=self.lang_model,
chunk_length_s=chunk_length,
device=self.device,
)
if forced_decoder_ids is not None:
try:
self.pipe.model.config.forced_decoder_ids = forced_decoder_ids
except Exception as exception_text:
logger.info(
"Unable to set forced_decoder_ids parameter for whisper model"
f"Text of exception: {exception_text}"
"Therefore whisper model will use default mode for decoder"
)
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""懒惰地解析blob。"""
import io
try:
from pydub import AudioSegment
except ImportError:
raise ImportError(
"pydub package not found, please install it with `pip install pydub`"
)
try:
import librosa
except ImportError:
raise ImportError(
"librosa package not found, please install it with "
"`pip install librosa`"
)
# Audio file from disk
audio = AudioSegment.from_file(blob.path)
file_obj = io.BytesIO(audio.export(format="mp3").read())
# Transcribe
print(f"Transcribing part {blob.path}!") # noqa: T201
y, sr = librosa.load(file_obj, sr=16000)
prediction = self.pipe(y.copy(), batch_size=self.batch_size)["text"]
yield Document(
page_content=prediction,
metadata={"source": blob.source},
)
[docs]class YandexSTTParser(BaseBlobParser):
"""转录和解析音频文件。
音频转录使用OpenAI Whisper模型。"""
[docs] def __init__(
self,
*,
api_key: Optional[str] = None,
iam_token: Optional[str] = None,
model: str = "general",
language: str = "auto",
):
"""初始化解析器。
参数:
api_key:用于服务帐户的API密钥,具有`ai.speechkit-stt.user`角色。
iam_token:用于服务帐户的IAM令牌,具有`ai.speechkit-stt.user`角色。
model:识别模型名称。
默认为通用模型。
language:以ISO 639-1格式表示的语言。
默认为自动语言识别。
必须提供`api_key`或`iam_token`,但不能同时提供。
"""
if (api_key is None) == (iam_token is None):
raise ValueError(
"Either 'api_key' or 'iam_token' must be provided, but not both."
)
self.api_key = api_key
self.iam_token = iam_token
self.model = model
self.language = language
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""懒惰地解析blob。"""
try:
from speechkit import configure_credentials, creds, model_repository
from speechkit.stt import AudioProcessingType
except ImportError:
raise ImportError(
"yandex-speechkit package not found, please install it with "
"`pip install yandex-speechkit`"
)
try:
from pydub import AudioSegment
except ImportError:
raise ImportError(
"pydub package not found, please install it with " "`pip install pydub`"
)
if self.api_key:
configure_credentials(
yandex_credentials=creds.YandexCredentials(api_key=self.api_key)
)
else:
configure_credentials(
yandex_credentials=creds.YandexCredentials(iam_token=self.iam_token)
)
audio = AudioSegment.from_file(blob.path)
model = model_repository.recognition_model()
model.model = self.model
model.language = self.language
model.audio_processing_type = AudioProcessingType.Full
result = model.transcribe(audio)
for res in result:
yield Document(
page_content=res.normalized_text,
metadata={"source": blob.source},
)
[docs]class FasterWhisperParser(BaseBlobParser):
"""使用 faster-whisper 转录和解析音频文件。
faster-whisper 是使用 CTranslate2 重新实现的 OpenAI 的 Whisper 模型,速度比 openai/whisper 快4倍,同时使用更少的内存。通过在 CPU 和 GPU 上进行8位量化,效率可以进一步提高。
它可以自动检测以下14种语言,并将文本转录为它们各自的语言:en, zh, fr, de, ja, ko, ru, es, th, it, pt, vi, ar, tr。
faster-whisper 的 gitbub 仓库链接为:
https://github.com/SYSTRAN/faster-whisper
示例:加载 YouTube 视频并将视频语音转录为文档。
.. code-block:: python
from langchain.document_loaders.generic import GenericLoader
from langchain_community.document_loaders.parsers.audio
import FasterWhisperParser
from langchain.document_loaders.blob_loaders.youtube_audio
import YoutubeAudioLoader
url="https://www.youtube.com/watch?v=your_video"
save_dir="your_dir/"
loader = GenericLoader(
YoutubeAudioLoader([url],save_dir),
FasterWhisperParser()
)
docs = loader.load()"""
[docs] def __init__(
self,
*,
device: Optional[str] = "cuda",
model_size: Optional[str] = None,
):
"""初始化解析器。
参数:
device:可以是"cuda"或"cpu",根据可用设备而定。
model_size:有四种模型大小可供选择:"base","small","medium"和"large-v3",根据可用的GPU内存而定。
"""
try:
import torch
except ImportError:
raise ImportError(
"torch package not found, please install it with `pip install torch`"
)
# Determine the device to use
if device == "cpu":
self.device = "cpu"
else:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# Determine the model_size
if self.device == "cpu":
self.model_size = "base"
else:
# Set the model_size based on the available memory
mem = torch.cuda.get_device_properties(self.device).total_memory / (1024**2)
if mem < 1000:
self.model_size = "base"
elif mem < 3000:
self.model_size = "small"
elif mem < 5000:
self.model_size = "medium"
else:
self.model_size = "large-v3"
# If the user has assigned a model size, then use the assigned size
if model_size is not None:
if model_size in ["base", "small", "medium", "large-v3"]:
self.model_size = model_size
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]:
"""懒惰地解析blob。"""
import io
try:
from pydub import AudioSegment
except ImportError:
raise ImportError(
"pydub package not found, please install it with `pip install pydub`"
)
try:
from faster_whisper import WhisperModel
except ImportError:
raise ImportError(
"faster_whisper package not found, please install it with "
"`pip install faster-whisper`"
)
# get the audio
if isinstance(blob.data, bytes):
# blob contains the audio
audio = AudioSegment.from_file(io.BytesIO(blob.data))
elif blob.data is None and blob.path:
# Audio file from disk
audio = AudioSegment.from_file(blob.path)
else:
raise ValueError("Unable to get audio from blob")
file_obj = io.BytesIO(audio.export(format="mp3").read())
# Transcribe
model = WhisperModel(
self.model_size, device=self.device, compute_type="float16"
)
segments, info = model.transcribe(file_obj, beam_size=5)
for segment in segments:
yield Document(
page_content=segment.text,
metadata={
"source": blob.source,
"timestamps": "[%.2fs -> %.2fs]" % (segment.start, segment.end),
"language": info.language,
"probability": "%d%%" % round(info.language_probability * 100),
**blob.metadata,
},
)