Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import numpy as np | |
| import random, json, spaces, torch, time | |
| from diffusers import AutoencoderKL, FlowMatchEulerDiscreteScheduler | |
| from transformers import AutoTokenizer, Qwen3ForCausalLM | |
| from safetensors.torch import load_file | |
| from utils import repo_utils, prompt_utils, image_utils | |
| from omegaconf import OmegaConf | |
| # clone and move videox_fun | |
| repo_utils.clone_repo_if_not_exists("https://github.com/aigc-apps/VideoX-Fun.git", "repos") | |
| repo_utils.move_folder("repos/VideoX-Fun/videox_fun", "videox_fun") | |
| repo_utils.move_folder("repos/VideoX-Fun/config", "config") | |
| from videox_fun.pipeline import ZImageControlPipeline | |
| from videox_fun.models import ZImageControlTransformer2DModel | |
| from videox_fun.utils.utils import get_image_latent | |
| from controlnet_aux.processor import Processor | |
| #clone models | |
| repo_utils.clone_repo_if_not_exists("https://huggingface.co/Tongyi-MAI/Z-Image-Turbo", "models") | |
| repo_utils.clone_repo_if_not_exists("https://huggingface.co/alibaba-pai/Z-Image-Turbo-Fun-Controlnet-Union-2.0", "models") | |
| MODEL_LOCAL = "models/Z-Image-Turbo/" | |
| TRANSFORMER_LOCAL = "models/Z-Image-Turbo-Fun-Controlnet-Union-2.0/Z-Image-Turbo-Fun-Controlnet-Union-2.0.safetensors" | |
| TRANSFORMER_CONFIG = "config/z_image/z_image_control_2.0.yaml" | |
| TRANSFORMER_MERGED = "models/ZIT-Merged" | |
| MAX_SEED = np.iinfo(np.int32).max | |
| MAX_IMAGE_SIZE = 1280 | |
| DTYPE = torch.bfloat16 | |
| has_merged = repo_utils.check_dir_exist(TRANSFORMER_MERGED) | |
| # load transformer | |
| config = OmegaConf.load(TRANSFORMER_CONFIG) | |
| # if not has_merged: | |
| print('load transformer from base') | |
| transformer = ZImageControlTransformer2DModel.from_pretrained( | |
| MODEL_LOCAL, | |
| subfolder="transformer", | |
| transformer_additional_kwargs=OmegaConf.to_container(config['transformer_additional_kwargs']), | |
| ).to("cuda", torch.bfloat16) | |
| print('load state_dict') | |
| state_dict = load_file(TRANSFORMER_LOCAL) | |
| state_dict = state_dict["state_dict"] if "state_dict" in state_dict else state_dict | |
| m, u = transformer.load_state_dict(state_dict, strict=False) | |
| print(f"missing keys: {len(m)}, unexpected keys: {len(u)}") | |
| transformer.save_pretrained(TRANSFORMER_MERGED) | |
| # else: | |
| # print('load transformer from merged to bypass calculation') | |
| # transformer = ZImageControlTransformer2DModel.from_pretrained( | |
| # TRANSFORMER_MERGED, | |
| # transformer_additional_kwargs=OmegaConf.to_container(config['transformer_additional_kwargs']), | |
| # ).to("cuda", torch.bfloat16) | |
| print("transformer ready.") | |
| # load ZImageControlPipeline | |
| vae = AutoencoderKL.from_pretrained( | |
| MODEL_LOCAL, | |
| subfolder="vae", | |
| device_map="cuda", | |
| torch_dtype=DTYPE, | |
| ) | |
| print("vae ready.") | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| MODEL_LOCAL, | |
| subfolder="tokenizer" | |
| ) | |
| print("tokenizer ready.") | |
| text_encoder = Qwen3ForCausalLM.from_pretrained( | |
| MODEL_LOCAL, | |
| subfolder="text_encoder", | |
| torch_dtype=DTYPE, | |
| ) | |
| scheduler = FlowMatchEulerDiscreteScheduler(num_train_timesteps=1000, shift=7) | |
| # scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( | |
| # MODEL_LOCAL, | |
| # subfolder="scheduler" | |
| # ) | |
| print("scheduler ready.") | |
| pipe = ZImageControlPipeline( | |
| vae=vae, | |
| tokenizer=tokenizer, | |
| text_encoder=text_encoder, | |
| transformer=transformer, | |
| scheduler=scheduler, | |
| ) | |
| pipe.to("cuda", torch.bfloat16) | |
| print("pipe ready.") | |
| def prepare(edit_dict, prompt): | |
| # return edit_dict['background'] | |
| if not prompt: prompt = "Ultra HD, 4K" | |
| output_image = image_utils.replace_transparent(edit_dict['layers'][0], (0, 0, 0)) | |
| return output_image, prompt | |
| def inference( | |
| prompt, | |
| negative_prompt, | |
| edit_dict, | |
| mask_image, | |
| control_context_scale = 0.75, | |
| seed=42, | |
| randomize_seed=False, | |
| guidance_scale=1, | |
| num_inference_steps=8, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| # guidance_scale=1 | |
| timestamp = time.time() | |
| print(f"timestamp: {timestamp}") | |
| # process image | |
| print("DEBUG: process image") | |
| if edit_dict is None or mask_image is None: | |
| print("Error: edit_dict or mask_image is empty.") | |
| return None | |
| upscale_target = 2 | |
| upscale_nearest = 16 | |
| upscale_max_size = 1440 | |
| # rescale to prevent OOM | |
| input_image = edit_dict['background'] | |
| input_image, width, height = image_utils.rescale_image(input_image, upscale_target, upscale_nearest, max_size=upscale_max_size) | |
| sample_size = [height, width] | |
| print("DEBUG: inpaint_image") | |
| if input_image is not None: | |
| inpaint_image = get_image_latent(input_image, sample_size=sample_size)[:, :, 0] | |
| else: | |
| inpaint_image = torch.zeros([1, 3, sample_size[0], sample_size[1]]) | |
| print("DEBUG: mask_image") | |
| if mask_image is not None: | |
| mask_image, w, h = image_utils.rescale_image(mask_image, upscale_target, upscale_nearest, max_size=upscale_max_size) | |
| mask_image = get_image_latent(mask_image, sample_size=sample_size)[:, :1, 0] | |
| else: | |
| mask_image = torch.ones([1, 1, sample_size[0], sample_size[1]]) * 255 | |
| # print("DEBUG: control_image_torch") | |
| # processor = Processor('openpose_full') | |
| # control_image, w, h = image_utils.rescale_image(input_image, upscale_target, upscale_nearest, max_size=1280) | |
| # control_image = control_image.resize((1024, 1024)) | |
| # control_image = processor(control_image, to_pil=True) | |
| # control_image = control_image.resize((width, height)) | |
| # control_image_torch = get_image_latent(control_image, sample_size=sample_size)[:, :, 0] | |
| # generation | |
| if randomize_seed: seed = random.randint(0, MAX_SEED) | |
| generator = torch.Generator().manual_seed(seed) | |
| output_image = pipe( | |
| prompt=prompt, | |
| negative_prompt = negative_prompt, | |
| height=height, | |
| width=width, | |
| generator=generator, | |
| guidance_scale=guidance_scale, | |
| image = inpaint_image, | |
| mask_image = mask_image, | |
| # control_image=control_image_torch, | |
| num_inference_steps=num_inference_steps, | |
| control_context_scale=control_context_scale, | |
| ).images[0] | |
| return output_image, seed | |
| def read_file(path: str) -> str: | |
| with open(path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| return content | |
| css = """ | |
| #col-container { | |
| margin: 0 auto; | |
| max-width: 960px; | |
| } | |
| """ | |
| with open('examples/0data.json', 'r') as file: examples = json.load(file) | |
| with gr.Blocks() as demo: | |
| with gr.Column(elem_id="col-container"): | |
| with gr.Column(): | |
| gr.HTML(read_file("static/header.html")) | |
| with gr.Row(): | |
| with gr.Column(): | |
| edit_dict = gr.ImageMask( | |
| height=600, | |
| sources=['upload', 'clipboard'], | |
| type="pil", | |
| brush= gr.Brush( | |
| colors=["#FFFFFF"], | |
| color_mode="fixed", | |
| # default_size=75 | |
| ), | |
| label="Edit Image" | |
| ) | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| show_label=False, | |
| lines=2, | |
| placeholder="Enter your prompt", | |
| # container=False, | |
| ) | |
| run_button = gr.Button("Generate", variant="primary") | |
| with gr.Accordion("Advanced Settings", open=False): | |
| negative_prompt = gr.Textbox( | |
| label="Negative prompt", | |
| lines=2, | |
| container=False, | |
| placeholder="Enter your negative prompt", | |
| value="blurry ugly bad" | |
| ) | |
| # with gr.Row(): | |
| num_inference_steps = gr.Slider( | |
| label="Steps", | |
| minimum=1, | |
| maximum=30, | |
| step=1, | |
| value=9, | |
| ) | |
| control_context_scale = gr.Slider( | |
| label="Context scale", | |
| minimum=0.0, | |
| maximum=1.0, | |
| step=0.01, | |
| value=0.40, | |
| ) | |
| guidance_scale = gr.Slider( | |
| label="Guidance scale", | |
| minimum=0.0, | |
| maximum=5.0, | |
| step=0.1, | |
| value=1.0, | |
| ) | |
| seed = gr.Slider( | |
| label="Seed", | |
| minimum=0, | |
| maximum=MAX_SEED, | |
| step=1, | |
| value=42, | |
| ) | |
| randomize_seed = gr.Checkbox(label="Randomize seed", value=False) | |
| with gr.Column(): | |
| output_image = gr.Image(label="Generated image", show_label=False) | |
| # polished_prompt = gr.Textbox(label="Polished prompt", interactive=False) | |
| with gr.Accordion("Preprocessor data", open=False): | |
| mask_image = gr.Image( | |
| label="Generated Mask", | |
| interactive=False, | |
| type="pil", | |
| ) | |
| # control_image = gr.Image( | |
| # label="Generated Control Image", | |
| # interactive=False, | |
| # type="pil", | |
| # ) | |
| gr.Examples(examples=examples, inputs=[edit_dict, prompt]) | |
| gr.Markdown(read_file("static/footer.md")) | |
| # edit_dict.upload(fn=lambda x: x, inputs=[mask_image], outputs=[input_image]) | |
| run_button.click( | |
| fn=prepare, | |
| inputs=[edit_dict, prompt], | |
| outputs=[mask_image, prompt] | |
| ).then( | |
| fn=inference, | |
| inputs=[ | |
| prompt, | |
| negative_prompt, | |
| edit_dict, | |
| mask_image, | |
| control_context_scale, | |
| seed, | |
| randomize_seed, | |
| guidance_scale, | |
| num_inference_steps, | |
| ], | |
| outputs=[output_image, seed], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True, css=css) | |