streaming-speech-translation / ARCHITECTURE.md
pltobing's picture
Update README command & sample rate
23577ee
# Architecture: Streaming Speech Translation Pipeline
## Overview
End-to-end streaming speech translation pipeline:
```
Audio Input β†’ ASR β†’ NMT β†’ TTS β†’ Audio Output
(PCM16) (ONNX) (GGUF) (ONNX) (PCM16)
```
Translates spoken English into spoken Russian in real-time with streaming output.
## Pipeline Stages
### 1. ASR β€” Cache-Aware Streaming ASR (NeMo Conformer RNN-T)
**Model**: NVIDIA NeMo Conformer RNN-T, exported to ONNX.
**Architecture**: 3 internal threads connected by `queue.Queue`:
```
Audio β†’ [Preprocess Thread] β†’ [Encoder Thread] β†’ [Decoder Thread] β†’ text deltas
```
- **Preprocess**: Buffers raw audio, extracts mel-spectrogram features, chunks them per CacheAwareStreamingConfig (chunk_size=[49,56], shift_size=[49,56])
- **Encoder**: Runs ONNX encoder inference on feature chunks, maintains encoder cache state
- **Decoder**: Runs RNN-T decoder (joint network), produces incremental text tokens
**Key classes**: `CacheAwareStreamingAudioBuffer`, `CacheAwareStreamingASR`, `ASRModelPackage`
**Wrapper**: `StreamingASR` β€” exposes `push_audio_chunk()` / `get_transcript_chunk()` API
### 2. NMT β€” Streaming Segmented Translation (TranslateGemma)
**Model**: TranslateGemma 4B (GGUF, Q8_0) via llama-cpp-python.
**Architecture**: Single-threaded, three internal components:
```
text deltas β†’ [Segmenter] β†’ text segments β†’ [Translator] β†’ raw translations β†’ [Merger] β†’ display text
```
- **StreamingSegmenter**: Batches ASR tokens into word-groups (max 5 words + 2 hold-back). Triggers on punctuation, pause (>700ms), or max-token boundaries (min 3 words)
- **StreamingTranslator**: Multi-turn translation using init/continuation prompt templates with KV cache warming
- **StreamingTranslationMerger**: Handles revision/append/continuation logic for incremental translations. Detects trailing ellipsis (incomplete), leading ellipsis (continuation), and word-level LCP revision
**Wrapper**: `StreamingNMT` β€” exposes `push_text_chunk()` / `flush()` / `check_pause()` API
### 3. TTS β€” Streaming XTTS v2 (ONNX)
**Model**: XTTSv2 with ONNX-exported GPT-2 AR model + HiFi-GAN vocoder.
**Architecture**: Sequential within a single call:
```
text β†’ [BPE Tokenizer] β†’ [GPT-2 AR Loop] β†’ mel latents β†’ [HiFi-GAN Vocoder] β†’ audio chunks
```
- **Speaker conditioning**: One-time compute from reference audio β†’ `gpt_cond_latent` [1,32,1024] + `speaker_embedding` [1,512,1]
- **AR generation**: GPT-2 autoregressive loop producing audio token latents. Accumulates `stream_chunk_size` (default 20) tokens before running vocoder
- **Vocoder**: HiFi-GAN converts accumulated latents to waveform
- **Crossfade stitching**: Linear fade-in/fade-out between consecutive vocoder output chunks for seamless playback
**Output**: 24kHz float32 audio chunks
**Wrapper**: `StreamingTTS` β€” exposes `synthesize_stream()` generator API
## Concurrency Model
```
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ asyncio Event Loop β”‚
β”‚ β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ WebSocket I/O β”‚ β”‚ asr_to_nmt β”‚ β”‚ tts_synthesis β”‚ β”‚
β”‚ β”‚ handler β”‚ β”‚ loop β”‚ β”‚ loop β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ β”‚ run_in_executor() β”‚ run_in_executor() β”‚ run_in_ex… β”‚
β”‚ β–Ό β–Ό β–Ό β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚ β”‚ ThreadPoolExecutor (4 workers) β”‚ β”‚
β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚
β”‚ β”‚ β”‚ ASR internal β”‚ β”‚ NMT blocking β”‚ β”‚ TTS blocking β”‚ β”‚ β”‚
β”‚ β”‚ β”‚ threads (3) β”‚ β”‚ llama-cpp β”‚ β”‚ ONNX inference β”‚ β”‚ β”‚
β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚
β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
```
- **asyncio loop**: Handles WebSocket I/O and coordinates pipeline stages
- **ASR threads**: 3 dedicated daemon threads (preprocess, encoder, decoder)
- **NMT**: Blocking llama-cpp inference bridged via `run_in_executor()`
- **TTS**: Blocking ONNX inference bridged via `run_in_executor()`
- **Concurrency**: All per-session state (including NMT turn context: prev_translation, prev_query) is held in per-session wrapper objects. The shared model weights (ASRModelPackage, StreamingTranslator, StreamingTTSPipeline) hold no mutable per-session state after initialization, making concurrent sessions safe.
## Data Flow & Queues
```
WebSocket binary β†’ push_audio()
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ audio_queue β”‚ (maxsize=256, queue.Queue)
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
β–Ό
[ASR Internal Threads]
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ output_queue β”‚ (maxsize=64, queue.Queue)
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
β–Ό
[asr_to_nmt_loop via executor]
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”
β–Ό β–Ό
transcript_queue tts_text_queue (maxsize=16, asyncio.Queue)
(β†’ WebSocket) β”‚
β–Ό
[tts_synthesis_loop via executor]
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚audio_out_queueβ”‚ (maxsize=32, asyncio.Queue)
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
β–Ό
WebSocket binary
```
## Backpressure Strategy
- **Audio input**: `put_nowait` with drop-on-full (acceptable to lose frames vs. building latency)
- **ASR→NMT**: `put_nowait` with drop warning on encoder/decoder output queues
- **NMT→TTS**: `put_nowait` with drop warning (translations can be reconstructed from next segment)
- **TTS→Output**: `put_nowait` with drop warning per audio chunk
- All queue sizes are configurable via `PipelineConfig`
## Model Loading & Session Lifecycle
**Startup**: Models loaded ONCE in `TranslationServer._load_models()`:
- ASR ONNX sessions (`ASRModelPackage`)
- NMT GGUF model (`StreamingTranslator`) + KV cache warmup
- TTS ONNX sessions (`StreamingTTSPipeline`)
**Per-session**: Each WebSocket connection creates:
- `StreamingASR` β€” own audio buffers, streaming state, thread pool
- `StreamingNMT` β€” own segmenter, merger, and translation context (prev_translation, prev_query); shares model weights only
- `StreamingTTS` β€” own speaker conditioning; shares ONNX sessions
**Cleanup**: On disconnect, orchestrator flushes remaining NMT text through TTS, then stops all threads and resets state.
## WebSocket Protocol
| Direction | Type | Format | Description |
|-----------|---------|--------|-------------|
| Client→ | Binary | PCM16 | Raw audio at declared sample rate |
| Client→ | Text | JSON | `{"action": "start", "sample_rate": 16000}` |
| Client→ | Text | JSON | `{"action": "stop"}` |
| β†’Client | Binary | PCM16 | Synthesized audio at 24kHz |
| β†’Client | Text | JSON | `{"type": "transcript", "text": "..."}` |
| β†’Client | Text | JSON | `{"type": "translation", "text": "..."}` |
| β†’Client | Text | JSON | `{"type": "status", "status": "started"}` |
## Configuration
All tunables are in `PipelineConfig` (dataclass) and exposed as CLI args:
| Parameter | Default | Description |
|-----------|---------|-------------|
| `asr_chunk_duration_ms` | 10 | Audio chunk duration for ASR |
| `nmt_n_threads` | 4 | CPU threads for llama-cpp |
| `tts_stream_chunk_size` | 20 | AR tokens per vocoder chunk |
| `audio_queue_maxsize` | 256 | Audio input queue bound |
| `tts_queue_maxsize` | 16 | NMT→TTS text queue bound |
| `audio_out_queue_maxsize` | 32 | TTS→output audio queue bound |
## File Structure
```
src/
β”œβ”€β”€ asr/
β”‚ β”œβ”€β”€ streaming_asr.py # StreamingASR wrapper
β”‚ β”œβ”€β”€ pipeline.py # ThreadedSpeechTranslator (reference)
β”‚ β”œβ”€β”€ cache_aware_modules.py # Audio buffer + streaming ASR
β”‚ β”œβ”€β”€ modules.py # ONNX model loading
β”‚ └── utils.py # Audio utilities
β”œβ”€β”€ nmt/
β”‚ β”œβ”€β”€ streaming_nmt.py # StreamingNMT wrapper
β”‚ β”œβ”€β”€ streaming_segmenter.py # Word-group segmentation
β”‚ β”œβ”€β”€ streaming_translation_merger.py # Translation merging
β”‚ └── translator_module.py # TranslateGemma via llama-cpp
β”œβ”€β”€ tts/
β”‚ β”œβ”€β”€ streaming_tts.py # StreamingTTS wrapper
β”‚ β”œβ”€β”€ xtts_streaming_pipeline.py # Full TTS pipeline
β”‚ β”œβ”€β”€ xtts_onnx_orchestrator.py # GPT-2 AR + vocoder
β”‚ β”œβ”€β”€ xtts_tokenizer.py # BPE tokenizer
β”‚ └── zh_num2words.py # Chinese text normalization
β”œβ”€β”€ pipeline/
β”‚ β”œβ”€β”€ orchestrator.py # PipelineOrchestrator
β”‚ └── config.py # PipelineConfig
└── server/
└── websocket_server.py # WebSocket server
```