AI Pipelines Built with EDA + Serverless: From AWS Lambda, EventBridge, and RAG to Real-time Anomaly Detection
To be honest, when I first designed the AI pipeline, I also thought, "Can't I just set up an API server?" The flow of preprocessing requests, creating embeddings, performing model inference, and returning results seemed simple. However, the story changes when you actually run it. Traffic surges unpredictably, embedding generation becomes a bottleneck, and a delay in a single step blocks the entire process. AI workloads are inherently bursty, and the resources required for each stage vary drastically.
AI pipelines combining Event-Driven Architecture (EDA) and serverless technology address this problem head-on. They feature a structure where each processing step is loosely coupled by an event, and serverless functions like AWS Lambda automatically scale using these events as triggers. This pattern is approaching an industry standard rather than an experimental idea, to the extent that AWS has officially named EDA the "backbone of serverless AI" (AWS Prescriptive Guidance).
After reading this article, you will be able to launch a PR within a day to separate the embedding phase of your existing AI API server into events. We will explore how to configure a RAG pipeline with EventBridge and Lambda, code for implementing real-time anomaly detection with Kafka, and implement a pattern for orchestrating multi-agents using Step Functions through hands-on code.
Key Concepts
EDA: Connecting Components to Events
In traditional architectures, services call each other directly. It is a structure where A knows B, and B knows C. If there are N services, the connection complexity explodes to O(N²). This is the situation where, every time someone on the team adds a new service, they have to ask on Slack, "Where is that API specification?" or "Who should I call?" EDA solves this problem by centering it around an event broker. Producers publish events to the broker, and consumers subscribe only to the events they are interested in. The complexity is reduced to O(N), and each component no longer needs to know of each other's existence.
Loose Coupling: A design where components do not directly depend on each other but share a contract called events. Replacing or extending one component does not affect other components.
Applying this principle to an AI pipeline separates each processing step—preprocessing, embedding, inference, and postprocessing—into independent functions. If you want to change the embedding generation logic, you simply replace the corresponding function. Upgrading the inference model does not affect the preceding or subsequent steps.
Serverless: Focus on logic, not infrastructure
The core of serverless is that it executes only when an event occurs, and costs are incurred only for the amount executed. It is particularly well-suited for workloads that are CPU/GPU intensive, such as AI inference, followed by periods of silence. Instead of paying for servers that are always running, resources are consumed only when there is an actual transaction.
One of the core strengths of EDA is fan-out. Since a single event can be delivered to multiple consumers simultaneously, a single document upload event can trigger both embedding generation and metadata extraction at the same time. The fact that events are stored in the DLQ (Dead Letter Queue) in the event of processing failure and can be reprocessed later also contributes significantly to operational stability.
외부 이벤트
└─► 이벤트 브로커 (Kafka / EventBridge)
├─► Lambda: 전처리
│ └─► 이벤트 발행 (fan-out)
│ ├─► Lambda: AI 추론 A
│ └─► Lambda: AI 추론 B
│ └─► 이벤트 발행
│ └─► Lambda: 후처리 / 액션
└─► DLQ: 실패 이벤트 보관 및 재처리Why EDA + Serverless Naturally Fits AI
When I first used this combination, what impressed me most was that the characteristics of AI workloads matched this pattern almost perfectly. The embedding generation stage is CPU-intensive, the inference stage requires GPUs, and post-processing is lightweight. If you bundle these three stages into a single server, you have to provision resources according to the heaviest stage.
| AI Workload Characteristics | EDA+Serverless Response |
|---|---|
| Busty Request Patterns | Event Queue Acts as Buffer, Serverless Auto-scales |
| Extreme resource differences at each stage | Individual scaling by separating each stage into an independent function |
| Frequent replacement of models and logic | Loose coupling allows only specific stages to be replaced |
| Preference for asynchronous processing | Naturally aligns with EDA's asynchronous communication |
Practical Application
Example 1: RAG Pipeline — From Document Upload to LLM Response
Retrieval-Augmented Generation (RAG) is one of the best examples of EDA + Serverless integration. This is because document parsing, chunking, embedding generation, vector database storage, searching, and inference must each be scaled independently.
In the code below, event['Records'][0]['s3'] is the trigger payload that Lambda automatically receives when a file is uploaded to S3. The S3 event notification directly invokes Lambda, and the invocation data contains the bucket name and object key.
# Lambda: 문서 파싱 & 청킹 (전처리 단계)
import boto3
import json
def handler(event, context):
s3 = boto3.client('s3')
eventbridge = boto3.client('events')
record = event['Records'][0]['s3']
bucket = record['bucket']['name']
key = record['object']['key']
try:
obj = s3.get_object(Bucket=bucket, Key=key)
text = obj['Body'].read().decode('utf-8')
chunks = chunk_text(text, chunk_size=512, overlap=64)
eventbridge.put_events(Entries=[{
'Source': 'rag.preprocessing',
'DetailType': 'DocumentChunked',
'EventBusName': 'rag-pipeline-bus', # 프로덕션에서는 커스텀 버스 지정 필수
'Detail': json.dumps({
'document_id': key,
'chunks': chunks,
'total_chunks': len(chunks)
})
}])
return {'statusCode': 200, 'chunksCount': len(chunks)}
except Exception as e:
print(f"처리 실패: {e}")
raise # 예외를 다시 던져 SQS/EventBridge 재시도 트리거
def chunk_text(text: str, chunk_size: int, overlap: int) -> list[str]:
# 실제 운영에서는 tiktoken 같은 토크나이저 기반 청킹을 권장합니다
# 단어 기반 분할은 한국어 등 CJK 문자에서 토큰 수 예측이 어렵습니다
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size - overlap):
chunk = ' '.join(words[i:i + chunk_size])
chunks.append(chunk)
return chunks# Lambda: 임베딩 생성 & 벡터 DB 저장
import boto3
import json
bedrock = boto3.client('bedrock-runtime')
def handler(event, context):
detail = json.loads(event['detail'])
document_id = detail['document_id']
chunks = detail['chunks']
try:
embeddings = []
for chunk in chunks:
response = bedrock.invoke_model(
modelId='amazon.titan-embed-text-v2:0',
body=json.dumps({'inputText': chunk})
)
embedding = json.loads(response['body'].read())['embedding']
embeddings.append({
'chunk_text': chunk,
'embedding': embedding,
'document_id': document_id
})
# OpenSearch 배치 인덱싱 — opensearchpy bulk API로 구현
batch_index_vectors(embeddings)
return {'indexed': len(embeddings)}
except Exception as e:
print(f"임베딩 실패: {e}")
raise
def batch_index_vectors(embeddings: list) -> None:
# opensearchpy 클라이언트로 벡터 인덱싱 — 구현 생략
pass| Code Point | Description |
|---|---|
Specify EventBusName |
If omitted, it is sent to the default bus. Separating the custom bus is recommended for production. |
raise Repropagation |
If Lambda swallows an exception, the event is considered successfully handled. You must repropagate it for a retry. |
| Chunking function separation | Embedding steps can be retried and expanded independently |
bedrock.invoke_model() |
No need for GPU infrastructure management with direct serverless LLM API calls |
Example 2: Real-time Anomaly Detection — Hundreds of milliseconds from transaction to alert
The following is a real-time streaming scenario rather than a batch process. This is a situation frequently encountered in practice where a team transitions from analyzing a day's worth of transactions in batches to real-time detection. With the combination of Kafka, Lambda, and SageMaker, detection latency can be reduced from several hours to hundreds of milliseconds.
# Lambda: Kafka 트리거 — 특징 추출 및 추론
import boto3
import json
import base64
sagemaker = boto3.client('sagemaker-runtime')
eventbridge = boto3.client('events')
def handler(event, context):
results = []
for record in event['records']['transaction-topic-0']:
try:
payload = json.loads(base64.b64decode(record['value']))
features = extract_features(payload)
# 레코드 수가 많다면 배치 예측 엔드포인트나 asyncio 병렬 처리 검토를 권장합니다
# 동기 루프는 레코드 수에 비례해 처리 시간이 선형 증가합니다
response = sagemaker.invoke_endpoint(
EndpointName='fraud-detection-serverless',
ContentType='application/json',
Body=json.dumps({'instances': [features]})
)
prediction = json.loads(response['Body'].read())
fraud_score = prediction['predictions'][0]['score']
if fraud_score > 0.85:
eventbridge.put_events(Entries=[{
'Source': 'fraud.detection',
'DetailType': 'HighRiskTransaction',
'EventBusName': 'fraud-detection-bus',
'Detail': json.dumps({
'transaction_id': payload['id'],
'fraud_score': fraud_score,
'amount': payload['amount']
})
}])
results.append({'transaction_id': payload['id'], 'score': fraud_score})
except Exception as e:
print(f"레코드 처리 실패: {e}")
continue # 개별 레코드 실패가 전체 배치를 막지 않도록
return results
def extract_features(transaction: dict) -> list[float]:
return [
transaction['amount'],
transaction['merchant_category_code'],
transaction['hour_of_day'],
transaction['is_international'],
transaction['velocity_1h'],
]Idempotency: The property where the result is the same even if the same event is processed twice. Since Kafka guarantees at-least-once delivery, the same message can be processed twice. transaction_id Logic is required to prevent duplicate processing using unique keys.
Example 3: AI Agent Orchestration — Parallel Agent Pattern
When coordination between agents becomes complex, it is better to delegate state management to Step Functions rather than orchestrating with code. I also initially connected agents by directly invoking Lambdas, but the code quickly became messy as I had to manage error retries, timeouts, and saving intermediate results directly within each function. Step Functions declaratively separate this state management, allowing agent Lambdas to remain completely stateless.
It is a structure where a routing agent analyzes user requests, executes multiple specialized agents in parallel, and synthesizes the results. Workflow definitions for Step Functions are written in JSON-based ASL (Amazon States Language) as shown below.
{
"Comment": "병렬 에이전트 오케스트레이션 — Amazon States Language",
"StartAt": "RouteRequest",
"States": {
"RouteRequest": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:routing-agent",
"Next": "ParallelAgents"
},
"ParallelAgents": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "SearchAgent",
"States": {
"SearchAgent": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:search-agent",
"End": true
}
}
},
{
"StartAt": "CalculationAgent",
"States": {
"CalculationAgent": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:calc-agent",
"End": true
}
}
}
],
"Next": "SynthesizeResults"
},
"SynthesizeResults": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:synthesis-agent",
"End": true
}
}
}If you want to add a specialized agent, you just need to add one more to the Branches array in ParallelAgents. You don't need to touch the existing agent code at all.
Pros and Cons Analysis
Advantages
| Item | Content |
|---|---|
| Elastic Scaling | Event volume-based automatic scaling, optimized for the bursty nature of AI workloads |
| Cost Efficient | Eliminates idle resource costs with a pay-per-use model, particularly advantageous for low traffic |
| Low Coupling | Each AI stage can be deployed, scaled, and replaced independently |
| Real-time | Significantly reduces latency compared to batch cycle pipelines |
| Fault Isolation | Minimizing the impact of specific stage failures on the entire pipeline |
| Minimize Operational Burden | Focus on AI Logic Without Infrastructure Management |
Disadvantages and Precautions
| Item | Content | Response Plan |
|---|---|---|
| Cold Start | 300ms to several seconds delay on the first invocation after idle. This can extend to tens of seconds when loading large model files from S3 internally within Lambda. | Provisioned Concurrency applied, model serving separated into Bedrock/SageMaker Endpoints. |
| State Management Complexity | Difficulty in combining state-dependent inference with stateless functions, such as LLM KV caches (internal state where the model reuses previous token calculations) | Externalizing state with Step Functions |
| Large Model Constraints | Directly loading GB to TB models into serverless environments is impractical | Separate SageMaker-dedicated endpoints, utilize Bedrock APIs |
| Cost Reversal | Large-scale AI models consume a lot of resources, which can cause the pay-per-use advantage to disappear | Traffic volume-based cost pre-validation |
| Difficulty of Distributed Trace | Complex for Debugging Event Chains and Ensuring Observability of the Entire Pipeline | Pre-stage Measurement with X-Ray and OpenTelemetry |
| Vendor Lockdown | Platform dependencies such as Lambda + EventBridge arise | Multi-cloud abstraction can be considered with Knative(CNCF Graduation Project) |
Provisioned Concurrency: This is a feature where AWS Lambda prepares instances in advance. While it can effectively eliminate cold starts, standby costs are incurred based on the number of prepared instances. It is an option worth actively considering for real-time, user-facing systems.
The Most Common Mistakes in Practice
1. Compatibility issues arose from operating the event schema loosely without a contract
At first, it is easy to think, "I just need to change one event field." However, as the team grows, it becomes difficult to track which Lambda function expects which field. The mindset of "I'll sort this out later" accumulates as technical debt as the team expands. It is recommended to enforce schemas using Confluent Schema Registry or AWS Glue Schema Registry from the start.
2. Production deployment without measuring cold start costs in the development environment
I discovered this mistake two days after the first production deployment. In a development environment, functions are always in a warm state, so you don't really feel the effects of a cold start. It wasn't until I saw the entire RAG pipeline stall for several seconds when the first request came in at 9 AM that I learned firsthand that you must simulate a cold start scenario in staging.
3. Implementing a Kafka Consumer Without Idempotence
Since Kafka guarantees at-least-once delivery, the same message may be processed twice. In the anomaly detection example, if you publish an event when fraud_score > 0.85 is true, we recommend a pattern of checking the processing status with Redis or DynamoDB to prevent the same transaction from generating duplicate warnings. If logic to prevent duplicate processing using unique keys like transaction_id is omitted, data corruption may occur during operation.
In Conclusion
If you have an AI pipeline currently in operation, you can start by selecting the heaviest step. You can experience firsthand how much more flexible the entire architecture becomes simply by separating independent, stateless steps, such as embedding generation, into events. The EDA + Serverless AI pipeline is not a perfect solution. The complexities of cold starts and distributed tracing remain real challenges. However, you will understand once you use it yourself that it is the pattern that most naturally handles the burstiness of AI workloads and the resource heterogeneity of each stage.
Here are 3 steps you can start right now.
-
Experience your first pipeline with SAM CLI — After installing SAM CLI (about 5 minutes), you can run an EventBridge + Lambda connected pipeline locally as
sam local invokeby following the "Building serverless architectures for agentic AI" document from AWS Prescriptive Guidance. You can quickly experience the flow of events without CloudFormation. -
Separate the heaviest step of the existing AI API server into an event — You can start a partial rollout simply by extracting independent, stateless steps, such as embedding generation, into Lambda and modifying the existing server to publish events to EventBridge. This enables a gradual transition without a complete redesign.
-
Visualize the entire event chain with AWS X-Ray — By adding just one line of the
@tracer.capture_lambda_handlerdecorator (Python Powertools) to your Lambda handler, you can grasp the entire pipeline latency and bottlenecks at a glance on the X-Ray service map. Establishing observability from the start is key to operating this architecture for the long term.
Reference Materials
- Event-driven architecture: The backbone of serverless AI | AWS Prescriptive Guidance
- Designing serverless AI architectures | AWS Prescriptive Guidance
- Building serverless architectures for agentic AI on AWS
- Event-Driven Architecture for AI Agents: Patterns and Benefits | Atlan
- Event-Driven Architecture: A Complete Guide (2026) | RisingWave
- Serverless EDA with AWS Lambda & Kafka | Aiven
- Build event-driven architectures with Amazon MSK and Amazon EventBridge | AWS Big Data Blog
- The Hidden Cost of Cold Starts in Serverless AI Workloads | DigitalOcean
- AI Goes Serverless: Are Systems Ready? | ACM SIGOPS
- Building Event-Driven Architectures: Top 5 Best Practices | Confluent
- Cloud Native Computing Foundation Announces Knative's Graduation | CNCF
- From Message to Job: A Serverless Event-Driven Data Pipeline on GCP