engine / video_processing.py
VeuReu's picture
Upload 15 files
b17b915 verified
raw
history blame
5.41 kB
# ==================================
# File: video_processing_refactor.py
# (drop-in replacement for process_video_pipeline in video_processing.py)
# ==================================
from __future__ import annotations
from typing import Any, Dict, List, Optional
from pathlib import Path
import json
import cv2
import yaml
import logging
from chromadb.config import Settings
import chromadb
from audio_tools import process_audio_for_video
from background_descriptor import build_keyframes_and_per_second, describe_keyframes_with_llm
from identity_manager import IdentityManager
log = logging.getLogger("video_processing")
if not log.handlers:
h = logging.StreamHandler(); h.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
log.addHandler(h)
log.setLevel(logging.INFO)
def _ensure_dir(p: Path) -> Path:
p.mkdir(parents=True, exist_ok=True)
return p
def _ensure_chroma(db_dir: str | Path):
_ensure_dir(Path(db_dir))
return chromadb.Client(Settings(
persist_directory=str(db_dir),
chroma_db_impl="duckdb+parquet",
anonymized_telemetry=False,
))
def load_config(path: str) -> Dict[str, Any]:
p = Path(path)
if not p.exists():
return {}
return yaml.safe_load(p.read_text(encoding="utf-8")) or {}
def process_video_pipeline(
video_path: str,
*,
config_path: str = "config_veureu.yaml",
out_root: str = "results",
db_dir: str = "chroma_db",
) -> Dict[str, Any]:
cfg = load_config(config_path)
out_dir = _ensure_dir(Path(out_root) / Path(video_path).stem)
# Metadatos del vídeo
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise RuntimeError(f"Cannot open video: {video_path}")
fps = float(cap.get(cv2.CAP_PROP_FPS)) or 25.0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
duration = (total_frames / fps) if total_frames > 0 else 0.0
cap.release()
# DB Chroma opcional
face_col = voice_col = None
if cfg.get("database", {}).get("enabled", True):
client = _ensure_chroma(cfg.get("database", {}).get("persist_directory", db_dir))
if cfg.get("database", {}).get("enable_face_recognition", True):
try:
face_col = client.get_collection(cfg.get("database", {}).get("face_collection", "index_faces"))
except Exception:
face_col = None
if cfg.get("database", {}).get("enable_voice_recognition", True):
try:
voice_col = client.get_collection(cfg.get("database", {}).get("voice_collection", "index_voices"))
except Exception:
voice_col = None
# 1) Background descriptor (frames, OCR, descripciones)
keyframes, per_second, _ = build_keyframes_and_per_second(video_path, out_dir, cfg, face_collection=face_col)
# Ajustar `end` de cada keyframe con la duración total
for i in range(len(keyframes)):
if i < len(keyframes) - 1:
keyframes[i]["end"] = keyframes[i + 1]["start"]
else:
keyframes[i]["end"] = round(duration, 2)
# 2) Descripción con LLM
face_identities = {f.get("identity") for fr in per_second for f in (fr.get("faces") or []) if f.get("identity")}
keyframes, montage_path = describe_keyframes_with_llm(keyframes, out_dir, face_identities=face_identities, config_path=config_path)
# 3) Audio pipeline
audio_segments, srt_unmodified_path, full_transcription = process_audio_for_video(video_path=str(video_path), out_dir=out_dir, cfg=cfg, voice_collection=voice_col)
# 4) Identity manager: enriquecer frames y clips
im = IdentityManager(face_collection=face_col, voice_collection=voice_col)
per_second = im.assign_faces_to_frames(per_second)
keyframes = im.assign_faces_to_frames(keyframes)
audio_segments = im.assign_voices_to_segments(audio_segments, distance_threshold=cfg.get("voice_processing", {}).get("speaker_identification", {}).get("distance_threshold"))
# 5) Mapear identidades a rangos
keyframes = im.map_identities_over_ranges(per_second, keyframes, key="faces", out_key="persona")
audio_segments = im.map_identities_over_ranges(per_second, audio_segments, key="faces", out_key="persona")
# 6) Export analysis.json
frames_analysis = [{
"frame_number": fr.get("id"),
"start": fr.get("start"),
"end": fr.get("end"),
"ocr": fr.get("ocr", ""),
"persona": fr.get("persona", []),
"description": fr.get("description", ""),
} for fr in keyframes]
analysis = {
"frames": frames_analysis,
"audio_segments": [{k: v for k, v in seg.items() if k != "voice_embedding"} for seg in audio_segments],
"full_transcription": full_transcription,
}
analysis_path = out_dir / f"{Path(video_path).stem}_analysis.json"
analysis_path.write_text(json.dumps(analysis, indent=2, ensure_ascii=False), encoding="utf-8")
return {
"output_dir": str(out_dir),
"files": {
"montage_path": montage_path,
"srt_path": srt_unmodified_path,
"analysis_path": str(analysis_path),
},
"stats": {
"duration_seconds": duration,
"total_frames": total_frames,
"frames_processed": len(keyframes),
"audio_segments_processed": len(audio_segments),
},
}