SSE for Real-Time Cockpit
Server-Sent Events give Mission Control a live window into every message the broker routes — broadcast_event pushes to all subscribers with queue overflow protection and 30-second keepalives.
The Principal Broker processes hundreds of messages per hour across the agent fleet. Trades execute. Signals publish. Services report. Escalations fire. Without a real-time visibility layer, the only way to observe this activity is to tail log files — which is not a sustainable operational practice.
Server-Sent Events solve this problem with minimal infrastructure. A single HTTP connection from Mission Control to the broker delivers a continuous stream of cockpit events. Every message routed by the broker broadcasts an event. The dashboard updates in real time without polling.
The Implementation
The entire SSE implementation is in broker/api/stream.py:
# In-memory broadcast queues for SSE subscribers
_cockpit_queues: list[asyncio.Queue] = []
async def broadcast_event(event_type: str, data: dict):
"""Broadcast an SSE event to all cockpit stream subscribers."""
message = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
dead = set()
for i, q in enumerate(_cockpit_queues):
try:
q.put_nowait(message)
except asyncio.QueueFull:
dead.add(i)
if dead:
_cockpit_queues[:] = [
q for i, q in enumerate(_cockpit_queues) if i not in dead
]
_cockpit_queues is a module-level list of asyncio.Queue objects — one per connected client. When a client connects to the SSE endpoint, a new queue is created and appended to the list. When the client disconnects, its queue is removed in the finally block.
broadcast_event iterates all queues and calls put_nowait on each one. Non-blocking: if a queue is full, put_nowait raises asyncio.QueueFull immediately rather than waiting. The full queue's index is collected in the dead set, and after the iteration, dead queues are removed from the list.
This design ensures that a slow client cannot block the broker's message processing. The broker does not wait for any SSE subscriber to consume its event — it fires and moves on.
The SSE Endpoint
@router.get("/cockpit")
async def cockpit_stream(request: Request):
"""SSE stream for cockpit — incident events."""
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
_cockpit_queues.append(queue)
async def event_generator():
try:
ts = datetime.now(timezone.utc).isoformat()
connected = {"status": "ok", "ts": ts}
yield (
f"event: connected\n"
f"data: {json.dumps(connected)}\n\n"
)
while True:
try:
message = await asyncio.wait_for(queue.get(), timeout=30)
yield message
except asyncio.TimeoutError:
yield ": keepalive\n\n"
except asyncio.CancelledError:
pass
finally:
if queue in _cockpit_queues:
_cockpit_queues.remove(queue)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
Walk through the lifecycle of a single SSE connection:
Connection established. A new asyncio.Queue(maxsize=100) is created and appended to _cockpit_queues. The event generator immediately yields a connected event with the current UTC timestamp. This confirms to the client that the connection is live.
Waiting for events. The generator enters a loop, calling queue.get() with a 30-second timeout. When a new event arrives via broadcast_event, the queue unblocks and the generator yields the event string.
Keepalive. If no event arrives within 30 seconds, asyncio.wait_for raises TimeoutError, and the generator yields a NATS-style comment line (: keepalive\n\n). SSE comment lines are not dispatched to the client as events — they are stripped by the browser's EventSource implementation. But they keep the TCP connection from being terminated by proxies and load balancers that close idle connections.
Client disconnect. When the client closes the connection, FastAPI cancels the generator coroutine, raising asyncio.CancelledError. The finally block removes the queue from _cockpit_queues, preventing memory leaks from disconnected clients.
The Response Headers
Three headers are set on the StreamingResponse:
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
Cache-Control: no-cache prevents any intermediate proxy from caching the stream. Each SSE event must reach the client immediately.
Connection: keep-alive signals to HTTP/1.1 infrastructure to maintain the persistent connection. Without this, proxies may terminate the connection after a default idle timeout.
X-Accel-Buffering: no is specific to nginx. By default, nginx buffers responses before forwarding them to the client. For SSE, buffering breaks the real-time delivery — events would be batched and delivered in chunks rather than individually. This header disables nginx buffering for this endpoint specifically.
Event Types
The broker emits three event types to the cockpit stream, defined in broker/notifications.py:
message_routed — fires after every successful dispatch:
await notify_message_routed(
message_id=message.envelope.message_id,
from_agent=message.envelope.from_agent,
subtype=message.subtype,
recipients=dispatched,
)
The recipients field contains the list of agent IDs that were actually dispatched to (returned by dispatcher.dispatch()). Mission Control displays this as a routing visualization: which agent sent a message, what type it was, and where it went.
escalation — fires when an authority check fails and an escalation is created. The escalation payload includes the escalation ID, the agent that triggered it, the reason, and the priority. Mission Control highlights escalations prominently — they require human action.
hard_block — fires when a hard-blocked action is attempted. This is a rare but high-severity event. It means an agent tried to perform a permanently forbidden action.
Why SSE Over WebSockets
WebSockets provide bidirectional communication. SSE provides server-to-client push only. The cockpit stream is read-only — Mission Control observes the broker's activity but does not send commands to it through the stream. Commands go through the REST API.
SSE has three advantages for this use case:
Simpler infrastructure. SSE is plain HTTP. It works through proxies, CDNs, and load balancers that handle HTTP but may not handle WebSocket upgrades. No special nginx configuration is required beyond the buffering header.
Auto-reconnect built into browsers. The browser's EventSource API automatically reconnects when the connection drops, with exponential backoff. WebSocket reconnection requires application-level implementation.
No framing overhead. SSE messages are plain text with a simple event: and data: prefix. There is no framing protocol overhead, no masking, and no binary encoding required.
Mission Control Integration
Mission Control subscribes to the cockpit stream using the browser's native EventSource API:
const source = new EventSource('/v1/stream/cockpit', {
headers: { Authorization: `Bearer ${token}` }
});
source.addEventListener('message_routed', (e) => {
const { from_agent, subtype, recipients } = JSON.parse(e.data);
updateMessageFeed(from_agent, subtype, recipients);
});
source.addEventListener('escalation', (e) => {
const data = JSON.parse(e.data);
showEscalationAlert(data);
});
The EventSource API does not support custom headers in the browser's native implementation — this is a known limitation. Mission Control works around this by passing the auth token as a query parameter for the SSE connection, or by using a server-side proxy that adds the header before forwarding to the broker.
Thread Safety
The _cockpit_queues list is module-level state. FastAPI's async model means all requests run on the same event loop — there is no multi-threading concern. All access to _cockpit_queues happens in coroutines that are scheduled on the same asyncio event loop, so there are no concurrent modification issues.
put_nowait is non-blocking and safe to call from any coroutine without acquiring a lock, because asyncio's event loop is single-threaded. The dead-set cleanup in broadcast_event modifies _cockpit_queues in place using list comprehension assignment (_cockpit_queues[:] = ...), which replaces the list contents atomically within the event loop's turn.
This simplicity is part of the design. SSE with asyncio queues requires no threading primitives, no locks, and no external dependencies. The entire real-time delivery mechanism is 67 lines of Python.