|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
from typing import Any, Dict, List, Optional |
|
|
from pathlib import Path |
|
|
import json |
|
|
import cv2 |
|
|
import yaml |
|
|
import logging |
|
|
|
|
|
from chromadb.config import Settings |
|
|
import chromadb |
|
|
|
|
|
from audio_tools import process_audio_for_video |
|
|
from background_descriptor import build_keyframes_and_per_second, describe_keyframes_with_llm |
|
|
from identity_manager import IdentityManager |
|
|
|
|
|
|
|
|
log = logging.getLogger("video_processing") |
|
|
if not log.handlers: |
|
|
h = logging.StreamHandler(); h.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) |
|
|
log.addHandler(h) |
|
|
log.setLevel(logging.INFO) |
|
|
|
|
|
|
|
|
def _ensure_dir(p: Path) -> Path: |
|
|
p.mkdir(parents=True, exist_ok=True) |
|
|
return p |
|
|
|
|
|
|
|
|
def _ensure_chroma(db_dir: str | Path): |
|
|
_ensure_dir(Path(db_dir)) |
|
|
return chromadb.Client(Settings( |
|
|
persist_directory=str(db_dir), |
|
|
chroma_db_impl="duckdb+parquet", |
|
|
anonymized_telemetry=False, |
|
|
)) |
|
|
|
|
|
|
|
|
def load_config(path: str) -> Dict[str, Any]: |
|
|
p = Path(path) |
|
|
if not p.exists(): |
|
|
return {} |
|
|
return yaml.safe_load(p.read_text(encoding="utf-8")) or {} |
|
|
|
|
|
|
|
|
def process_video_pipeline( |
|
|
video_path: str, |
|
|
*, |
|
|
config_path: str = "config_veureu.yaml", |
|
|
out_root: str = "results", |
|
|
db_dir: str = "chroma_db", |
|
|
) -> Dict[str, Any]: |
|
|
cfg = load_config(config_path) |
|
|
out_dir = _ensure_dir(Path(out_root) / Path(video_path).stem) |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(str(video_path)) |
|
|
if not cap.isOpened(): |
|
|
raise RuntimeError(f"Cannot open video: {video_path}") |
|
|
fps = float(cap.get(cv2.CAP_PROP_FPS)) or 25.0 |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0 |
|
|
duration = (total_frames / fps) if total_frames > 0 else 0.0 |
|
|
cap.release() |
|
|
|
|
|
|
|
|
face_col = voice_col = None |
|
|
if cfg.get("database", {}).get("enabled", True): |
|
|
client = _ensure_chroma(cfg.get("database", {}).get("persist_directory", db_dir)) |
|
|
if cfg.get("database", {}).get("enable_face_recognition", True): |
|
|
try: |
|
|
face_col = client.get_collection(cfg.get("database", {}).get("face_collection", "index_faces")) |
|
|
except Exception: |
|
|
face_col = None |
|
|
if cfg.get("database", {}).get("enable_voice_recognition", True): |
|
|
try: |
|
|
voice_col = client.get_collection(cfg.get("database", {}).get("voice_collection", "index_voices")) |
|
|
except Exception: |
|
|
voice_col = None |
|
|
|
|
|
|
|
|
keyframes, per_second, _ = build_keyframes_and_per_second(video_path, out_dir, cfg, face_collection=face_col) |
|
|
|
|
|
|
|
|
for i in range(len(keyframes)): |
|
|
if i < len(keyframes) - 1: |
|
|
keyframes[i]["end"] = keyframes[i + 1]["start"] |
|
|
else: |
|
|
keyframes[i]["end"] = round(duration, 2) |
|
|
|
|
|
|
|
|
face_identities = {f.get("identity") for fr in per_second for f in (fr.get("faces") or []) if f.get("identity")} |
|
|
keyframes, montage_path = describe_keyframes_with_llm(keyframes, out_dir, face_identities=face_identities, config_path=config_path) |
|
|
|
|
|
|
|
|
audio_segments, srt_unmodified_path, full_transcription = process_audio_for_video(video_path=str(video_path), out_dir=out_dir, cfg=cfg, voice_collection=voice_col) |
|
|
|
|
|
|
|
|
im = IdentityManager(face_collection=face_col, voice_collection=voice_col) |
|
|
per_second = im.assign_faces_to_frames(per_second) |
|
|
keyframes = im.assign_faces_to_frames(keyframes) |
|
|
audio_segments = im.assign_voices_to_segments(audio_segments, distance_threshold=cfg.get("voice_processing", {}).get("speaker_identification", {}).get("distance_threshold")) |
|
|
|
|
|
|
|
|
keyframes = im.map_identities_over_ranges(per_second, keyframes, key="faces", out_key="persona") |
|
|
audio_segments = im.map_identities_over_ranges(per_second, audio_segments, key="faces", out_key="persona") |
|
|
|
|
|
|
|
|
frames_analysis = [{ |
|
|
"frame_number": fr.get("id"), |
|
|
"start": fr.get("start"), |
|
|
"end": fr.get("end"), |
|
|
"ocr": fr.get("ocr", ""), |
|
|
"persona": fr.get("persona", []), |
|
|
"description": fr.get("description", ""), |
|
|
} for fr in keyframes] |
|
|
|
|
|
analysis = { |
|
|
"frames": frames_analysis, |
|
|
"audio_segments": [{k: v for k, v in seg.items() if k != "voice_embedding"} for seg in audio_segments], |
|
|
"full_transcription": full_transcription, |
|
|
} |
|
|
analysis_path = out_dir / f"{Path(video_path).stem}_analysis.json" |
|
|
analysis_path.write_text(json.dumps(analysis, indent=2, ensure_ascii=False), encoding="utf-8") |
|
|
|
|
|
return { |
|
|
"output_dir": str(out_dir), |
|
|
"files": { |
|
|
"montage_path": montage_path, |
|
|
"srt_path": srt_unmodified_path, |
|
|
"analysis_path": str(analysis_path), |
|
|
}, |
|
|
"stats": { |
|
|
"duration_seconds": duration, |
|
|
"total_frames": total_frames, |
|
|
"frames_processed": len(keyframes), |
|
|
"audio_segments_processed": len(audio_segments), |
|
|
}, |
|
|
} |
|
|
|