Translsis's picture
Update app.py
6f66838 verified
raw
history blame
16.8 kB
import gradio as gr
import torch
import os
import time
import copy
from pathlib import Path
from typing import Optional, Tuple
import spaces
from vibevoice.modular.modeling_vibevoice_streaming_inference import (
VibeVoiceStreamingForConditionalGenerationInference,
)
from vibevoice.processor.vibevoice_streaming_processor import (
VibeVoiceStreamingProcessor,
)
class VoiceMapper:
"""Maps speaker names to voice file paths"""
def __init__(self):
self.setup_voice_presets()
# Change name according to our preset voice file
new_dict = {}
for name, path in self.voice_presets.items():
if "_" in name:
name = name.split("_")[0]
if "-" in name:
name = name.split("-")[-1]
new_dict[name] = path
self.voice_presets.update(new_dict)
def setup_voice_presets(self):
"""Setup voice presets by scanning the voices directory."""
voices_dir = os.path.join(os.path.dirname(__file__), "demo/voices/streaming_model")
# Check if voices directory exists
if not os.path.exists(voices_dir):
print(f"Warning: Voices directory not found at {voices_dir}")
self.voice_presets = {}
self.available_voices = {}
return
# Scan for all VOICE files in the voices directory
self.voice_presets = {}
# Get all .pt files in the voices directory
pt_files = [
f
for f in os.listdir(voices_dir)
if f.lower().endswith(".pt") and os.path.isfile(os.path.join(voices_dir, f))
]
# Create dictionary with filename (without extension) as key
for pt_file in pt_files:
# Remove .pt extension to get the name
name = os.path.splitext(pt_file)[0]
# Create full path
full_path = os.path.join(voices_dir, pt_file)
self.voice_presets[name] = full_path
# Sort the voice presets alphabetically by name for better UI
self.voice_presets = dict(sorted(self.voice_presets.items()))
# Filter out voices that don't exist (this is now redundant but kept for safety)
self.available_voices = {
name: path for name, path in self.voice_presets.items() if os.path.exists(path)
}
print(f"Found {len(self.available_voices)} voice files in {voices_dir}")
print(f"Available voices: {', '.join(self.available_voices.keys())}")
def get_voice_path(self, speaker_name: str) -> str:
"""Get voice file path for a given speaker name"""
# First try exact match
if speaker_name in self.voice_presets:
return self.voice_presets[speaker_name]
# Try partial matching (case insensitive)
speaker_lower = speaker_name.lower()
for preset_name, path in self.voice_presets.items():
if preset_name.lower() in speaker_lower or speaker_lower in preset_name.lower():
return path
# Default to first voice if no match found
default_voice = list(self.voice_presets.values())[0]
print(
f"Warning: No voice preset found for '{speaker_name}', using default voice: {default_voice}"
)
return default_voice
# Patch the _update_model_kwargs_for_generation method
def patched_update_model_kwargs_for_generation(
self,
outputs,
model_kwargs,
is_encoder_decoder=False,
model_inputs=None,
num_new_tokens=1,
):
"""Patched version that handles both dict and object-like outputs"""
# Handle both dict and object-like outputs for cache
cache_name = "past_key_values"
if isinstance(outputs, dict):
# For dict outputs, use .get() method
model_kwargs[cache_name] = outputs.get(cache_name)
else:
# For object outputs, try to get the attribute
model_kwargs[cache_name] = getattr(outputs, cache_name, None)
if getattr(self, "config", None) is not None:
if "token_type_ids" in model_kwargs and model_kwargs["token_type_ids"] is not None:
token_type_ids = model_kwargs["token_type_ids"]
model_kwargs["token_type_ids"] = torch.cat(
[token_type_ids, token_type_ids[:, -1:]], dim=-1
)
if not is_encoder_decoder:
# update attention mask
if "attention_mask" in model_kwargs and model_kwargs["attention_mask"] is not None:
attention_mask = model_kwargs["attention_mask"]
model_kwargs["attention_mask"] = torch.cat(
[attention_mask, attention_mask.new_ones((attention_mask.shape[0], 1))],
dim=-1,
)
else:
# update decoder attention mask
if "decoder_attention_mask" in model_kwargs and model_kwargs["decoder_attention_mask"] is not None:
decoder_attention_mask = model_kwargs["decoder_attention_mask"]
model_kwargs["decoder_attention_mask"] = torch.cat(
[
decoder_attention_mask,
decoder_attention_mask.new_ones((decoder_attention_mask.shape[0], 1)),
],
dim=-1,
)
if model_inputs is not None and "cache_position" in model_inputs:
model_kwargs["cache_position"] = model_inputs["cache_position"][-1:] + num_new_tokens
return model_kwargs
# Check if CUDA is available
CUDA_AVAILABLE = torch.cuda.is_available()
DEVICE = "cuda" if CUDA_AVAILABLE else "cpu"
DTYPE = torch.float16 if CUDA_AVAILABLE else torch.float32
print(f"CUDA available: {CUDA_AVAILABLE}")
print(f"Using device: {DEVICE}")
# Load model and processor directly
print("Loading VibeVoice-Realtime model...")
MODEL_PATH = "microsoft/VibeVoice-Realtime-0.5B"
# Load processor (CPU operation)
PROCESSOR = VibeVoiceStreamingProcessor.from_pretrained(MODEL_PATH)
# Load model - use appropriate dtype based on device
MODEL = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
MODEL_PATH,
torch_dtype=DTYPE,
device_map="cpu", # Always start on CPU for ZeroGPU compatibility
attn_implementation="sdpa",
)
# Apply the patch to the model instance
MODEL._update_model_kwargs_for_generation = patched_update_model_kwargs_for_generation.__get__(MODEL, type(MODEL))
MODEL.eval()
MODEL.set_ddpm_inference_steps(num_steps=5)
# Initialize voice mapper
VOICE_MAPPER = VoiceMapper()
print("Model loaded successfully!")
def move_to_device(obj, device):
"""Recursively move tensors in nested structures to device"""
if torch.is_tensor(obj):
return obj.to(device)
elif isinstance(obj, dict):
return {k: move_to_device(v, device) for k, v in obj.items()}
elif isinstance(obj, list):
return [move_to_device(item, device) for item in obj]
elif isinstance(obj, tuple):
return tuple(move_to_device(item, device) for item in obj)
else:
return obj
@spaces.GPU(duration=60) # Request GPU for 60 seconds
def generate_speech(
text: str,
speaker_name: str,
cfg_scale: float = 1.5,
progress=gr.Progress(),
) -> Tuple[Optional[str], str]:
"""
Generate speech from text using VibeVoice-Realtime with ZeroGPU
Args:
text: Input text to convert to speech
speaker_name: Name of the speaker voice to use
cfg_scale: Classifier-Free Guidance scale (higher = more faithful to text)
progress: Gradio progress tracker
Returns:
Tuple of (audio_path, status_message)
"""
if not text or not text.strip():
return None, "❌ Error: Please enter some text to convert to speech."
try:
# Detect actual device inside the decorated function
device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.float16 if device == "cuda" else torch.float32
progress(0, desc="Loading voice preset...")
# Clean text
full_script = text.strip().replace("'", "'").replace('"', '"').replace('"', '"')
# Get voice sample path
voice_sample = VOICE_MAPPER.get_voice_path(speaker_name)
# Load voice sample to CPU first
all_prefilled_outputs = torch.load(
voice_sample, map_location="cpu", weights_only=False
)
# Move model to the appropriate device
MODEL.to(device)
# Move voice sample tensors to device
all_prefilled_outputs = move_to_device(all_prefilled_outputs, device)
progress(0.2, desc="Preparing inputs...")
# Prepare inputs
inputs = PROCESSOR.process_input_with_cached_prompt(
text=full_script,
cached_prompt=all_prefilled_outputs,
padding=True,
return_tensors="pt",
return_attention_mask=True,
)
# Move input tensors to device
inputs = move_to_device(inputs, device)
progress(0.4, desc=f"Generating speech on {device.upper()}...")
# Generate audio
start_time = time.time()
# Use autocast only if on CUDA
if device == "cuda":
with torch.cuda.amp.autocast():
outputs = MODEL.generate(
**inputs,
max_new_tokens=None,
cfg_scale=cfg_scale,
tokenizer=PROCESSOR.tokenizer,
generation_config={"do_sample": False},
verbose=False,
all_prefilled_outputs=copy.deepcopy(all_prefilled_outputs)
if all_prefilled_outputs is not None
else None,
)
else:
outputs = MODEL.generate(
**inputs,
max_new_tokens=None,
cfg_scale=cfg_scale,
tokenizer=PROCESSOR.tokenizer,
generation_config={"do_sample": False},
verbose=False,
all_prefilled_outputs=copy.deepcopy(all_prefilled_outputs)
if all_prefilled_outputs is not None
else None,
)
generation_time = time.time() - start_time
progress(0.8, desc="Saving audio...")
# Calculate metrics
if outputs.speech_outputs and outputs.speech_outputs[0] is not None:
sample_rate = 24000
audio_samples = (
outputs.speech_outputs[0].shape[-1]
if len(outputs.speech_outputs[0].shape) > 0
else len(outputs.speech_outputs[0])
)
audio_duration = audio_samples / sample_rate
rtf = generation_time / audio_duration if audio_duration > 0 else float("inf")
# Save output
output_dir = "./outputs"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"generated_{int(time.time())}.wav")
PROCESSOR.save_audio(
outputs.speech_outputs[0].cpu(), # Move to CPU for saving
output_path=output_path,
)
progress(1.0, desc="Complete!")
# Create status message
device_info = "ZeroGPU (CUDA)" if device == "cuda" else "CPU"
status = f"""✅ **Generation Complete!**
📊 **Metrics:**
- Audio Duration: {audio_duration:.2f}s
- Generation Time: {generation_time:.2f}s
- Real-Time Factor: {rtf:.2f}x
- Speaker: {speaker_name}
- CFG Scale: {cfg_scale}
- Device: {device_info}
"""
# Move model back to CPU to free GPU memory
MODEL.to("cpu")
if device == "cuda":
torch.cuda.empty_cache()
return output_path, status
else:
MODEL.to("cpu")
if device == "cuda":
torch.cuda.empty_cache()
return None, "❌ Error: No audio output generated."
except Exception as e:
import traceback
error_msg = f"❌ Error during generation:\n{str(e)}\n\n{traceback.format_exc()}"
print(error_msg)
# Clean up GPU memory on error
try:
MODEL.to("cpu")
if torch.cuda.is_available():
torch.cuda.empty_cache()
except:
pass
return None, error_msg
# Create Gradio interface
with gr.Blocks(fill_height=True) as demo:
gr.Markdown(
f"""
# 🎙️ VibeVoice-Realtime Text-to-Speech
Convert text to natural-sounding speech using Microsoft's VibeVoice-Realtime model.
**🚀 Device:** {"ZeroGPU - Efficient GPU allocation for fast inference!" if CUDA_AVAILABLE else "CPU Mode - GPU will be allocated when generating"}
<div style="text-align: center; margin-top: 10px;">
<a href="https://huggingface.co/spaces/akhaliq/anycoder" target="_blank" style="text-decoration: none; color: #4F46E5; font-weight: 600;">
Built with anycoder ✨
</a>
</div>
"""
)
with gr.Row():
with gr.Column(scale=2):
# Input section
text_input = gr.Textbox(
label="Text to Convert",
placeholder="Enter the text you want to convert to speech...",
lines=8,
max_lines=20,
)
with gr.Row():
speaker_dropdown = gr.Dropdown(
choices=list(VOICE_MAPPER.available_voices.keys()),
value=list(VOICE_MAPPER.available_voices.keys())[0]
if VOICE_MAPPER.available_voices
else None,
label="Speaker Voice",
info="Select the voice to use for speech generation",
)
cfg_slider = gr.Slider(
minimum=1.0,
maximum=3.0,
value=1.5,
step=0.1,
label="CFG Scale",
info="Higher values = more faithful to text (1.0-3.0)",
)
generate_btn = gr.Button("🎵 Generate Speech", variant="primary", size="lg")
with gr.Column(scale=1):
# Output section
audio_output = gr.Audio(
label="Generated Speech",
type="filepath",
interactive=False,
)
status_output = gr.Markdown(
"""
**Status:** Ready to generate speech
Enter text and click "Generate Speech" to start.
⚡ GPU will be allocated dynamically for generation
"""
)
# Example inputs
gr.Examples(
examples=[
[
"VibeVoice is a novel framework designed for generating expressive, long-form, multi-speaker conversational audio.",
list(VOICE_MAPPER.available_voices.keys())[0]
if VOICE_MAPPER.available_voices
else "Wayne",
1.5,
],
[
"The quick brown fox jumps over the lazy dog. This is a test of the text-to-speech system.",
list(VOICE_MAPPER.available_voices.keys())[0]
if VOICE_MAPPER.available_voices
else "Wayne",
1.5,
],
],
inputs=[text_input, speaker_dropdown, cfg_slider],
label="Example Inputs",
)
# Event handlers
generate_btn.click(
fn=generate_speech,
inputs=[text_input, speaker_dropdown, cfg_slider],
outputs=[audio_output, status_output],
api_name="generate",
)
# Footer
gr.Markdown(
"""
---
### 📝 Notes:
- **Model**: Microsoft VibeVoice-Realtime-0.5B
- **Sample Rate**: 24kHz
- **Context Length**: 8K tokens
- **Generation Length**: ~10 minutes
- **Infrastructure**: ZeroGPU (Hugging Face Spaces)
### ⚠️ Important:
- The model is designed for English text only
- Very short inputs (< 3 words) may produce unstable results
- Code, formulas, and special symbols are not supported
- Please use responsibly and disclose AI-generated content
- GPU is allocated dynamically - generation may take a few seconds to start
"""
)
# Launch the app with Gradio 6 syntax
if __name__ == "__main__":
demo.launch(
theme=gr.themes.Soft(
primary_hue="blue",
secondary_hue="indigo",
neutral_hue="slate",
),
footer_links=[
{"label": "Built with anycoder", "url": "https://huggingface.co/spaces/akhaliq/anycoder"}
],
)