Multi-Agent Pipeline Design — State Sharing and Error Propagation Between Claude Agent SDK Orchestrators and Subagents
When first designing a multi-agent system, the most common question is "Can't you just hook up multiple agents?" I thought the same thing at first, but the story changes once you put it in production. You'll run into situations where a subagent fails but the orchestrator moves on to the next step without noticing, or where 10 agents simultaneously fire retries and API requests explode. These two issues were, in fact, the most common reasons pipelines quietly die in production.
This article walks through — with code — how to actually design the orchestrator-subagent pattern in the Claude Agent SDK: how to share state between agents and how to prevent errors from bringing down the entire pipeline. By the end, you'll be able to implement a pipeline that runs parallel agents without retry storms and doesn't silently swallow subagent failures.
Before you start: Python 3.11+,
pip install anthropic, and the environment variableANTHROPIC_API_KEYmust be configured.
Core Concepts
What Is the Claude Agent SDK
The Claude Agent SDK is a Python/TypeScript runtime that Anthropic has open-sourced from the agent loop and context management system used internally in Claude Code. It lets you build agents that autonomously combine tools — file reading/writing, command execution, web search, code editing — to perform tasks.
Claude Managed Agents: A higher-level layer released in public beta in April 2026. It is a service that handles orchestration, sandboxing, session state management, and error recovery on hosted infrastructure, using the
/v1/agentsendpoint and themanaged-agents-2026-04-01beta header.
The Orchestrator-Subagent Pattern
The core of this pattern is clearly separating "who needs to know what."
- Orchestrator: Receives the overall task, decomposes it into subtasks, delegates them, and synthesizes the results. Sees the big picture.
- Subagent: Does only the one thing it was instructed to do. Its internal process is isolated from the orchestrator; it returns only the final result.
Orchestrator
├── Task decomposition
├── Delegate to Subagent A ──▶ [Independent context] ──▶ Return result
├── Delegate to Subagent B ──▶ [Independent context] ──▶ Return result
└── Synthesize resultsThere is one important design principle here.
Context Window Isolation: The context window refers to the length of text a model can process at once. Because each subagent starts with an independent context window, intermediate tool call results inside a subagent stay within that subagent — only the final message reaches the orchestrator.
This isolation prevents the orchestrator's context from being polluted in large tasks. Conversely, it means the only channel through which the orchestrator can communicate with subagents is a prompt string. Everything the subagent needs — file paths, error messages, decisions — must be packed into that prompt.
How to Share State
Before looking at the table, let's briefly cover the concept of Event Sourcing.
Event Sourcing: Instead of storing state directly, this pattern stores an ordered event log that records state mutations. It is advantageous for rolling back to a specific point in time or tracing conflicts.
In the orchestrator-subagent architecture, there are three main approaches to state sharing.
| Approach | Description | When It Fits |
|---|---|---|
| Prompt serial passing | Directly include previous step results in the next agent's prompt | When data size is small and the pipeline is sequential |
| Shared KV store | Save intermediate results to an external store like Redis; subsequent agents read from it | When parallel agents reference the same data |
| Event sourcing | Record state mutations as an event log; agents replay as needed | When conflict resolution or audit trails are required |
Honestly, starting with prompt serial passing is the easiest, and transitioning to a KV store as complexity grows is a realistic progression.
Practical Application
Example 1: Planner–Generator–Evaluator Pipeline
This is a pattern directly recommended by the Anthropic engineering blog. It can be applied to code generation, documentation writing, analysis reports, and more.
import anthropic
import asyncio
import json
from typing import TypedDict
# Using AsyncAnthropic — does not block the event loop when running in parallel with asyncio.gather
async_client = anthropic.AsyncAnthropic()
# Recommended to start with a lower-cost model and swap as needed
MODEL = "claude-haiku-4-5-20251001"
class PipelineState(TypedDict):
task: str
plan: str | None
output: str | None
evaluation: str | None
passed: bool
async def run_subagent(system_prompt: str, user_message: str) -> str:
"""Run a single subagent — returns only the result text"""
response = await async_client.messages.create(
model=MODEL,
max_tokens=4096,
system=system_prompt,
messages=[{"role": "user", "content": user_message}],
)
return response.content[0].text
async def planner_agent(task: str) -> str:
return await run_subagent(
system_prompt="You are a planner that designs task structure. Clearly describe the goal and a step-by-step approach.",
user_message=f"Analyze the following task and write an execution plan:\n\n{task}",
)
async def generator_agent(task: str, plan: str) -> str:
return await run_subagent(
system_prompt="You are an execution agent. Generate the deliverable according to the given plan.",
user_message=f"Task: {task}\n\nPlan:\n{plan}\n\nWrite the deliverable according to the plan above.",
)
async def evaluator_agent(task: str, output: str) -> tuple[bool, str]:
result = await run_subagent(
system_prompt=(
"You are an independent quality verification agent. "
"Determine whether the deliverable meets the task objectives, "
"and respond only in the following JSON format:\n"
'{"status": "pass" or "fail", "reason": "reason for judgment"}'
),
user_message=f"Task: {task}\n\nDeliverable:\n{output}\n\nVerify the quality.",
)
try:
data = json.loads(result)
passed = data.get("status") == "pass"
except json.JSONDecodeError:
passed = False
return passed, result
async def orchestrator(task: str) -> PipelineState:
state: PipelineState = {
"task": task,
"plan": None,
"output": None,
"evaluation": None,
"passed": False,
}
print("[Orchestrator] Running planner subagent...")
state["plan"] = await planner_agent(task)
print("[Orchestrator] Running generator subagent...")
state["output"] = await generator_agent(task, state["plan"])
print("[Orchestrator] Running evaluator subagent...")
passed, evaluation = await evaluator_agent(task, state["output"])
state["evaluation"] = evaluation
state["passed"] = passed
return state
async def main():
result = await orchestrator(
"Design and implement a simple to-do management CLI tool in Python."
)
print(f"\nFinal result — PASS: {result['passed']}")
asyncio.run(main())| Step | Role | Passed Information |
|---|---|---|
planner_agent |
Establish structure and objectives | Task string |
generator_agent |
Generate the actual deliverable | Task + plan |
evaluator_agent |
Independent quality verification | Task + deliverable |
When I initially ran the evaluator in the same context as the generator, strange noise crept into the evaluation scores. The generator's self-justifying statements mid-process leaked into the evaluator's context, preventing the evaluator from assessing the deliverable objectively. Only after isolating each agent into an independent context did the evaluation actually start working independently.
Tradeoff of this pattern: With 3 stages, the cost is three times higher. If fast iteration is needed, dropping the planner and going with a 2-stage generator-evaluator is sufficient.
Example 2: Parallel Pipeline with a Shared KV Store
When agents running in parallel need to share intermediate results, relying solely on prompts becomes difficult. This pattern allows agents to share data without serial passing through the orchestrator.
import asyncio
import json
import time
from typing import Any
import anthropic
async_client = anthropic.AsyncAnthropic()
MODEL = "claude-haiku-4-5-20251001"
# In-memory KV store (replace with Redis or equivalent in production)
_store: dict[str, Any] = {}
def kv_set(key: str, value: Any) -> None:
_store[key] = {"value": value, "timestamp": time.time()}
def kv_get(key: str) -> Any | None:
entry = _store.get(key)
return entry["value"] if entry else None
async def compliance_check_agent(run_id: str, domain: str, task: str) -> dict:
"""Per-domain compliance check subagent"""
response = await async_client.messages.create(
model=MODEL,
max_tokens=512,
system=(
f"You are a {domain} compliance expert. "
"Review the given transaction data and determine whether there are any issues. "
'Respond only in JSON: {"status": "pass" or "fail", "issues": ["list of issues"]}'
),
messages=[{"role": "user", "content": f"Transaction data:\n{task}"}],
)
try:
parsed = json.loads(response.content[0].text)
except json.JSONDecodeError:
parsed = {"status": "error", "issues": ["Failed to parse response"]}
result = {
"domain": domain,
"status": parsed.get("status", "error"),
"issues": parsed.get("issues", []),
"checked_at": time.time(),
}
kv_set(f"{run_id}:compliance:{domain}", result)
print(f" [{domain}] Check complete → saved to KV")
return result
async def report_agent(run_id: str, domains: list[str]) -> str:
"""Collect results from KV and generate final report"""
results = []
for domain in domains:
result = kv_get(f"{run_id}:compliance:{domain}")
if result:
results.append(result)
passed = sum(1 for r in results if r["status"] == "pass")
report = (
f"Compliance check results: {passed}/{len(results)} passed\n"
+ "\n".join(f"- {r['domain']}: {r['status']}" for r in results)
)
return report
async def financial_compliance_orchestrator(transaction_data: str) -> str:
run_id = f"run_{int(time.time())}"
domains = ["AML", "KYC", "FATF"]
print(f"[Orchestrator] Starting parallel compliance checks (run_id={run_id})")
# Run independent checks in parallel
await asyncio.gather(
*[
compliance_check_agent(run_id, domain, transaction_data)
for domain in domains
]
)
print("[Orchestrator] Running report agent")
return await report_agent(run_id, domains)
asyncio.run(financial_compliance_orchestrator("Sample transaction data"))Each check agent doesn't need to know the others' results. It writes its own result to the shared KV, and the report agent collects everything at the end.
Tradeoff of this pattern: Without a KV store you can start with an in-memory dictionary, but in a multi-process environment it must be replaced with an external store like Redis. Also, if multiple agents write to the same key simultaneously, a separate conflict resolution strategy is needed.
Example 3: Blocking Error Propagation with a Circuit Breaker
This was the most painful part in practice. When a downstream API became briefly unstable overnight, 7 agents were each running retries, and API requests exploded. Without the Circuit Breaker pattern, it would have ended in a rate-limit storm.
import asyncio
import time
from enum import Enum
class CircuitOpenError(RuntimeError):
"""Dedicated exception raised when the Circuit Breaker is in OPEN state"""
pass
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Tripped, requests blocked
HALF_OPEN = "half_open" # Probing for recovery
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time: float | None = None
self.state = CircuitState.CLOSED
def _should_attempt_reset(self) -> bool:
if self.last_failure_time is None:
return False
return time.time() - self.last_failure_time >= self.recovery_timeout
async def call(self, coro):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
print("[Circuit Breaker] HALF_OPEN — attempting recovery")
else:
raise CircuitOpenError("Circuit Breaker OPEN: request blocked")
try:
result = await coro
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
print("[Circuit Breaker] CLOSED — recovery complete")
return result
except CircuitOpenError:
raise
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(
f"[Circuit Breaker] OPEN — {self.failure_count} failures, "
f"retry available after {self.recovery_timeout}s"
)
raise e
async def agent_with_retry(
agent_fn,
breaker: CircuitBreaker,
max_retries: int = 3,
base_delay: float = 1.0,
):
"""Exponential backoff retry + Circuit Breaker combination"""
for attempt in range(max_retries):
try:
return await breaker.call(agent_fn())
except CircuitOpenError:
raise # Do not retry on Circuit Breaker trip
except Exception:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f" Retry {attempt + 1}/{max_retries}, waiting {delay:.1f}s")
await asyncio.sleep(delay)Circuit Breaker Pattern: When consecutive failures exceed a threshold, it "trips" the circuit and immediately blocks subsequent requests. After a set time, it transitions to HALF_OPEN state to probe for recovery. The name comes from electrical circuit breakers.
There is a reason for having a dedicated CircuitOpenError exception class. If you distinguish exception types by string matching, it will silently break the moment an error message changes. Distinguishing by type lets you catch it cleanly with a single except CircuitOpenError:, and anyone using this code can immediately understand the intent.
Tradeoff of this pattern: If you have 2 or fewer subagents, simple retries are more appropriate than a Circuit Breaker. The Circuit Breaker carries the cost of managing shared state, so it pays off when the number of agents grows and failures risk propagating in a chain.
Pros and Cons Analysis
Advantages
| Item | Details |
|---|---|
| Prevents context overload | Subagent intermediate results don't pollute the orchestrator's context, maintaining quality even in large tasks |
| Parallel processing | Independent subtasks can be executed simultaneously, reducing overall completion time |
| Role specialization | Each subagent receives instructions specialized to a narrow domain, enabling higher accuracy |
| Model selection flexibility | Combining Haiku for simple tasks and Opus for complex reasoning lets you balance cost and performance |
| Independent verification | Separating the Evaluator into its own agent enables unbiased evaluation unaffected by the Generator's context |
Disadvantages and Caveats
| Item | Details | Mitigation |
|---|---|---|
| Error cascade | If a subagent failure is not detected at the orchestrator level, incorrect results silently flow to the next stage | Include an explicit status field in subagent return values; add validation logic at the orchestrator level |
| Retry storm | If each of 10 agents has independent retry logic, a single service failure causes requests to increase exponentially | Manage Circuit Breaker in shared state |
| Cost explosion | If subagents recursively spawn more subagents, costs grow rapidly | Set a per-run agent count cap and depth limit |
| State inconsistency | Concurrent writes by multiple agents to a shared store can cause conflicts | Define a conflict resolution strategy; apply locking or event sourcing |
| Unsuitable domains | Tasks with tight interdependencies between agents, or where all agents must share the same context | Anthropic also recommends a single agent in these cases |
Most Common Mistakes in Practice
-
Not including a success/failure status in subagent return values: Our team encountered a situation where the output was wrong but no errors were raised. Agents always "return something," but if the orchestrator has no way to distinguish a normal result from an error, incorrect data quietly flows to the next stage. It is recommended to explicitly include
status: "success" | "error"anderror_messagein the return schema. -
Embedding independent retry logic in each agent without a Circuit Breaker: This seems fine locally, but in production, API rate limits or downstream service failures make request storms a reality. A structure that manages the Circuit Breaker in shared state is far safer.
-
Stacking agent layers infinitely deep: When a subagent calls another subagent, which calls yet another, costs and debugging complexity both explode. Once you start going beyond 2–3 levels deep, it's worth first checking whether you can simplify to a single agent.
Closing Thoughts
The orchestrator-subagent pattern is the work of simultaneously designing "how to isolate" and "how to connect." Context isolation makes it possible to handle large tasks, but that same isolation makes state sharing and error propagation tricky.
Three steps you can start with right now:
-
Implement the Planner–Generator–Evaluator pattern as a single pipeline. Plugging your own task string directly into the
orchestrator()function in Example 1 and running it is the fastest starting point for experiencing how each agent runs in an independent context. That said, if you have 2 or fewer subagents, starting with a simple single function chain is sufficient rather than this 3-stage structure. -
Add a
statusfield to subagent return schemas and attach validation logic at the orchestrator level to stop the pipeline whenstatus == "error". This one change alone can catch 80% of silent failure issues. -
When the number of agents grows to 3 or more, introduce the
CircuitBreakerclass as shared state. Simply wrapping each agent call inbreaker.call()using the implementation from Example 3 can significantly reduce the risk of retry storms. If you have 2 or fewer agents, a Circuit Breaker is overkill. Simple retries are often sufficient.
References
- Building agents with the Claude Agent SDK | Anthropic Engineering
- Claude Managed Agents Overview — Official API Docs
- Multiagent sessions — Claude API Docs
- Subagents in the SDK — Claude API Docs
- Create custom subagents — Claude Code Docs
- Scaling Managed Agents: Decoupling the brain from the runtime | Anthropic Engineering
- Equipping agents for the real world with Agent Skills | Anthropic Engineering
- Retry Storms in Multi-Agent LangGraph Systems: Circuit Breaker Fix (2026)
- 6 Multi-Agent Orchestration Patterns for Production (2026)
- Multi-Agent in Production 2026: 3 Patterns That Survived
- Best Practices for Multi-Agent Orchestration with Claude (GitHub Discussions)