Documentation

WebSocket

Stream live trace events to clients.

WebSocket

TraceLLM uses a lightweight WebSocket server embedded in the FastAPI backend for real-time push of trace events. The server is defined in app/websocket/socket.py and mounted at /ws. When a trace is persisted, a trace.created event is broadcast to all connected clients — including the dashboard, CLI monitor, and any custom WebSocket subscribers.

ConnectionManager

The ConnectionManager class (singleton) manages all active WebSocket connections. It uses an asyncio.Lock for thread-safe connection tracking. Stale connections (those that raise exceptions during broadcast) are automatically disconnected and pruned:

ConnectionManager implementationCopy
python
class ConnectionManager:
    def __init__(self):
        self._connections: list[WebSocket] = []
        self._lock = asyncio.Lock()

    async def connect(self, websocket: WebSocket) -> None:
        await websocket.accept()
        async with self._lock:
            self._connections.append(websocket)

    async def disconnect(self, websocket: WebSocket) -> None:
        async with self._lock:
            if websocket in self._connections:
                self._connections.remove(websocket)

    async def broadcast(self, payload: dict[str, Any]) -> None:
        async with self._lock:
            connections = list(self._connections)  # Snapshot under lock

        stale: list[WebSocket] = []
        for conn in connections:
            try:
                await conn.send_json(payload)
            except Exception:
                stale.append(conn)  # Auto-prune on failure

        for conn in stale:
            await self.disconnect(conn)

manager = ConnectionManager()  # Global singleton

WebSocket Endpoint (/ws)

The WebSocket endpoint accepts connections, sends a welcome message, and then enters a listen loop for client messages. Clients can send pingheartbeats; the server responds with system.pong. When a client disconnects (or the connection drops), the WebSocketDisconnectexception is caught and the connection is cleaned up:

WebSocket endpointCopy
python
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket) -> None:
    await manager.connect(websocket)

    # Send welcome message
    await websocket.send_json({
        "type": "system.connected",
        "status": "connected",
        "message": "TraceLLM websocket active",
    })

    try:
        while True:
            message = await websocket.receive_json()
            if message.get("type") == "ping":
                await websocket.send_json({
                    "type": "system.pong",
                    "timestamp": message.get("ts"),
                })
    except WebSocketDisconnect:
        await manager.disconnect(websocket)

Message Protocol

All WebSocket messages are JSON-encoded. The protocol defines four message types:

DirectionTypePayloadWhen
Server → Clientsystem.connected{ "status": "connected", "message": "..." }Immediately after handshake
Server → Clienttrace.created{ "trace": { TraceSchema } }After each trace is persisted to MongoDB
Client → Serverping{ "type": "ping", "ts": 1717151234 }Heartbeat (dashboard sends every 5s)
Server → Clientsystem.pong{ "type": "system.pong", "timestamp": 1717151234 }Response to client ping
WebSocket protocol messagesCopy
json
// Welcome message (server → client on connect)
{
  "type": "system.connected",
  "status": "connected",
  "message": "TraceLLM websocket active"
}

// Trace event (server → client on trace.created)
{
  "type": "trace.created",
  "trace": {
    "trace_id": "tr_2kf9q3m1",
    "prompt": "Explain transformers",
    "latency": 3420.0,
    "token_count": 1247,
    "status": "success",
    "steps": [...]
  }
}

// Heartbeat (client → server, every 5 seconds)
{ "type": "ping", "ts": 1717151234567 }

// Heartbeat response (server → client)
{ "type": "system.pong", "timestamp": 1717151234567 }

Broadcast Flow

Broadcasting is triggered from trace_service.save_trace() immediately after the MongoDB insert succeeds. The trace document is validated against TraceSchema and serialized to JSON before broadcast:

Broadcast triggerCopy
python
# In trace_service.py:save_trace()
document = normalize_trace_document(trace_data)
await collection.insert_one(document)          # Persist to MongoDB

await manager.broadcast({                       # Push to all clients
    "type": "trace.created",
    "trace": TraceSchema.model_validate(document)
                       .model_dump(mode="json")
})

console.print(f"Trace saved to MongoDB")

Client Integration (Dashboard)

The Next.js dashboard connects to the WebSocket in the ObservabilityProvider React context. It uses a plain WebSocket client with a 5-second heartbeat interval to keep the connection alive. Incoming messages are parsed and dispatched to the React state via useEffectEvent:

Dashboard WebSocket clientCopy
typescript
// Frontend WebSocket client (simplified)
useEffect(() => {
  const socket = new WebSocket(socketUrl);

  socket.onopen = () => {
    setConnectionState("open");
    heartbeat = setInterval(() => {
      socket.send(JSON.stringify({ type: "ping", ts: Date.now() }));
    }, 5000);
  };

  socket.onmessage = (event) => {
    const message = JSON.parse(event.data);
    if (message.type === "trace.created") {
      setLatestTrace(message.trace);
      appendEvents(traceToEvents(message.trace));
    }
  };

  socket.onerror = () => setConnectionState("error");
  socket.onclose = () => setConnectionState("closed");

  return () => { clearInterval(heartbeat); socket.close(); };
}, [socketUrl]);

Tip

The dashboard auto-reconnects via browser WebSocket built-in behavior. The CLI monitor (tracellm monitor) implements its own WebSocket client with exponential backoff reconnection (1s to 30s) and falls back to polling MongoDB when the WebSocket is unavailable.