|
|
|
|
|
import os
|
|
|
import requests
|
|
|
import base64
|
|
|
import zipfile
|
|
|
import io
|
|
|
from typing import Iterable, Dict, Any
|
|
|
|
|
|
class APIClient:
|
|
|
"""
|
|
|
Cliente para 'engine':
|
|
|
POST /jobs -> {"job_id": "..."}
|
|
|
GET /jobs/{job_id}/status -> {"status": "queued|processing|done|failed", ...}
|
|
|
GET /jobs/{job_id}/result -> JobResult {"book": {...}, "une": {...}, ...}
|
|
|
"""
|
|
|
def __init__(self, base_url: str, use_mock: bool = False, data_dir: str | None = None, token: str | None = None, timeout: int = 180):
|
|
|
self.base_url = base_url.rstrip("/")
|
|
|
|
|
|
self.tts_url = self.base_url
|
|
|
self.use_mock = use_mock
|
|
|
self.data_dir = data_dir
|
|
|
self.timeout = timeout
|
|
|
self.session = requests.Session()
|
|
|
|
|
|
token = token or os.getenv("API_SHARED_TOKEN")
|
|
|
if token:
|
|
|
self.session.headers.update({"Authorization": f"Bearer {token}"})
|
|
|
|
|
|
|
|
|
def _post_jobs(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]:
|
|
|
url = f"{self.base_url}/jobs"
|
|
|
files = {"file": (os.path.basename(video_path), open(video_path, "rb"), "application/octet-stream")}
|
|
|
data = {"modes": ",".join(modes)}
|
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout)
|
|
|
r.raise_for_status()
|
|
|
return r.json()
|
|
|
|
|
|
def _get_status(self, job_id: str) -> Dict[str, Any]:
|
|
|
url = f"{self.base_url}/jobs/{job_id}/status"
|
|
|
r = self.session.get(url, timeout=self.timeout)
|
|
|
r.raise_for_status()
|
|
|
return r.json()
|
|
|
|
|
|
def _get_result(self, job_id: str) -> Dict[str, Any]:
|
|
|
url = f"{self.base_url}/jobs/{job_id}/result"
|
|
|
r = self.session.get(url, timeout=self.timeout)
|
|
|
r.raise_for_status()
|
|
|
return r.json()
|
|
|
|
|
|
|
|
|
def process_video(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]:
|
|
|
"""Devuelve {"job_id": "..."}"""
|
|
|
if self.use_mock:
|
|
|
return {"job_id": "mock-123"}
|
|
|
return self._post_jobs(video_path, modes)
|
|
|
|
|
|
def get_job(self, job_id: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
La UI espera algo del estilo:
|
|
|
{"status":"done","results":{"book":{...},"une":{...}}}
|
|
|
Adaptamos la respuesta de /result del engine a ese contrato.
|
|
|
"""
|
|
|
if self.use_mock:
|
|
|
|
|
|
return {
|
|
|
"status": "done",
|
|
|
"results": {
|
|
|
"book": {"text": "Text d'exemple (book)", "mp3_bytes": b""},
|
|
|
"une": {"srt": "1\n00:00:00,000 --> 00:00:01,000\nExemple UNE\n", "mp3_bytes": b""},
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
st = self._get_status(job_id)
|
|
|
if st.get("status") in {"queued", "processing"}:
|
|
|
return {"status": st.get("status", "queued")}
|
|
|
|
|
|
res = self._get_result(job_id)
|
|
|
|
|
|
|
|
|
results = {}
|
|
|
if "book" in res:
|
|
|
results["book"] = {
|
|
|
"text": res["book"].get("text"),
|
|
|
|
|
|
|
|
|
}
|
|
|
if "une" in res:
|
|
|
results["une"] = {
|
|
|
"srt": res["une"].get("srt"),
|
|
|
}
|
|
|
|
|
|
for k in ("book", "une"):
|
|
|
if k in res:
|
|
|
if "characters" in res[k]:
|
|
|
results[k]["characters"] = res[k]["characters"]
|
|
|
if "metrics" in res[k]:
|
|
|
results[k]["metrics"] = res[k]["metrics"]
|
|
|
|
|
|
status = "done" if results else st.get("status", "unknown")
|
|
|
return {"status": status, "results": results}
|
|
|
|
|
|
|
|
|
def tts_matxa(self, text: str, voice: str = "central/grau") -> dict:
|
|
|
"""
|
|
|
Llama al space 'tts' para sintetizar audio.
|
|
|
|
|
|
Args:
|
|
|
text (str): Texto a sintetizar.
|
|
|
voice (str): Voz de Matxa a usar (p.ej. 'central/alvocat').
|
|
|
|
|
|
Returns:
|
|
|
dict: {'mp3_data_url': 'data:audio/mpeg;base64,...'}
|
|
|
"""
|
|
|
if not self.tts_url:
|
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
|
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/text"
|
|
|
data = {
|
|
|
"texto": text,
|
|
|
"voice": voice,
|
|
|
"formato": "mp3"
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
r = requests.post(url, data=data, timeout=self.timeout)
|
|
|
r.raise_for_status()
|
|
|
|
|
|
|
|
|
return {"mp3_bytes": r.content}
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
print(f"Error cridant a TTS: {e}")
|
|
|
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
|
|
|
def rebuild_video_with_ad(self, video_path: str, srt_path: str) -> dict:
|
|
|
"""
|
|
|
Llama al space 'tts' para reconstruir un vídeo con audiodescripció a partir de un SRT.
|
|
|
El servidor devuelve un ZIP, y de ahí extraemos el MP4 final.
|
|
|
"""
|
|
|
if not self.tts_url:
|
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
|
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/srt"
|
|
|
|
|
|
try:
|
|
|
files = {
|
|
|
'video': (os.path.basename(video_path), open(video_path, 'rb'), 'video/mp4'),
|
|
|
'srt': (os.path.basename(srt_path), open(srt_path, 'rb'), 'application/x-subrip')
|
|
|
}
|
|
|
data = {"include_final_mp4": 1}
|
|
|
|
|
|
r = requests.post(url, files=files, data=data, timeout=self.timeout * 5)
|
|
|
r.raise_for_status()
|
|
|
|
|
|
|
|
|
with zipfile.ZipFile(io.BytesIO(r.content)) as z:
|
|
|
|
|
|
for filename in z.namelist():
|
|
|
if filename.endswith('.mp4'):
|
|
|
video_bytes = z.read(filename)
|
|
|
return {"video_bytes": video_bytes}
|
|
|
|
|
|
|
|
|
return {"error": "No se encontró el archivo de vídeo MP4 en la respuesta del servidor."}
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
print(f"Error cridant a la reconstrucció de vídeo: {e}")
|
|
|
return {"error": str(e)}
|
|
|
except zipfile.BadZipFile:
|
|
|
return {"error": "La respuesta del servidor no fue un archivo ZIP válido."}
|
|
|
|
|
|
|
|
|
def create_initial_casting(self, video_path: str = None, video_bytes: bytes = None, video_name: str = None, epsilon: float = 0.5, min_cluster_size: int = 2) -> dict:
|
|
|
"""
|
|
|
Llama al endpoint del space 'engine' para crear el 'initial casting'.
|
|
|
|
|
|
Envía el vídeo recién importado como archivo y los parámetros de clustering.
|
|
|
|
|
|
Args:
|
|
|
video_path: Path to video file (if reading from disk)
|
|
|
video_bytes: Video file bytes (if already in memory)
|
|
|
video_name: Name for the video file
|
|
|
epsilon: Clustering epsilon parameter
|
|
|
min_cluster_size: Minimum cluster size parameter
|
|
|
"""
|
|
|
url = f"{self.base_url}/create_initial_casting"
|
|
|
try:
|
|
|
|
|
|
if video_bytes:
|
|
|
filename = video_name or "video.mp4"
|
|
|
files = {
|
|
|
"video": (filename, video_bytes, "video/mp4"),
|
|
|
}
|
|
|
elif video_path:
|
|
|
with open(video_path, "rb") as f:
|
|
|
files = {
|
|
|
"video": (os.path.basename(video_path), f.read(), "video/mp4"),
|
|
|
}
|
|
|
else:
|
|
|
return {"error": "Either video_path or video_bytes must be provided"}
|
|
|
|
|
|
data = {
|
|
|
"epsilon": str(epsilon),
|
|
|
"min_cluster_size": str(min_cluster_size),
|
|
|
}
|
|
|
r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5)
|
|
|
r.raise_for_status()
|
|
|
return r.json() if r.headers.get("content-type", "").startswith("application/json") else {"ok": True}
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
return {"error": str(e)}
|
|
|
except Exception as e:
|
|
|
return {"error": f"Unexpected error: {str(e)}"}
|
|
|
|
|
|
def generate_audio_from_text_file(self, text_content: str, voice: str = "central/grau") -> dict:
|
|
|
"""
|
|
|
Genera un único MP3 a partir de un texto largo, usando el endpoint de SRT.
|
|
|
1. Convierte el texto en un SRT falso.
|
|
|
2. Llama a /tts/srt con el SRT.
|
|
|
3. Extrae el 'ad_master.mp3' del ZIP resultante.
|
|
|
"""
|
|
|
if not self.tts_url:
|
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
|
|
|
|
|
|
|
|
|
srt_content = ""
|
|
|
start_time = 0
|
|
|
for i, line in enumerate(text_content.strip().split('\n')):
|
|
|
line = line.strip()
|
|
|
if not line:
|
|
|
continue
|
|
|
|
|
|
end_time = start_time + 5
|
|
|
|
|
|
def format_time(seconds):
|
|
|
h = int(seconds / 3600)
|
|
|
m = int((seconds % 3600) / 60)
|
|
|
s = int(seconds % 60)
|
|
|
ms = int((seconds - int(seconds)) * 1000)
|
|
|
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
|
|
|
|
|
|
srt_content += f"{i+1}\n"
|
|
|
srt_content += f"{format_time(start_time)} --> {format_time(end_time)}\n"
|
|
|
srt_content += f"{line}\n\n"
|
|
|
start_time = end_time
|
|
|
|
|
|
if not srt_content:
|
|
|
return {"error": "El texto proporcionado estaba vacío o no se pudo procesar."}
|
|
|
|
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/srt"
|
|
|
try:
|
|
|
files = {
|
|
|
'srt': ('fake_ad.srt', srt_content, 'application/x-subrip')
|
|
|
}
|
|
|
data = {"voice": voice, "ad_format": "mp3"}
|
|
|
|
|
|
r = requests.post(url, files=files, data=data, timeout=self.timeout * 5)
|
|
|
r.raise_for_status()
|
|
|
|
|
|
|
|
|
with zipfile.ZipFile(io.BytesIO(r.content)) as z:
|
|
|
for filename in z.namelist():
|
|
|
if filename == 'ad_master.mp3':
|
|
|
mp3_bytes = z.read(filename)
|
|
|
return {"mp3_bytes": mp3_bytes}
|
|
|
|
|
|
return {"error": "No se encontró 'ad_master.mp3' en la respuesta del servidor."}
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
return {"error": f"Error llamando a la API de SRT: {e}"}
|
|
|
except zipfile.BadZipFile:
|
|
|
return {"error": "La respuesta del servidor no fue un archivo ZIP válido."}
|
|
|
|
|
|
|
|
|
def tts_long_text(self, text: str, voice: str = "central/grau") -> dict:
|
|
|
"""
|
|
|
Llama al endpoint '/tts/text_long' para sintetizar un texto largo.
|
|
|
La API se encarga de todo el procesamiento.
|
|
|
"""
|
|
|
if not self.tts_url:
|
|
|
raise ValueError("La URL del servei TTS no està configurada (API_TTS_URL)")
|
|
|
|
|
|
url = f"{self.tts_url.rstrip('/')}/tts/text_long"
|
|
|
data = {
|
|
|
"texto": text,
|
|
|
"voice": voice,
|
|
|
"formato": "mp3"
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
|
|
|
r = requests.post(url, data=data, timeout=self.timeout * 10)
|
|
|
r.raise_for_status()
|
|
|
return {"mp3_bytes": r.content}
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
print(f"Error cridant a TTS per a text llarg: {e}")
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
|
|
|
|