# ================================== # File: video_processing_refactor.py # (drop-in replacement for process_video_pipeline in video_processing.py) # ================================== from __future__ import annotations from typing import Any, Dict, List, Optional from pathlib import Path import json import cv2 import yaml import logging from chromadb.config import Settings import chromadb from audio_tools import process_audio_for_video from background_descriptor import build_keyframes_and_per_second, describe_keyframes_with_llm from identity_manager import IdentityManager log = logging.getLogger("video_processing") if not log.handlers: h = logging.StreamHandler(); h.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) log.addHandler(h) log.setLevel(logging.INFO) def _ensure_dir(p: Path) -> Path: p.mkdir(parents=True, exist_ok=True) return p def _ensure_chroma(db_dir: str | Path): _ensure_dir(Path(db_dir)) return chromadb.Client(Settings( persist_directory=str(db_dir), chroma_db_impl="duckdb+parquet", anonymized_telemetry=False, )) def load_config(path: str) -> Dict[str, Any]: p = Path(path) if not p.exists(): return {} return yaml.safe_load(p.read_text(encoding="utf-8")) or {} def process_video_pipeline( video_path: str, *, config_path: str = "config_veureu.yaml", out_root: str = "results", db_dir: str = "chroma_db", ) -> Dict[str, Any]: cfg = load_config(config_path) out_dir = _ensure_dir(Path(out_root) / Path(video_path).stem) # Metadatos del vídeo cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise RuntimeError(f"Cannot open video: {video_path}") fps = float(cap.get(cv2.CAP_PROP_FPS)) or 25.0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0 duration = (total_frames / fps) if total_frames > 0 else 0.0 cap.release() # DB Chroma opcional face_col = voice_col = None if cfg.get("database", {}).get("enabled", True): client = _ensure_chroma(cfg.get("database", {}).get("persist_directory", db_dir)) if cfg.get("database", {}).get("enable_face_recognition", True): try: face_col = client.get_collection(cfg.get("database", {}).get("face_collection", "index_faces")) except Exception: face_col = None if cfg.get("database", {}).get("enable_voice_recognition", True): try: voice_col = client.get_collection(cfg.get("database", {}).get("voice_collection", "index_voices")) except Exception: voice_col = None # 1) Background descriptor (frames, OCR, descripciones) keyframes, per_second, _ = build_keyframes_and_per_second(video_path, out_dir, cfg, face_collection=face_col) # Ajustar `end` de cada keyframe con la duración total for i in range(len(keyframes)): if i < len(keyframes) - 1: keyframes[i]["end"] = keyframes[i + 1]["start"] else: keyframes[i]["end"] = round(duration, 2) # 2) Descripción con LLM face_identities = {f.get("identity") for fr in per_second for f in (fr.get("faces") or []) if f.get("identity")} keyframes, montage_path = describe_keyframes_with_llm(keyframes, out_dir, face_identities=face_identities, config_path=config_path) # 3) Audio pipeline audio_segments, srt_unmodified_path, full_transcription = process_audio_for_video(video_path=str(video_path), out_dir=out_dir, cfg=cfg, voice_collection=voice_col) # 4) Identity manager: enriquecer frames y clips im = IdentityManager(face_collection=face_col, voice_collection=voice_col) per_second = im.assign_faces_to_frames(per_second) keyframes = im.assign_faces_to_frames(keyframes) audio_segments = im.assign_voices_to_segments(audio_segments, distance_threshold=cfg.get("voice_processing", {}).get("speaker_identification", {}).get("distance_threshold")) # 5) Mapear identidades a rangos keyframes = im.map_identities_over_ranges(per_second, keyframes, key="faces", out_key="persona") audio_segments = im.map_identities_over_ranges(per_second, audio_segments, key="faces", out_key="persona") # 6) Export analysis.json frames_analysis = [{ "frame_number": fr.get("id"), "start": fr.get("start"), "end": fr.get("end"), "ocr": fr.get("ocr", ""), "persona": fr.get("persona", []), "description": fr.get("description", ""), } for fr in keyframes] analysis = { "frames": frames_analysis, "audio_segments": [{k: v for k, v in seg.items() if k != "voice_embedding"} for seg in audio_segments], "full_transcription": full_transcription, } analysis_path = out_dir / f"{Path(video_path).stem}_analysis.json" analysis_path.write_text(json.dumps(analysis, indent=2, ensure_ascii=False), encoding="utf-8") return { "output_dir": str(out_dir), "files": { "montage_path": montage_path, "srt_path": srt_unmodified_path, "analysis_path": str(analysis_path), }, "stats": { "duration_seconds": duration, "total_frames": total_frames, "frames_processed": len(keyframes), "audio_segments_processed": len(audio_segments), }, }