Voice AI

WebSocket & Streaming

Real-time bidirectional communication patterns

WebSocket & Streaming

Real-time streaming is essential for low-latency voice AI. Every component streams data rather than waiting for complete responses.

Why Streaming Matters

Without streaming:

User speaks (2s) → Wait for full transcription (1s) → Wait for full LLM response (3s) → Wait for full TTS (2s)
Total: ~8 seconds latency

With streaming:

User speaks → Stream transcription → Stream LLM tokens → Stream TTS audio
Total: ~1 second to first audio

WebSocket Server Setup

const express = require('express');
const http = require('http');
const WebSocket = require('ws');

const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });

wss.on('connection', (ws) => {
  console.log('Client connected');
  
  ws.on('message', (data) => {
    // Handle incoming messages
  });
  
  ws.on('close', () => {
    console.log('Client disconnected');
  });
  
  ws.on('error', (error) => {
    console.error('WebSocket error:', error);
  });
});

server.listen(3000);

Event Emitter Pattern

Coordinate between services using EventEmitter:

const EventEmitter = require('events');

class StreamService extends EventEmitter {
  constructor() {
    super();
  }
  
  processAudio(audioChunk) {
    // Process and emit events
    this.emit('transcription', { text: 'Hello' });
  }
}

// Usage
const service = new StreamService();
service.on('transcription', (data) => {
  // Send to LLM
});

Handling Backpressure

When one service is slower than another:

class AudioBuffer {
  constructor() {
    this.buffer = [];
    this.processing = false;
  }
  
  add(chunk) {
    this.buffer.push(chunk);
    this.process();
  }
  
  async process() {
    if (this.processing) return;
    this.processing = true;
    
    while (this.buffer.length > 0) {
      const chunk = this.buffer.shift();
      await this.sendToService(chunk);
    }
    
    this.processing = false;
  }
}

Interruption Handling

Detect when user starts speaking and cancel current response:

class ConversationManager {
  constructor() {
    this.isPlaying = false;
    this.currentResponse = null;
  }
  
  onUserSpeech() {
    if (this.isPlaying) {
      // User interrupted, cancel current response
      this.cancelResponse();
      this.clearAudioBuffer();
    }
  }
  
  cancelResponse() {
    if (this.currentResponse) {
      this.currentResponse.cancel();
      this.currentResponse = null;
    }
    this.isPlaying = false;
  }
}

Stream Chunking for TTS

Split LLM response at natural pauses for smoother TTS:

function splitForTTS(text) {
  // Split on punctuation and natural pauses
  const chunks = text.split(/(?<=[.!?•])\s+/);
  return chunks.filter(chunk => chunk.trim().length > 0);
}

// In LLM prompt, ask for markers:
// "Add a '•' symbol every 5 to 10 words at natural pauses"

Connection Health

Monitor WebSocket connection:

const HEARTBEAT_INTERVAL = 30000;

wss.on('connection', (ws) => {
  ws.isAlive = true;
  
  ws.on('pong', () => {
    ws.isAlive = true;
  });
});

setInterval(() => {
  wss.clients.forEach((ws) => {
    if (!ws.isAlive) {
      return ws.terminate();
    }
    ws.isAlive = false;
    ws.ping();
  });
}, HEARTBEAT_INTERVAL);

Error Recovery

Handle disconnections gracefully:

function createReconnectingWebSocket(url) {
  let ws;
  let reconnectAttempts = 0;
  const maxAttempts = 5;
  
  function connect() {
    ws = new WebSocket(url);
    
    ws.on('open', () => {
      reconnectAttempts = 0;
    });
    
    ws.on('close', () => {
      if (reconnectAttempts < maxAttempts) {
        reconnectAttempts++;
        const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000);
        setTimeout(connect, delay);
      }
    });
  }
  
  connect();
  return ws;
}