WebSocket
Stream live trace events to clients.
WebSocket
TraceLLM uses a lightweight WebSocket server embedded in the FastAPI backend for real-time push of trace events. The server is defined in app/websocket/socket.py and mounted at /ws. When a trace is persisted, a trace.created event is broadcast to all connected clients — including the dashboard, CLI monitor, and any custom WebSocket subscribers.
ConnectionManager
The ConnectionManager class (singleton) manages all active WebSocket connections. It uses an asyncio.Lock for thread-safe connection tracking. Stale connections (those that raise exceptions during broadcast) are automatically disconnected and pruned:
class ConnectionManager:
def __init__(self):
self._connections: list[WebSocket] = []
self._lock = asyncio.Lock()
async def connect(self, websocket: WebSocket) -> None:
await websocket.accept()
async with self._lock:
self._connections.append(websocket)
async def disconnect(self, websocket: WebSocket) -> None:
async with self._lock:
if websocket in self._connections:
self._connections.remove(websocket)
async def broadcast(self, payload: dict[str, Any]) -> None:
async with self._lock:
connections = list(self._connections) # Snapshot under lock
stale: list[WebSocket] = []
for conn in connections:
try:
await conn.send_json(payload)
except Exception:
stale.append(conn) # Auto-prune on failure
for conn in stale:
await self.disconnect(conn)
manager = ConnectionManager() # Global singletonWebSocket Endpoint (/ws)
The WebSocket endpoint accepts connections, sends a welcome message, and then enters a listen loop for client messages. Clients can send pingheartbeats; the server responds with system.pong. When a client disconnects (or the connection drops), the WebSocketDisconnectexception is caught and the connection is cleaned up:
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket) -> None:
await manager.connect(websocket)
# Send welcome message
await websocket.send_json({
"type": "system.connected",
"status": "connected",
"message": "TraceLLM websocket active",
})
try:
while True:
message = await websocket.receive_json()
if message.get("type") == "ping":
await websocket.send_json({
"type": "system.pong",
"timestamp": message.get("ts"),
})
except WebSocketDisconnect:
await manager.disconnect(websocket)Message Protocol
All WebSocket messages are JSON-encoded. The protocol defines four message types:
| Direction | Type | Payload | When |
|---|---|---|---|
| Server → Client | system.connected | { "status": "connected", "message": "..." } | Immediately after handshake |
| Server → Client | trace.created | { "trace": { TraceSchema } } | After each trace is persisted to MongoDB |
| Client → Server | ping | { "type": "ping", "ts": 1717151234 } | Heartbeat (dashboard sends every 5s) |
| Server → Client | system.pong | { "type": "system.pong", "timestamp": 1717151234 } | Response to client ping |
// Welcome message (server → client on connect)
{
"type": "system.connected",
"status": "connected",
"message": "TraceLLM websocket active"
}
// Trace event (server → client on trace.created)
{
"type": "trace.created",
"trace": {
"trace_id": "tr_2kf9q3m1",
"prompt": "Explain transformers",
"latency": 3420.0,
"token_count": 1247,
"status": "success",
"steps": [...]
}
}
// Heartbeat (client → server, every 5 seconds)
{ "type": "ping", "ts": 1717151234567 }
// Heartbeat response (server → client)
{ "type": "system.pong", "timestamp": 1717151234567 }Broadcast Flow
Broadcasting is triggered from trace_service.save_trace() immediately after the MongoDB insert succeeds. The trace document is validated against TraceSchema and serialized to JSON before broadcast:
# In trace_service.py:save_trace()
document = normalize_trace_document(trace_data)
await collection.insert_one(document) # Persist to MongoDB
await manager.broadcast({ # Push to all clients
"type": "trace.created",
"trace": TraceSchema.model_validate(document)
.model_dump(mode="json")
})
console.print(f"Trace saved to MongoDB")Client Integration (Dashboard)
The Next.js dashboard connects to the WebSocket in the ObservabilityProvider React context. It uses a plain WebSocket client with a 5-second heartbeat interval to keep the connection alive. Incoming messages are parsed and dispatched to the React state via useEffectEvent:
// Frontend WebSocket client (simplified)
useEffect(() => {
const socket = new WebSocket(socketUrl);
socket.onopen = () => {
setConnectionState("open");
heartbeat = setInterval(() => {
socket.send(JSON.stringify({ type: "ping", ts: Date.now() }));
}, 5000);
};
socket.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === "trace.created") {
setLatestTrace(message.trace);
appendEvents(traceToEvents(message.trace));
}
};
socket.onerror = () => setConnectionState("error");
socket.onclose = () => setConnectionState("closed");
return () => { clearInterval(heartbeat); socket.close(); };
}, [socketUrl]);Tip
tracellm monitor) implements its own WebSocket client with exponential backoff reconnection (1s to 30s) and falls back to polling MongoDB when the WebSocket is unavailable.