| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | from __future__ import annotations |
| |
|
| |
|
| | import os |
| | os.environ["CUDA_VISIBLE_DEVICES"] = "0" |
| |
|
| | from dataclasses import dataclass |
| | from pathlib import Path |
| | from typing import Any, Dict, List, Optional, Tuple |
| |
|
| | import json |
| | import logging |
| | import math |
| | import os |
| | import shlex |
| | import subprocess |
| |
|
| | import numpy as np |
| | import torch |
| | import torchaudio |
| | import torchaudio.transforms as T |
| | from transformers import WhisperProcessor, WhisperForConditionalGeneration |
| | from pyannote.audio import Pipeline as PyannotePipeline |
| | from speechbrain.inference.speaker import SpeakerRecognition |
| | from pydub import AudioSegment |
| | from sklearn.cluster import KMeans |
| | from sklearn.metrics import silhouette_score |
| | from scenedetect import VideoManager, SceneManager |
| | from scenedetect.detectors import ContentDetector |
| |
|
| | import os, base64, requests, subprocess, contextlib, time |
| |
|
| | from transformers import AutoProcessor, LlavaForConditionalGeneration |
| | from PIL import Image |
| |
|
| | from audio_tools import process_audio_for_video |
| | from llm_router import load_yaml, LLMRouter |
| |
|
| | import cv2 |
| |
|
| | try: |
| | import face_recognition |
| | except Exception: |
| | face_recognition = None |
| |
|
| | |
| | class DFRecognizer: |
| | """Wrapper simple para DeepFace como backend de embeddings.""" |
| | def __init__(self, model_name: str = 'Facenet512'): |
| | self.model_name = model_name |
| | if DeepFace is None: |
| | raise ImportError("DeepFace not available") |
| | |
| | def get_face_embedding_from_path(self, image_path: str) -> Optional[np.ndarray]: |
| | """Extrae embedding de cara usando DeepFace.""" |
| | try: |
| | |
| | embedding = DeepFace.represent( |
| | img_path=image_path, |
| | model_name=self.model_name, |
| | enforce_detection=False, |
| | detector_backend='skip' |
| | ) |
| | |
| | if isinstance(embedding, list) and len(embedding) > 0: |
| | |
| | emb = embedding[0].get('embedding') |
| | if emb: |
| | return np.array(emb, dtype=float) |
| | |
| | return None |
| | |
| | except Exception as e: |
| | log.debug("DeepFace embedding failed for %s: %s", image_path, e) |
| | return None |
| |
|
| | try: |
| | from deepface import DeepFace |
| | except ImportError: |
| | DeepFace = None |
| |
|
| | import easyocr |
| |
|
| | |
| | log = logging.getLogger("audio_tools") |
| | if not log.handlers: |
| | h = logging.StreamHandler() |
| | h.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) |
| | log.addHandler(h) |
| | log.setLevel(logging.INFO) |
| |
|
| | |
| | def load_config(path: str = "configs/config_veureu.yaml") -> Dict[str, Any]: |
| | p = Path(path) |
| | if not p.exists(): |
| | log.warning("Config file not found: %s (using defaults)", path) |
| | return {} |
| | try: |
| | import yaml |
| | cfg = yaml.safe_load(p.read_text(encoding="utf-8")) or {} |
| | cfg["__path__"] = str(p) |
| | return cfg |
| | except Exception as e: |
| | log.error("Failed to read YAML config: %s", e) |
| | return {} |
| |
|
| | |
| | class FaceOfImageEmbedding: |
| | """Preferred backend: `face_recognition`; fallback: DeepFace via libs.face_utils.""" |
| | def __init__(self, deepface_model: str = 'Facenet512'): |
| | self.use_fr = face_recognition is not None |
| | self.df = None |
| | if not self.use_fr and DFRecognizer is not None: |
| | try: |
| | self.df = DFRecognizer(model_name=deepface_model) |
| | log.info("Using DeepFace (%s) as face embedding backend.", deepface_model) |
| | except Exception as e: |
| | log.warning("Failed to initialize DeepFace: %s", e) |
| | elif self.use_fr: |
| | log.info("Using face_recognition as face embedding backend.") |
| | else: |
| | log.error("No face embedding backend available.") |
| |
|
| | def encode_image(self, image_path: Path) -> Optional[List[float]]: |
| | import numpy as np |
| | try: |
| | if self.use_fr: |
| | img = face_recognition.load_image_file(str(image_path)) |
| | encs = face_recognition.face_encodings(img) |
| | if encs: |
| | |
| | embeddings = [(e / np.linalg.norm(e)).astype(float).tolist() for e in encs] |
| | return embeddings |
| | return None |
| |
|
| | if self.df is not None: |
| | emb = self.df.get_face_embedding_from_path(str(image_path)) |
| | if emb is None: |
| | return None |
| | |
| | emb = np.array(emb, dtype=float) |
| | emb = emb / np.linalg.norm(emb) |
| | return emb.tolist() |
| |
|
| | except Exception as e: |
| | log.debug("Fallo embedding cara %s: %s", image_path, e) |
| |
|
| | return None |
| |
|
| | class FaceAnalyzer: |
| | """Wrapper sencillo para DeepFace que obtiene edad y género de una imagen.""" |
| | def __init__(self, actions=None): |
| | if actions is None: |
| | actions = ["age", "gender"] |
| | self.actions = actions |
| | if DeepFace is None: |
| | log.warning("DeepFace not available - FaceAnalyzer will return None") |
| |
|
| | def analyze_image(self, img_path: str) -> Optional[Dict[str, Any]]: |
| | if DeepFace is None: |
| | return None |
| | try: |
| | result = DeepFace.analyze(img_path=img_path, actions=self.actions) |
| |
|
| | |
| | if isinstance(result, list) and len(result) > 0: |
| | result = result[0] |
| |
|
| | |
| | return { |
| | "age": result.get("age", "unknown"), |
| | "gender": result.get("dominant_gender", "unknown") |
| | } |
| |
|
| | except Exception as e: |
| | log.warning("No se pudo analizar la imagen %s: %s", img_path, e) |
| | return None |
| |
|
| | |
| | def map_identities_per_second(frames_per_second, intervals): |
| | for seg in intervals: |
| | seg_start = seg["start"] |
| | seg_end = seg["end"] |
| |
|
| | |
| | identities = [] |
| | for f in frames_per_second: |
| | if seg_start <= f["start"] <= seg_end: |
| | for face in f.get("faces", []): |
| | identities.append(face) |
| |
|
| | |
| | seg["counts"] = dict(Counter(identities)) |
| |
|
| | return intervals |
| |
|
| | def _split_montage(img: np.ndarray, n: int, cfg: Dict[str, Any]) -> List[np.ndarray]: |
| | vd = cfg.get('vision_describer', {}) |
| | montage_cfg = vd.get('montage', {}) |
| | mode = montage_cfg.get('split_mode', 'horizontal') |
| |
|
| | h, w = img.shape[:2] |
| | tiles: List[np.ndarray] = [] |
| |
|
| | if mode == 'vertical': |
| | tile_h = h // n |
| | for i in range(n): |
| | y0 = i * tile_h; y1 = h if i == n-1 else (i+1) * tile_h |
| | tiles.append(img[y0:y1, 0:w]) |
| | return tiles |
| |
|
| | if mode == 'grid': |
| | rows = int(montage_cfg.get('rows', 1) or 1) |
| | cols = int(montage_cfg.get('cols', n) or n) |
| | assert rows * cols >= n, "grid rows*cols must be >= n" |
| | tile_h = h // rows; tile_w = w // cols |
| | k = 0 |
| | for r in range(rows): |
| | for c in range(cols): |
| | if k >= n: break |
| | y0, y1 = r*tile_h, h if (r==rows-1) else (r+1)*tile_h |
| | x0, x1 = c*tile_w, w if (c==cols-1) else (c+1)*tile_w |
| | tiles.append(img[y0:y1, x0:x1]); k += 1 |
| | return tiles |
| |
|
| | tile_w = w // n |
| | for i in range(n): |
| | x0 = i * tile_w; x1 = w if i == n-1 else (i+1) * tile_w |
| | tiles.append(img[0:h, x0:x1]) |
| | return tiles |
| |
|
| | def generar_montage(frame_paths: List[str], output_dir: str) -> None: |
| | output_path = Path(output_dir) |
| | output_path.mkdir(parents=True, exist_ok=True) |
| | montage_path = "" |
| |
|
| | if frame_paths: |
| | imgs = [cv2.imread(kf) for kf in frame_paths if os.path.exists(kf)] |
| | imgs = [img for img in imgs if img is not None] |
| | print(f"Se encontraron {len(imgs)} imágenes para el montaje.") |
| |
|
| | if imgs: |
| | h = max(img.shape[0] for img in imgs) |
| | imgs_resized = [cv2.resize(img, (int(img.shape[1]*h/img.shape[0]), h)) for img in imgs] |
| | montage = cv2.hconcat(imgs_resized) |
| | montage_path = os.path.join(output_dir, "keyframes_montage.jpg") |
| | print(f"Guardando montaje en: {montage_path}") |
| | cv2.imwrite(montage_path, montage) |
| | print("Montaje guardado.") |
| | else: |
| | print("No se encontraron imágenes válidas para el montaje.") |
| |
|
| | return montage_path |
| |
|
| | def describe_montage_sequence( |
| | montage_path: str, |
| | n: int, |
| | informacion, |
| | face_identities, |
| | *, |
| | config_path: str = 'config.yaml' |
| | ) -> Dict[str, Any]: |
| | """Describe each sub-image of a montage using remote Space (svision) via LLMRouter. |
| | |
| | Returns a list of descriptions, one per tile. |
| | """ |
| | |
| | img = cv2.imread(montage_path, cv2.IMREAD_COLOR) |
| | if img is None: |
| | raise RuntimeError(f"No se puede leer la imagen: {montage_path}") |
| |
|
| | |
| | cfg = load_yaml(config_path) |
| | tiles = _split_montage(img, n, cfg) |
| | if len(tiles) < n: |
| | raise RuntimeError(f"Se produjeron {len(tiles)} tiles, se esperaban {n}") |
| |
|
| | |
| | out_dir = Path(montage_path).parent |
| | frame_paths: List[str] = [] |
| | for i, t in enumerate(tiles): |
| | p = out_dir / f"tile_{i:03d}.jpg" |
| | cv2.imwrite(str(p), t) |
| | frame_paths.append(str(p)) |
| |
|
| | |
| | context = { |
| | "informacion": informacion, |
| | "face_identities": sorted(list(face_identities or set())), |
| | } |
| | model_name = (cfg.get("models", {}).get("vision") or "salamandra-vision") |
| | router = LLMRouter(cfg) |
| | descs = router.vision_describe(frame_paths, context=context, model=model_name) |
| | return descs |
| |
|
| | |
| | def keyframe_conditional_extraction_ana( |
| | video_path, |
| | output_dir, |
| | threshold=30.0, |
| | offset_frames=10 |
| | ): |
| | """ |
| | Detecta cambios de escena en un vídeo, guarda un fotograma por cada cambio, |
| | devuelve intervalos con start y end basados en los tiempos de los keyframes |
| | y genera un montaje con todos los keyframes. |
| | """ |
| | if not os.path.exists(output_dir): |
| | os.makedirs(output_dir) |
| |
|
| | video_manager = VideoManager([video_path]) |
| | scene_manager = SceneManager() |
| | scene_manager.add_detector(ContentDetector(threshold=threshold)) |
| |
|
| | video_manager.start() |
| | scene_manager.detect_scenes(video_manager) |
| |
|
| | scene_list = scene_manager.get_scene_list() |
| |
|
| | cap = cv2.VideoCapture(video_path) |
| | fps = cap.get(cv2.CAP_PROP_FPS) |
| | total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) |
| | video_duration = total_frames / fps |
| |
|
| | keyframes = [] |
| | for i, (start_time, end_time) in enumerate(scene_list): |
| | frame_number = int(start_time.get_frames()) + offset_frames |
| | cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) |
| | ret, frame = cap.read() |
| | if ret: |
| | ts = frame_number / fps |
| | frame_path = os.path.join(output_dir, f"scene_{i+1:03d}.jpg") |
| | cv2.imwrite(frame_path, frame) |
| | keyframes.append({ |
| | "index": i+1, |
| | "time": round(ts, 2), |
| | "path": frame_path |
| | }) |
| |
|
| | cap.release() |
| | video_manager.release() |
| |
|
| | |
| | intervals = [] |
| | for i, kf in enumerate(keyframes): |
| | start = kf["time"] |
| | if i < len(keyframes) - 1: |
| | end = keyframes[i+1]["time"] |
| | else: |
| | end = video_duration |
| | intervals.append({ |
| | "index": kf["index"], |
| | "start": start, |
| | "end": round(end, 2), |
| | "path": kf["path"] |
| | }) |
| |
|
| | return intervals |
| |
|
| | def keyframe_every_second( |
| | video_path: str, |
| | output_dir: str = ".", |
| | max_frames: Optional[int] = 10000, |
| | ) -> List[dict]: |
| | """ |
| | Extrae un fotograma por cada segundo del video. |
| | |
| | Returns: |
| | List[dict]: Cada elemento es {"index", "start", "end", "path"} |
| | """ |
| | out_dir = Path(output_dir) |
| | out_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | cap = cv2.VideoCapture(str(video_path)) |
| | fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 |
| | total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| | duration = total_frames / fps |
| |
|
| | frames: List[dict] = [] |
| | idx = 0 |
| | sec = 0.0 |
| |
|
| | while sec <= duration: |
| | frame_number = int(sec * fps) |
| | cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| |
|
| | timestamp = frame_number / fps |
| | frame_path = out_dir / f"frame_per_second{idx:03d}.jpg" |
| | cv2.imwrite(str(frame_path), frame) |
| |
|
| | frames.append({ |
| | "index": idx + 1, |
| | "start": round(timestamp, 2), |
| | "end": None, |
| | "path": str(frame_path), |
| | }) |
| | |
| | idx += 1 |
| | sec += 1.0 |
| | |
| | if max_frames and idx >= max_frames: |
| | break |
| |
|
| | cap.release() |
| |
|
| | |
| | for i in range(len(frames)): |
| | if i < len(frames) - 1: |
| | frames[i]["end"] = frames[i+1]["start"] |
| | else: |
| | frames[i]["end"] = round(duration, 2) |
| |
|
| | return frames |
| |
|
| | from collections import Counter, defaultdict |
| |
|
| | |
| | def process_frames( |
| | frames: List[dict], |
| | config: dict, |
| | face_col=None, |
| | embedding_model=None, |
| | ) -> Tuple[List[dict], List[int]]: |
| | """ |
| | Procesa keyframes: |
| | - Detecta caras |
| | - Genera embeddings con FaceEmbedding |
| | - Opcionalmente compara con face_col (KNN top-3) |
| | - Opcionalmente ejecuta OCR |
| | """ |
| |
|
| | frame_results = [] |
| |
|
| | |
| | if embedding_model is None: |
| | embedding_model = FaceOfImageEmbedding() |
| |
|
| | for idx, frame in enumerate(frames): |
| | frame_path = frame["path"] |
| |
|
| | try: |
| | raw_faces = embedding_model.encode_image(Path(frame_path)) |
| | except Exception as e: |
| | print(f"Error procesando {frame_path}: {e}") |
| | raw_faces = None |
| |
|
| | faces = [] |
| | if raw_faces is not None: |
| | if isinstance(raw_faces[0], list): |
| | for e in raw_faces: |
| | faces.append({"embedding": e}) |
| | else: |
| | faces.append({"embedding": raw_faces}) |
| |
|
| | faces_detected = [] |
| | for f in faces: |
| | embedding = f.get("embedding") |
| | identity = "Unknown" |
| | knn = [] |
| |
|
| | if face_col is not None and embedding is not None: |
| | try: |
| | num_embeddings = face_col.count() |
| | if num_embeddings < 1: |
| | knn = [] |
| | identity = "Unknown" |
| |
|
| | else: |
| | n_results = min(3, num_embeddings) |
| | q = face_col.query( |
| | query_embeddings=[embedding], |
| | n_results=n_results, |
| | include=["metadatas", "distances"] |
| | ) |
| |
|
| | knn = [] |
| | metas = q.get("metadatas", [[]])[0] |
| | dists = q.get("distances", [[]])[0] |
| | for meta, dist in zip(metas, dists): |
| | person_id = meta.get("identity", "Unknown") if isinstance(meta, dict) else "Unknown" |
| | knn.append({"identity": person_id, "distance": float(dist)}) |
| |
|
| | if knn and knn[0]["distance"] < 0.6: |
| | identity = knn[0]["identity"] |
| | else: |
| | identity = "Unknown" |
| |
|
| | except Exception as e: |
| | print(f"Face KNN failed: {e}") |
| | knn = [] |
| | identity = "Unknown" |
| |
|
| | faces_detected.append(identity) |
| |
|
| | use_easyocr = True |
| | if use_easyocr: |
| | try: |
| | reader = easyocr.Reader(['en', 'es'], gpu=True) |
| | results = reader.readtext(frame_path) |
| | ocr_text_easyocr = " ".join([text for _, text, _ in results]).strip() |
| |
|
| | except Exception as e: |
| | print(f"OCR error: {e}") |
| |
|
| | frame_results.append({ |
| | "id": frame["index"], |
| | "start": frame["start"], |
| | "end": frame["end"], |
| | "image_path": frame_path, |
| | "faces": faces_detected, |
| | "ocr": ocr_text_easyocr, |
| | }) |
| |
|
| | return frame_results |
| |
|
| | if __name__ == "__main__": |
| | import argparse |
| | ap = argparse.ArgumentParser(description="Veureu — Audio tools (self-contained)") |
| | ap.add_argument("--video", required=True) |
| | ap.add_argument("--out", default="results") |
| | ap.add_argument("--config", default="configs/config_veureu.yaml") |
| | args = ap.parse_args() |
| |
|
| | |
| | import yaml |
| | cfg = {} |
| | p = Path(args.config) |
| | if p.exists(): |
| | cfg = yaml.safe_load(p.read_text(encoding="utf-8")) or {} |
| |
|
| | out_dir = Path(args.out) / Path(args.video).stem |
| | out_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | segs, srt = process_audio_for_video(args.video, out_dir, cfg, voice_collection=None) |
| | print(json.dumps({ |
| | "segments": len(segs), |
| | "srt": srt |
| | }, indent=2, ensure_ascii=False)) |