tggtg commited on
Commit
ceb0a08
·
verified ·
1 Parent(s): c918f3b

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +293 -0
app.py ADDED
@@ -0,0 +1,293 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app.py
2
+ # AI Video Enhancer 4K - Gradio app for Hugging Face Spaces
3
+ # NOTE: This app attempts to use GFPGAN and Real-ESRGAN if installed.
4
+ # If they're not available (common on CPU-only environments), it falls back to ffmpeg-based upscale.
5
+
6
+ import os
7
+ import shutil
8
+ import subprocess
9
+ import tempfile
10
+ import time
11
+ from pathlib import Path
12
+ from typing import Tuple
13
+
14
+ import gradio as gr
15
+ from PIL import Image
16
+
17
+ # Try to import optional enhancement libs
18
+ try:
19
+ import torch
20
+ from gfpgan import GFPGANer # type: ignore
21
+ from realesrgan import RealESRGAN # type: ignore
22
+ HAVE_ENHANCERS = True
23
+ except Exception:
24
+ HAVE_ENHANCERS = False
25
+
26
+ # Config
27
+ MAX_SECONDS = 30 # maximum video length
28
+ TEMP_DIR = Path(tempfile.gettempdir()) / "hf_video_enhancer"
29
+ TEMP_DIR.mkdir(parents=True, exist_ok=True)
30
+
31
+ def run_cmd(cmd):
32
+ """Run shell command, raise if failed."""
33
+ p = subprocess.run(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
34
+ if p.returncode != 0:
35
+ raise RuntimeError(f"Command failed: {' '.join(cmd)}\nSTDOUT:{p.stdout.decode()}\nSTDERR:{p.stderr.decode()}")
36
+ return p.stdout.decode()
37
+
38
+ def probe_video(video_path: str) -> Tuple[float, int, int]:
39
+ """Return (duration_seconds, width, height) using ffprobe."""
40
+ cmd = [
41
+ "ffprobe", "-v", "error",
42
+ "-select_streams", "v:0",
43
+ "-show_entries", "stream=width,height,duration",
44
+ "-of", "default=noprint_wrappers=1:nokey=0",
45
+ video_path
46
+ ]
47
+ p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
48
+ out = p.stdout.decode()
49
+ # parse
50
+ width = height = 0
51
+ duration = 0.0
52
+ for line in out.splitlines():
53
+ if line.startswith("width="):
54
+ width = int(line.split("=", 1)[1])
55
+ if line.startswith("height="):
56
+ height = int(line.split("=", 1)[1])
57
+ if line.startswith("duration="):
58
+ try:
59
+ duration = float(line.split("=", 1)[1])
60
+ except:
61
+ duration = 0.0
62
+ return duration, width, height
63
+
64
+ def extract_frames(video_path: str, frames_dir: Path):
65
+ """Extract frames as PNG to frames_dir."""
66
+ frames_dir.mkdir(parents=True, exist_ok=True)
67
+ # %06d.png
68
+ cmd = [
69
+ "ffmpeg", "-y", "-i", video_path,
70
+ "-vsync", "0",
71
+ str(frames_dir / "%06d.png")
72
+ ]
73
+ run_cmd(cmd)
74
+
75
+ def reassemble_video(frames_dir: Path, audio_src: str, out_path: str, fps: float = 30.0):
76
+ """Reassemble frames to video and add original audio (if exists)."""
77
+ # Determine fps if possible by probing source
78
+ cmd_probe = [
79
+ "ffprobe", "-v", "error", "-select_streams", "v:0",
80
+ "-show_entries", "stream=r_frame_rate",
81
+ "-of", "default=noprint_wrappers=1:nokey=1", audio_src
82
+ ]
83
+ try:
84
+ p = subprocess.run(cmd_probe, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
85
+ r = p.stdout.decode().strip()
86
+ if "/" in r:
87
+ a, b = r.split("/")
88
+ fps = float(a) / float(b)
89
+ except Exception:
90
+ pass
91
+
92
+ # Create video from frames
93
+ tmp_video = str(frames_dir.parent / "tmp_video_no_audio.mp4")
94
+ cmd_encode = [
95
+ "ffmpeg", "-y",
96
+ "-framerate", str(fps),
97
+ "-i", str(frames_dir / "%06d.png"),
98
+ "-c:v", "libx264", "-preset", "veryfast", "-pix_fmt", "yuv420p",
99
+ tmp_video
100
+ ]
101
+ run_cmd(cmd_encode)
102
+
103
+ # Add audio if present
104
+ # First check if the source has an audio stream
105
+ p = subprocess.run(["ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "default=noprint_wrappers=1", audio_src], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
106
+ has_audio = bool(p.stdout.decode().strip())
107
+ if has_audio:
108
+ cmd_mux = [
109
+ "ffmpeg", "-y",
110
+ "-i", tmp_video,
111
+ "-i", audio_src,
112
+ "-c:v", "copy",
113
+ "-c:a", "aac",
114
+ "-map", "0:v:0",
115
+ "-map", "1:a:0",
116
+ out_path
117
+ ]
118
+ run_cmd(cmd_mux)
119
+ os.remove(tmp_video)
120
+ else:
121
+ # just rename
122
+ shutil.move(tmp_video, out_path)
123
+
124
+ def simple_upscale_with_ffmpeg(frames_dir: Path, scale_factor: int = 2):
125
+ """Basic ffmpeg-based upscale (nearest/linear) as a fallback."""
126
+ # iterate frames and upscale in-place
127
+ for p in sorted(frames_dir.glob("*.png")):
128
+ tmp = str(p) + ".tmp.png"
129
+ cmd = [
130
+ "ffmpeg", "-y", "-i", str(p),
131
+ "-vf", f"scale=iw*{scale_factor}:ih*{scale_factor}:flags=lanczos",
132
+ tmp
133
+ ]
134
+ run_cmd(cmd)
135
+ os.replace(tmp, p)
136
+
137
+ def load_enhancers(device="cuda"):
138
+ """Initialize GFPGAN and Real-ESRGAN if available. Returns (gfpganer, realesrgan) or (None, None)."""
139
+ if not HAVE_ENHANCERS:
140
+ return None, None
141
+ # GFPGAN
142
+ try:
143
+ # GFPGANer parameters depend on package version; this is best-effort
144
+ gfpganer = GFPGANer(
145
+ model_path=None, # let package find default
146
+ upscale=1,
147
+ arch='clean',
148
+ channel_multiplier=2,
149
+ bg_upsampler=None
150
+ )
151
+ except Exception:
152
+ gfpganer = None
153
+
154
+ try:
155
+ # RealESRGAN uses a model name typically; attempt to load default x2 model
156
+ realesrgan = RealESRGAN(device, scale=4)
157
+ realesrgan.load_weights('RealESRGAN_x4plus') # may raise if not present
158
+ except Exception:
159
+ realesrgan = None
160
+
161
+ return gfpganer, realesrgan
162
+
163
+ def enhance_frames(frames_dir: Path, progress= None):
164
+ """Apply GFPGAN (face restore) then Real-ESRGAN upscaling if available. Otherwise fallback to ffmpeg upscale."""
165
+ if HAVE_ENHANCERS:
166
+ device = "cuda" if torch.cuda.is_available() else "cpu"
167
+ gfpganer, realesrgan = load_enhancers(device)
168
+ frame_paths = sorted(frames_dir.glob("*.png"))
169
+ n = len(frame_paths)
170
+ for i, fp in enumerate(frame_paths, start=1):
171
+ img = Image.open(fp).convert("RGB")
172
+ # face restore
173
+ try:
174
+ if gfpganer is not None:
175
+ _, restored = gfpganer.enhance(np.array(img), has_aligned=False, only_center_face=False, paste_back=True)
176
+ img = Image.fromarray(restored)
177
+ except Exception:
178
+ pass
179
+ # upscale
180
+ try:
181
+ if realesrgan is not None:
182
+ with torch.no_grad():
183
+ out = realesrgan.predict(img)
184
+ out.save(fp)
185
+ else:
186
+ # fallback to ffmpeg scaling for this frame
187
+ tmp = str(fp) + ".tmp.png"
188
+ cmd = [
189
+ "ffmpeg", "-y", "-i", str(fp),
190
+ "-vf", "scale=iw*2:ih*2:flags=lanczos",
191
+ tmp
192
+ ]
193
+ run_cmd(cmd)
194
+ os.replace(tmp, str(fp))
195
+ except Exception:
196
+ # final fallback: leave the frame as-is
197
+ pass
198
+ if progress:
199
+ progress(i / n)
200
+ else:
201
+ # simple ffmpeg upscale 2x
202
+ simple_upscale_with_ffmpeg(frames_dir, scale_factor=2)
203
+
204
+ # The main processing pipeline
205
+ def process_video(video_file) -> Tuple[str, str]:
206
+ """
207
+ Accepts an uploaded video file from Gradio,
208
+ processes it and returns (message, path_to_result_video)
209
+ """
210
+ # Save upload to temp
211
+ ts = int(time.time() * 1000)
212
+ base_dir = TEMP_DIR / f"job_{ts}"
213
+ base_dir.mkdir(parents=True, exist_ok=True)
214
+ in_path = base_dir / "input_video"
215
+ with open(in_path, "wb") as f:
216
+ f.write(video_file.read())
217
+
218
+ # probe
219
+ try:
220
+ duration, w, h = probe_video(str(in_path))
221
+ except Exception as e:
222
+ shutil.rmtree(base_dir, ignore_errors=True)
223
+ return f"Error probing video: {e}", ""
224
+
225
+ if duration > MAX_SECONDS:
226
+ shutil.rmtree(base_dir, ignore_errors=True)
227
+ return f"Video too long: {duration:.1f}s (limit {MAX_SECONDS}s). Trim it and try again.", ""
228
+
229
+ # extract frames
230
+ frames_dir = base_dir / "frames"
231
+ try:
232
+ extract_frames(str(in_path), frames_dir)
233
+ except Exception as e:
234
+ shutil.rmtree(base_dir, ignore_errors=True)
235
+ return f"Failed extracting frames: {e}", ""
236
+
237
+ # enhancement step
238
+ # Using a progress hook from Gradio is a bit awkward; we'll just run and hope for the best
239
+ try:
240
+ enhance_frames(frames_dir)
241
+ except Exception as e:
242
+ # continue, fallback allowed
243
+ print(f"Enhancement failed: {e}")
244
+
245
+ # reassemble and add audio
246
+ out_video = base_dir / "enhanced_output.mp4"
247
+ try:
248
+ reassemble_video(frames_dir, str(in_path), str(out_video))
249
+ except Exception as e:
250
+ shutil.rmtree(base_dir, ignore_errors=True)
251
+ return f"Failed to reassemble video: {e}", ""
252
+
253
+ # Optionally: cleanup frames to save space
254
+ try:
255
+ shutil.rmtree(frames_dir)
256
+ except Exception:
257
+ pass
258
+
259
+ # Serve out_video path as string (Gradio will handle file serving)
260
+ return "Processing complete. Download below.", str(out_video)
261
+
262
+ # Gradio UI
263
+ with gr.Blocks(title="AI Video Enhancer 4K") as demo:
264
+ gr.Markdown("# AI Video Enhancer 4K")
265
+ gr.Markdown("Upload a short video (<= 30s). The app will attempt to enhance faces and upscale frames. Heavy models may not run on CPU-only free environments.")
266
+
267
+ with gr.Row():
268
+ with gr.Column(scale=2):
269
+ video_in = gr.File(label="Upload video (mp4/avi/mov)", file_count="single")
270
+ btn = gr.Button("Enhance Video")
271
+ status = gr.Textbox(label="Status", interactive=False)
272
+ with gr.Column(scale=1):
273
+ out_video = gr.Video(label="Enhanced video")
274
+ download_btn = gr.Button("Download enhanced video")
275
+
276
+ def on_click_process(file_obj):
277
+ if not file_obj:
278
+ return "Please upload a video file.", None
279
+ try:
280
+ msg, path = process_video(file_obj)
281
+ if path:
282
+ return msg, path
283
+ else:
284
+ return msg, None
285
+ except Exception as e:
286
+ return f"Unexpected error: {e}", None
287
+
288
+ btn.click(fn=on_click_process, inputs=[video_in], outputs=[status, out_video])
289
+
290
+ gr.Markdown("**Notes:** If the Space is CPU-only or lacks the heavy model weights, the app will run a simpler ffmpeg upscale. For realistic Real-ESRGAN/GFPGAN results, enable GPU on the Space and ensure model weights are installed in `./weights` or accessible to the packages used.")
291
+
292
+ if __name__ == "__main__":
293
+ demo.launch()