ASK KNOX
beta
LESSON 307

Sessions and Event Streaming

Sessions are the runtime unit of Managed Agents — understanding session lifecycle, event types, and how to consume them correctly is the foundation of reliable production integrations.

9 min read

A session is a running instance of an agent. It has a lifecycle, emits events as it works, and terminates in a known state. Understanding how to start sessions, consume their events, and handle their lifecycle correctly is not optional background knowledge — it is the core operational skill for building on Managed Agents.

Session Lifecycle

Every session passes through a defined set of states:

PENDING — session has been created but execution has not started yet. The infrastructure is provisioning the container.

RUNNING — the agent is actively executing. Events are being emitted.

COMPLETED — the agent finished execution successfully. Results are retrievable.

FAILED — the agent encountered an unrecoverable error. An error event describes what happened.

CANCELLED — the session was explicitly cancelled before completion.

Terminal states are COMPLETED, FAILED, and CANCELLED. Once a session reaches a terminal state, it will not transition further. Results are immutable after completion.

Starting a Session

import anthropic

client = anthropic.Anthropic()

# Start a session with a task for the agent
session = client.beta.agents.sessions.create(
    agent_id="agent_01abc123",
    input={
        "role": "user",
        "content": "Research Stripe's Q1 2026 product launches and write a competitor intelligence report."
    }
)

print(f"Session ID: {session.id}")
print(f"Status: {session.status}")  # PENDING

The session is created immediately. Execution starts asynchronously. The session.id is what you use to track and retrieve results.

Streaming Events

For sessions where you want to observe execution as it happens, stream the SSE events:

# Stream events from an active session
with client.beta.agents.sessions.stream(session.id) as stream:
    for event in stream:
        event_type = event.type
        
        if event_type == "message_start":
            print(f"[Agent turn started — thread: {event.session_thread_id}]")
        
        elif event_type == "content_block_delta":
            if hasattr(event.delta, 'text'):
                print(event.delta.text, end="", flush=True)
        
        elif event_type == "content_block_stop":
            print()  # End of a content block
        
        elif event_type == "message_stop":
            print(f"[Turn complete — stop reason: {event.message.stop_reason}]")
        
        elif event_type == "tool_use":
            print(f"[Tool call: {event.tool_name}]")
        
        elif event_type == "tool_result":
            print(f"[Tool result received]")
        
        elif event_type == "error":
            print(f"[Error: {event.error.type} — {event.error.message}]")
            break

The streaming connection stays open until the session reaches a terminal state. If your client disconnects and reconnects, you can resume event consumption from a specific event ID.

Event Types

message_start — a new agent message turn has started. This marks the beginning of the agent generating a response.

content_block_start — a new content block (text or tool call) has started within the current message.

content_block_delta — an incremental chunk of content within the current block. For text, this is a text delta. For tool calls, this is a partial JSON input delta.

content_block_stop — the current content block is complete.

message_stop — the current message turn is complete. Contains the final stop_reason (end_turn, tool_use, max_tokens).

tool_use — the agent is calling a tool. Contains tool name and input.

tool_result — a tool call has returned a result.

session_complete — the session has reached a terminal state. Contains final status and any output metadata.

error — an error occurred. The session may or may not continue depending on the error type.

Polling vs Streaming

Streaming is appropriate when:

  • The session runs for minutes and you want to show progress to a user
  • You need to log events in real time for observability
  • You want to react to specific events (like a tool call) while the session is in progress

Polling is appropriate when:

  • The session runs for hours and holding a streaming connection open is not practical
  • Your integration is fire-and-forget — you just need the final result
  • You are building a batch pipeline that checks session statuses on a schedule
import time

def wait_for_completion(session_id: str, poll_interval: int = 30) -> dict:
    """Poll a session until it reaches a terminal state."""
    while True:
        session = client.beta.agents.sessions.retrieve(session_id)
        
        if session.status == "COMPLETED":
            return {"status": "completed", "output": session.output}
        elif session.status == "FAILED":
            return {"status": "failed", "error": session.error}
        elif session.status == "CANCELLED":
            return {"status": "cancelled"}
        
        print(f"Session {session_id}: {session.status} — waiting {poll_interval}s")
        time.sleep(poll_interval)

For multi-hour pipelines, poll every 60-120 seconds. For shorter tasks, streaming is cleaner.

Session Threads

In multi-agent sessions where an orchestrator dispatches to subagents, events are organized into threads. Each thread corresponds to one agent's execution within the broader session.

The session_thread_id field on events tells you which thread (and therefore which agent) produced each event:

with client.beta.agents.sessions.stream(session.id) as stream:
    for event in stream:
        thread_id = getattr(event, 'session_thread_id', 'main')
        
        if event.type == "content_block_delta":
            if hasattr(event.delta, 'text'):
                print(f"[{thread_id}] {event.delta.text}", end="")

This separation is critical for production observability. When an orchestrator delegates to three parallel subagents, you need to know which agent produced which output — not just that output exists.

Reconnection and Resume

If your streaming connection drops, you can resume from a specific event:

# Resume from a known event ID
with client.beta.agents.sessions.stream(
    session.id,
    after_event_id="evt_01xyz"  # Resume from this event
) as stream:
    for event in stream:
        # Process events after the resume point
        pass

Always log the last-seen event ID in your consumer. This enables safe reconnection without losing events or reprocessing events you already handled.