The Message Pipeline
Nine steps from NATS receive to memory store — each step has a specific job, and skipping any of them breaks a different guarantee.
Every message that enters Principal Broker passes through exactly nine steps. The steps are sequential, synchronous in the critical path, and non-negotiable. Skipping a step does not make the broker faster — it removes a guarantee that the rest of the system depends on.
The pipeline is defined in the docstring of _make_message_handler in broker/main.py:
"""
NATS message handler — the full pipeline.
1. Deserialize
2. Replay prevention
3. Validate schema
4. Hard block check
5. Authority check
6. Route (deterministic, 9 rules)
7. Audit write BEFORE dispatch
8. Dispatch to recipients via NATS
9. Memory store
"""
This lesson walks through each step, explains what it does, and shows the production implementation.
Step 1: Deserialize
The message arrives as raw bytes from NATS. The first job is to parse those bytes into a structured A2AMessage object.
message, error = Validator.deserialize(msg.data)
if error:
logger.warning(f"Deserialization failed: {error}")
app.state.audit_log.write_error(msg.data, error)
return
Deserialization failures are logged immediately, before any other processing. This is the first audit entry — even messages that cannot be parsed get a record. The write_error method scrubs sensitive patterns (Bearer tokens, API keys, Authorization headers) before writing to SQLite.
Immediately after deserialization, the broker does session tracking — recording that this agent has an active session and detecting if a memory.committed subtype indicates the session has ended cleanly.
Step 2: Replay Prevention
A message that has already been processed should not be processed again. Network retries, NATS at-least-once delivery, and client bugs can all cause duplicate messages.
if app.state.dispatcher.is_replay(message.envelope.message_id):
logger.debug(f"Replay dropped: {message.envelope.message_id}")
return
The replay prevention uses an OrderedDict in the dispatcher with a 24-hour TTL and a maximum size of 100,000 entries. Eviction runs amortized — every 100 calls, the oldest expired entries are removed. This keeps the hot path at O(1) for the common case while maintaining bounded memory.
The check is purely in-memory with no database query. This is intentional — replay prevention needs to be fast because it runs on every message. The trade-off is that broker restarts reset the replay window, which means a message that was sent just before a restart could theoretically be processed twice. For the agent fleet, this is acceptable: idempotency is enforced at the agent level, not the broker level.
Step 3: Validate Schema
Schema validation confirms the message structure is correct before any business logic runs.
result = app.state.validator.validate(message)
if not result.valid:
logger.warning(f"Validation failed: {result.error}")
app.state.audit_log.write_rejected(message, result.error)
return
The validator checks that required fields are present, that from_agent exists in the registry (or passes a grace condition for self-registration), and that the message subtype matches the declared message type. A trade.executed subtype on a message typed as directive fails validation.
Rejected messages are logged with their specific validation error, which makes debugging agent protocol bugs significantly faster than raw NATS log inspection.
Step 4: Hard Block Check
Hard blocks are the list of actions that are blocked regardless of authority, approval, or directive. This list lives in broker/safety/hard_blocks.py and is append-only — items can be added but never removed.
if is_hard_blocked(message.subtype):
logger.warning(
f"HARD BLOCK: {message.envelope.from_agent} "
f"attempted {message.subtype}"
)
app.state.audit_log.write_hard_block(message)
await notify_hard_block(
message.envelope.from_agent, message.subtype
)
return
When a hard block fires, three things happen simultaneously: the audit log gets a hard_blocked status entry, a notification is sent to Knox via Discord or SSE, and the message is dropped. There is no appeal path for hard-blocked actions — this is by design. The hard block list represents the absolute floor of system safety.
Hard blocks are distinct from authority failures. An authority failure means "this agent is not permitted to do this right now, escalate to its manager." A hard block means "nothing in the system should ever be able to do this."
Step 5: Authority Check
Authority enforcement is the most nuanced step in the pipeline.
auth_result = app.state.authority.check(message)
if not auth_result.authorized:
# Persist the escalation
esc_id = app.state.escalation_manager.create(
message_id=message.envelope.message_id,
from_agent=message.envelope.from_agent,
subtype=f"authority.exceeded.{message.subtype}",
reason=auth_result.reason,
original_payload=json.dumps(message.payload),
)
# Notify Knox
await notify_escalation(
escalation_id=esc_id,
from_agent=message.envelope.from_agent,
reason=auth_result.reason,
priority=message.envelope.priority,
)
# Convert to escalation message for routing
message = app.state.authority.convert_to_escalation(
message, auth_result.reason
)
The authority check reads the sender's Agent Card and evaluates the message against:
- Financial ceiling (
maxAutonomousDollars) - Risk tier (
maxRiskTier) - The
cannotModifyandrequiresApprovalForlists
When authority is exceeded, the broker does not drop the message. It converts it to an escalation. The original intent is preserved in the escalation payload. The escalation is persisted in SQLite so it appears in the REST API's escalation queue. A notification fires to whoever is responsible for reviewing escalations.
The converted escalation message then continues through the pipeline — it will be routed via Rule 2 to the sender's direct manager.
This design — convert rather than block — means authority breaches are handled gracefully. A trading bot that attempts a trade above its ceiling does not fail silently. The request reaches a human with full context about what was attempted.
Step 6: Route
After all the safety checks, the routing step is simple:
route = app.state.router.route(message)
The router evaluates the 9 rules and returns a RouteDecision. From the pipeline's perspective, this is a pure function — given the message, return a decision. No side effects. No database writes. No async operations.
The route decision includes primary_recipients, fan_out, store_in_memory, and an optional priority_override. The pipeline uses all four fields in subsequent steps.
Step 7: Audit Write Before Dispatch
This step is the integrity guarantee of the entire system.
audit_id = app.state.audit_log.write(message, route)
The audit write happens before the dispatch call. This sequencing is not a performance consideration — it is the correct behavior.
An undelivered message that was logged is recoverable. The audit record shows where it was supposed to go, what its content was, and when it was processed. Recovery tooling can re-dispatch it.
An unlogged message that was delivered is invisible. There is no way to know it happened, who sent it, or where it went. Recovery is impossible.
The audit log uses SQLite in WAL mode. The write includes the full message envelope, the route decision (serialized to JSON), the message payload, and a UTC timestamp. Two database-level triggers enforce append-only behavior — BEFORE UPDATE and BEFORE DELETE both call RAISE(ABORT, 'Audit log is append-only').
Step 8: Dispatch
The dispatch step delivers the message to all recipients via NATS:
dispatched = await app.state.dispatcher.dispatch(message, route)
The dispatcher iterates over route.primary_recipients + route.fan_out and publishes to agents.{agent_id}.inbox for each one. Failed publishes are logged but do not cause the pipeline to abort — if one recipient's NATS topic is unavailable, the others still receive the message.
for agent_id in all_recipients:
topic = f"agents.{agent_id}.inbox"
try:
await self.nats.publish(topic, payload)
dispatched.append(agent_id)
except (OSError, ConnectionError) as e:
logger.error(f"Failed to dispatch to {agent_id}: {e}")
The return value is the list of agent IDs that were actually dispatched to. This list is used in Step 9 for memory storage metadata and in the final SSE broadcast.
If NATS is not connected (the broker is running in API-only mode), dispatch returns an empty list and logs a warning. Messages are not buffered at the broker level — buffering is the SDK's responsibility (covered in Lesson 196).
Step 9: Memory Store
The final step stores the message in the semantic memory layer if the route decision requires it:
if route.store_in_memory:
await app.state.dispatcher.store_in_memory(message)
Not every message warrants permanent memory storage. The routing rules specify when memory storage applies: trade.executed, trade.completed, memory.committed, content.published, and all escalations and critical messages.
The memory store call is an HTTP POST to the semantic memory layer API at port 8002. It uses a shared httpx.AsyncClient that is created once at startup and reused across all requests — avoiding the TCP connection overhead that would accumulate on the hot path if a new client were created per message.
request_data = {
"content": json.dumps(message.payload),
"namespace": message.envelope.from_agent,
"memory_type": "episodic",
"tags": [message.type, message.subtype, message.envelope.from_agent],
}
After the memory store call, the broker broadcasts a cockpit event to all SSE subscribers — providing real-time visibility to Mission Control and any other dashboard connected to /v1/stream/cockpit.
The Pipeline as Error Budget
Each step in the pipeline has a different failure mode and a different consequence:
The asymmetry between Steps 4 and 5 is intentional. Hard blocks are absolute — the action is rejected, full stop. Authority breaches are escalations — the intent is preserved and routed up the chain. These are not the same thing, and the pipeline treats them differently.
The asymmetry around Step 7 is also intentional. If the audit write fails, the broker should not dispatch. It is better to lose a message than to have an unaudited action in a financial system.
Startup and Lifecycle
The pipeline handler is created by _make_message_handler(app) and registered as a NATS subscription callback during the broker's startup lifespan:
await nc.subscribe("agents.*.outbox", cb=_make_message_handler(app))
await nc.subscribe("agents.*.heartbeat", cb=_make_heartbeat_handler(app))
The wildcard agents.*.outbox subscription means every agent's outbox topics are handled by the same pipeline function. This is the single point of entry for all A2A traffic. There is no per-agent subscription management — new agents publish to their own outbox topics, and the broker handles them automatically.
The heartbeat subscription at agents.*.heartbeat uses a separate, lighter handler that only updates last_seen in the registry and records SLA metrics. Heartbeats do not go through the full 9-step pipeline — they are health signals, not business messages.
The broker also runs four background tasks during its lifetime: a heartbeat timeout monitor (checks every 30 seconds for stale agents), an escalation monitor (expires stale escalations every 60 seconds), an incomplete session monitor (flags agents that ended without a memory commit), and a daily FinOps reset (generates the CFO cost report and resets daily counters at UTC midnight).
These background tasks are started in the lifespan context manager and cancelled cleanly on shutdown, ensuring no orphaned coroutines when the broker restarts.