Audit Before Dispatch
Write to the immutable SQLite audit log before sending the message — an undelivered message that was logged is recoverable, but an unlogged message that was delivered is invisible forever.
The most important rule in the Principal Broker's design is a single constraint on sequencing:
Write to the audit log before dispatching the message.
Not after. Not concurrently. Before.
This is Step 7 in the 9-step pipeline, occurring after routing (Step 6) and before dispatch (Step 8). The ordering is not arbitrary — it is the foundation of the system's integrity guarantee.
The Integrity Argument
Two scenarios explain why the ordering matters:
Scenario A: Audit write succeeds, dispatch fails.
The broker attempts to deliver Foresight's trade.executed message to VP Trading. NATS publishes the message but the connection drops mid-operation and the ACK never arrives. Did VP Trading receive the message? Unknown. But the audit log has a complete record: message_id, from_agent=foresight, to_agents=["vp-trading", "cfo", "mission-control"], status=dispatched, route_json with the full routing decision, and the message payload. Recovery tooling can inspect the audit log, see that dispatch was attempted, and re-deliver if necessary.
Scenario B: Dispatch succeeds, audit write fails.
The broker delivers the trade.executed message to VP Trading. The SQLite write then fails. Now there is an action that happened in the system with no record. There is no message_id to query. There is no way to know what was in the payload, who it was routed to, or what authority checks passed. For a financial system, this is not a minor inconvenience — it is a compliance failure.
The audit-before-dispatch rule makes Scenario A recoverable and makes Scenario B impossible (by design). If the audit write fails, the broker raises an exception and the dispatch does not happen.
The Schema
The audit log schema defines both the table structure and the append-only enforcement:
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS audit_log (
audit_id TEXT PRIMARY KEY,
message_id TEXT NOT NULL,
correlation_id TEXT,
from_agent TEXT NOT NULL,
to_agents TEXT,
message_type TEXT NOT NULL,
subtype TEXT NOT NULL,
priority TEXT DEFAULT 'normal',
payload_json TEXT,
route_json TEXT,
status TEXT NOT NULL DEFAULT 'dispatched',
error_detail TEXT,
created_at TEXT NOT NULL
);
-- Append-only enforcement: prevent UPDATE
CREATE TRIGGER IF NOT EXISTS prevent_audit_update
BEFORE UPDATE ON audit_log
BEGIN
SELECT RAISE(ABORT, 'Audit log is append-only');
END;
-- Append-only enforcement: prevent DELETE
CREATE TRIGGER IF NOT EXISTS prevent_audit_delete
BEFORE DELETE ON audit_log
BEGIN
SELECT RAISE(ABORT, 'Audit log is append-only');
END;
"""
The triggers are created with CREATE TRIGGER IF NOT EXISTS, meaning they are idempotent across schema migrations. They fire at the SQLite engine level, before the statement executes, and call RAISE(ABORT, ...) to abort the entire transaction. This enforcement cannot be bypassed by application code — it is in the database engine.
WAL mode (PRAGMA journal_mode=WAL) is enabled on connection. WAL mode allows concurrent reads while a write is in progress, which matters because the audit log is also queried by the REST API (/v1/audit) while the broker is processing messages.
The Write Method
def write(self, message: Any, route: Any = None) -> str:
"""
Write a message to the audit log BEFORE dispatching.
Returns the audit_id.
"""
audit_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
envelope = message.envelope
to_agents = None
if route and hasattr(route, "primary_recipients"):
all_recipients = route.primary_recipients + getattr(route, "fan_out", [])
to_agents = json.dumps(all_recipients)
route_json = None
if route:
if hasattr(route, "__dict__"):
route_json = json.dumps({
k: v for k, v in route.__dict__.items()
if not k.startswith("_")
})
self._conn.execute(
"""INSERT INTO audit_log
(audit_id, message_id, correlation_id, from_agent,
to_agents, message_type, subtype, priority,
payload_json, route_json, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'dispatched', ?)""",
(
audit_id,
envelope.message_id,
envelope.correlation_id,
envelope.from_agent,
to_agents,
message.type,
message.subtype,
envelope.priority,
json.dumps(message.payload),
route_json,
now,
),
)
self._conn.commit()
return audit_id
The route_json field stores the complete routing decision — not just the recipient list. This means you can query the audit log and see exactly which routing rule fired, whether memory storage was requested, and whether there was a priority override. Post-incident analysis does not require re-running the routing logic; the decision is preserved.
Status Variants
The audit log records five distinct statuses:
VALID_STATUSES = {
"dispatched",
"rejected",
"hard_blocked",
"error",
"shutdown",
}
Each status has its own write method:
dispatched — normal path, written by write() before dispatch
rejected — schema validation failure, written by write_rejected() with the specific validation error in error_detail
hard_blocked — blocked by the hard_blocks list, written by write_hard_block(), triggers a log warning with agent ID and action type
error — pipeline processing error, written by write_error() with scrubbed raw bytes (first 4096 bytes of the raw NATS message)
shutdown — kill switch activation, written by write_shutdown() with the kill switch level and detail
The absence of an "undelivered" status is intentional. The audit log records the broker's intent and actions, not the outcome of delivery. Whether VP Trading actually processed the message is tracked by the receiving agent, not the broker.
Secret Scrubbing
The write_error method handles cases where the raw message bytes could not be parsed — which means they go into the audit log as raw text. Before writing, secrets are scrubbed:
@staticmethod
def _scrub_secrets(text: str) -> str:
"""Replace sensitive patterns in text before storing to audit log."""
# Bearer tokens
text = re.sub(r"Bearer [A-Za-z0-9_\-\.]+", "Bearer [REDACTED]", text)
# Anthropic API keys
text = re.sub(r"sk-ant-[A-Za-z0-9_\-]+", "[REDACTED_API_KEY]", text)
# Authorization header values
text = re.sub(
r'(Authorization\s*[:=]\s*)[^\s,\}\"]+',
r'\1[REDACTED]',
text,
flags=re.IGNORECASE,
)
# Generic key=value patterns for known sensitive keys
text = re.sub(
r'(api_key|apikey|secret|password|token|pin)'
r'(["\']?\s*[:=]\s*["\']?)[A-Za-z0-9_\-\.]+',
r'\1\2[REDACTED]',
text,
flags=re.IGNORECASE,
)
return text
This scrubbing runs on error payloads specifically because malformed messages that fail deserialization are stored as raw text. A message that passes deserialization is stored as structured JSON, where secrets in known fields can be handled at the application layer.
Querying the Audit Log
The audit log exposes a query interface that the REST API uses at /v1/audit:
def query(
self,
agent_id: Optional[str] = None,
message_id: Optional[str] = None,
correlation_id: Optional[str] = None,
status: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> list[dict]:
Six indexes support the common query patterns:
CREATE INDEX idx_audit_message_id ON audit_log(message_id);
CREATE INDEX idx_audit_correlation_id ON audit_log(correlation_id);
CREATE INDEX idx_audit_from_agent ON audit_log(from_agent);
CREATE INDEX idx_audit_created_at ON audit_log(created_at);
CREATE INDEX idx_audit_status ON audit_log(status);
CREATE INDEX idx_audit_subtype ON audit_log(subtype);
The correlation_id index is particularly useful for incident analysis. A correlation ID threads through a chain of related messages — the initial signal, the trade entry, the execution, and the report. Querying by correlation_id reconstructs the complete flow for a given transaction.
The Shutdown Audit Record
When the kill switch fires, the audit log records the shutdown before any service termination begins:
def write_shutdown(self, level: int, triggered_by: str, detail: str) -> str:
self._conn.execute(
"""INSERT INTO audit_log
...
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'shutdown', ?, ?)""",
(
audit_id,
str(uuid.uuid4()),
str(uuid.uuid4()),
triggered_by,
"shutdown",
f"kill_switch.level_{level}",
"critical",
json.dumps({"level": level, "detail": detail}),
f"Kill switch Level {level} activated",
now,
),
)
The shutdown record includes the kill switch level, who triggered it, and the detail string (which captures the reason, the time, and any relevant context). This record is the first line of the post-incident timeline — before logs, before Discord messages, before anything else.
Implications for Recovery
The audit log's write-before-dispatch guarantee enables a specific recovery pattern that would otherwise be impossible:
- Query
audit_log WHERE status='dispatched' AND created_at > [last_known_good_time] - For each record, check whether the recipient agent acknowledges receipt
- Re-dispatch any messages where receipt was not confirmed
This is the replay workflow. It assumes the audit log is complete (which the write-before-dispatch rule ensures) and that re-dispatch is idempotent (which the agents are responsible for, using the message_id as an idempotency key).
The two triggers — prevent_audit_update and prevent_audit_delete — are sometimes described as paranoid. They are not. They are the enforcement mechanism for a promise: once a record is in the audit log, it does not change and it does not disappear. Without that promise, the audit log is just a log file. With it, the audit log is a source of truth.