Real-time streaming is essential for low-latency voice AI. Every component streams data rather than waiting for complete responses.
Without streaming:
User speaks (2s) → Wait for full transcription (1s) → Wait for full LLM response (3s) → Wait for full TTS (2s)
Total: ~8 seconds latency
With streaming:
User speaks → Stream transcription → Stream LLM tokens → Stream TTS audio
Total: ~1 second to first audio
const express = require('express');
const http = require('http');
const WebSocket = require('ws');
const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
wss.on('connection', (ws) => {
console.log('Client connected');
ws.on('message', (data) => {
// Handle incoming messages
});
ws.on('close', () => {
console.log('Client disconnected');
});
ws.on('error', (error) => {
console.error('WebSocket error:', error);
});
});
server.listen(3000);
Coordinate between services using EventEmitter:
const EventEmitter = require('events');
class StreamService extends EventEmitter {
constructor() {
super();
}
processAudio(audioChunk) {
// Process and emit events
this.emit('transcription', { text: 'Hello' });
}
}
// Usage
const service = new StreamService();
service.on('transcription', (data) => {
// Send to LLM
});
When one service is slower than another:
class AudioBuffer {
constructor() {
this.buffer = [];
this.processing = false;
}
add(chunk) {
this.buffer.push(chunk);
this.process();
}
async process() {
if (this.processing) return;
this.processing = true;
while (this.buffer.length > 0) {
const chunk = this.buffer.shift();
await this.sendToService(chunk);
}
this.processing = false;
}
}
Detect when user starts speaking and cancel current response:
class ConversationManager {
constructor() {
this.isPlaying = false;
this.currentResponse = null;
}
onUserSpeech() {
if (this.isPlaying) {
// User interrupted, cancel current response
this.cancelResponse();
this.clearAudioBuffer();
}
}
cancelResponse() {
if (this.currentResponse) {
this.currentResponse.cancel();
this.currentResponse = null;
}
this.isPlaying = false;
}
}
Split LLM response at natural pauses for smoother TTS:
function splitForTTS(text) {
// Split on punctuation and natural pauses
const chunks = text.split(/(?<=[.!?•])\s+/);
return chunks.filter(chunk => chunk.trim().length > 0);
}
// In LLM prompt, ask for markers:
// "Add a '•' symbol every 5 to 10 words at natural pauses"
Monitor WebSocket connection:
const HEARTBEAT_INTERVAL = 30000;
wss.on('connection', (ws) => {
ws.isAlive = true;
ws.on('pong', () => {
ws.isAlive = true;
});
});
setInterval(() => {
wss.clients.forEach((ws) => {
if (!ws.isAlive) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, HEARTBEAT_INTERVAL);
Handle disconnections gracefully:
function createReconnectingWebSocket(url) {
let ws;
let reconnectAttempts = 0;
const maxAttempts = 5;
function connect() {
ws = new WebSocket(url);
ws.on('open', () => {
reconnectAttempts = 0;
});
ws.on('close', () => {
if (reconnectAttempts < maxAttempts) {
reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000);
setTimeout(connect, delay);
}
});
}
connect();
return ws;
}