멀티에이전트 파이프라인 설계 — Claude Agent SDK 오케스트레이터·서브에이전트 상태 공유와 오류 전파 처리
멀티에이전트 시스템을 처음 설계할 때 가장 많이 듣는 말이 "그냥 에이전트 여러 개 붙이면 되는 거 아닌가요?"입니다. 저도 처음엔 그렇게 생각했는데, 실제로 프로덕션에 올려보면 이야기가 달라집니다. 서브에이전트 하나가 실패했는데 오케스트레이터가 그걸 모른 채 다음 단계로 넘어가거나, 10개 에이전트가 동시에 재시도를 쏘아대며 API 요청이 폭발하는 상황을 만나게 됩니다. 사실 이 두 가지가 프로덕션에서 파이프라인을 조용히 죽이는 가장 흔한 원인이었습니다.
이 글은 Claude Agent SDK의 오케스트레이터-서브에이전트 패턴을 실제로 어떻게 설계하면 좋을지, 에이전트 간 상태를 어떻게 공유하고 오류가 파이프라인 전체를 무너뜨리지 않도록 막을 수 있는지를 코드와 함께 풀어냅니다. 이 글을 읽고 나면 재시도 폭풍 없이 병렬 에이전트를 돌리고, 서브에이전트 실패를 조용히 흘려보내지 않는 파이프라인을 직접 구현할 수 있습니다.
시작하기 전에: Python 3.11+,
pip install anthropic, 환경 변수ANTHROPIC_API_KEY설정이 필요합니다.
핵심 개념
Claude Agent SDK란 무엇인가
Claude Agent SDK는 Anthropic이 Claude Code 내부에서 쓰던 에이전트 루프와 컨텍스트 관리 시스템을 Python/TypeScript로 공개한 런타임입니다. 파일 읽기·쓰기, 명령 실행, 웹 검색, 코드 편집 같은 도구를 에이전트가 자율적으로 조합해서 태스크를 수행하도록 만들 수 있습니다.
Claude Managed Agents: 2026년 4월 공개 베타로 출시된 상위 레이어. 오케스트레이션·샌드박싱·세션 상태 관리·오류 복구를 호스팅 인프라에서 처리해주는 서비스로,
/v1/agents엔드포인트와managed-agents-2026-04-01베타 헤더를 사용합니다.
오케스트레이터-서브에이전트 패턴
이 패턴의 핵심은 "누가 알고 있어야 하는가"를 명확히 나누는 데 있습니다.
- 오케스트레이터: 전체 태스크를 받아서 하위 태스크로 분해하고 위임한 다음, 결과를 종합합니다. 큰 그림을 봅니다.
- 서브에이전트: 지시받은 한 가지 일만 합니다. 내부 과정은 오케스트레이터와 격리되어 있고, 최종 결과만 반환합니다.
Orchestrator
├── 태스크 분해
├── 서브에이전트 A 위임 ──▶ [독립 컨텍스트] ──▶ 결과 반환
├── 서브에이전트 B 위임 ──▶ [독립 컨텍스트] ──▶ 결과 반환
└── 결과 종합여기서 중요한 설계 원칙이 하나 있습니다.
컨텍스트 윈도우 격리: 컨텍스트 윈도우란 모델이 한 번에 처리할 수 있는 텍스트의 길이를 뜻합니다. 각 서브에이전트는 독립된 컨텍스트 윈도우에서 시작하기 때문에, 서브에이전트 내부의 중간 툴 호출 결과는 해당 서브에이전트 안에 머물고 오케스트레이터에는 최종 메시지만 도달합니다.
이 격리 덕분에 대형 태스크에서 오케스트레이터 컨텍스트가 오염되는 문제를 막을 수 있습니다. 반대로 말하면, 오케스트레이터가 서브에이전트에게 전달할 수 있는 유일한 채널이 프롬프트 문자열뿐입니다. 파일 경로, 에러 메시지, 결정 사항 등 필요한 모든 것을 그 프롬프트에 담아야 합니다.
상태를 어떻게 공유할까
표를 보기 전에 이벤트 소싱 개념을 잠깐 짚고 가겠습니다.
이벤트 소싱 (Event Sourcing): 상태를 직접 저장하는 대신 상태 변이를 순서대로 기록한 이벤트 로그를 저장하는 패턴입니다. 특정 시점으로 롤백하거나 충돌을 추적하는 데 유리합니다.
오케스트레이터-서브에이전트 구조에서 상태 공유 방식은 크게 세 가지로 나뉩니다.
| 방식 | 설명 | 적합한 경우 |
|---|---|---|
| 프롬프트 직렬 전달 | 이전 단계 결과를 다음 에이전트 프롬프트에 직접 포함 | 데이터 크기가 작고 순차 파이프라인일 때 |
| 공유 KV 스토어 | Redis 등 외부 저장소에 중간 결과 저장, 후속 에이전트가 읽음 | 병렬 에이전트가 같은 데이터를 참조할 때 |
| 이벤트 소싱 | 상태 변이를 이벤트 로그로 기록, 에이전트가 필요한 시점에 재생 | 충돌 해결·감사 추적이 필요할 때 |
솔직히 말하면 프롬프트 직렬 전달로 시작하는 게 제일 쉽고, 복잡성이 커질 때 KV 스토어로 넘어가는 게 현실적인 흐름입니다.
실전 적용
예시 1: Planner–Generator–Evaluator 파이프라인
Anthropic 엔지니어링 블로그에서 직접 권장하는 패턴입니다. 코드 생성, 문서 작성, 분석 리포트 등 다양한 곳에 쓸 수 있습니다.
import anthropic
import asyncio
import json
from typing import TypedDict
# AsyncAnthropic 사용 — asyncio.gather로 병렬 실행 시 이벤트 루프를 블록하지 않습니다
async_client = anthropic.AsyncAnthropic()
# 비용이 낮은 모델로 먼저 시작해 필요에 따라 교체하는 것을 권장합니다
MODEL = "claude-haiku-4-5-20251001"
class PipelineState(TypedDict):
task: str
plan: str | None
output: str | None
evaluation: str | None
passed: bool
async def run_subagent(system_prompt: str, user_message: str) -> str:
"""단일 서브에이전트 실행 — 결과 텍스트만 반환"""
response = await async_client.messages.create(
model=MODEL,
max_tokens=4096,
system=system_prompt,
messages=[{"role": "user", "content": user_message}],
)
return response.content[0].text
async def planner_agent(task: str) -> str:
return await run_subagent(
system_prompt="당신은 작업 구조를 설계하는 플래너입니다. 목표와 단계별 접근법을 명확히 서술하세요.",
user_message=f"다음 태스크를 분석하고 실행 계획을 작성하세요:\n\n{task}",
)
async def generator_agent(task: str, plan: str) -> str:
return await run_subagent(
system_prompt="당신은 실행 에이전트입니다. 주어진 계획대로 결과물을 생성하세요.",
user_message=f"태스크: {task}\n\n계획:\n{plan}\n\n위 계획에 따라 결과물을 작성하세요.",
)
async def evaluator_agent(task: str, output: str) -> tuple[bool, str]:
result = await run_subagent(
system_prompt=(
"당신은 독립적인 품질 검증 에이전트입니다. "
"결과물이 태스크 목표를 충족하는지 판단하고, "
"반드시 다음 JSON 형식으로만 응답하세요:\n"
'{"status": "pass" 또는 "fail", "reason": "판단 이유"}'
),
user_message=f"태스크: {task}\n\n결과물:\n{output}\n\n품질을 검증하세요.",
)
try:
data = json.loads(result)
passed = data.get("status") == "pass"
except json.JSONDecodeError:
passed = False
return passed, result
async def orchestrator(task: str) -> PipelineState:
state: PipelineState = {
"task": task,
"plan": None,
"output": None,
"evaluation": None,
"passed": False,
}
print("[오케스트레이터] 플래너 서브에이전트 실행 중...")
state["plan"] = await planner_agent(task)
print("[오케스트레이터] 제너레이터 서브에이전트 실행 중...")
state["output"] = await generator_agent(task, state["plan"])
print("[오케스트레이터] 평가자 서브에이전트 실행 중...")
passed, evaluation = await evaluator_agent(task, state["output"])
state["evaluation"] = evaluation
state["passed"] = passed
return state
async def main():
result = await orchestrator(
"Python으로 간단한 할 일 관리 CLI 도구를 설계하고 구현하세요."
)
print(f"\n최종 결과 — PASS: {result['passed']}")
asyncio.run(main())| 단계 | 역할 | 전달 정보 |
|---|---|---|
planner_agent |
구조·목표 수립 | 태스크 문자열 |
generator_agent |
실제 결과물 생성 | 태스크 + 플랜 |
evaluator_agent |
독립 품질 검증 | 태스크 + 결과물 |
처음에 평가자를 제너레이터와 같은 컨텍스트로 묶어서 실행했을 때는 평가 점수에 이상한 노이즈가 섞였습니다. 제너레이터가 중간에 내뱉는 자기 합리화 문장들이 평가자 맥락으로 흘러들어가서, 평가자가 결과물을 냉정하게 보지 못했던 겁니다. 각 에이전트를 독립 컨텍스트로 분리하고 나서야 평가가 실제로 독립적으로 동작하기 시작했습니다.
이 패턴의 트레이드오프: 단계가 3개라 비용이 세 배입니다. 빠른 반복이 필요한 상황이라면 플래너를 생략하고 제너레이터-평가자 2단계로 줄여도 충분합니다.
예시 2: 공유 KV 스토어를 활용한 병렬 파이프라인
병렬로 실행되는 에이전트들이 중간 결과를 공유해야 할 때는 프롬프트에만 의존하기 어렵습니다. 오케스트레이터를 통한 직렬 전달 없이도 에이전트 간 데이터를 공유할 수 있는 패턴입니다.
import asyncio
import json
import time
from typing import Any
import anthropic
async_client = anthropic.AsyncAnthropic()
MODEL = "claude-haiku-4-5-20251001"
# 인메모리 KV 스토어 (실제 환경에서는 Redis 등으로 대체)
_store: dict[str, Any] = {}
def kv_set(key: str, value: Any) -> None:
_store[key] = {"value": value, "timestamp": time.time()}
def kv_get(key: str) -> Any | None:
entry = _store.get(key)
return entry["value"] if entry else None
async def compliance_check_agent(run_id: str, domain: str, task: str) -> dict:
"""도메인별 컴플라이언스 검사 서브에이전트"""
response = await async_client.messages.create(
model=MODEL,
max_tokens=512,
system=(
f"당신은 {domain} 컴플라이언스 전문가입니다. "
"주어진 거래 데이터를 검토하고 이슈 여부를 판단하세요. "
'반드시 JSON으로만 응답하세요: {"status": "pass" 또는 "fail", "issues": ["이슈 목록"]}'
),
messages=[{"role": "user", "content": f"거래 데이터:\n{task}"}],
)
try:
parsed = json.loads(response.content[0].text)
except json.JSONDecodeError:
parsed = {"status": "error", "issues": ["응답 파싱 실패"]}
result = {
"domain": domain,
"status": parsed.get("status", "error"),
"issues": parsed.get("issues", []),
"checked_at": time.time(),
}
kv_set(f"{run_id}:compliance:{domain}", result)
print(f" [{domain}] 검사 완료 → KV 저장")
return result
async def report_agent(run_id: str, domains: list[str]) -> str:
"""KV에서 결과를 수집해 최종 리포트 생성"""
results = []
for domain in domains:
result = kv_get(f"{run_id}:compliance:{domain}")
if result:
results.append(result)
passed = sum(1 for r in results if r["status"] == "pass")
report = (
f"컴플라이언스 검사 결과: {passed}/{len(results)} 통과\n"
+ "\n".join(f"- {r['domain']}: {r['status']}" for r in results)
)
return report
async def financial_compliance_orchestrator(transaction_data: str) -> str:
run_id = f"run_{int(time.time())}"
domains = ["AML", "KYC", "FATF"]
print(f"[오케스트레이터] 병렬 컴플라이언스 검사 시작 (run_id={run_id})")
# 독립적인 검사를 병렬로 실행
await asyncio.gather(
*[
compliance_check_agent(run_id, domain, transaction_data)
for domain in domains
]
)
print("[오케스트레이터] 리포트 에이전트 실행")
return await report_agent(run_id, domains)
asyncio.run(financial_compliance_orchestrator("거래 데이터 샘플"))각 검사 에이전트는 서로의 결과를 몰라도 됩니다. 공유 KV에 자신의 결과를 쓰고, 리포트 에이전트가 마지막에 수집하는 구조입니다.
이 패턴의 트레이드오프: KV 스토어가 없으면 인메모리 딕셔너리로 시작할 수 있지만, 멀티 프로세스 환경에서는 Redis 같은 외부 스토어로 교체해야 합니다. 또한 여러 에이전트가 동시에 같은 키에 쓰는 상황이 생기면 충돌 해결 전략이 별도로 필요합니다.
예시 3: Circuit Breaker를 통한 오류 전파 차단
실무에서 가장 골치 아팠던 부분이 바로 여기였습니다. 야간에 다운스트림 API가 잠깐 불안정해졌을 때, 에이전트 7개가 각자 재시도를 돌리면서 API 요청이 폭발했던 경험이 있습니다. Circuit Breaker 패턴이 없었다면 레이트 리밋 폭풍으로 끝났을 일이었습니다.
import asyncio
import time
from enum import Enum
class CircuitOpenError(RuntimeError):
"""Circuit Breaker가 OPEN 상태일 때 발생하는 전용 예외"""
pass
class CircuitState(Enum):
CLOSED = "closed" # 정상 운전
OPEN = "open" # 트립됨, 요청 차단
HALF_OPEN = "half_open" # 복구 탐색 중
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time: float | None = None
self.state = CircuitState.CLOSED
def _should_attempt_reset(self) -> bool:
if self.last_failure_time is None:
return False
return time.time() - self.last_failure_time >= self.recovery_timeout
async def call(self, coro):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
print("[Circuit Breaker] HALF_OPEN — 복구 시도 중")
else:
raise CircuitOpenError("Circuit Breaker OPEN: 요청 차단됨")
try:
result = await coro
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
print("[Circuit Breaker] CLOSED — 복구 완료")
return result
except CircuitOpenError:
raise
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(
f"[Circuit Breaker] OPEN — {self.failure_count}회 실패, "
f"{self.recovery_timeout}초 후 재시도 가능"
)
raise e
async def agent_with_retry(
agent_fn,
breaker: CircuitBreaker,
max_retries: int = 3,
base_delay: float = 1.0,
):
"""지수 백오프 재시도 + Circuit Breaker 조합"""
for attempt in range(max_retries):
try:
return await breaker.call(agent_fn())
except CircuitOpenError:
raise # Circuit Breaker 트립은 재시도하지 않음
except Exception:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f" 재시도 {attempt + 1}/{max_retries}, {delay:.1f}초 대기")
await asyncio.sleep(delay)Circuit Breaker 패턴: 연속 실패가 임계값을 넘으면 회로를 "트립"시켜 후속 요청을 즉시 차단합니다. 일정 시간 후 HALF_OPEN 상태로 전환해 복구 여부를 탐색합니다. 전기 회로 차단기에서 이름을 따왔습니다.
전용 CircuitOpenError 예외 클래스를 두는 이유가 있습니다. 문자열 매칭으로 예외 타입을 구분하면 에러 메시지가 바뀌는 순간 조용히 망가집니다. 타입으로 구분하면 except CircuitOpenError: 하나로 명확하게 잡을 수 있고, 이 코드를 가져다 쓰는 사람도 의도를 바로 파악할 수 있습니다.
이 패턴의 트레이드오프: 서브에이전트가 2개 이하라면 Circuit Breaker보다 단순 재시도가 더 적합합니다. Circuit Breaker는 공유 상태를 관리하는 비용이 있기 때문에, 에이전트가 많아지고 실패가 연쇄적으로 전파될 가능성이 있을 때 가치를 발휘합니다.
장단점 분석
장점
| 항목 | 내용 |
|---|---|
| 컨텍스트 오버로드 방지 | 서브에이전트 중간 결과가 오케스트레이터 컨텍스트를 오염시키지 않아 대형 태스크에서도 품질이 유지됩니다 |
| 병렬 처리 | 독립적인 서브태스크를 동시에 실행해 전체 완료 시간을 단축할 수 있습니다 |
| 역할 전문화 | 각 서브에이전트가 좁은 도메인에 특화된 지시를 받아 더 높은 정확도를 기대할 수 있습니다 |
| 모델 선택 유연성 | 단순 태스크에 Haiku, 복잡한 추론에 Opus를 조합해 비용과 성능을 조율할 수 있습니다 |
| 독립 검증 | Evaluator를 별도 에이전트로 분리하면 Generator의 맥락에 영향받지 않는 편향 없는 평가가 가능합니다 |
단점 및 주의사항
| 항목 | 내용 | 대응 방안 |
|---|---|---|
| 실패 전파 (Error Cascade) | 서브에이전트 실패가 오케스트레이터 단에서 감지되지 않으면 잘못된 결과가 조용히 다음 단계로 흐릅니다 | 서브에이전트 반환값에 명시적 status 필드 포함, 오케스트레이터 단 검증 로직 추가 |
| 재시도 폭풍 (Retry Storm) | 에이전트 10개 각각에 재시도 로직이 있으면 단일 서비스 장애 시 요청이 기하급수적으로 증가합니다 | 공유 상태에서 Circuit Breaker 관리 |
| 비용 폭증 | 서브에이전트가 재귀적으로 서브에이전트를 생성하면 비용이 빠르게 불어납니다 | per-run 에이전트 수 상한 설정, 깊이 제한 |
| 상태 불일치 | 여러 에이전트가 공유 저장소에 동시 쓰기 시 충돌이 발생할 수 있습니다 | 충돌 해결 전략 명시, 락(Lock) 또는 이벤트 소싱 패턴 적용 |
| 부적합 도메인 | 에이전트 간 의존성이 촘촘하거나 모든 에이전트가 동일 컨텍스트를 공유해야 하는 태스크 | Anthropic도 이런 경우 단일 에이전트를 권장합니다 |
실무에서 가장 흔한 실수
-
서브에이전트 반환값에 성공/실패 상태를 넣지 않는 것: 팀에서 이 실수 때문에 결과물이 이상한데 에러는 없는 상황을 경험한 적이 있습니다. 에이전트는 항상 "뭔가를 반환"하는데, 그 뭔가가 정상 결과인지 에러인지 오케스트레이터가 구분할 방법이 없으면 잘못된 데이터가 조용히 다음 단계로 흘러갑니다. 반환 스키마에
status: "success" | "error"와error_message를 명시적으로 포함하는 것을 권장합니다. -
Circuit Breaker 없이 각 에이전트에 독립 재시도 로직을 심는 것: 로컬에서는 문제없어 보이지만, 프로덕션에서 API 레이트 리밋이나 다운스트림 서비스 장애가 발생하면 요청 폭풍이 현실이 됩니다. 공유 상태에서 Circuit Breaker를 관리하는 구조가 훨씬 안전합니다.
-
에이전트 계층을 무한히 깊게 쌓는 것: 서브에이전트가 또 서브에이전트를 부르고, 그게 또 서브에이전트를 부르면 비용과 디버깅 난이도가 함께 폭발합니다. 깊이 2~3 이상이 되기 시작하면 먼저 단일 에이전트로 단순화할 수 있는지 점검해보시는 것이 좋습니다.
마치며
오케스트레이터-서브에이전트 패턴은 "어떻게 격리할 것인가"와 "어떻게 연결할 것인가"를 동시에 설계하는 작업입니다. 컨텍스트 격리 덕분에 대형 태스크를 다룰 수 있지만, 그 격리 때문에 상태 공유와 오류 전파가 까다로워집니다.
지금 바로 시작해볼 수 있는 3단계:
-
Planner–Generator–Evaluator 패턴을 단일 파이프라인으로 구현해보시면 좋습니다. 예시 1의 코드에서
orchestrator()함수에 자신의 태스크 문자열을 직접 넣어 실행해보시면, 각 에이전트가 독립 컨텍스트에서 실행되는 방식을 체감하는 가장 빠른 출발점이 됩니다. 단, 서브에이전트가 2개 이하라면 이 3단계 구조보다 단순한 단일 함수 체인으로 시작해도 충분합니다. -
서브에이전트 반환 스키마에
status필드를 추가하고, 오케스트레이터 단에서status == "error"일 때 파이프라인을 멈추는 검증 로직을 붙여보시는 것을 권장합니다. 이 한 가지만 해도 조용한 실패(silent failure) 문제의 80%를 잡을 수 있습니다. -
에이전트가 3개 이상으로 늘어나는 시점에
CircuitBreaker클래스를 공유 상태로 도입해보시면 좋습니다. 예시 3의 구현을 가져와서 각 에이전트 호출을breaker.call()로 감싸는 것만으로도 재시도 폭풍 위험을 크게 줄일 수 있습니다. 에이전트가 2개 이하라면 Circuit Breaker는 과잉입니다. 단순 재시도로도 충분한 경우가 많습니다.
참고 자료
- Building agents with the Claude Agent SDK | Anthropic Engineering
- Claude Managed Agents Overview — 공식 API 문서
- Multiagent sessions — Claude API Docs
- Subagents in the SDK — Claude API Docs
- Create custom subagents — Claude Code Docs
- Scaling Managed Agents: Decoupling the brain from the runtime | Anthropic Engineering
- Equipping agents for the real world with Agent Skills | Anthropic Engineering
- Retry Storms in Multi-Agent LangGraph Systems: Circuit Breaker Fix (2026)
- 6 Multi-Agent Orchestration Patterns for Production (2026)
- Multi-Agent in Production 2026: 3 Patterns That Survived
- Best Practices for Multi-Agent Orchestration with Claude (GitHub Discussions)