ASK KNOX
beta
LESSON 247

Behavioral Health Monitoring

Detecting hallucination patterns, doom spirals, and stale execution in production agents. The health monitor pattern, circuit breakers, and automated recovery protocols that prevent individual agent failures from cascading.

13 min read

Agents fail in ways that are not immediately obvious. They do not always crash. They do not always return error codes. Sometimes they produce plausible-looking output that is wrong. Sometimes they enter patterns that escalate cost without making progress. Sometimes they simply get stuck and stay in-progress indefinitely.

A platform without health monitoring is a platform operating blind. You learn about problems when users notice, not when they start. This lesson builds the visibility layer that detects problems early — before they cascade.

The Three Failure Modes to Monitor

Hallucination — the agent produces output that is syntactically correct and structurally plausible but factually wrong or fabricated. The agent does not know it is hallucinating. Its confidence is unaffected. Detection requires external validation.

Doom spiral — the agent enters a non-convergent error-correction loop. An error triggers a fix attempt. The fix introduces a new error. The new error triggers a larger fix. The pattern diverges until token budget is exhausted or the session becomes unrecoverable. Detection requires recognizing the divergent pattern, not just counting errors.

Stale execution — the agent is in in_progress state significantly longer than its baseline for this task type. It is stuck — possibly waiting for a dependency that will never arrive, possibly in an infinite loop, possibly out of context. Detection requires time-in-state tracking against a per-task-type baseline.

Each failure mode has a different detection mechanism and a different response.

The Behavioral Baseline

Behavioral monitoring requires a reference point: what does healthy look like for this agent on this task type? Without a baseline, every metric is ambiguous.

@dataclass
class BehavioralBaseline:
    agent_id: str
    task_type: str
    established_from: int          # number of sessions used to build baseline

    # Time metrics
    median_completion_seconds: float
    p95_completion_seconds: float
    stale_threshold_seconds: float  # p95 × 2.0

    # Quality metrics
    median_confidence: float
    min_acceptable_confidence: float   # median - 2σ

    # Error metrics
    baseline_error_rate: float
    error_spike_threshold: float       # baseline × 3.0

    # Cost metrics
    median_token_cost: float
    cost_spike_threshold: float        # median × 2.5


class BaselineManager:
    def compute_baseline(
        self,
        agent_id: str,
        task_type: str,
        session_window: int = 20
    ) -> BehavioralBaseline:
        """
        Compute baseline from last N validated healthy sessions.
        Excludes sessions flagged as anomalous.
        """
        sessions = self.store.get_healthy_sessions(
            agent_id=agent_id,
            task_type=task_type,
            limit=session_window
        )

        if len(sessions) < 5:
            return self.get_default_baseline(task_type)

        durations = [s.duration_seconds for s in sessions]
        confidences = [s.avg_confidence for s in sessions]
        error_rates = [s.error_rate for s in sessions]
        costs = [s.token_cost for s in sessions]

        return BehavioralBaseline(
            agent_id=agent_id,
            task_type=task_type,
            established_from=len(sessions),
            median_completion_seconds=statistics.median(durations),
            p95_completion_seconds=statistics.quantiles(durations, n=20)[18],
            stale_threshold_seconds=statistics.quantiles(durations, n=20)[18] * 2.0,
            median_confidence=statistics.median(confidences),
            min_acceptable_confidence=statistics.mean(confidences) - 2 * statistics.stdev(confidences),
            baseline_error_rate=statistics.mean(error_rates),
            error_spike_threshold=statistics.mean(error_rates) * 3.0,
            median_token_cost=statistics.median(costs),
            cost_spike_threshold=statistics.median(costs) * 2.5,
        )

Hallucination Detection

Hallucination detection is a validation problem. The agent cannot detect its own hallucinations, so the platform must.

class HallucinationDetector:
    def __init__(self, validators: list[Validator]):
        self.validators = validators

    async def validate(
        self,
        output: AgentOutput,
        context: ValidationContext
    ) -> HallucinationResult:
        violations = []

        for validator in self.validators:
            result = await validator.check(output, context)
            if result.is_violation:
                violations.append(result)

        confidence = 1.0 - (len(violations) / max(len(self.validators), 1))

        return HallucinationResult(
            has_violations=len(violations) > 0,
            violations=violations,
            confidence=confidence
        )


class FactualConsistencyValidator:
    """Cross-check agent claims against authoritative sources."""

    async def check(
        self,
        output: AgentOutput,
        context: ValidationContext
    ) -> ValidationResult:
        # Extract claims from output
        claims = extract_factual_claims(output.content)

        violations = []
        for claim in claims:
            # Check against authoritative data
            authoritative = await context.knowledge_base.lookup(claim.subject)
            if authoritative and not claim.is_consistent_with(authoritative):
                violations.append(ClaimViolation(
                    claim=claim,
                    expected=authoritative,
                    actual=claim.value
                ))

        return ValidationResult(
            validator="factual-consistency",
            is_violation=len(violations) > 0,
            violations=violations
        )


class SchemaValidator:
    """Validate structured outputs against expected schema."""

    async def check(
        self,
        output: AgentOutput,
        context: ValidationContext
    ) -> ValidationResult:
        if not output.expected_schema:
            return ValidationResult(validator="schema", is_violation=False)

        try:
            jsonschema.validate(output.structured, output.expected_schema)
            return ValidationResult(validator="schema", is_violation=False)
        except jsonschema.ValidationError as e:
            return ValidationResult(
                validator="schema",
                is_violation=True,
                violations=[SchemaViolation(path=e.path, message=e.message)]
            )

Doom Spiral Detection

A doom spiral is characterized by a specific pattern: error rates increasing over iterations, with each iteration making the problem worse rather than better.

class DoomSpiralDetector:
    def __init__(
        self,
        error_threshold: float = 0.40,
        divergence_threshold: float = 1.5,
        window: int = 5
    ):
        self.error_threshold = error_threshold
        self.divergence_threshold = divergence_threshold
        self.window = window

    def detect(self, session: ActiveSession) -> DoomSpiralResult:
        if len(session.iterations) < self.window:
            return DoomSpiralResult(detected=False)

        recent = session.iterations[-self.window:]
        error_rates = [i.error_rate for i in recent]

        # Is the error rate above threshold?
        current_error_rate = error_rates[-1]
        if current_error_rate < self.error_threshold:
            return DoomSpiralResult(detected=False)

        # Is it diverging (getting worse, not better)?
        trend = self._compute_trend(error_rates)
        if trend < self.divergence_threshold:
            return DoomSpiralResult(detected=False)

        # Are the errors different each time (not the same error looping)?
        error_diversity = len(set(i.error_type for i in recent)) / len(recent)

        return DoomSpiralResult(
            detected=True,
            current_error_rate=current_error_rate,
            trend=trend,
            error_diversity=error_diversity,
            iteration_count=len(session.iterations),
            recommendation="halt_and_reset" if error_diversity > 0.6 else "halt_and_retry"
        )

    def _compute_trend(self, series: list[float]) -> float:
        """Returns rate of change — positive means diverging."""
        if len(series) < 2:
            return 0.0
        deltas = [series[i+1] - series[i] for i in range(len(series)-1)]
        return statistics.mean(deltas)

Stale Execution Detection

Stale execution is the simplest failure to detect and one of the most common.

class StaleExecutionDetector:
    def __init__(self, baselines: BaselineManager):
        self.baselines = baselines

    def check_session(
        self,
        session: ActiveSession
    ) -> StaleResult:
        if session.status != "in_progress":
            return StaleResult(is_stale=False)

        elapsed = (datetime.utcnow() - session.started_at).total_seconds()
        baseline = self.baselines.get(session.agent_id, session.task_type)

        if elapsed > baseline.stale_threshold_seconds:
            return StaleResult(
                is_stale=True,
                elapsed_seconds=elapsed,
                threshold_seconds=baseline.stale_threshold_seconds,
                factor=elapsed / baseline.median_completion_seconds,
                action=self.recommend_action(elapsed, baseline)
            )

        return StaleResult(is_stale=False)

    def recommend_action(
        self,
        elapsed: float,
        baseline: BehavioralBaseline
    ) -> str:
        factor = elapsed / baseline.median_completion_seconds
        if factor > 5.0:
            return "force_terminate"
        elif factor > 3.0:
            return "send_heartbeat_request"
        else:
            return "flag_for_monitoring"

Circuit Breakers

A circuit breaker prevents a failing agent from receiving new work until it demonstrates it has recovered. It tracks failure rates and opens the circuit when failures exceed the threshold.

class AgentCircuitBreaker:
    def __init__(
        self,
        agent_id: str,
        failure_threshold: float = 0.30,  # 30% failure rate opens circuit
        recovery_timeout_seconds: int = 300,
        half_open_test_count: int = 3
    ):
        self.agent_id = agent_id
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout_seconds
        self.half_open_tests = half_open_test_count

        self.state: str = "closed"  # closed | open | half-open
        self.failure_count: int = 0
        self.total_count: int = 0
        self.last_failure: Optional[datetime] = None
        self.half_open_successes: int = 0

    def record_result(self, success: bool) -> None:
        self.total_count += 1
        if not success:
            self.failure_count += 1
            self.last_failure = datetime.utcnow()

        self._evaluate_state()

    def _evaluate_state(self) -> None:
        if self.state == "closed":
            if self.total_count >= 10:  # minimum sample
                failure_rate = self.failure_count / self.total_count
                if failure_rate >= self.failure_threshold:
                    self._open_circuit()

        elif self.state == "open":
            if self.last_failure:
                elapsed = (datetime.utcnow() - self.last_failure).total_seconds()
                if elapsed >= self.recovery_timeout:
                    self._half_open()

        elif self.state == "half-open":
            if self.half_open_successes >= self.half_open_tests:
                self._close_circuit()

    def can_accept_task(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "half-open":
            return True  # Allow limited tasks to test recovery
        return False  # open — no new tasks

    def _open_circuit(self) -> None:
        self.state = "open"
        # Notify health monitor
        health_monitor.alert(
            agent_id=self.agent_id,
            event="circuit_breaker_opened",
            failure_rate=self.failure_count / self.total_count
        )

    def _half_open(self) -> None:
        self.state = "half-open"
        self.half_open_successes = 0

    def _close_circuit(self) -> None:
        self.state = "closed"
        self.failure_count = 0
        self.total_count = 0

The Health Monitor: Assembling the System

The health monitor runs as a separate process — not part of any agent, not part of the broker. It polls agent state, runs detection checks, and acts on findings.

class AgentHealthMonitor:
    def __init__(
        self,
        broker: PrincipalBroker,
        check_interval_seconds: int = 30
    ):
        self.broker = broker
        self.check_interval = check_interval_seconds
        self.stale_detector = StaleExecutionDetector(BaselineManager())
        self.spiral_detector = DoomSpiralDetector()
        self.circuit_breakers: dict[str, AgentCircuitBreaker] = {}

    async def run(self) -> None:
        """Main monitoring loop — runs forever."""
        while True:
            try:
                await self.check_all_agents()
            except Exception as e:
                logger.error(f"Health monitor error: {e}")
            await asyncio.sleep(self.check_interval)

    async def check_all_agents(self) -> None:
        active_sessions = await self.broker.get_active_sessions()

        for session in active_sessions:
            await self.check_session(session)

    async def check_session(self, session: ActiveSession) -> None:
        issues = []

        # Stale execution check
        stale = self.stale_detector.check_session(session)
        if stale.is_stale:
            issues.append(HealthIssue(
                type="stale_execution",
                severity="warning" if stale.factor < 3 else "error",
                details=stale
            ))

        # Doom spiral check
        spiral = self.spiral_detector.detect(session)
        if spiral.detected:
            issues.append(HealthIssue(
                type="doom_spiral",
                severity="error",
                details=spiral
            ))

        if issues:
            await self.act_on_issues(session, issues)

    async def act_on_issues(
        self,
        session: ActiveSession,
        issues: list[HealthIssue]
    ) -> None:
        max_severity = max(i.severity for i in issues)

        if max_severity == "error":
            # Halt the session, open the circuit breaker
            await self.broker.halt_session(session.id, reason="health-monitor")
            cb = self.get_circuit_breaker(session.agent_id)
            cb.record_result(success=False)
            await self.escalate_to_ceo(session, issues)
        else:
            # Warning — flag and monitor
            await self.broker.flag_session(session.id, issues=issues)

Automated Recovery Protocols

When the health monitor halts a session, the recovery protocol determines what happens next.

RECOVERY_PROTOCOLS = {
    "stale_execution": {
        "factor < 3": "send_heartbeat_check",
        "factor 3-5": "soft_terminate_and_retry",
        "factor > 5": "force_terminate_no_retry",
    },
    "doom_spiral": {
        "iteration_count < 10": "halt_and_reset_context",
        "iteration_count >= 10": "halt_escalate_human",
    },
    "hallucination_detected": {
        "confidence > 0.80": "reject_output_retry",
        "confidence 0.60-0.80": "reject_output_validate_manually",
        "confidence < 0.60": "halt_escalate_human",
    },
}

Summary

  • Three failure modes require distinct detection mechanisms: hallucination (external validation), doom spiral (divergence pattern), stale execution (time-in-state vs baseline)
  • Behavioral baselines must be built from a rolling window of validated healthy sessions
  • Hallucination detection requires multiple validation layers — schema, factual consistency, and cross-referencing
  • Circuit breakers isolate failing agents from new work, allowing recovery without cascading failure
  • The health monitor is a separate process that polls agent state, runs detection checks, and triggers automated recovery

What's Next

The final lesson assembles all components into a complete platform — walking end-to-end through how expertise, team architecture, organizational wiring, authority delegation, and behavioral monitoring combine into a running system.