| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | from __future__ import annotations
|
| |
|
| | import json
|
| | import logging
|
| | import math
|
| | import os
|
| | import shlex
|
| | import subprocess
|
| | from pathlib import Path
|
| | from typing import List, Dict, Any, Tuple, Optional
|
| |
|
| | import numpy as np
|
| |
|
| |
|
| | try:
|
| | import torch
|
| | import torchaudio as ta
|
| | import torchaudio.transforms as T
|
| | HAS_TORCHAUDIO = True
|
| |
|
| | except Exception:
|
| | HAS_TORCHAUDIO = False
|
| | ta = None
|
| |
|
| | import soundfile as sf
|
| |
|
| |
|
| | try:
|
| | from pyannote.audio import Pipeline
|
| | HAS_PYANNOTE = True
|
| | except Exception:
|
| | Pipeline = None
|
| | HAS_PYANNOTE = False
|
| |
|
| |
|
| | from speechbrain.inference.speaker import SpeakerRecognition
|
| |
|
| |
|
| | from sklearn.cluster import KMeans
|
| | from sklearn.metrics import silhouette_score
|
| |
|
| |
|
| | from llm_router import load_yaml, LLMRouter
|
| |
|
| |
|
| | log = logging.getLogger("audio_tools")
|
| | if not log.handlers:
|
| | _h = logging.StreamHandler()
|
| | _h.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
|
| | log.addHandler(_h)
|
| | log.setLevel(logging.INFO)
|
| |
|
| |
|
| |
|
| | def load_wav(path: str | Path, sr: int = 16000):
|
| | """Load audio as mono float32 at the requested sample rate."""
|
| | if HAS_TORCHAUDIO:
|
| | wav, in_sr = ta.load(str(path))
|
| | if in_sr != sr:
|
| | wav = ta.functional.resample(wav, in_sr, sr)
|
| | if wav.dim() > 1:
|
| | wav = wav.mean(dim=0, keepdim=True)
|
| | return wav.squeeze(0).numpy(), sr
|
| | import librosa
|
| | y, in_sr = sf.read(str(path), dtype="float32", always_2d=False)
|
| | if y.ndim > 1:
|
| | y = y.mean(axis=1)
|
| | if in_sr != sr:
|
| | y = librosa.resample(y, orig_sr=in_sr, target_sr=sr)
|
| | return y.astype(np.float32), sr
|
| |
|
| | def save_wav(path: str | Path, y, sr: int = 16000):
|
| | """Save mono float32 wav."""
|
| | if HAS_TORCHAUDIO:
|
| | import torch
|
| | wav = torch.from_numpy(np.asarray(y, dtype=np.float32)).unsqueeze(0)
|
| | ta.save(str(path), wav, sr)
|
| | else:
|
| | sf.write(str(path), np.asarray(y, dtype=np.float32), sr)
|
| |
|
| | def extract_audio_ffmpeg(
|
| | video_path: str,
|
| | audio_out: Path,
|
| | sr: int = 16000,
|
| | mono: bool = True,
|
| | ) -> str:
|
| | """Extract audio from video to WAV using ffmpeg."""
|
| | audio_out.parent.mkdir(parents=True, exist_ok=True)
|
| | cmd = f'ffmpeg -y -i "{video_path}" -vn {"-ac 1" if mono else ""} -ar {sr} -f wav "{audio_out}"'
|
| | subprocess.run(
|
| | shlex.split(cmd),
|
| | check=True,
|
| | stdout=subprocess.DEVNULL,
|
| | stderr=subprocess.DEVNULL,
|
| | )
|
| | return str(audio_out)
|
| |
|
| |
|
| |
|
| | def transcribe_audio_remote(audio_path: str | Path, cfg: Dict[str, Any]) -> Dict[str, Any]:
|
| | """
|
| | Send the audio file to the remote ASR Space `veureu/asr` (Gradio or HTTP).
|
| | The remote model is 'faster-whisper-large-v3-ca-3catparla' (Aina).
|
| | Returns standardized dict: {'text': str, 'segments': list?}
|
| | """
|
| | if not cfg:
|
| | cfg = load_yaml("config.yaml")
|
| | router = LLMRouter(cfg)
|
| | model_name = (cfg.get("models", {}).get("asr") or "whisper-catalan")
|
| | params = {
|
| | "language": "ca",
|
| |
|
| | "timestamps": True,
|
| | "diarization": False,
|
| | }
|
| | try:
|
| | result = router.asr_transcribe(str(audio_path), model=model_name, **params)
|
| | except Exception as e:
|
| | try:
|
| | import httpx
|
| | if isinstance(e, httpx.ReadTimeout):
|
| | log.warning(f"ASR timeout for {audio_path}: {e}")
|
| | return {"text": "", "segments": []}
|
| | except Exception:
|
| | pass
|
| | log.warning(f"ASR error for {audio_path}: {e}")
|
| | return {"text": "", "segments": []}
|
| |
|
| | if isinstance(result, str):
|
| | return {"text": result, "segments": []}
|
| | if isinstance(result, dict):
|
| | if "text" not in result and "transcription" in result:
|
| | result["text"] = result["transcription"]
|
| | result.setdefault("segments", [])
|
| | return result
|
| | return {"text": str(result), "segments": []}
|
| |
|
| |
|
| |
|
| | def diarize_audio_silence_based(
|
| | wav_path: str,
|
| | base_dir: Path,
|
| | clips_folder: str = "clips",
|
| | min_segment_duration: float = 20.0,
|
| | max_segment_duration: float = 50.0,
|
| | silence_thresh: int = -40,
|
| | min_silence_len: int = 500,
|
| | ) -> Tuple[List[str], List[Dict[str, Any]], Dict[str, Any], List[Dict[str, Any]]]:
|
| | """Segmentation based on silence detection (alternative to pyannote).
|
| | Returns (clip_paths, segments, info, connection_logs) in same format as diarize_audio.
|
| | """
|
| | from pydub import AudioSegment
|
| | from pydub.silence import detect_nonsilent
|
| |
|
| | audio = AudioSegment.from_wav(wav_path)
|
| | duration = len(audio) / 1000.0
|
| |
|
| |
|
| | nonsilent_ranges = detect_nonsilent(
|
| | audio,
|
| | min_silence_len=min_silence_len,
|
| | silence_thresh=silence_thresh
|
| | )
|
| |
|
| | clips_dir = (base_dir / clips_folder)
|
| | clips_dir.mkdir(parents=True, exist_ok=True)
|
| | clip_paths: List[str] = []
|
| | segments: List[Dict[str, Any]] = []
|
| |
|
| | for idx, (start_ms, end_ms) in enumerate(nonsilent_ranges):
|
| | start = start_ms / 1000.0
|
| | end = end_ms / 1000.0
|
| | seg_dur = end - start
|
| |
|
| |
|
| | if seg_dur < min_segment_duration:
|
| | continue
|
| |
|
| |
|
| | if seg_dur > max_segment_duration:
|
| | n = int(math.ceil(seg_dur / max_segment_duration))
|
| | sub_d = seg_dur / n
|
| | for j in range(n):
|
| | s = start + j * sub_d
|
| | e = min(end, start + (j + 1) * sub_d)
|
| | if e <= s:
|
| | continue
|
| | clip = audio[int(s * 1000):int(e * 1000)]
|
| | cp = clips_dir / f"segment_{idx:03d}_{j:02d}.wav"
|
| | clip.export(cp, format="wav")
|
| | segments.append({"start": s, "end": e, "speaker": "UNKNOWN"})
|
| | clip_paths.append(str(cp))
|
| | else:
|
| | clip = audio[start_ms:end_ms]
|
| | cp = clips_dir / f"segment_{idx:03d}.wav"
|
| | clip.export(cp, format="wav")
|
| | segments.append({"start": start, "end": end, "speaker": "UNKNOWN"})
|
| | clip_paths.append(str(cp))
|
| |
|
| |
|
| | if not segments:
|
| | cp = clips_dir / "segment_000.wav"
|
| | audio.export(cp, format="wav")
|
| | return (
|
| | [str(cp)],
|
| | [{"start": 0.0, "end": duration, "speaker": "UNKNOWN"}],
|
| | {"diarization_ok": False, "error": "no_segments_after_silence_filter", "token_source": "silence-based"},
|
| | [{"service": "silence-detection", "phase": "done", "message": "Segmentation by silence completed"}]
|
| | )
|
| |
|
| | diar_info = {
|
| | "diarization_ok": True,
|
| | "error": "",
|
| | "token_source": "silence-based",
|
| | "method": "silence-detection",
|
| | "num_segments": len(segments)
|
| | }
|
| | connection_logs = [{
|
| | "service": "silence-detection",
|
| | "phase": "done",
|
| | "message": f"Segmented audio into {len(segments)} clips based on silence"
|
| | }]
|
| |
|
| | return clip_paths, segments, diar_info, connection_logs
|
| |
|
| |
|
| | def diarize_audio(
|
| | wav_path: str,
|
| | base_dir: Path,
|
| | clips_folder: str = "clips",
|
| | min_segment_duration: float = 20.0,
|
| | max_segment_duration: float = 50.0,
|
| | hf_token_env: str | None = None,
|
| | use_silence_fallback: bool = True,
|
| | force_silence_only: bool = False,
|
| | silence_thresh: int = -40,
|
| | min_silence_len: int = 500,
|
| | ) -> Tuple[List[str], List[Dict[str, Any]], Dict[str, Any], List[Dict[str, Any]]]:
|
| | """Diarization with pyannote (or silence-based fallback) and clip export with pydub.
|
| |
|
| | Args:
|
| | force_silence_only: If True, skip pyannote and use silence-based segmentation directly.
|
| | use_silence_fallback: If True and pyannote fails, use silence-based segmentation.
|
| | silence_thresh: dBFS threshold for silence detection (default -40).
|
| | min_silence_len: Minimum silence length in milliseconds (default 500).
|
| |
|
| | Returns (clip_paths, segments, info) where info includes diarization_ok and optional error.
|
| | """
|
| |
|
| | if force_silence_only or not HAS_PYANNOTE:
|
| | if not HAS_PYANNOTE:
|
| | log.info("pyannote not available, using silence-based segmentation")
|
| | else:
|
| | log.info("Using silence-based segmentation (forced)")
|
| | return diarize_audio_silence_based(
|
| | wav_path, base_dir, clips_folder,
|
| | min_segment_duration, max_segment_duration,
|
| | silence_thresh, min_silence_len
|
| | )
|
| |
|
| | from pydub import AudioSegment
|
| | audio = AudioSegment.from_wav(wav_path)
|
| | duration = len(audio) / 1000.0
|
| |
|
| | diarization = None
|
| | connection_logs: List[Dict[str, Any]] = []
|
| | diar_info: Dict[str, Any] = {"diarization_ok": True, "error": "", "token_source": ""}
|
| | try:
|
| |
|
| | _env_token = os.getenv("PYANNOTE_TOKEN")
|
| | _token = hf_token_env or _env_token
|
| | diar_info["token_source"] = "hf_token_env" if hf_token_env else ("PYANNOTE_TOKEN" if _env_token else "none")
|
| | import time as _t
|
| | t0 = _t.time()
|
| | pipeline = Pipeline.from_pretrained(
|
| | "pyannote/speaker-diarization-3.1",
|
| | use_auth_token=_token
|
| | )
|
| | connection_logs.append({"service": "pyannote", "phase": "connect", "message": "Connecting to pyannote server..."})
|
| | diarization = pipeline(wav_path)
|
| | dt = _t.time() - t0
|
| | connection_logs.append({"service": "pyannote", "phase": "done", "message": f"Response from pyannote received in {dt:.2f} s"})
|
| | except Exception as e:
|
| | log.warning(f"Diarization unavailable: {e}")
|
| | diar_info.update({"diarization_ok": False, "error": str(e)})
|
| | connection_logs.append({"service": "pyannote", "phase": "error", "message": f"pyannote error: {str(e)}"})
|
| |
|
| |
|
| | if use_silence_fallback:
|
| | log.info("Attempting silence-based segmentation as fallback...")
|
| | return diarize_audio_silence_based(
|
| | wav_path, base_dir, clips_folder,
|
| | min_segment_duration, max_segment_duration,
|
| | silence_thresh, min_silence_len
|
| | )
|
| |
|
| | clips_dir = (base_dir / clips_folder)
|
| | clips_dir.mkdir(parents=True, exist_ok=True)
|
| | clip_paths: List[str] = []
|
| | segments: List[Dict[str, Any]] = []
|
| | spk_map: Dict[str, int] = {}
|
| | prev_end = 0.0
|
| |
|
| | if diarization is not None:
|
| | for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True)):
|
| | start, end = max(0.0, float(turn.start)), min(duration, float(turn.end))
|
| | if start < prev_end:
|
| | start = prev_end
|
| | if end <= start:
|
| | continue
|
| |
|
| | seg_dur = end - start
|
| | if seg_dur < min_segment_duration:
|
| | continue
|
| |
|
| | if seg_dur > max_segment_duration:
|
| | n = int(math.ceil(seg_dur / max_segment_duration))
|
| | sub_d = seg_dur / n
|
| | for j in range(n):
|
| | s = start + j * sub_d
|
| | e = min(end, start + (j + 1) * sub_d)
|
| | if e <= s:
|
| | continue
|
| | clip = audio[int(s * 1000):int(e * 1000)]
|
| | cp = clips_dir / f"segment_{i:03d}_{j:02d}.wav"
|
| | clip.export(cp, format="wav")
|
| | if speaker not in spk_map:
|
| | spk_map[speaker] = len(spk_map)
|
| | segments.append({"start": s, "end": e, "speaker": f"SPEAKER_{spk_map[speaker]:02d}"})
|
| | clip_paths.append(str(cp))
|
| | prev_end = e
|
| | else:
|
| | clip = audio[int(start * 1000):int(end * 1000)]
|
| | cp = clips_dir / f"segment_{i:03d}.wav"
|
| | clip.export(cp, format="wav")
|
| | if speaker not in spk_map:
|
| | spk_map[speaker] = len(spk_map)
|
| | segments.append({"start": start, "end": end, "speaker": f"SPEAKER_{spk_map[speaker]:02d}"})
|
| | clip_paths.append(str(cp))
|
| | prev_end = end
|
| |
|
| | if not segments:
|
| | cp = clips_dir / "segment_000.wav"
|
| | audio.export(cp, format="wav")
|
| |
|
| | if diar_info.get("error"):
|
| |
|
| | pass
|
| | else:
|
| | diar_info["reason"] = "no_segments_after_filter"
|
| | return [str(cp)], [{"start": 0.0, "end": duration, "speaker": "SPEAKER_00"}], diar_info, connection_logs
|
| |
|
| | pairs = sorted(zip(clip_paths, segments), key=lambda x: x[1]["start"])
|
| | clip_paths, segments = [p[0] for p in pairs], [p[1] for p in pairs]
|
| | return clip_paths, segments, diar_info, connection_logs
|
| |
|
| |
|
| |
|
| | class VoiceEmbedder:
|
| | def __init__(self):
|
| | self.model = SpeakerRecognition.from_hparams(
|
| | source="speechbrain/spkrec-ecapa-voxceleb",
|
| | savedir="pretrained_models/spkrec-ecapa-voxceleb",
|
| | )
|
| | self.model.eval()
|
| |
|
| | def embed(self, wav_path: str) -> List[float]:
|
| |
|
| | try:
|
| | import torch as _torch
|
| | except Exception:
|
| | _torch = None
|
| |
|
| | if HAS_TORCHAUDIO:
|
| | waveform, sr = ta.load(wav_path)
|
| | target_sr = 16000
|
| | if sr != target_sr:
|
| | waveform = T.Resample(orig_freq=sr, new_freq=target_sr)(waveform)
|
| | if waveform.shape[0] > 1:
|
| | waveform = waveform.mean(dim=0, keepdim=True)
|
| | min_samples = int(0.2 * target_sr)
|
| | if waveform.shape[1] < min_samples:
|
| | pad = min_samples - waveform.shape[1]
|
| | if _torch is None:
|
| | raise RuntimeError("Torch not available for padding")
|
| | waveform = _torch.cat([waveform, _torch.zeros((1, pad))], dim=1)
|
| | if _torch is None:
|
| | raise RuntimeError("Torch not available for inference")
|
| | with _torch.no_grad():
|
| | emb = self.model.encode_batch(waveform).squeeze().cpu().numpy().astype(float)
|
| | return emb.tolist()
|
| | else:
|
| | y, sr = load_wav(wav_path, sr=16000)
|
| | min_len = int(0.2 * 16000)
|
| | if len(y) < min_len:
|
| | y = np.pad(y, (0, min_len - len(y)))
|
| | if _torch is None:
|
| | raise RuntimeError("Torch not available for inference")
|
| | w = _torch.from_numpy(y).unsqueeze(0).unsqueeze(0)
|
| | with _torch.no_grad():
|
| | emb = self.model.encode_batch(w).squeeze().cpu().numpy().astype(float)
|
| | return emb.tolist()
|
| |
|
| |
|
| | def embed_voice_segments(clip_paths: List[str]) -> List[List[float]]:
|
| | ve = VoiceEmbedder()
|
| | out: List[List[float]] = []
|
| | for cp in clip_paths:
|
| | try:
|
| | out.append(ve.embed(cp))
|
| | except Exception as e:
|
| | log.warning(f"Embedding error in {cp}: {e}")
|
| | out.append([])
|
| | return out
|
| |
|
| |
|
| |
|
| | def identify_speakers(
|
| | embeddings: List[List[float]],
|
| | voice_collection,
|
| | cfg: Dict[str, Any],
|
| | ) -> List[str]:
|
| | voice_cfg = cfg.get("voice_processing", {}).get("speaker_identification", {})
|
| | if not embeddings or sum(1 for e in embeddings if e) < 2:
|
| | return ["SPEAKER_00" for _ in embeddings]
|
| |
|
| | valid = [e for e in embeddings if e and len(e) > 0]
|
| | if len(valid) < 2:
|
| | return ["SPEAKER_00" for _ in embeddings]
|
| |
|
| | min_clusters = max(1, int(voice_cfg.get("min_speakers", 1)))
|
| | max_clusters = min(int(voice_cfg.get("max_speakers", 5)), len(valid) - 1)
|
| |
|
| | if voice_cfg.get("find_optimal_clusters", True) and len(valid) > 2:
|
| | best_score, best_k = -1.0, min_clusters
|
| | for k in range(min_clusters, max_clusters + 1):
|
| | if k >= len(valid):
|
| | break
|
| | km = KMeans(n_clusters=k, random_state=42, n_init="auto")
|
| | labels = km.fit_predict(valid)
|
| | if len(set(labels)) > 1:
|
| | score = silhouette_score(valid, labels)
|
| | if score > best_score:
|
| | best_score, best_k = score, k
|
| | else:
|
| | best_k = min(max_clusters, max(min_clusters, int(voice_cfg.get("num_speakers", 2))))
|
| | best_k = max(1, min(best_k, len(valid) - 1))
|
| |
|
| | km = KMeans(n_clusters=best_k, random_state=42, n_init="auto", init="k-means++")
|
| | labels = km.fit_predict(np.array(valid))
|
| | centers = km.cluster_centers_
|
| |
|
| | cluster_to_name: Dict[int, str] = {}
|
| | unknown_counter = 0
|
| | for cid in range(best_k):
|
| | center = centers[cid].tolist()
|
| | name = f"SPEAKER_{cid:02d}"
|
| | if voice_collection is not None:
|
| | try:
|
| | q = voice_collection.query(query_embeddings=[center], n_results=1)
|
| | metas = q.get("metadatas", [[]])[0]
|
| | dists = q.get("distances", [[]])[0]
|
| | thr = voice_cfg.get("distance_threshold")
|
| | if dists and thr is not None and dists[0] > thr:
|
| | name = f"UNKNOWN_{unknown_counter}"
|
| | unknown_counter += 1
|
| | voice_collection.add(
|
| | embeddings=[center],
|
| | metadatas=[{"name": name}],
|
| | ids=[f"unk_{cid}_{unknown_counter}"],
|
| | )
|
| | else:
|
| | if metas and isinstance(metas[0], dict):
|
| | name = metas[0].get("nombre") or metas[0].get("name") \
|
| | or metas[0].get("speaker") or metas[0].get("identity") or name
|
| | except Exception as e:
|
| | log.warning(f"Voice KNN query failed: {e}")
|
| | cluster_to_name[cid] = name
|
| |
|
| | personas: List[str] = []
|
| | vi = 0
|
| | for emb in embeddings:
|
| | if not emb:
|
| | personas.append("UNKNOWN")
|
| | else:
|
| | label = int(labels[vi])
|
| | personas.append(cluster_to_name.get(label, f"SPEAKER_{label:02d}"))
|
| | vi += 1
|
| | return personas
|
| |
|
| |
|
| |
|
| | def _fmt_srt_time(seconds: float) -> str:
|
| | h = int(seconds // 3600)
|
| | m = int((seconds % 3600) // 60)
|
| | s = int(seconds % 60)
|
| | ms = int(round((seconds - int(seconds)) * 1000))
|
| | return f"{h:02}:{m:02}:{s:02},{ms:03}"
|
| |
|
| | def generate_srt_from_diarization(
|
| | diarization_segments: List[Dict[str, Any]],
|
| | transcriptions: List[str],
|
| | speakers_per_segment: List[str],
|
| | output_srt_path: str,
|
| | cfg: Dict[str, Any],
|
| | ) -> None:
|
| | subs = cfg.get("subtitles", {})
|
| | max_cpl = int(subs.get("max_chars_per_line", 42))
|
| | max_lines = int(subs.get("max_lines_per_cue", 10))
|
| | speaker_display = subs.get("speaker_display", "brackets")
|
| |
|
| | items: List[Dict[str, Any]] = []
|
| | n = min(len(diarization_segments), len(transcriptions), len(speakers_per_segment))
|
| | for i in range(n):
|
| | seg = diarization_segments[i]
|
| | text = (transcriptions[i] or "").strip()
|
| | spk = speakers_per_segment[i]
|
| | items.append({"start": float(seg.get("start", 0.0)), "end": float(seg.get("end", 0.0)), "text": text, "speaker": spk})
|
| |
|
| | out = Path(output_srt_path)
|
| | out.parent.mkdir(parents=True, exist_ok=True)
|
| | with out.open("w", encoding="utf-8-sig") as f:
|
| | for i, it in enumerate(items, 1):
|
| | text = it["text"]
|
| | spk = it["speaker"]
|
| | if speaker_display == "brackets" and spk:
|
| | text = f"[{spk}]: {text}"
|
| | elif speaker_display == "prefix" and spk:
|
| | text = f"{spk}: {text}"
|
| | words = text.split()
|
| | lines: List[str] = []
|
| | cur = ""
|
| | for w in words:
|
| | if len(cur) + len(w) + (1 if cur else 0) <= max_cpl:
|
| | cur = (cur + " " + w) if cur else w
|
| | else:
|
| | lines.append(cur)
|
| | cur = w
|
| | if len(lines) >= max_lines - 1:
|
| | break
|
| | if cur and len(lines) < max_lines:
|
| | lines.append(cur)
|
| | f.write(f"{i}\n{_fmt_srt_time(it['start'])} --> {_fmt_srt_time(it['end'])}\n")
|
| | f.write("\n".join(lines) + "\n\n")
|
| |
|
| |
|
| |
|
| | def process_audio_for_video(
|
| | video_path: str,
|
| | out_dir: Path,
|
| | cfg: Dict[str, Any],
|
| | voice_collection=None,
|
| | ) -> Tuple[List[Dict[str, Any]], Optional[str], str, Dict[str, Any], List[Dict[str, Any]]]:
|
| | """
|
| | Audio pipeline: FFmpeg -> diarization -> remote ASR (full + clips) -> embeddings -> speaker-ID -> SRT.
|
| | Returns (audio_segments, srt_path or None, full_transcription_text).
|
| | """
|
| | audio_cfg = cfg.get("audio_processing", {})
|
| | sr = int(audio_cfg.get("sample_rate", 16000))
|
| | fmt = audio_cfg.get("format", "wav")
|
| | wav_path = extract_audio_ffmpeg(video_path, out_dir / f"{Path(video_path).stem}.{fmt}", sr=sr)
|
| | log.info("Audio extraído")
|
| |
|
| | diar_cfg = audio_cfg.get("diarization", {})
|
| | min_dur = float(diar_cfg.get("min_segment_duration", 0.5))
|
| | max_dur = float(diar_cfg.get("max_segment_duration", 10.0))
|
| | force_silence = bool(diar_cfg.get("force_silence_only", True))
|
| | silence_thresh = int(diar_cfg.get("silence_thresh", -40))
|
| | min_silence_len = int(diar_cfg.get("min_silence_len", 500))
|
| |
|
| | clip_paths, diar_segs, diar_info, connection_logs = diarize_audio(
|
| | wav_path, out_dir, "clips", min_dur, max_dur,
|
| | force_silence_only=force_silence,
|
| | silence_thresh=silence_thresh,
|
| | min_silence_len=min_silence_len
|
| | )
|
| | log.info("Clips de audio generados.")
|
| |
|
| | full_transcription = ""
|
| | asr_section = cfg.get("asr", {})
|
| | if asr_section.get("enable_full_transcription", True):
|
| | log.info("Transcripción completa (remota, Space 'asr')...")
|
| | import time as _t
|
| | t0 = _t.time()
|
| | connection_logs.append({"service": "asr", "phase": "connect", "message": "Connecting to ASR space..."})
|
| | full_res = transcribe_audio_remote(wav_path, cfg)
|
| | dt = _t.time() - t0
|
| | connection_logs.append({"service": "asr", "phase": "done", "message": f"Response from ASR space received in {dt:.2f} s"})
|
| | full_transcription = full_res.get("text", "") or ""
|
| | log.info("Transcripción completa finalizada.")
|
| |
|
| | log.info("Transcripción por clip (remota, Space 'asr')...")
|
| | trans: List[str] = []
|
| | for cp in clip_paths:
|
| | import time as _t
|
| | t0 = _t.time()
|
| | connection_logs.append({"service": "asr", "phase": "connect", "message": "Transcribing clip via ASR space..."})
|
| | res = transcribe_audio_remote(cp, cfg)
|
| | dt = _t.time() - t0
|
| | connection_logs.append({"service": "asr", "phase": "done", "message": f"Clip transcribed in {dt:.2f} s"})
|
| | trans.append(res.get("text", ""))
|
| |
|
| | log.info("Se han transcrito todos los clips.")
|
| |
|
| | embeddings = embed_voice_segments(clip_paths) if audio_cfg.get("enable_voice_embeddings", True) else [[] for _ in clip_paths]
|
| |
|
| | if cfg.get("voice_processing", {}).get("speaker_identification", {}).get("enabled", True):
|
| | speakers = identify_speakers(embeddings, voice_collection, cfg)
|
| | log.info("Speakers identificados correctamente.")
|
| | else:
|
| | speakers = [seg.get("speaker", f"SPEAKER_{i:02d}") for i, seg in enumerate(diar_segs)]
|
| |
|
| | audio_segments: List[Dict[str, Any]] = []
|
| | for i, seg in enumerate(diar_segs):
|
| | audio_segments.append(
|
| | {
|
| | "segment": i,
|
| | "start": float(seg.get("start", 0.0)),
|
| | "end": float(seg.get("end", 0.0)),
|
| | "speaker": speakers[i] if i < len(speakers) else seg.get("speaker", f"SPEAKER_{i:02d}"),
|
| | "text": trans[i] if i < len(trans) else "",
|
| | "voice_embedding": embeddings[i],
|
| | "clip_path": clip_paths[i] if i < len(clip_paths) else str(out_dir / "clips" / f"segment_{i:03d}.wav"),
|
| | "lang": "ca",
|
| | "lang_prob": 1.0,
|
| | }
|
| | )
|
| |
|
| | srt_base_path = out_dir / f"transcripcion_diarizada_{Path(video_path).stem}"
|
| | srt_unmodified_path = str(srt_base_path) + "_unmodified.srt"
|
| |
|
| | try:
|
| | generate_srt_from_diarization(
|
| | diar_segs,
|
| | [a["text"] for a in audio_segments],
|
| | [a["speaker"] for a in audio_segments],
|
| | srt_unmodified_path,
|
| | cfg,
|
| | )
|
| | except Exception as e:
|
| | log.warning(f"SRT generation failed: {e}")
|
| | srt_unmodified_path = None
|
| |
|
| | return audio_segments, srt_unmodified_path, full_transcription, diar_info, connection_logs
|
| |
|