The SDK Pattern
BrokerClient gives every agent a consistent interface to the broker — fire-and-forget publish, automatic heartbeat, and a local SQLite buffer that drains on reconnect when NATS goes down.
Every agent that connects to the Principal Broker needs to handle the same set of concerns: constructing valid message envelopes, publishing to the right NATS topic, sending periodic heartbeats, and surviving the inevitable periods when NATS is temporarily unavailable.
These concerns are identical across every agent in the fleet. They belong in a shared library, not in each agent's own implementation.
The principal_sdk package provides BrokerClient — a thin, opinionated client that handles all of this consistently. An agent that uses BrokerClient cannot send a malformed message, cannot forget to heartbeat, and cannot lose a message to a temporary NATS outage.
The Interface
class BrokerClient:
"""
Agent-side client for the Principal Broker.
Publishes messages to NATS topics that the broker subscribes to.
If NATS is unavailable, buffers locally in SQLite and replays on reconnect.
"""
def __init__(
self,
agent_id: str,
agent_version: str = "1.0.0",
nats_url: str = "nats://localhost:4222",
buffer_db_path: str | None = None,
):
Three things to note about the constructor:
agent_id is the only required parameter. It is the sender's identity for every envelope the client constructs. Every message will have envelope.from = self.agent_id.
nats_url defaults to localhost. Agents on the primary host use this default. Agents on the trading server (Foresight, the sports prediction agent, the political events prediction agent) set this to the primary host's LAN address (e.g., nats://primary-host:4222), since they need to reach its NATS instance.
buffer_db_path defaults to ~/.principal/buffer_{agent_id}.db — a dedicated SQLite file per agent. The buffer is initialized in the constructor, which means it exists even before the NATS connection is attempted. An agent can buffer messages from the very first second of its lifecycle.
The Publish Method
async def publish(
self,
subtype: str,
payload: dict[str, Any],
msg_type: str = "event",
priority: str = "normal",
correlation_id: str | None = None,
) -> str:
"""
Publish an A2A message to the broker.
Returns the message_id.
"""
message_id = str(uuid.uuid4())
message = {
"envelope": {
"messageId": message_id,
"correlationId": correlation_id or str(uuid.uuid4()),
"from": self.agent_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"priority": priority,
"ttl": 3600,
},
"type": msg_type,
"subtype": subtype,
"payload": payload,
"ack": {"required": False, "timeout": 300},
"audit": {
"agentVersion": self.agent_version,
"sessionId": self._session_id,
},
}
topic = f"agents.{self.agent_id}.outbox"
data = json.dumps(message).encode()
if self._nc and self._nc.is_connected:
try:
await self._nc.publish(topic, data)
logger.debug(f"Published {subtype} ({message_id})")
except Exception as e:
logger.warning(f"Publish failed, buffering: {e}")
self._buffer_message(topic, data)
else:
self._buffer_message(topic, data)
return message_id
The publish method constructs the complete A2A message envelope. The agent only needs to specify subtype and payload — the SDK handles messageId, correlationId, timestamp, ttl, agentVersion, and sessionId.
This matters because the broker validates the envelope structure on every message. An agent that constructs its own envelopes can get the schema wrong. An agent that uses BrokerClient.publish() always sends a valid envelope.
The return value is the message_id. Agents that need to track messages — for correlation, for ack verification, for logging — capture this ID and include it in their own logs.
Fire-and-Forget
Notice that publish() is fire-and-forget: it returns after the NATS publish call without waiting for any acknowledgment from the broker. ack.required is hardcoded to False.
This is the correct default for event-driven agent communication. Foresight does not need to know that VP Trading has received the trade.executed message before it can continue its work. The broker's audit log guarantees that the message was received and processed. The agent's responsibility ends at publication.
For cases where an agent genuinely needs synchronous confirmation, the REST API provides request/reply semantics at /v1/messages. But this is the exception, not the default.
The Local Buffer
The local buffer is the feature that makes the SDK essential for agents on the trading server:
def _buffer_message(self, topic: str, data: bytes):
"""Buffer a message locally when NATS is unavailable."""
try:
conn = sqlite3.connect(str(self._buffer_path))
conn.execute(
"INSERT INTO message_buffer (topic, payload, created_at) "
"VALUES (?, ?, ?)",
(topic, data.decode(), datetime.now(timezone.utc).isoformat()),
)
conn.commit()
conn.close()
logger.info(f"Buffered message to {topic}")
except Exception as e:
logger.error(f"Failed to buffer message: {e}")
When NATS is unavailable — whether because the primary host's NATS went down, the local network between the primary host and the trading server dropped, or the broker restarted — publish() routes to _buffer_message() instead of NATS. The message is written to ~/.principal/buffer_foresight.db with sent=0.
The buffer schema is minimal:
conn.execute("""
CREATE TABLE IF NOT EXISTS message_buffer (
id INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL,
sent INTEGER DEFAULT 0
)
""")
id is the ordering key — messages are replayed in the order they were buffered, which preserves temporal causality for trade sequences.
The Replay
When the NATS connection is restored, connect() calls _replay_buffer():
async def _replay_buffer(self):
"""Replay buffered messages after reconnecting."""
conn = None
try:
conn = sqlite3.connect(str(self._buffer_path))
cursor = conn.execute(
"SELECT id, topic, payload FROM message_buffer "
"WHERE sent=0 ORDER BY id"
)
rows = cursor.fetchall()
replayed = 0
for row_id, topic, payload in rows:
if self._nc and self._nc.is_connected:
await self._nc.publish(topic, payload.encode())
conn.execute(
"UPDATE message_buffer SET sent=1 WHERE id=?",
(row_id,),
)
replayed += 1
conn.commit()
if replayed:
logger.info(f"Replayed {replayed} buffered messages")
# Prune sent messages older than 24h
cutoff = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
conn.execute(
"DELETE FROM message_buffer WHERE sent=1 AND created_at < ?",
(cutoff,),
)
conn.commit()
except Exception as e:
logger.warning(f"Buffer replay failed: {e}")
finally:
if conn:
conn.close()
The replay is ordered by id (ascending), which means messages are replayed in the order they were originally published. For trading bots, this is critical: a trade.entry message must reach the broker before trade.executed.
After replay, the pruning step removes buffered messages that were sent more than 24 hours ago. This prevents the buffer database from growing indefinitely in agents that experience frequent brief outages.
The Heartbeat Loop
Every connected client maintains a heartbeat loop:
async def _heartbeat_loop(self):
"""Send heartbeat every 30 seconds."""
while True:
try:
await asyncio.sleep(HEARTBEAT_INTERVAL)
if self._nc and self._nc.is_connected:
topic = f"agents.{self.agent_id}.heartbeat"
payload = json.dumps({
"agent_id": self.agent_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"session_id": self._session_id,
}).encode()
await self._nc.publish(topic, payload)
except asyncio.CancelledError:
break
except Exception as e:
logger.debug(f"Heartbeat failed: {e}")
The heartbeat fires every 30 seconds. The broker's heartbeat monitor checks every 30 seconds for agents whose last_seen timestamp has gone stale. Combined, an agent failure is detectable within 60 seconds.
session_id is included in the heartbeat payload. This is how the broker tracks session continuity — if an agent reconnects after a crash, the new session ID in the heartbeat signals that the previous session ended without a memory commit, which the incomplete session monitor flags.
Usage Pattern in an Agent
A complete agent integration looks like this:
from principal_sdk import BrokerClient
class ForesightBot:
def __init__(self):
self.broker = BrokerClient(
agent_id="foresight",
agent_version="2.4.1",
nats_url="nats://trading-server:4222", # primary host's LAN address from trading server
)
async def start(self):
await self.broker.connect()
# ... run trading logic
async def record_trade(self, trade_data: dict):
message_id = await self.broker.publish(
subtype="trade.executed",
payload=trade_data,
priority="high",
)
logger.info(f"Trade published: {message_id}")
async def stop(self):
# Commit memory before disconnect
await self.broker.publish(
subtype="memory.committed",
payload={"session_id": self.broker.session_id},
)
await self.broker.disconnect()
Three things to notice:
No envelope construction. The agent calls publish(subtype, payload) and lets the SDK build the envelope. There is no risk of a malformed message ID, a missing timestamp, or an incorrect from-agent field.
Memory commit before disconnect. The broker monitors for agents that end sessions without committing. Calling publish("memory.committed", ...) before disconnect() tells the broker this session ended cleanly and the semantic memory layer was updated. Agents that skip this step are flagged as incomplete sessions.
Session ID in the memory commit. The session_id from self.broker.session_id threads through the commit payload, which the broker uses to mark the specific session as complete in its session tracker.
Replay Prevention at the Broker
When buffered messages are replayed, they have the same message_id as the original publication. This is intentional — the broker's replay prevention (Step 2 of the pipeline) uses message_id to detect duplicates.
If the original message was published but not yet processed by the broker when the outage occurred, the replay might deliver a message the broker has already handled. The broker's is_replay() check will detect this and silently drop the duplicate.
If the original message was not yet published (the outage happened before NATS delivery), the replay delivers a new message with a message_id the broker has not seen. The broker processes it normally.
This design means agents never need to implement their own deduplication logic. The SDK constructs the message_id at publish time, the buffer preserves it, and the broker's replay prevention handles the rest.
What This Track Built
This lesson is the last in the "Building an A2A Message Broker" track. Step back and look at what the complete system provides:
The Agent Card defines identity, org position, capabilities, and authority ceilings for every service in the fleet. The registry loads all cards at startup and answers queries from the routing and authority layers.
The 9 routing rules deterministically map every message type to its recipients. No LLM, no guessing, no probabilistic behavior.
The 9-step pipeline takes a raw NATS message from receipt to storage with safety checks at every stage — hard blocks for absolute constraints, authority conversion for escalations, audit-before-dispatch for integrity, and semantic memory layer storage for permanent memory.
NATS provides the pub/sub fabric that connects the primary host and the trading server, handles fan-out to multiple recipients simultaneously, and gives agents a heartbeat channel for passive health monitoring.
The immutable audit log records every message before it is dispatched, with database-level triggers preventing modification. It is the ground truth for incident reconstruction, compliance review, and recovery.
SSE provides Mission Control with a live window into the broker's activity — every routed message, every escalation, every hard block, streamed in real time without polling.
The BrokerClient SDK gives agents a protocol-compliant interface with offline resilience built in. Agents publish, the SDK handles the rest.
These seven components form a complete nervous system. The fleet is no longer dozens of repos communicating via cron and Discord. It is an organism with a spine.