Architecture: Streaming Speech Translation Pipeline
Overview
End-to-end streaming speech translation pipeline:
Audio Input β ASR β NMT β TTS β Audio Output
(PCM16) (ONNX) (GGUF) (ONNX) (PCM16)
Translates spoken English into spoken Russian in real-time with streaming output.
Pipeline Stages
1. ASR β Cache-Aware Streaming ASR (NeMo Conformer RNN-T)
Model: NVIDIA NeMo Conformer RNN-T, exported to ONNX.
Architecture: 3 internal threads connected by queue.Queue:
Audio β [Preprocess Thread] β [Encoder Thread] β [Decoder Thread] β text deltas
- Preprocess: Buffers raw audio, extracts mel-spectrogram features, chunks them per CacheAwareStreamingConfig (chunk_size=[49,56], shift_size=[49,56])
- Encoder: Runs ONNX encoder inference on feature chunks, maintains encoder cache state
- Decoder: Runs RNN-T decoder (joint network), produces incremental text tokens
Key classes: CacheAwareStreamingAudioBuffer, CacheAwareStreamingASR, ASRModelPackage
Wrapper: StreamingASR β exposes push_audio_chunk() / get_transcript_chunk() API
2. NMT β Streaming Segmented Translation (TranslateGemma)
Model: TranslateGemma 4B (GGUF, Q8_0) via llama-cpp-python.
Architecture: Single-threaded, three internal components:
text deltas β [Segmenter] β text segments β [Translator] β raw translations β [Merger] β display text
- StreamingSegmenter: Batches ASR tokens into word-groups (max 5 words + 2 hold-back). Triggers on punctuation, pause (>700ms), or max-token boundaries (min 3 words)
- StreamingTranslator: Multi-turn translation using init/continuation prompt templates with KV cache warming
- StreamingTranslationMerger: Handles revision/append/continuation logic for incremental translations. Detects trailing ellipsis (incomplete), leading ellipsis (continuation), and word-level LCP revision
Wrapper: StreamingNMT β exposes push_text_chunk() / flush() / check_pause() API
3. TTS β Streaming XTTS v2 (ONNX)
Model: XTTSv2 with ONNX-exported GPT-2 AR model + HiFi-GAN vocoder.
Architecture: Sequential within a single call:
text β [BPE Tokenizer] β [GPT-2 AR Loop] β mel latents β [HiFi-GAN Vocoder] β audio chunks
- Speaker conditioning: One-time compute from reference audio β
gpt_cond_latent[1,32,1024] +speaker_embedding[1,512,1] - AR generation: GPT-2 autoregressive loop producing audio token latents. Accumulates
stream_chunk_size(default 20) tokens before running vocoder - Vocoder: HiFi-GAN converts accumulated latents to waveform
- Crossfade stitching: Linear fade-in/fade-out between consecutive vocoder output chunks for seamless playback
Output: 24kHz float32 audio chunks
Wrapper: StreamingTTS β exposes synthesize_stream() generator API
Concurrency Model
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β asyncio Event Loop β
β β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββββββ β
β β WebSocket I/O β β asr_to_nmt β β tts_synthesis β β
β β handler β β loop β β loop β β
β ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββββ¬ββββββββββ β
β β β β β
β β run_in_executor() β run_in_executor() β run_in_exβ¦ β
β βΌ βΌ βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ThreadPoolExecutor (4 workers) β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββββββ β β
β β β ASR internal β β NMT blocking β β TTS blocking β β β
β β β threads (3) β β llama-cpp β β ONNX inference β β β
β β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
- asyncio loop: Handles WebSocket I/O and coordinates pipeline stages
- ASR threads: 3 dedicated daemon threads (preprocess, encoder, decoder)
- NMT: Blocking llama-cpp inference bridged via
run_in_executor() - TTS: Blocking ONNX inference bridged via
run_in_executor() - Concurrency: All per-session state (including NMT turn context: prev_translation, prev_query) is held in per-session wrapper objects. The shared model weights (ASRModelPackage, StreamingTranslator, StreamingTTSPipeline) hold no mutable per-session state after initialization, making concurrent sessions safe.
Data Flow & Queues
WebSocket binary β push_audio()
β
βΌ
ββββββββββββββββ
β audio_queue β (maxsize=256, queue.Queue)
ββββββββ¬ββββββββ
βΌ
[ASR Internal Threads]
β
βΌ
ββββββββββββββββ
β output_queue β (maxsize=64, queue.Queue)
ββββββββ¬ββββββββ
βΌ
[asr_to_nmt_loop via executor]
β
ββββββββ΄ββββββββ
βΌ βΌ
transcript_queue tts_text_queue (maxsize=16, asyncio.Queue)
(β WebSocket) β
βΌ
[tts_synthesis_loop via executor]
β
βΌ
ββββββββββββββββ
βaudio_out_queueβ (maxsize=32, asyncio.Queue)
ββββββββ¬ββββββββ
βΌ
WebSocket binary
Backpressure Strategy
- Audio input:
put_nowaitwith drop-on-full (acceptable to lose frames vs. building latency) - ASRβNMT:
put_nowaitwith drop warning on encoder/decoder output queues - NMTβTTS:
put_nowaitwith drop warning (translations can be reconstructed from next segment) - TTSβOutput:
put_nowaitwith drop warning per audio chunk - All queue sizes are configurable via
PipelineConfig
Model Loading & Session Lifecycle
Startup: Models loaded ONCE in TranslationServer._load_models():
- ASR ONNX sessions (
ASRModelPackage) - NMT GGUF model (
StreamingTranslator) + KV cache warmup - TTS ONNX sessions (
StreamingTTSPipeline)
Per-session: Each WebSocket connection creates:
StreamingASRβ own audio buffers, streaming state, thread poolStreamingNMTβ own segmenter, merger, and translation context (prev_translation, prev_query); shares model weights onlyStreamingTTSβ own speaker conditioning; shares ONNX sessions
Cleanup: On disconnect, orchestrator flushes remaining NMT text through TTS, then stops all threads and resets state.
WebSocket Protocol
| Direction | Type | Format | Description |
|---|---|---|---|
| Clientβ | Binary | PCM16 | Raw audio at declared sample rate |
| Clientβ | Text | JSON | {"action": "start", "sample_rate": 16000} |
| Clientβ | Text | JSON | {"action": "stop"} |
| βClient | Binary | PCM16 | Synthesized audio at 24kHz |
| βClient | Text | JSON | {"type": "transcript", "text": "..."} |
| βClient | Text | JSON | {"type": "translation", "text": "..."} |
| βClient | Text | JSON | {"type": "status", "status": "started"} |
Configuration
All tunables are in PipelineConfig (dataclass) and exposed as CLI args:
| Parameter | Default | Description |
|---|---|---|
asr_chunk_duration_ms |
10 | Audio chunk duration for ASR |
nmt_n_threads |
4 | CPU threads for llama-cpp |
tts_stream_chunk_size |
20 | AR tokens per vocoder chunk |
audio_queue_maxsize |
256 | Audio input queue bound |
tts_queue_maxsize |
16 | NMTβTTS text queue bound |
audio_out_queue_maxsize |
32 | TTSβoutput audio queue bound |
File Structure
src/
βββ asr/
β βββ streaming_asr.py # StreamingASR wrapper
β βββ pipeline.py # ThreadedSpeechTranslator (reference)
β βββ cache_aware_modules.py # Audio buffer + streaming ASR
β βββ modules.py # ONNX model loading
β βββ utils.py # Audio utilities
βββ nmt/
β βββ streaming_nmt.py # StreamingNMT wrapper
β βββ streaming_segmenter.py # Word-group segmentation
β βββ streaming_translation_merger.py # Translation merging
β βββ translator_module.py # TranslateGemma via llama-cpp
βββ tts/
β βββ streaming_tts.py # StreamingTTS wrapper
β βββ xtts_streaming_pipeline.py # Full TTS pipeline
β βββ xtts_onnx_orchestrator.py # GPT-2 AR + vocoder
β βββ xtts_tokenizer.py # BPE tokenizer
β βββ zh_num2words.py # Chinese text normalization
βββ pipeline/
β βββ orchestrator.py # PipelineOrchestrator
β βββ config.py # PipelineConfig
βββ server/
βββ websocket_server.py # WebSocket server