import gradio as gr import cv2 import numpy as np import os import subprocess import json import tempfile import shutil from pathlib import Path import torch class NeRFReconstructor: def __init__(self): self.temp_dir = None self.frames_dir = None self.output_dir = None self.colmap_dir = None def setup_directories(self): """Create temporary directories for processing""" self.temp_dir = tempfile.mkdtemp(prefix="nerf_") self.frames_dir = os.path.join(self.temp_dir, "images") self.colmap_dir = os.path.join(self.temp_dir, "colmap") self.output_dir = os.path.join(self.temp_dir, "output") os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.colmap_dir, exist_ok=True) os.makedirs(self.output_dir, exist_ok=True) return self.temp_dir def extract_frames(self, video_path, fps=2, max_frames=100, progress=gr.Progress()): """Extract frames from video""" progress(0, desc="📹 Opening video file...") if not os.path.exists(video_path): raise ValueError("Video file not found") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError("Could not open video file") video_fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) duration = total_frames / video_fps if video_fps > 0 else 0 # Calculate frame interval frame_interval = max(1, int(video_fps / fps)) frames_extracted = 0 frame_count = 0 frame_paths = [] progress(0.05, desc=f"🎬 Extracting frames (0/{max_frames})...") while frames_extracted < max_frames: ret, frame = cap.read() if not ret: break if frame_count % frame_interval == 0: # Save frame with high quality frame_path = os.path.join(self.frames_dir, f"frame_{frames_extracted:05d}.jpg") cv2.imwrite(frame_path, frame, [cv2.IMWRITE_JPEG_QUALITY, 95]) frame_paths.append(frame_path) frames_extracted += 1 # Update progress (5-25%) progress_val = 0.05 + (0.20 * frames_extracted / max_frames) progress(progress_val, desc=f"🎬 Extracting frames ({frames_extracted}/{max_frames})...") frame_count += 1 cap.release() return frame_paths, { "frames_extracted": frames_extracted, "video_fps": video_fps, "video_duration": duration, "resolution": f"{width}x{height}", "total_video_frames": total_frames } def run_colmap(self, progress=gr.Progress()): """Run COLMAP for Structure from Motion""" database_path = os.path.join(self.colmap_dir, "database.db") sparse_dir = os.path.join(self.colmap_dir, "sparse") os.makedirs(sparse_dir, exist_ok=True) try: # Check if COLMAP is available result = subprocess.run(["which", "colmap"], capture_output=True, text=True) if result.returncode != 0: return {"status": "colmap_not_found", "message": "COLMAP not installed"} # Feature extraction (25-40%) progress(0.25, desc="🔍 COLMAP: Extracting image features...") result = subprocess.run([ "colmap", "feature_extractor", "--database_path", database_path, "--image_path", self.frames_dir, "--ImageReader.single_camera", "1", "--ImageReader.camera_model", "OPENCV", "--SiftExtraction.use_gpu", "1" ], capture_output=True, text=True, timeout=600) if result.returncode != 0: return {"status": "error", "message": f"Feature extraction failed: {result.stderr}"} # Feature matching (40-55%) progress(0.40, desc="🔗 COLMAP: Matching features...") result = subprocess.run([ "colmap", "exhaustive_matcher", "--database_path", database_path, "--SiftMatching.use_gpu", "1" ], capture_output=True, text=True, timeout=600) if result.returncode != 0: return {"status": "error", "message": f"Feature matching failed: {result.stderr}"} # Sparse reconstruction (55-70%) progress(0.55, desc="📐 COLMAP: Building sparse 3D reconstruction...") result = subprocess.run([ "colmap", "mapper", "--database_path", database_path, "--image_path", self.frames_dir, "--output_path", sparse_dir ], capture_output=True, text=True, timeout=900) if result.returncode != 0: return {"status": "error", "message": f"Mapping failed: {result.stderr}"} # Check if reconstruction was successful model_dir = os.path.join(sparse_dir, "0") if not os.path.exists(model_dir): return {"status": "error", "message": "No reconstruction created"} return {"status": "success", "sparse_dir": sparse_dir} except subprocess.TimeoutExpired: return {"status": "timeout", "message": "COLMAP processing timeout"} except Exception as e: return {"status": "error", "message": str(e)} def convert_colmap_to_nerf(self, progress=gr.Progress()): """Convert COLMAP output to NeRF format""" progress(0.70, desc="🔄 Converting COLMAP to NeRF format...") try: # Use nerfstudio's data processing result = subprocess.run([ "ns-process-data", "images", "--data", self.frames_dir, "--output-dir", self.output_dir, "--skip-colmap" ], capture_output=True, text=True, timeout=300) if result.returncode != 0: return {"status": "error", "message": f"Conversion failed: {result.stderr}"} # Copy COLMAP results colmap_sparse = os.path.join(self.colmap_dir, "sparse", "0") output_colmap = os.path.join(self.output_dir, "colmap", "sparse", "0") os.makedirs(os.path.dirname(output_colmap), exist_ok=True) if os.path.exists(colmap_sparse): shutil.copytree(colmap_sparse, output_colmap, dirs_exist_ok=True) return {"status": "success"} except Exception as e: return {"status": "error", "message": str(e)} def train_nerf(self, num_iterations=1000, progress=gr.Progress()): """Train NeRF model using Nerfstudio""" progress(0.75, desc="🧠 Training NeRF model...") try: # Check for nerfstudio result = subprocess.run(["which", "ns-train"], capture_output=True, text=True) if result.returncode != 0: return {"status": "nerfstudio_not_found", "message": "Nerfstudio not installed"} # Check GPU availability gpu_available = torch.cuda.is_available() gpu_info = f"GPU: {torch.cuda.get_device_name(0)}" if gpu_available else "CPU mode" progress(0.75, desc=f"🧠 Training NeRF ({gpu_info})...") # Train with nerfacto method (optimized for quality and speed) config_path = os.path.join(self.output_dir, "config.yml") result = subprocess.run([ "ns-train", "nerfacto", "--data", self.output_dir, "--output-dir", os.path.join(self.output_dir, "models"), "--max-num-iterations", str(num_iterations), "--pipeline.model.predict-normals", "False", "--viewer.quit-on-train-completion", "True", "--vis", "viewer+tensorboard" ], capture_output=True, text=True, timeout=3600) # Update progress incrementally during training for i in range(10): progress(0.75 + (i * 0.015), desc=f"🧠 Training NeRF... ({(i+1)*10}% complete)") if result.returncode != 0: return {"status": "error", "message": f"Training failed: {result.stderr[:500]}"} return {"status": "success", "gpu_used": gpu_available} except subprocess.TimeoutExpired: return {"status": "timeout", "message": "Training timeout (>1 hour)"} except Exception as e: return {"status": "error", "message": str(e)} def export_model(self, progress=gr.Progress()): """Export point cloud and model""" progress(0.90, desc="💾 Exporting 3D model...") try: # Find the latest model models_dir = os.path.join(self.output_dir, "models") if not os.path.exists(models_dir): return {"status": "error", "message": "No trained model found"} # Get the most recent model directory model_dirs = [d for d in os.listdir(models_dir) if os.path.isdir(os.path.join(models_dir, d))] if not model_dirs: return {"status": "error", "message": "No model directories found"} latest_model = sorted(model_dirs)[-1] model_path = os.path.join(models_dir, latest_model) config_path = os.path.join(model_path, "config.yml") if not os.path.exists(config_path): return {"status": "error", "message": "Model config not found"} # Export point cloud output_ply = os.path.join(self.output_dir, "point_cloud.ply") result = subprocess.run([ "ns-export", "pointcloud", "--load-config", config_path, "--output-dir", self.output_dir, "--num-points", "1000000", "--remove-outliers", "True", "--use-bounding-box", "True" ], capture_output=True, text=True, timeout=600) if result.returncode != 0: # Try COLMAP export as fallback colmap_sparse = os.path.join(self.colmap_dir, "sparse", "0") if os.path.exists(colmap_sparse): result = subprocess.run([ "colmap", "model_converter", "--input_path", colmap_sparse, "--output_path", output_ply, "--output_type", "PLY" ], capture_output=True, text=True, timeout=300) # Check if PLY was created if os.path.exists(output_ply): file_size = os.path.getsize(output_ply) / (1024 * 1024) # MB return { "status": "success", "ply_path": output_ply, "file_size_mb": round(file_size, 2) } return {"status": "error", "message": "Export failed"} except Exception as e: return {"status": "error", "message": str(e)} def cleanup(self): """Clean up temporary files""" if self.temp_dir and os.path.exists(self.temp_dir): try: shutil.rmtree(self.temp_dir) except: pass def create_preview_grid(frame_paths, max_frames=9): """Create a grid preview of extracted frames""" preview_frames = frame_paths[:min(max_frames, len(frame_paths))] images = [] for fp in preview_frames: img = cv2.imread(fp) if img is not None: img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img = cv2.resize(img, (256, 256)) images.append(img) if not images: return None # Create 3x3 grid rows = [] for i in range(0, len(images), 3): row_imgs = images[i:i+3] # Pad if needed while len(row_imgs) < 3: row_imgs.append(np.zeros((256, 256, 3), dtype=np.uint8)) rows.append(np.hstack(row_imgs)) # Pad rows if needed while len(rows) < 3: rows.append(np.zeros((256, 768, 3), dtype=np.uint8)) grid = np.vstack(rows) return grid def process_video_full_pipeline( video_path, fps, max_frames, train_iterations, progress=gr.Progress() ): """Full NeRF reconstruction pipeline""" if video_path is None: return ( "❌ Please upload a video file first.", None, None, json.dumps({"error": "No video uploaded"}, indent=2) ) reconstructor = NeRFReconstructor() results = {"steps": []} try: # Step 1: Setup progress(0, desc="⚙️ Setting up workspace...") reconstructor.setup_directories() results["workspace"] = reconstructor.temp_dir # Step 2: Extract frames (0-25%) progress(0.05, desc="🎬 Extracting frames...") frame_paths, video_info = reconstructor.extract_frames( video_path, fps, max_frames, progress ) if len(frame_paths) < 3: return ( "❌ Error: Not enough frames extracted (minimum 3 required).", None, None, json.dumps({"error": "Insufficient frames", "frames": len(frame_paths)}, indent=2) ) results["steps"].append({"name": "Frame Extraction", "status": "✅ Complete"}) results["video_info"] = video_info # Create preview preview_grid = create_preview_grid(frame_paths) # Step 3: COLMAP (25-70%) progress(0.25, desc="📐 Running COLMAP reconstruction...") colmap_result = reconstructor.run_colmap(progress) if colmap_result["status"] != "success": return ( f"❌ COLMAP failed: {colmap_result.get('message', 'Unknown error')}", preview_grid, None, json.dumps(results, indent=2) ) results["steps"].append({"name": "COLMAP SfM", "status": "✅ Complete"}) # Step 4: Convert to NeRF format (70-75%) convert_result = reconstructor.convert_colmap_to_nerf(progress) if convert_result["status"] != "success": return ( f"❌ Conversion failed: {convert_result.get('message', 'Unknown error')}", preview_grid, None, json.dumps(results, indent=2) ) results["steps"].append({"name": "Data Conversion", "status": "✅ Complete"}) # Step 5: Train NeRF (75-90%) progress(0.75, desc="🧠 Training NeRF model (this may take several minutes)...") train_result = reconstructor.train_nerf(train_iterations, progress) if train_result["status"] != "success": return ( f"❌ Training failed: {train_result.get('message', 'Unknown error')}", preview_grid, None, json.dumps(results, indent=2) ) results["steps"].append({"name": "NeRF Training", "status": "✅ Complete"}) results["gpu_used"] = train_result.get("gpu_used", False) # Step 6: Export (90-100%) export_result = reconstructor.export_model(progress) if export_result["status"] != "success": return ( f"⚠️ Training complete but export failed: {export_result.get('message', 'Unknown error')}", preview_grid, None, json.dumps(results, indent=2) ) results["steps"].append({"name": "Model Export", "status": "✅ Complete"}) results["output_file"] = export_result.get("ply_path") results["file_size_mb"] = export_result.get("file_size_mb") progress(1.0, desc="✅ Complete!") # Generate success message status_msg = f""" # ✅ 3D Reconstruction Complete! ## 📊 Processing Results **Video Information:** - Frames extracted: {video_info['frames_extracted']} - Video resolution: {video_info['resolution']} - Video duration: {video_info['video_duration']:.2f} seconds - Video FPS: {video_info['video_fps']:.1f} **Processing Pipeline:** {''.join([f"- {step['name']}: {step['status']}" + chr(10) for step in results['steps']])} **Output:** - 3D Point Cloud: {results['file_size_mb']:.2f} MB - Format: PLY (compatible with CloudCompare, MeshLab, Blender) - GPU Acceleration: {'✅ Yes' if results.get('gpu_used') else '❌ No'} ## 📥 Next Steps 1. Download the PLY file below 2. Import into your GIS software or 3D viewer 3. Use for spatial analysis and visualization ## 🔬 Suggested Tools - **CloudCompare**: Point cloud analysis - **MeshLab**: Mesh processing - **Blender**: 3D visualization and animation - **QGIS**: Geographic analysis """ return ( status_msg, preview_grid, export_result.get("ply_path"), json.dumps(results, indent=2) ) except Exception as e: results["error"] = str(e) return ( f"❌ Error during processing: {str(e)}", None, None, json.dumps(results, indent=2) ) finally: # Note: Keep temp files for download, clean up later pass # Create Gradio interface def create_app(): with gr.Blocks(theme=gr.themes.Soft(), title="NeRF 3D Reconstruction") as app: gr.Markdown(""" # 🎥 NeRF 3D Reconstruction from Insta360 Video ### Neural Radiance Fields for Geographic Research Transform your 360° Insta360 videos into detailed 3D models using state-of-the-art NeRF technology. Powered by COLMAP and Nerfstudio with GPU acceleration. """) # Check GPU availability gpu_info = "" if torch.cuda.is_available(): gpu_name = torch.cuda.get_device_name(0) gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9 gpu_info = f"🚀 **GPU Detected:** {gpu_name} ({gpu_memory:.1f} GB)" else: gpu_info = "⚠️ **No GPU detected** - Processing will be slower" gr.Markdown(gpu_info) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📤 Input Configuration") video_input = gr.Video( label="Upload Insta360 Video", height=300 ) with gr.Group(): gr.Markdown("**Frame Extraction Settings**") fps_slider = gr.Slider( minimum=1, maximum=10, value=2, step=1, label="Extraction FPS", info="Frames per second to extract (2-3 recommended)" ) max_frames_slider = gr.Slider( minimum=20, maximum=300, value=100, step=10, label="Maximum Frames", info="More frames = better quality but longer processing" ) with gr.Group(): gr.Markdown("**NeRF Training Settings**") iterations_slider = gr.Slider( minimum=500, maximum=5000, value=2000, step=500, label="Training Iterations", info="More iterations = better quality (2000 recommended)" ) process_btn = gr.Button( "🚀 Start Full Reconstruction", variant="primary", size="lg" ) gr.Markdown(""" ### ⏱️ Expected Processing Time - Frame extraction: 1-2 minutes - COLMAP reconstruction: 5-10 minutes - NeRF training: 10-30 minutes - **Total: ~20-45 minutes** ### 💡 Tips for Best Results - Use well-lit outdoor scenes - Ensure camera movement has overlap - Avoid fast movements - 50-150 frames optimal for most scenes """) with gr.Column(scale=1): gr.Markdown("### 📊 Results") status_output = gr.Markdown( value="Ready to process. Upload a video and click 'Start Full Reconstruction'.", label="Status" ) preview_output = gr.Image( label="Extracted Frames Preview", height=400 ) model_output = gr.File( label="📥 Download 3D Model (PLY)", file_types=[".ply"] ) with gr.Accordion("🔍 Technical Details", open=False): json_output = gr.JSON(label="Processing Log") gr.Markdown(""" --- ## 🎓 Geographic Research Applications ### Landscape Analysis - Terrain modeling and elevation analysis - Geomorphological feature detection - Erosion and landform studies ### Urban Geography - 3D city modeling - Building reconstruction - Urban planning visualization ### Environmental Monitoring - Vegetation structure analysis - Coastal erosion monitoring - Land use change detection ### Cultural Heritage - Archaeological site documentation - Historical structure preservation - Virtual field trips ## 📚 Technical Pipeline 1. **Frame Extraction**: Extract key frames from 360° video 2. **COLMAP SfM**: Structure from Motion for camera poses 3. **NeRF Training**: Neural network learns 3D scene representation 4. **Export**: Generate point cloud in PLY format ## 🔗 Export Compatibility The generated PLY files work with: - CloudCompare (point cloud processing) - MeshLab (mesh editing) - Blender (3D modeling) - QGIS (geographic analysis) - ArcGIS (spatial analysis) --- **Powered by:** Nerfstudio • COLMAP • PyTorch • Gradio """) # Event handler process_btn.click( fn=process_video_full_pipeline, inputs=[ video_input, fps_slider, max_frames_slider, iterations_slider ], outputs=[ status_output, preview_output, model_output, json_output ] ) return app # Launch app if __name__ == "__main__": app = create_app() app.launch( server_name="0.0.0.0", server_port=7860, share=False )