Sessions and Event Streaming
Sessions are the runtime unit of Managed Agents — understanding session lifecycle, event types, and how to consume them correctly is the foundation of reliable production integrations.
A session is a running instance of an agent. It has a lifecycle, emits events as it works, and terminates in a known state. Understanding how to start sessions, consume their events, and handle their lifecycle correctly is not optional background knowledge — it is the core operational skill for building on Managed Agents.
Session Lifecycle
Every session passes through a defined set of states:
PENDING — session has been created but execution has not started yet. The infrastructure is provisioning the container.
RUNNING — the agent is actively executing. Events are being emitted.
COMPLETED — the agent finished execution successfully. Results are retrievable.
FAILED — the agent encountered an unrecoverable error. An error event describes what happened.
CANCELLED — the session was explicitly cancelled before completion.
Terminal states are COMPLETED, FAILED, and CANCELLED. Once a session reaches a terminal state, it will not transition further. Results are immutable after completion.
Starting a Session
import anthropic
client = anthropic.Anthropic()
# Start a session with a task for the agent
session = client.beta.agents.sessions.create(
agent_id="agent_01abc123",
input={
"role": "user",
"content": "Research Stripe's Q1 2026 product launches and write a competitor intelligence report."
}
)
print(f"Session ID: {session.id}")
print(f"Status: {session.status}") # PENDING
The session is created immediately. Execution starts asynchronously. The session.id is what you use to track and retrieve results.
Streaming Events
For sessions where you want to observe execution as it happens, stream the SSE events:
# Stream events from an active session
with client.beta.agents.sessions.stream(session.id) as stream:
for event in stream:
event_type = event.type
if event_type == "message_start":
print(f"[Agent turn started — thread: {event.session_thread_id}]")
elif event_type == "content_block_delta":
if hasattr(event.delta, 'text'):
print(event.delta.text, end="", flush=True)
elif event_type == "content_block_stop":
print() # End of a content block
elif event_type == "message_stop":
print(f"[Turn complete — stop reason: {event.message.stop_reason}]")
elif event_type == "tool_use":
print(f"[Tool call: {event.tool_name}]")
elif event_type == "tool_result":
print(f"[Tool result received]")
elif event_type == "error":
print(f"[Error: {event.error.type} — {event.error.message}]")
break
The streaming connection stays open until the session reaches a terminal state. If your client disconnects and reconnects, you can resume event consumption from a specific event ID.
Event Types
message_start — a new agent message turn has started. This marks the beginning of the agent generating a response.
content_block_start — a new content block (text or tool call) has started within the current message.
content_block_delta — an incremental chunk of content within the current block. For text, this is a text delta. For tool calls, this is a partial JSON input delta.
content_block_stop — the current content block is complete.
message_stop — the current message turn is complete. Contains the final stop_reason (end_turn, tool_use, max_tokens).
tool_use — the agent is calling a tool. Contains tool name and input.
tool_result — a tool call has returned a result.
session_complete — the session has reached a terminal state. Contains final status and any output metadata.
error — an error occurred. The session may or may not continue depending on the error type.
Polling vs Streaming
Streaming is appropriate when:
- The session runs for minutes and you want to show progress to a user
- You need to log events in real time for observability
- You want to react to specific events (like a tool call) while the session is in progress
Polling is appropriate when:
- The session runs for hours and holding a streaming connection open is not practical
- Your integration is fire-and-forget — you just need the final result
- You are building a batch pipeline that checks session statuses on a schedule
import time
def wait_for_completion(session_id: str, poll_interval: int = 30) -> dict:
"""Poll a session until it reaches a terminal state."""
while True:
session = client.beta.agents.sessions.retrieve(session_id)
if session.status == "COMPLETED":
return {"status": "completed", "output": session.output}
elif session.status == "FAILED":
return {"status": "failed", "error": session.error}
elif session.status == "CANCELLED":
return {"status": "cancelled"}
print(f"Session {session_id}: {session.status} — waiting {poll_interval}s")
time.sleep(poll_interval)
For multi-hour pipelines, poll every 60-120 seconds. For shorter tasks, streaming is cleaner.
Session Threads
In multi-agent sessions where an orchestrator dispatches to subagents, events are organized into threads. Each thread corresponds to one agent's execution within the broader session.
The session_thread_id field on events tells you which thread (and therefore which agent) produced each event:
with client.beta.agents.sessions.stream(session.id) as stream:
for event in stream:
thread_id = getattr(event, 'session_thread_id', 'main')
if event.type == "content_block_delta":
if hasattr(event.delta, 'text'):
print(f"[{thread_id}] {event.delta.text}", end="")
This separation is critical for production observability. When an orchestrator delegates to three parallel subagents, you need to know which agent produced which output — not just that output exists.
Reconnection and Resume
If your streaming connection drops, you can resume from a specific event:
# Resume from a known event ID
with client.beta.agents.sessions.stream(
session.id,
after_event_id="evt_01xyz" # Resume from this event
) as stream:
for event in stream:
# Process events after the resume point
pass
Always log the last-seen event ID in your consumer. This enables safe reconnection without losing events or reprocessing events you already handled.