# Architecture: Streaming Speech Translation Pipeline ## Overview End-to-end streaming speech translation pipeline: ``` Audio Input → ASR → NMT → TTS → Audio Output (PCM16) (ONNX) (GGUF) (ONNX) (PCM16) ``` Translates spoken English into spoken Russian in real-time with streaming output. ## Pipeline Stages ### 1. ASR — Cache-Aware Streaming ASR (NeMo Conformer RNN-T) **Model**: NVIDIA NeMo Conformer RNN-T, exported to ONNX. **Architecture**: 3 internal threads connected by `queue.Queue`: ``` Audio → [Preprocess Thread] → [Encoder Thread] → [Decoder Thread] → text deltas ``` - **Preprocess**: Buffers raw audio, extracts mel-spectrogram features, chunks them per CacheAwareStreamingConfig (chunk_size=[49,56], shift_size=[49,56]) - **Encoder**: Runs ONNX encoder inference on feature chunks, maintains encoder cache state - **Decoder**: Runs RNN-T decoder (joint network), produces incremental text tokens **Key classes**: `CacheAwareStreamingAudioBuffer`, `CacheAwareStreamingASR`, `ASRModelPackage` **Wrapper**: `StreamingASR` — exposes `push_audio_chunk()` / `get_transcript_chunk()` API ### 2. NMT — Streaming Segmented Translation (TranslateGemma) **Model**: TranslateGemma 4B (GGUF, Q8_0) via llama-cpp-python. **Architecture**: Single-threaded, three internal components: ``` text deltas → [Segmenter] → text segments → [Translator] → raw translations → [Merger] → display text ``` - **StreamingSegmenter**: Batches ASR tokens into word-groups (max 5 words + 2 hold-back). Triggers on punctuation, pause (>700ms), or max-token boundaries (min 3 words) - **StreamingTranslator**: Multi-turn translation using init/continuation prompt templates with KV cache warming - **StreamingTranslationMerger**: Handles revision/append/continuation logic for incremental translations. Detects trailing ellipsis (incomplete), leading ellipsis (continuation), and word-level LCP revision **Wrapper**: `StreamingNMT` — exposes `push_text_chunk()` / `flush()` / `check_pause()` API ### 3. TTS — Streaming XTTS v2 (ONNX) **Model**: XTTSv2 with ONNX-exported GPT-2 AR model + HiFi-GAN vocoder. **Architecture**: Sequential within a single call: ``` text → [BPE Tokenizer] → [GPT-2 AR Loop] → mel latents → [HiFi-GAN Vocoder] → audio chunks ``` - **Speaker conditioning**: One-time compute from reference audio → `gpt_cond_latent` [1,32,1024] + `speaker_embedding` [1,512,1] - **AR generation**: GPT-2 autoregressive loop producing audio token latents. Accumulates `stream_chunk_size` (default 20) tokens before running vocoder - **Vocoder**: HiFi-GAN converts accumulated latents to waveform - **Crossfade stitching**: Linear fade-in/fade-out between consecutive vocoder output chunks for seamless playback **Output**: 24kHz float32 audio chunks **Wrapper**: `StreamingTTS` — exposes `synthesize_stream()` generator API ## Concurrency Model ``` ┌─────────────────────────────────────────────────────────────────┐ │ asyncio Event Loop │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ │ │ WebSocket I/O │ │ asr_to_nmt │ │ tts_synthesis │ │ │ │ handler │ │ loop │ │ loop │ │ │ └──────┬───────┘ └──────┬───────┘ └────────┬─────────┘ │ │ │ │ │ │ │ │ run_in_executor() │ run_in_executor() │ run_in_ex… │ │ ▼ ▼ ▼ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ ThreadPoolExecutor (4 workers) │ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ │ │ │ │ ASR internal │ │ NMT blocking │ │ TTS blocking │ │ │ │ │ │ threads (3) │ │ llama-cpp │ │ ONNX inference │ │ │ │ │ └──────────────┘ └──────────────┘ └──────────────────┘ │ │ │ └──────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` - **asyncio loop**: Handles WebSocket I/O and coordinates pipeline stages - **ASR threads**: 3 dedicated daemon threads (preprocess, encoder, decoder) - **NMT**: Blocking llama-cpp inference bridged via `run_in_executor()` - **TTS**: Blocking ONNX inference bridged via `run_in_executor()` - **Concurrency**: All per-session state (including NMT turn context: prev_translation, prev_query) is held in per-session wrapper objects. The shared model weights (ASRModelPackage, StreamingTranslator, StreamingTTSPipeline) hold no mutable per-session state after initialization, making concurrent sessions safe. ## Data Flow & Queues ``` WebSocket binary → push_audio() │ ▼ ┌──────────────┐ │ audio_queue │ (maxsize=256, queue.Queue) └──────┬───────┘ ▼ [ASR Internal Threads] │ ▼ ┌──────────────┐ │ output_queue │ (maxsize=64, queue.Queue) └──────┬───────┘ ▼ [asr_to_nmt_loop via executor] │ ┌──────┴───────┐ ▼ ▼ transcript_queue tts_text_queue (maxsize=16, asyncio.Queue) (→ WebSocket) │ ▼ [tts_synthesis_loop via executor] │ ▼ ┌──────────────┐ │audio_out_queue│ (maxsize=32, asyncio.Queue) └──────┬───────┘ ▼ WebSocket binary ``` ## Backpressure Strategy - **Audio input**: `put_nowait` with drop-on-full (acceptable to lose frames vs. building latency) - **ASR→NMT**: `put_nowait` with drop warning on encoder/decoder output queues - **NMT→TTS**: `put_nowait` with drop warning (translations can be reconstructed from next segment) - **TTS→Output**: `put_nowait` with drop warning per audio chunk - All queue sizes are configurable via `PipelineConfig` ## Model Loading & Session Lifecycle **Startup**: Models loaded ONCE in `TranslationServer._load_models()`: - ASR ONNX sessions (`ASRModelPackage`) - NMT GGUF model (`StreamingTranslator`) + KV cache warmup - TTS ONNX sessions (`StreamingTTSPipeline`) **Per-session**: Each WebSocket connection creates: - `StreamingASR` — own audio buffers, streaming state, thread pool - `StreamingNMT` — own segmenter, merger, and translation context (prev_translation, prev_query); shares model weights only - `StreamingTTS` — own speaker conditioning; shares ONNX sessions **Cleanup**: On disconnect, orchestrator flushes remaining NMT text through TTS, then stops all threads and resets state. ## WebSocket Protocol | Direction | Type | Format | Description | |-----------|---------|--------|-------------| | Client→ | Binary | PCM16 | Raw audio at declared sample rate | | Client→ | Text | JSON | `{"action": "start", "sample_rate": 16000}` | | Client→ | Text | JSON | `{"action": "stop"}` | | →Client | Binary | PCM16 | Synthesized audio at 24kHz | | →Client | Text | JSON | `{"type": "transcript", "text": "..."}` | | →Client | Text | JSON | `{"type": "translation", "text": "..."}` | | →Client | Text | JSON | `{"type": "status", "status": "started"}` | ## Configuration All tunables are in `PipelineConfig` (dataclass) and exposed as CLI args: | Parameter | Default | Description | |-----------|---------|-------------| | `asr_chunk_duration_ms` | 10 | Audio chunk duration for ASR | | `nmt_n_threads` | 4 | CPU threads for llama-cpp | | `tts_stream_chunk_size` | 20 | AR tokens per vocoder chunk | | `audio_queue_maxsize` | 256 | Audio input queue bound | | `tts_queue_maxsize` | 16 | NMT→TTS text queue bound | | `audio_out_queue_maxsize` | 32 | TTS→output audio queue bound | ## File Structure ``` src/ ├── asr/ │ ├── streaming_asr.py # StreamingASR wrapper │ ├── pipeline.py # ThreadedSpeechTranslator (reference) │ ├── cache_aware_modules.py # Audio buffer + streaming ASR │ ├── modules.py # ONNX model loading │ └── utils.py # Audio utilities ├── nmt/ │ ├── streaming_nmt.py # StreamingNMT wrapper │ ├── streaming_segmenter.py # Word-group segmentation │ ├── streaming_translation_merger.py # Translation merging │ └── translator_module.py # TranslateGemma via llama-cpp ├── tts/ │ ├── streaming_tts.py # StreamingTTS wrapper │ ├── xtts_streaming_pipeline.py # Full TTS pipeline │ ├── xtts_onnx_orchestrator.py # GPT-2 AR + vocoder │ ├── xtts_tokenizer.py # BPE tokenizer │ └── zh_num2words.py # Chinese text normalization ├── pipeline/ │ ├── orchestrator.py # PipelineOrchestrator │ └── config.py # PipelineConfig └── server/ └── websocket_server.py # WebSocket server ```