Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import numpy as np | |
| import random | |
| import spaces | |
| import torch | |
| from diffusers import DiffusionPipeline, FlowMatchEulerDiscreteScheduler, AutoencoderTiny, AutoencoderKL | |
| # from para_attn.first_block_cache.diffusers_adapters import apply_cache_on_pipe | |
| from transformers import CLIPTextModel, CLIPTokenizer,T5EncoderModel, T5TokenizerFast | |
| from live_preview_helpers import calculate_shift, retrieve_timesteps, flux_pipe_call_that_returns_an_iterable_of_images | |
| import multiprocessing as mp | |
| import os | |
| import requests | |
| import tempfile | |
| import shutil | |
| from urllib.parse import urlparse | |
| dtype = torch.bfloat16 | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| #black-forest-labs/FLUX.1-Krea-dev | |
| taef1 = AutoencoderTiny.from_pretrained("madebyollin/taef1", torch_dtype=dtype).to(device) | |
| good_vae = AutoencoderKL.from_pretrained("black-forest-labs/FLUX.1-dev", subfolder="vae", torch_dtype=dtype).to(device) | |
| pipe = DiffusionPipeline.from_pretrained("black-forest-labs/FLUX.1-dev", torch_dtype=dtype, vae=taef1).to(device) | |
| # srpo_128_base_oficial_model_fp16.safetensors | |
| # pipe.load_lora_weights('Alissonerdx/flux.1-dev-SRPO-LoRas', weight_name='srpo_16_base_oficial_model_fp16.safetensors') | |
| # pipe.fuse_lora() | |
| torch.cuda.empty_cache() | |
| MAX_SEED = np.iinfo(np.int32).max | |
| MAX_IMAGE_SIZE = 2048 | |
| pipe.flux_pipe_call_that_returns_an_iterable_of_images = flux_pipe_call_that_returns_an_iterable_of_images.__get__(pipe) | |
| def load_lora_auto(pipe, lora_input): | |
| lora_input = lora_input.strip() | |
| if not lora_input: | |
| return | |
| # If it's just an ID like "author/model" | |
| if "/" in lora_input and not lora_input.startswith("http"): | |
| pipe.load_lora_weights(lora_input) | |
| return | |
| if lora_input.startswith("http"): | |
| url = lora_input | |
| # Repo page (no blob/resolve) | |
| if "huggingface.co" in url and "/blob/" not in url and "/resolve/" not in url: | |
| repo_id = urlparse(url).path.strip("/") | |
| pipe.load_lora_weights(repo_id) | |
| return | |
| # Blob link → convert to resolve link | |
| if "/blob/" in url: | |
| url = url.replace("/blob/", "/resolve/") | |
| # Download direct file | |
| tmp_dir = tempfile.mkdtemp() | |
| local_path = os.path.join(tmp_dir, os.path.basename(urlparse(url).path)) | |
| try: | |
| print(f"Downloading LoRA from {url}...") | |
| resp = requests.get(url, stream=True) | |
| resp.raise_for_status() | |
| with open(local_path, "wb") as f: | |
| for chunk in resp.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print(f"Saved LoRA to {local_path}") | |
| pipe.load_lora_weights(local_path) | |
| finally: | |
| shutil.rmtree(tmp_dir, ignore_errors=True) | |
| def infer(prompt, seed=42, randomize_seed=False, width=1024, height=1024, guidance_scale=3.5, num_inference_steps=28, lora_id=None, lora_scale=0.95, progress=gr.Progress(track_tqdm=True)): | |
| if randomize_seed: | |
| seed = random.randint(0, MAX_SEED) | |
| generator = torch.Generator().manual_seed(seed) | |
| # for img in pipe.flux_pipe_call_that_returns_an_iterable_of_images( | |
| # prompt=prompt, | |
| # guidance_scale=guidance_scale, | |
| # num_inference_steps=num_inference_steps, | |
| # width=width, | |
| # height=height, | |
| # generator=generator, | |
| # output_type="pil", | |
| # good_vae=good_vae, | |
| # ): | |
| # yield img, seed | |
| # Handle LoRA loading | |
| # Load LoRA weights and prepare joint_attention_kwargs | |
| if lora_id and lora_id.strip() != "": | |
| pipe.unload_lora_weights() | |
| # pipe.load_lora_weights(lora_id.strip()) | |
| load_lora_auto(pipe, lora_id.strip()) | |
| joint_attention_kwargs = {"scale": lora_scale} | |
| else: | |
| joint_attention_kwargs = None | |
| # apply_cache_on_pipe( | |
| # pipe, | |
| # # residual_diff_threshold=0.2, | |
| # ) | |
| try: | |
| # Call the custom pipeline function with the correct keyword argument | |
| for img in pipe.flux_pipe_call_that_returns_an_iterable_of_images( | |
| prompt=prompt, | |
| guidance_scale=guidance_scale, | |
| num_inference_steps=num_inference_steps, | |
| width=width, | |
| height=height, | |
| generator=generator, | |
| output_type="pil", | |
| good_vae=good_vae, # Assuming good_vae is defined elsewhere | |
| joint_attention_kwargs=joint_attention_kwargs, # Fixed parameter name | |
| ): | |
| yield img, seed | |
| finally: | |
| # Unload LoRA weights if they were loaded | |
| if lora_id: | |
| pipe.unload_lora_weights() | |
| examples = [ | |
| "a tiny astronaut hatching from an egg on the moon", | |
| "a cat holding a sign that says hello world", | |
| "an anime illustration of a wiener schnitzel", | |
| ] | |
| css = """ | |
| #col-container { | |
| margin: 0 auto; | |
| max-width: 960px; | |
| } | |
| .generate-btn { | |
| background: linear-gradient(90deg, #4B79A1 0%, #283E51 100%) !important; | |
| border: none !important; | |
| color: white !important; | |
| } | |
| .generate-btn:hover { | |
| transform: translateY(-2px); | |
| box-shadow: 0 5px 15px rgba(0,0,0,0.2); | |
| } | |
| """ | |
| with gr.Blocks(css=css) as app: | |
| gr.HTML("<center><h1>FLUX.1-Dev with LoRA support</h1></center>") | |
| with gr.Column(elem_id="col-container"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Row(): | |
| text_prompt = gr.Textbox(label="Prompt", placeholder="Enter a prompt here", lines=3, elem_id="prompt-text-input") | |
| with gr.Row(): | |
| custom_lora = gr.Textbox(label="Custom LoRA (optional)", info="LoRA Hugging Face path", placeholder="multimodalart/vintage-ads-flux") | |
| with gr.Row(): | |
| with gr.Accordion("Advanced Settings", open=False): | |
| lora_scale = gr.Slider( | |
| label="LoRA Scale", | |
| minimum=0, | |
| maximum=2, | |
| step=0.01, | |
| value=0.95, | |
| ) | |
| with gr.Row(): | |
| width = gr.Slider(label="Width", value=1024, minimum=64, maximum=2048, step=8) | |
| height = gr.Slider(label="Height", value=1024, minimum=64, maximum=2048, step=8) | |
| seed = gr.Slider(label="Seed", value=-1, minimum=-1, maximum=4294967296, step=1) | |
| randomize_seed = gr.Checkbox(label="Randomize seed", value=True) | |
| with gr.Row(): | |
| steps = gr.Slider(label="Inference steps steps", value=28, minimum=1, maximum=100, step=1) | |
| cfg = gr.Slider(label="Guidance Scale", value=3.5, minimum=1, maximum=20, step=0.5) | |
| # method = gr.Radio(label="Sampling method", value="DPM++ 2M Karras", choices=["DPM++ 2M Karras", "DPM++ SDE Karras", "Euler", "Euler a", "Heun", "DDIM"]) | |
| with gr.Row(): | |
| # text_button = gr.Button("Run", variant='primary', elem_id="gen-button") | |
| text_button = gr.Button("✨ Generate Image", variant='primary', elem_classes=["generate-btn"]) | |
| with gr.Column(): | |
| with gr.Row(): | |
| image_output = gr.Image(type="pil", label="Image Output", elem_id="gallery") | |
| # gr.Markdown(article_text) | |
| with gr.Column(): | |
| gr.Examples( | |
| examples = examples, | |
| inputs = [text_prompt], | |
| ) | |
| gr.on( | |
| triggers=[text_button.click, text_prompt.submit], | |
| fn = infer, | |
| inputs=[text_prompt, seed, randomize_seed, width, height, cfg, steps, custom_lora, lora_scale], | |
| outputs=[image_output, seed] | |
| ) | |
| # text_button.click(query, inputs=[custom_lora, text_prompt, steps, cfg, randomize_seed, seed, width, height], outputs=[image_output,seed_output, seed]) | |
| # text_button.click(infer, inputs=[text_prompt, seed, randomize_seed, width, height, cfg, steps, custom_lora, lora_scale], outputs=[image_output,seed_output, seed]) | |
| app.launch(mcp_server=True, share=True) | |