File size: 5,410 Bytes
b17b915 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# ==================================
# File: video_processing_refactor.py
# (drop-in replacement for process_video_pipeline in video_processing.py)
# ==================================
from __future__ import annotations
from typing import Any, Dict, List, Optional
from pathlib import Path
import json
import cv2
import yaml
import logging
from chromadb.config import Settings
import chromadb
from audio_tools import process_audio_for_video
from background_descriptor import build_keyframes_and_per_second, describe_keyframes_with_llm
from identity_manager import IdentityManager
log = logging.getLogger("video_processing")
if not log.handlers:
h = logging.StreamHandler(); h.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
log.addHandler(h)
log.setLevel(logging.INFO)
def _ensure_dir(p: Path) -> Path:
p.mkdir(parents=True, exist_ok=True)
return p
def _ensure_chroma(db_dir: str | Path):
_ensure_dir(Path(db_dir))
return chromadb.Client(Settings(
persist_directory=str(db_dir),
chroma_db_impl="duckdb+parquet",
anonymized_telemetry=False,
))
def load_config(path: str) -> Dict[str, Any]:
p = Path(path)
if not p.exists():
return {}
return yaml.safe_load(p.read_text(encoding="utf-8")) or {}
def process_video_pipeline(
video_path: str,
*,
config_path: str = "config_veureu.yaml",
out_root: str = "results",
db_dir: str = "chroma_db",
) -> Dict[str, Any]:
cfg = load_config(config_path)
out_dir = _ensure_dir(Path(out_root) / Path(video_path).stem)
# Metadatos del vídeo
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise RuntimeError(f"Cannot open video: {video_path}")
fps = float(cap.get(cv2.CAP_PROP_FPS)) or 25.0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
duration = (total_frames / fps) if total_frames > 0 else 0.0
cap.release()
# DB Chroma opcional
face_col = voice_col = None
if cfg.get("database", {}).get("enabled", True):
client = _ensure_chroma(cfg.get("database", {}).get("persist_directory", db_dir))
if cfg.get("database", {}).get("enable_face_recognition", True):
try:
face_col = client.get_collection(cfg.get("database", {}).get("face_collection", "index_faces"))
except Exception:
face_col = None
if cfg.get("database", {}).get("enable_voice_recognition", True):
try:
voice_col = client.get_collection(cfg.get("database", {}).get("voice_collection", "index_voices"))
except Exception:
voice_col = None
# 1) Background descriptor (frames, OCR, descripciones)
keyframes, per_second, _ = build_keyframes_and_per_second(video_path, out_dir, cfg, face_collection=face_col)
# Ajustar `end` de cada keyframe con la duración total
for i in range(len(keyframes)):
if i < len(keyframes) - 1:
keyframes[i]["end"] = keyframes[i + 1]["start"]
else:
keyframes[i]["end"] = round(duration, 2)
# 2) Descripción con LLM
face_identities = {f.get("identity") for fr in per_second for f in (fr.get("faces") or []) if f.get("identity")}
keyframes, montage_path = describe_keyframes_with_llm(keyframes, out_dir, face_identities=face_identities, config_path=config_path)
# 3) Audio pipeline
audio_segments, srt_unmodified_path, full_transcription = process_audio_for_video(video_path=str(video_path), out_dir=out_dir, cfg=cfg, voice_collection=voice_col)
# 4) Identity manager: enriquecer frames y clips
im = IdentityManager(face_collection=face_col, voice_collection=voice_col)
per_second = im.assign_faces_to_frames(per_second)
keyframes = im.assign_faces_to_frames(keyframes)
audio_segments = im.assign_voices_to_segments(audio_segments, distance_threshold=cfg.get("voice_processing", {}).get("speaker_identification", {}).get("distance_threshold"))
# 5) Mapear identidades a rangos
keyframes = im.map_identities_over_ranges(per_second, keyframes, key="faces", out_key="persona")
audio_segments = im.map_identities_over_ranges(per_second, audio_segments, key="faces", out_key="persona")
# 6) Export analysis.json
frames_analysis = [{
"frame_number": fr.get("id"),
"start": fr.get("start"),
"end": fr.get("end"),
"ocr": fr.get("ocr", ""),
"persona": fr.get("persona", []),
"description": fr.get("description", ""),
} for fr in keyframes]
analysis = {
"frames": frames_analysis,
"audio_segments": [{k: v for k, v in seg.items() if k != "voice_embedding"} for seg in audio_segments],
"full_transcription": full_transcription,
}
analysis_path = out_dir / f"{Path(video_path).stem}_analysis.json"
analysis_path.write_text(json.dumps(analysis, indent=2, ensure_ascii=False), encoding="utf-8")
return {
"output_dir": str(out_dir),
"files": {
"montage_path": montage_path,
"srt_path": srt_unmodified_path,
"analysis_path": str(analysis_path),
},
"stats": {
"duration_seconds": duration,
"total_frames": total_frames,
"frames_processed": len(keyframes),
"audio_segments_processed": len(audio_segments),
},
}
|