Privacy Policy© 2026 DEV BAK - TECH BLOG. All rights reserved.
DEV BAK - TECH BLOG
Claude

Multi-Agent Pipeline Design — State Sharing and Error Propagation Between Claude Agent SDK Orchestrators and Subagents

When first designing a multi-agent system, the most common question is "Can't you just hook up multiple agents?" I thought the same thing at first, but the story changes once you put it in production. You'll run into situations where a subagent fails but the orchestrator moves on to the next step without noticing, or where 10 agents simultaneously fire retries and API requests explode. These two issues were, in fact, the most common reasons pipelines quietly die in production.

This article walks through — with code — how to actually design the orchestrator-subagent pattern in the Claude Agent SDK: how to share state between agents and how to prevent errors from bringing down the entire pipeline. By the end, you'll be able to implement a pipeline that runs parallel agents without retry storms and doesn't silently swallow subagent failures.

Before you start: Python 3.11+, pip install anthropic, and the environment variable ANTHROPIC_API_KEY must be configured.


Core Concepts

What Is the Claude Agent SDK

The Claude Agent SDK is a Python/TypeScript runtime that Anthropic has open-sourced from the agent loop and context management system used internally in Claude Code. It lets you build agents that autonomously combine tools — file reading/writing, command execution, web search, code editing — to perform tasks.

Claude Managed Agents: A higher-level layer released in public beta in April 2026. It is a service that handles orchestration, sandboxing, session state management, and error recovery on hosted infrastructure, using the /v1/agents endpoint and the managed-agents-2026-04-01 beta header.

The Orchestrator-Subagent Pattern

The core of this pattern is clearly separating "who needs to know what."

  • Orchestrator: Receives the overall task, decomposes it into subtasks, delegates them, and synthesizes the results. Sees the big picture.
  • Subagent: Does only the one thing it was instructed to do. Its internal process is isolated from the orchestrator; it returns only the final result.
css
Orchestrator
  ├── Task decomposition
  ├── Delegate to Subagent A ──▶ [Independent context] ──▶ Return result
  ├── Delegate to Subagent B ──▶ [Independent context] ──▶ Return result
  └── Synthesize results

There is one important design principle here.

Context Window Isolation: The context window refers to the length of text a model can process at once. Because each subagent starts with an independent context window, intermediate tool call results inside a subagent stay within that subagent — only the final message reaches the orchestrator.

This isolation prevents the orchestrator's context from being polluted in large tasks. Conversely, it means the only channel through which the orchestrator can communicate with subagents is a prompt string. Everything the subagent needs — file paths, error messages, decisions — must be packed into that prompt.

How to Share State

Before looking at the table, let's briefly cover the concept of Event Sourcing.

Event Sourcing: Instead of storing state directly, this pattern stores an ordered event log that records state mutations. It is advantageous for rolling back to a specific point in time or tracing conflicts.

In the orchestrator-subagent architecture, there are three main approaches to state sharing.

Approach Description When It Fits
Prompt serial passing Directly include previous step results in the next agent's prompt When data size is small and the pipeline is sequential
Shared KV store Save intermediate results to an external store like Redis; subsequent agents read from it When parallel agents reference the same data
Event sourcing Record state mutations as an event log; agents replay as needed When conflict resolution or audit trails are required

Honestly, starting with prompt serial passing is the easiest, and transitioning to a KV store as complexity grows is a realistic progression.


Practical Application

Example 1: Planner–Generator–Evaluator Pipeline

This is a pattern directly recommended by the Anthropic engineering blog. It can be applied to code generation, documentation writing, analysis reports, and more.

python
import anthropic
import asyncio
import json
from typing import TypedDict
 
# Using AsyncAnthropic — does not block the event loop when running in parallel with asyncio.gather
async_client = anthropic.AsyncAnthropic()
 
# Recommended to start with a lower-cost model and swap as needed
MODEL = "claude-haiku-4-5-20251001"
 
class PipelineState(TypedDict):
    task: str
    plan: str | None
    output: str | None
    evaluation: str | None
    passed: bool
 
async def run_subagent(system_prompt: str, user_message: str) -> str:
    """Run a single subagent — returns only the result text"""
    response = await async_client.messages.create(
        model=MODEL,
        max_tokens=4096,
        system=system_prompt,
        messages=[{"role": "user", "content": user_message}],
    )
    return response.content[0].text
 
async def planner_agent(task: str) -> str:
    return await run_subagent(
        system_prompt="You are a planner that designs task structure. Clearly describe the goal and a step-by-step approach.",
        user_message=f"Analyze the following task and write an execution plan:\n\n{task}",
    )
 
async def generator_agent(task: str, plan: str) -> str:
    return await run_subagent(
        system_prompt="You are an execution agent. Generate the deliverable according to the given plan.",
        user_message=f"Task: {task}\n\nPlan:\n{plan}\n\nWrite the deliverable according to the plan above.",
    )
 
async def evaluator_agent(task: str, output: str) -> tuple[bool, str]:
    result = await run_subagent(
        system_prompt=(
            "You are an independent quality verification agent. "
            "Determine whether the deliverable meets the task objectives, "
            "and respond only in the following JSON format:\n"
            '{"status": "pass" or "fail", "reason": "reason for judgment"}'
        ),
        user_message=f"Task: {task}\n\nDeliverable:\n{output}\n\nVerify the quality.",
    )
    try:
        data = json.loads(result)
        passed = data.get("status") == "pass"
    except json.JSONDecodeError:
        passed = False
    return passed, result
 
async def orchestrator(task: str) -> PipelineState:
    state: PipelineState = {
        "task": task,
        "plan": None,
        "output": None,
        "evaluation": None,
        "passed": False,
    }
 
    print("[Orchestrator] Running planner subagent...")
    state["plan"] = await planner_agent(task)
 
    print("[Orchestrator] Running generator subagent...")
    state["output"] = await generator_agent(task, state["plan"])
 
    print("[Orchestrator] Running evaluator subagent...")
    passed, evaluation = await evaluator_agent(task, state["output"])
    state["evaluation"] = evaluation
    state["passed"] = passed
 
    return state
 
async def main():
    result = await orchestrator(
        "Design and implement a simple to-do management CLI tool in Python."
    )
    print(f"\nFinal result — PASS: {result['passed']}")
 
asyncio.run(main())
Step Role Passed Information
planner_agent Establish structure and objectives Task string
generator_agent Generate the actual deliverable Task + plan
evaluator_agent Independent quality verification Task + deliverable

When I initially ran the evaluator in the same context as the generator, strange noise crept into the evaluation scores. The generator's self-justifying statements mid-process leaked into the evaluator's context, preventing the evaluator from assessing the deliverable objectively. Only after isolating each agent into an independent context did the evaluation actually start working independently.

Tradeoff of this pattern: With 3 stages, the cost is three times higher. If fast iteration is needed, dropping the planner and going with a 2-stage generator-evaluator is sufficient.


Example 2: Parallel Pipeline with a Shared KV Store

When agents running in parallel need to share intermediate results, relying solely on prompts becomes difficult. This pattern allows agents to share data without serial passing through the orchestrator.

python
import asyncio
import json
import time
from typing import Any
 
import anthropic
 
async_client = anthropic.AsyncAnthropic()
MODEL = "claude-haiku-4-5-20251001"
 
# In-memory KV store (replace with Redis or equivalent in production)
_store: dict[str, Any] = {}
 
def kv_set(key: str, value: Any) -> None:
    _store[key] = {"value": value, "timestamp": time.time()}
 
def kv_get(key: str) -> Any | None:
    entry = _store.get(key)
    return entry["value"] if entry else None
 
async def compliance_check_agent(run_id: str, domain: str, task: str) -> dict:
    """Per-domain compliance check subagent"""
    response = await async_client.messages.create(
        model=MODEL,
        max_tokens=512,
        system=(
            f"You are a {domain} compliance expert. "
            "Review the given transaction data and determine whether there are any issues. "
            'Respond only in JSON: {"status": "pass" or "fail", "issues": ["list of issues"]}'
        ),
        messages=[{"role": "user", "content": f"Transaction data:\n{task}"}],
    )
 
    try:
        parsed = json.loads(response.content[0].text)
    except json.JSONDecodeError:
        parsed = {"status": "error", "issues": ["Failed to parse response"]}
 
    result = {
        "domain": domain,
        "status": parsed.get("status", "error"),
        "issues": parsed.get("issues", []),
        "checked_at": time.time(),
    }
    kv_set(f"{run_id}:compliance:{domain}", result)
    print(f"  [{domain}] Check complete → saved to KV")
    return result
 
async def report_agent(run_id: str, domains: list[str]) -> str:
    """Collect results from KV and generate final report"""
    results = []
    for domain in domains:
        result = kv_get(f"{run_id}:compliance:{domain}")
        if result:
            results.append(result)
 
    passed = sum(1 for r in results if r["status"] == "pass")
    report = (
        f"Compliance check results: {passed}/{len(results)} passed\n"
        + "\n".join(f"- {r['domain']}: {r['status']}" for r in results)
    )
    return report
 
async def financial_compliance_orchestrator(transaction_data: str) -> str:
    run_id = f"run_{int(time.time())}"
    domains = ["AML", "KYC", "FATF"]
 
    print(f"[Orchestrator] Starting parallel compliance checks (run_id={run_id})")
 
    # Run independent checks in parallel
    await asyncio.gather(
        *[
            compliance_check_agent(run_id, domain, transaction_data)
            for domain in domains
        ]
    )
 
    print("[Orchestrator] Running report agent")
    return await report_agent(run_id, domains)
 
asyncio.run(financial_compliance_orchestrator("Sample transaction data"))

Each check agent doesn't need to know the others' results. It writes its own result to the shared KV, and the report agent collects everything at the end.

Tradeoff of this pattern: Without a KV store you can start with an in-memory dictionary, but in a multi-process environment it must be replaced with an external store like Redis. Also, if multiple agents write to the same key simultaneously, a separate conflict resolution strategy is needed.


Example 3: Blocking Error Propagation with a Circuit Breaker

This was the most painful part in practice. When a downstream API became briefly unstable overnight, 7 agents were each running retries, and API requests exploded. Without the Circuit Breaker pattern, it would have ended in a rate-limit storm.

python
import asyncio
import time
from enum import Enum
 
class CircuitOpenError(RuntimeError):
    """Dedicated exception raised when the Circuit Breaker is in OPEN state"""
    pass
 
class CircuitState(Enum):
    CLOSED = "closed"        # Normal operation
    OPEN = "open"            # Tripped, requests blocked
    HALF_OPEN = "half_open"  # Probing for recovery
 
class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time: float | None = None
        self.state = CircuitState.CLOSED
 
    def _should_attempt_reset(self) -> bool:
        if self.last_failure_time is None:
            return False
        return time.time() - self.last_failure_time >= self.recovery_timeout
 
    async def call(self, coro):
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
                print("[Circuit Breaker] HALF_OPEN — attempting recovery")
            else:
                raise CircuitOpenError("Circuit Breaker OPEN: request blocked")
 
        try:
            result = await coro
            self.failure_count = 0
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                print("[Circuit Breaker] CLOSED — recovery complete")
            return result
 
        except CircuitOpenError:
            raise
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
                print(
                    f"[Circuit Breaker] OPEN — {self.failure_count} failures, "
                    f"retry available after {self.recovery_timeout}s"
                )
            raise e
 
async def agent_with_retry(
    agent_fn,
    breaker: CircuitBreaker,
    max_retries: int = 3,
    base_delay: float = 1.0,
):
    """Exponential backoff retry + Circuit Breaker combination"""
    for attempt in range(max_retries):
        try:
            return await breaker.call(agent_fn())
        except CircuitOpenError:
            raise  # Do not retry on Circuit Breaker trip
        except Exception:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt)
            print(f"  Retry {attempt + 1}/{max_retries}, waiting {delay:.1f}s")
            await asyncio.sleep(delay)

Circuit Breaker Pattern: When consecutive failures exceed a threshold, it "trips" the circuit and immediately blocks subsequent requests. After a set time, it transitions to HALF_OPEN state to probe for recovery. The name comes from electrical circuit breakers.

There is a reason for having a dedicated CircuitOpenError exception class. If you distinguish exception types by string matching, it will silently break the moment an error message changes. Distinguishing by type lets you catch it cleanly with a single except CircuitOpenError:, and anyone using this code can immediately understand the intent.

Tradeoff of this pattern: If you have 2 or fewer subagents, simple retries are more appropriate than a Circuit Breaker. The Circuit Breaker carries the cost of managing shared state, so it pays off when the number of agents grows and failures risk propagating in a chain.


Pros and Cons Analysis

Advantages

Item Details
Prevents context overload Subagent intermediate results don't pollute the orchestrator's context, maintaining quality even in large tasks
Parallel processing Independent subtasks can be executed simultaneously, reducing overall completion time
Role specialization Each subagent receives instructions specialized to a narrow domain, enabling higher accuracy
Model selection flexibility Combining Haiku for simple tasks and Opus for complex reasoning lets you balance cost and performance
Independent verification Separating the Evaluator into its own agent enables unbiased evaluation unaffected by the Generator's context

Disadvantages and Caveats

Item Details Mitigation
Error cascade If a subagent failure is not detected at the orchestrator level, incorrect results silently flow to the next stage Include an explicit status field in subagent return values; add validation logic at the orchestrator level
Retry storm If each of 10 agents has independent retry logic, a single service failure causes requests to increase exponentially Manage Circuit Breaker in shared state
Cost explosion If subagents recursively spawn more subagents, costs grow rapidly Set a per-run agent count cap and depth limit
State inconsistency Concurrent writes by multiple agents to a shared store can cause conflicts Define a conflict resolution strategy; apply locking or event sourcing
Unsuitable domains Tasks with tight interdependencies between agents, or where all agents must share the same context Anthropic also recommends a single agent in these cases

Most Common Mistakes in Practice

  1. Not including a success/failure status in subagent return values: Our team encountered a situation where the output was wrong but no errors were raised. Agents always "return something," but if the orchestrator has no way to distinguish a normal result from an error, incorrect data quietly flows to the next stage. It is recommended to explicitly include status: "success" | "error" and error_message in the return schema.

  2. Embedding independent retry logic in each agent without a Circuit Breaker: This seems fine locally, but in production, API rate limits or downstream service failures make request storms a reality. A structure that manages the Circuit Breaker in shared state is far safer.

  3. Stacking agent layers infinitely deep: When a subagent calls another subagent, which calls yet another, costs and debugging complexity both explode. Once you start going beyond 2–3 levels deep, it's worth first checking whether you can simplify to a single agent.


Closing Thoughts

The orchestrator-subagent pattern is the work of simultaneously designing "how to isolate" and "how to connect." Context isolation makes it possible to handle large tasks, but that same isolation makes state sharing and error propagation tricky.

Three steps you can start with right now:

  1. Implement the Planner–Generator–Evaluator pattern as a single pipeline. Plugging your own task string directly into the orchestrator() function in Example 1 and running it is the fastest starting point for experiencing how each agent runs in an independent context. That said, if you have 2 or fewer subagents, starting with a simple single function chain is sufficient rather than this 3-stage structure.

  2. Add a status field to subagent return schemas and attach validation logic at the orchestrator level to stop the pipeline when status == "error". This one change alone can catch 80% of silent failure issues.

  3. When the number of agents grows to 3 or more, introduce the CircuitBreaker class as shared state. Simply wrapping each agent call in breaker.call() using the implementation from Example 3 can significantly reduce the risk of retry storms. If you have 2 or fewer agents, a Circuit Breaker is overkill. Simple retries are often sufficient.


References

  • Building agents with the Claude Agent SDK | Anthropic Engineering
  • Claude Managed Agents Overview — Official API Docs
  • Multiagent sessions — Claude API Docs
  • Subagents in the SDK — Claude API Docs
  • Create custom subagents — Claude Code Docs
  • Scaling Managed Agents: Decoupling the brain from the runtime | Anthropic Engineering
  • Equipping agents for the real world with Agent Skills | Anthropic Engineering
  • Retry Storms in Multi-Agent LangGraph Systems: Circuit Breaker Fix (2026)
  • 6 Multi-Agent Orchestration Patterns for Production (2026)
  • Multi-Agent in Production 2026: 3 Patterns That Survived
  • Best Practices for Multi-Agent Orchestration with Claude (GitHub Discussions)
#멀티에이전트#ClaudeAgentSDK#오케스트레이터패턴#CircuitBreaker#Python#asyncio#이벤트소싱#상태공유#서브에이전트#파이프라인설계
Share

Table of Contents

Core ConceptsWhat Is the Claude Agent SDKThe Orchestrator-Subagent PatternHow to Share StatePractical ApplicationExample 1: Planner–Generator–Evaluator PipelineExample 2: Parallel Pipeline with a Shared KV StoreExample 3: Blocking Error Propagation with a Circuit BreakerPros and Cons AnalysisAdvantagesDisadvantages and CaveatsMost Common Mistakes in PracticeClosing ThoughtsReferences

Recommended Posts

Claude Opus 4.8 Dynamic Workflows and Effort Control — A Structure for Automating Codebase Migration with Parallel Agents
Claude

Claude Opus 4.8 Dynamic Workflows and Effort Control — A Structure for Automating Codebase Migration with Parallel Agents

When I first saw Claude Opus 4.8, released by Anthropic on May 28, 2026, I honestly thought, "Just another update with a bumped version number." After all, it h...

May 30, 202620 min read
Claude Code Hooks — Controlling Agent Tool Execution in Code with PreToolUse·PostToolUse
Claude

Claude Code Hooks — Controlling Agent Tool Execution in Code with PreToolUse·PostToolUse

Based on official documentation | Claude Code hooks · PreToolUse · PostToolUse Not long after adopting Claude Code, I felt a familiar anxiety: "What if Claud...

May 30, 202620 min read
Claude Code /goal & Session Management: How to Continue Multi-Day Tasks with AI Without Losing Your Place
Claude

Claude Code /goal & Session Management: How to Continue Multi-Day Tasks with AI Without Losing Your Place

If you've used Claude Code for any length of time, you've probably hit this wall: "I have no idea what Claude is actually doing right now." It reads files, edit...

May 12, 202617 min read
How to Declaratively Separate Team-Based AI Tool Access Permissions Using Claude Code MCP and `.claude/rules/`
Claude

How to Declaratively Separate Team-Based AI Tool Access Permissions Using Claude Code MCP and `.claude/rules/`

When rolling out AI coding tools across an entire team, you hit a familiar concern sooner than you'd expect: "Backend developers need database access, but what ...

May 6, 202621 min read
How to Modularize Team-Specific AI Rules with `Claude Code .claude/rules/` — A Separation Strategy for Frontend, Backend, and Security Teams
Claude

How to Modularize Team-Specific AI Rules with `Claude Code .claude/rules/` — A Separation Strategy for Frontend, Backend, and Security Teams

Have you ever experienced growing bloated as your team scales? I started out cramming all the rules into a single file, and at some point it crossed 500 lines ...

May 6, 202615 min read
Customizing the Claude Code Status Line — How to Always Display Session Info in Your Terminal
Claude

Customizing the Claude Code Status Line — How to Always Display Session Info in Your Terminal

Honestly, when I first used Claude Code, what made me most anxious was things like "How much context am I using right now?" and "How much is this costing me?" C...

April 20, 202618 min read