|
|
|
|
|
import whisperx |
|
|
import gradio as gr |
|
|
import gc |
|
|
import time |
|
|
import os |
|
|
from datetime import datetime |
|
|
import warnings |
|
|
|
|
|
|
|
|
warnings.filterwarnings("ignore", message="std\(\): degrees of freedom is <= 0") |
|
|
|
|
|
from src.config.settings import Config |
|
|
|
|
|
|
|
|
from whisperx.diarize import DiarizationPipeline |
|
|
|
|
|
class TranscriptionService: |
|
|
def __init__(self): |
|
|
self.config = Config |
|
|
self.models_loaded = False |
|
|
self.whisper_model = None |
|
|
self.diarize_model = None |
|
|
self.batch_size = 16 |
|
|
|
|
|
|
|
|
def load_models(self): |
|
|
"""Load AI models once - use pre-loaded models from init""" |
|
|
if not self.models_loaded: |
|
|
print("π₯ Loading transcription models...") |
|
|
|
|
|
|
|
|
self.whisper_model = whisperx.load_model( |
|
|
self.config.WHISPER_MODEL, |
|
|
self.config.DEVICE, |
|
|
compute_type=self.config.COMPUTE_TYPE, |
|
|
language="en" |
|
|
) |
|
|
|
|
|
self.diarize_model = DiarizationPipeline( |
|
|
use_auth_token=self.config.HUGGINGFACE_TOKEN, |
|
|
device=self.config.DEVICE |
|
|
) |
|
|
|
|
|
self.models_loaded = True |
|
|
print("β
Models loaded successfully") |
|
|
|
|
|
|
|
|
def transcribe_video(self, video_file_path, progress_callback=None): |
|
|
"""Clean transcription pipeline without Gradio dependencies. |
|
|
Added optional progress callback""" |
|
|
try: |
|
|
if not self.models_loaded: |
|
|
self.load_models() |
|
|
|
|
|
start_time = time.time() |
|
|
print(f"π¬ Processing video: {os.path.basename(video_file_path)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if progress_callback: |
|
|
print(f"DEBUG: Calling progress callback 0.1. Type: {type(progress_callback)}") |
|
|
try: |
|
|
progress_callback(0.1, desc="π¬ Loading audio from video...") |
|
|
print("DEBUG: Progress callback 0.1 called successfully") |
|
|
except Exception as e: |
|
|
print(f"DEBUG: Error calling progress callback: {e}") |
|
|
time.sleep(0.5) |
|
|
print("1οΈβ£ Loading audio directly from video...") |
|
|
audio = whisperx.load_audio(video_file_path) |
|
|
|
|
|
print(f"β
Audio loaded: {len(audio)} samples") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("2οΈβ£ Loading Whisper model...") |
|
|
if progress_callback: |
|
|
progress_callback(0.3, desc="π€ Loading Whisper model...") |
|
|
time.sleep(0.5) |
|
|
|
|
|
if progress_callback: |
|
|
progress_callback(0.4, desc="π Transcribing audio...") |
|
|
time.sleep(0.5) |
|
|
print("3οΈβ£ Transcribing audio...") |
|
|
|
|
|
result = self.whisper_model.transcribe(audio, batch_size=self.batch_size) |
|
|
detected_language = result['language'] |
|
|
print(f"β
Transcription complete ({detected_language} detected)") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if progress_callback: |
|
|
progress_callback(0.5, desc="β±οΈ Aligning timestamps...") |
|
|
time.sleep(0.5) |
|
|
print("4οΈβ£ Aligning word-level timestamps...") |
|
|
|
|
|
|
|
|
model_a, metadata = whisperx.load_align_model( |
|
|
language_code=detected_language, |
|
|
device=self.config.DEVICE |
|
|
) |
|
|
result = whisperx.align( |
|
|
result["segments"], |
|
|
model_a, |
|
|
metadata, |
|
|
audio, |
|
|
self.config.DEVICE, |
|
|
return_char_alignments=False |
|
|
) |
|
|
|
|
|
result["language"] = detected_language |
|
|
print("β
Timestamps aligned") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if progress_callback: |
|
|
progress_callback(0.7, desc="π₯ Identifying speakers...") |
|
|
time.sleep(0.5) |
|
|
print("5οΈβ£ Loading speaker diarization model...") |
|
|
diarize_segments = self.diarize_model(audio) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if progress_callback: |
|
|
progress_callback(0.9, desc="π Assigning speakers to text...") |
|
|
time.sleep(0.5) |
|
|
result = whisperx.assign_word_speakers(diarize_segments, result) |
|
|
print("6οΈβ£ Assigning speakers to transcript...") |
|
|
|
|
|
print("π Assigning speakers to text...") |
|
|
result = whisperx.assign_word_speakers(diarize_segments, result) |
|
|
print("β
Speaker assignment complete") |
|
|
|
|
|
|
|
|
if progress_callback: |
|
|
progress_callback(1.0, desc="β
Complete!") |
|
|
time.sleep(0.5) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
transcription = self._format_results(result, video_file_path) |
|
|
timing_info = self._get_timing_info(result, processing_time, video_file_path) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"transcription": transcription, |
|
|
"timing_info": timing_info, |
|
|
"raw_data": result, |
|
|
"processing_time": processing_time, |
|
|
"speakers_count": len(set(seg.get("speaker", "UNKNOWN") for seg in result["segments"])) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Transcription failed: {str(e)}" |
|
|
print(f"β ERROR: {error_msg}") |
|
|
return { |
|
|
"success": False, |
|
|
"error": error_msg |
|
|
} |
|
|
|
|
|
|
|
|
def _format_results(self, result, video_file_path): |
|
|
"""Format transcription with speaker labels and comprehensive meeting metadata""" |
|
|
if not result["segments"]: |
|
|
return "No transcription segments found" |
|
|
|
|
|
|
|
|
segments = result["segments"] |
|
|
speakers = set(segment.get("speaker", "UNKNOWN") for segment in segments) |
|
|
total_duration = segments[-1]["end"] if segments else 0 |
|
|
language = result.get("language", "unknown") |
|
|
|
|
|
|
|
|
total_words = sum(len(seg.get("text", "").split()) for seg in segments) |
|
|
avg_segment_length = total_words / len(segments) if segments else 0 |
|
|
|
|
|
|
|
|
output = "# π― Meeting Transcription\n\n" |
|
|
output += "## π Meeting Information\n\n" |
|
|
output += f"**π File:** `{os.path.basename(video_file_path)}`\n" |
|
|
output += f"**π
Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n" |
|
|
output += f"**β±οΈ Duration:** {self._format_timestamp(total_duration)}\n" |
|
|
output += f"**π₯ Speakers:** {len(speakers)}\n" |
|
|
output += f"**π Language:** {language.upper()}\n" |
|
|
output += f"**π€ Model:** {self.config.WHISPER_MODEL}\n\n" |
|
|
output += "---\n\n" |
|
|
output += "## π¬ Transcript\n\n" |
|
|
|
|
|
|
|
|
current_speaker = None |
|
|
for segment in segments: |
|
|
speaker = segment.get("speaker", "UNKNOWN") |
|
|
start_time = self._format_timestamp(segment["start"]) |
|
|
|
|
|
if speaker != current_speaker: |
|
|
output += f"\n**π€ {speaker}:**\n" |
|
|
current_speaker = speaker |
|
|
|
|
|
output += f"[{start_time}] {segment['text'].strip()}\n" |
|
|
|
|
|
|
|
|
output += "\n---\n\n" |
|
|
output += "## π Transcript Statistics\n\n" |
|
|
output += f"**Total Segments:** {len(segments)}\n" |
|
|
output += f"**Total Words:** {total_words:,}\n" |
|
|
output += f"**Avg Words/Segment:** {avg_segment_length:.1f}\n" |
|
|
output += f"**Unique Speakers:** {len(speakers)}\n" |
|
|
output += f"**Speaker IDs:** {', '.join(sorted(speakers))}\n" |
|
|
|
|
|
return output |
|
|
|
|
|
def _get_timing_info(self, result, processing_time, video_file_path): |
|
|
"""Generate timing information""" |
|
|
if not result["segments"]: |
|
|
return "No timing information available" |
|
|
|
|
|
total_duration = result["segments"][-1]["end"] |
|
|
speed_ratio = total_duration / processing_time if processing_time > 0 else 0 |
|
|
video_name = os.path.basename(video_file_path) |
|
|
|
|
|
return f""" |
|
|
## β±οΈ Processing Statistics |
|
|
|
|
|
**File:** {video_name} |
|
|
\n**Duration:** {self._format_timestamp(total_duration)} |
|
|
**Processing Time:** {processing_time:.1f}s |
|
|
\n**Speed:** {speed_ratio:.1f}x ({'Faster' if speed_ratio > 1 else 'Slower'} than real-time) |
|
|
**Completed:** {datetime.now().strftime("%H:%M:%S")} |
|
|
""" |
|
|
|
|
|
def _format_timestamp(self, seconds): |
|
|
"""Convert seconds to MM:SS format""" |
|
|
if seconds is None: |
|
|
return "00:00" |
|
|
minutes = int(seconds // 60) |
|
|
seconds = int(seconds % 60) |
|
|
return f"{minutes:02d}:{seconds:02d}" |