Tools: How I Built an AI Agent That Handles On-Call Incidents and Pauses for Human Approval Before Touching Production (2026)
The Problem
What IRAS Does
The Architecture
Part 1: The Durable Interrupt Pattern
The problem with polling
LangGraph's interrupt() genuine suspension
The checkpointer setup
Timeout monitoring without in-memory state
Part 2: Typed Agent Outputs with Pydantic AI
Model selection per agent
Part 3: The Confidence-Gated RCA Retry Loop
Part 4: We Don't Trust the Model
Safety invariants enforced in code
Adversarial test scenarios
Part 5: The Context Gathering Agent (Tool Calls)
Dependency injection
Part 6: The LangGraph State Machine
Running It
What I'd Do Differently
Key Takeaways It's 3 AM. PagerDuty fires. You drag yourself to your laptop. Open Grafana. Squint at a spike. Switch to Kibana, filter logs, grep for errors. Cross-reference a recent deployment. Form a hypothesis. Write a Slack message explaining what you found. Wait for someone to approve your fix. Apply it. Verify it worked. Then spend an hour writing a post-mortem that goes into a folder nobody opens. You do this for every incident. Every single time. I've been that engineer. So I built IRAS an Intelligent Incident Response Agent System that handles the full first-response lifecycle automatically, and only wakes you up to press Approve. Here's the architecture, the interesting engineering problems, and the decisions I'd make again (and the ones I wouldn't). When an alert fires, IRAS: Total response time from alert to post-mortem: under 2 minutes. Here's what that looks like in practice: IRAS is a 9-node LangGraph state machine with a FastAPI layer on top. The full system overview: Now let me walk through the interesting engineering decisions. This is the most technically interesting part of IRAS, and the main reason I chose LangGraph over simpler frameworks. The naive approach to human-in-the-loop approval is polling. When the agent needs approval, it writes a flag to a database, sends a Slack message, and then polls in a loop: This breaks the moment the server restarts. The coroutine is gone. The incident is orphaned. The on-call engineer is staring at a dead Slack message with no way to resume. LangGraph's interrupt() is fundamentally different. It doesn't poll. It doesn't sleep. It genuinely suspends graph execution, serializes the entire state to the checkpointer (PostgreSQL in our case), and returns control to the caller. When interrupt() is called: When the engineer hits POST /incidents/{id}/approve: LangGraph reconstructs the graph state from the PostgreSQL checkpoint using thread_id, injects the Command(resume=...), and execution continues exactly where it left off same state, same node, no re-running prior stages. The singleton + asyncio.Lock() pattern is important here. Without it, multiple concurrent requests during startup can race to initialize the checkpointer, resulting in duplicate table creation attempts. Because all state is in PostgreSQL, the approval timeout monitor doesn't need in-memory state either: P0 incidents escalate after 15 minutes. P1–P3 after 2 hours. Configurable via environment variables. Most AI agent code I've seen looks like this: This is fragile. The model output format drifts. Regex breaks. You get None at 3 AM when you least want it. IRAS uses Pydantic AI to get strongly-typed, validated outputs directly from every agent. Here's the triage agent: Every stage follows this pattern. The RCA agent returns a RootCauseHypothesis. The remediation agent returns a RemediationPlan. The post-mortem agent returns a PostMortem. The rest of the graph code is just Python no parsing, no regex, no json.loads() on LLM output. Each agent instantiates its own model. This matters: Haiku costs roughly 20x less than Sonnet and is fast enough for triage and context gathering. Sonnet is worth the cost for RCA and remediation planning these are the decisions that affect production. Root cause analysis is genuinely hard. The first attempt often doesn't have enough evidence. IRAS handles this with a confidence-gated retry loop baked into the LangGraph conditional edges: On retry, the context agent widens its evidence window pulling a longer log time range and more deployment history. This typically lifts confidence from 0.5–0.6 to above 0.7 on the second attempt. Default thresholds: confidence >= 0.7 to proceed, max 3 RCA attempts before auto-escalation. This is the part I'm most proud of and that I think most AI agent projects get wrong. When you're building an agent that can modify production systems, the model's output isn't just text it's an instruction set. You need to treat it like untrusted input. Two rules apply to every remediation plan, regardless of what the model returns: These are not prompts. They're not suggestions. They run on every plan output, unconditionally. The stress test suite includes 47 scenarios specifically designed to test model misbehavior: 292 tests total, 99% coverage. The test suite takes about 30 seconds to run. The context agent uses Claude Haiku with tool calling to gather evidence from three sources simultaneously: Each tool has a Mock*Client fallback. If ELASTICSEARCH_BASE_URL isn't set, a mock client returns realistic fake data. This means the full graph runs end-to-end with only two environment variables: ANTHROPIC_API_KEY and POSTGRES_URL. Tool clients are injected via ContextDeps, making them swappable in tests: The full graph wiring: One important detail: interrupt_before=["approval"] tells LangGraph to checkpoint state before entering the approval node, not inside it. This means the plan is fully generated and the Slack message is sent before the graph suspends. Only two things required: Then fire a test alert: 1. Start with MemorySaver, not PostgreSQL For local development and prototyping, LangGraph's MemorySaver (in-memory checkpointer) is much faster to set up. I spent time early on getting Postgres running when I didn't need durability yet. Start with MemorySaver, switch to AsyncPostgresSaver when you're ready for production. 2. Separate trace IDs from thread IDs IRAS uses the same UUID for the HTTP response incident_id, the LangGraph thread_id, and the database primary key. Convenient, but it creates coupling. If you ever want to re-run an incident or fork a graph for testing, you'll want these to be different. 3. Add streaming earlier The graph produces intermediate outputs (triage result, context bundle, etc.) as it runs. Currently these are only visible via LangSmith traces. Adding Server-Sent Events to stream node outputs to a UI would make the "watching it work" experience much better. LangGraph's interrupt() is not a workaround it's a first-class primitive for durable human-in-the-loop workflows. If you're building agents that need human approval in production, this is the pattern. Pydantic AI's typed outputs eliminate an entire class of bugs. Parsing LLM output with regex or manual JSON extraction is fragile. Defining your output schema as a Pydantic model and letting the framework handle parsing is strictly better. Safety invariants belong in code, not prompts. Prompting the model to be safe is not enough when the output drives production changes. Enforce your invariants programmatically, after the model responds. Mock clients everywhere. If every external integration falls back to a mock, the full system is testable with zero infrastructure. This pays for itself immediately in CI speed and developer experience. If you've built something similar, or have questions about the interrupt pattern or the Pydantic AI setup, drop them in the comments happy to go deeper on any of it. And if IRAS would've saved your last 3 AM page, give it a ⭐ on GitHub. Templates let you quickly answer FAQs or store snippets for re-use. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse
Alert: "High error rate on payment-service http_error_rate: 45% (threshold: 5%)" [10:30:01] ▶ Incident ingested
[10:30:02] ▶ P1 · payment-service · ~5,000 users affected · confidence: 0.9
[10:30:04] ▶ DB connection errors in logs, deployment 2m before alert
[10:30:07] ▶ Root cause: DB connection pool exhausted after canary deploy · confidence: 0.88 ✓
[10:30:09] ▶ 3-step remediation plan ready · low risk · rollback commands included
[10:30:09] ▶ Approval request sent to #incidents [Approve] [Reject] ... engineer reviews and clicks Approve (1m 35s later) ... [10:31:44] ▶ Step 1/3 increase DB_POOL_SIZE from 10 to 50
[10:31:45] ▶ Step 2/3 rolling restart payment-service pods
[10:31:45] ▶ Step 3/3 verify error rate dropped below 2%
[10:31:46] ▶ Post-mortem written and posted to #incidents
[10:31:46] ▶ Resolved · total response time: 1m 45s
Alert: "High error rate on payment-service http_error_rate: 45% (threshold: 5%)" [10:30:01] ▶ Incident ingested
[10:30:02] ▶ P1 · payment-service · ~5,000 users affected · confidence: 0.9
[10:30:04] ▶ DB connection errors in logs, deployment 2m before alert
[10:30:07] ▶ Root cause: DB connection pool exhausted after canary deploy · confidence: 0.88 ✓
[10:30:09] ▶ 3-step remediation plan ready · low risk · rollback commands included
[10:30:09] ▶ Approval request sent to #incidents [Approve] [Reject] ... engineer reviews and clicks Approve (1m 35s later) ... [10:31:44] ▶ Step 1/3 increase DB_POOL_SIZE from 10 to 50
[10:31:45] ▶ Step 2/3 rolling restart payment-service pods
[10:31:45] ▶ Step 3/3 verify error rate dropped below 2%
[10:31:46] ▶ Post-mortem written and posted to #incidents
[10:31:46] ▶ Resolved · total response time: 1m 45s
Alert: "High error rate on payment-service http_error_rate: 45% (threshold: 5%)" [10:30:01] ▶ Incident ingested
[10:30:02] ▶ P1 · payment-service · ~5,000 users affected · confidence: 0.9
[10:30:04] ▶ DB connection errors in logs, deployment 2m before alert
[10:30:07] ▶ Root cause: DB connection pool exhausted after canary deploy · confidence: 0.88 ✓
[10:30:09] ▶ 3-step remediation plan ready · low risk · rollback commands included
[10:30:09] ▶ Approval request sent to #incidents [Approve] [Reject] ... engineer reviews and clicks Approve (1m 35s later) ... [10:31:44] ▶ Step 1/3 increase DB_POOL_SIZE from 10 to 50
[10:31:45] ▶ Step 2/3 rolling restart payment-service pods
[10:31:45] ▶ Step 3/3 verify error rate dropped below 2%
[10:31:46] ▶ Post-mortem written and posted to #incidents
[10:31:46] ▶ Resolved · total response time: 1m 45s
Alert → Ingest → Triage → Context → RCA → Plan → [YOU ⏸] → Apply → Post-mortem ↑ ↓ └── retry if confidence < 0.7
Alert → Ingest → Triage → Context → RCA → Plan → [YOU ⏸] → Apply → Post-mortem ↑ ↓ └── retry if confidence < 0.7
Alert → Ingest → Triage → Context → RCA → Plan → [YOU ⏸] → Apply → Post-mortem ↑ ↓ └── retry if confidence < 0.7
Alert Sources (PagerDuty / Prometheus / Datadog / any webhook) ↓
FastAPI (POST /webhook/alert) ↓
LangGraph State Machine ├── ingestion — validate, stamp UUID + timestamp ├── triage — Claude Haiku: P0–P3, affected services ├── context_gathering — Claude Haiku + tool calls: logs, metrics, deployments ├── rca — Claude Sonnet: root cause + confidence score │ ↓ confidence < 0.7? loop back to context_gathering ├── generate_plan — Claude Sonnet: remediation steps + rollback commands ├── approval — interrupt() ⏸ human-in-the-loop │ ↓ approved → apply_remediation │ ↓ rejected → escalation ├── apply_remediation — execute steps, rollback on failure ├── escalation — PagerDuty trigger + Slack alert └── postmortem — Claude Sonnet: structured post-mortem → PostgreSQL + Slack
Alert Sources (PagerDuty / Prometheus / Datadog / any webhook) ↓
FastAPI (POST /webhook/alert) ↓
LangGraph State Machine ├── ingestion — validate, stamp UUID + timestamp ├── triage — Claude Haiku: P0–P3, affected services ├── context_gathering — Claude Haiku + tool calls: logs, metrics, deployments ├── rca — Claude Sonnet: root cause + confidence score │ ↓ confidence < 0.7? loop back to context_gathering ├── generate_plan — Claude Sonnet: remediation steps + rollback commands ├── approval — interrupt() ⏸ human-in-the-loop │ ↓ approved → apply_remediation │ ↓ rejected → escalation ├── apply_remediation — execute steps, rollback on failure ├── escalation — PagerDuty trigger + Slack alert └── postmortem — Claude Sonnet: structured post-mortem → PostgreSQL + Slack
Alert Sources (PagerDuty / Prometheus / Datadog / any webhook) ↓
FastAPI (POST /webhook/alert) ↓
LangGraph State Machine ├── ingestion — validate, stamp UUID + timestamp ├── triage — Claude Haiku: P0–P3, affected services ├── context_gathering — Claude Haiku + tool calls: logs, metrics, deployments ├── rca — Claude Sonnet: root cause + confidence score │ ↓ confidence < 0.7? loop back to context_gathering ├── generate_plan — Claude Sonnet: remediation steps + rollback commands ├── approval — interrupt() ⏸ human-in-the-loop │ ↓ approved → apply_remediation │ ↓ rejected → escalation ├── apply_remediation — execute steps, rollback on failure ├── escalation — PagerDuty trigger + Slack alert └── postmortem — Claude Sonnet: structured post-mortem → PostgreSQL + Slack
# The naive approach DON'T do this
async def approval_node(state): await slack.send_approval_message(state["plan"]) while True: decision = await db.get_decision(state["incident_id"]) if decision is not None: return decision await asyncio.sleep(5) # poll every 5 seconds
# The naive approach DON'T do this
async def approval_node(state): await slack.send_approval_message(state["plan"]) while True: decision = await db.get_decision(state["incident_id"]) if decision is not None: return decision await asyncio.sleep(5) # poll every 5 seconds
# The naive approach DON'T do this
async def approval_node(state): await slack.send_approval_message(state["plan"]) while True: decision = await db.get_decision(state["incident_id"]) if decision is not None: return decision await asyncio.sleep(5) # poll every 5 seconds
# src/iras/graph/nodes/approval.py from langgraph.types import interrupt, Command
from ..state import IncidentState async def approval_node(state: IncidentState) -> dict: """ Pauses graph execution and waits for human decision. State is checkpointed to PostgreSQL survives server restarts. """ human_decision = interrupt({ "message": "Remediation plan ready for approval", "incident_id": state["incident_id"], "severity": state["triage_result"].severity, "plan": state["remediation_plan"].model_dump(), }) # Execution resumes HERE after Command(resume=...) is sent return {"human_approved": human_decision["approved"]}
# src/iras/graph/nodes/approval.py from langgraph.types import interrupt, Command
from ..state import IncidentState async def approval_node(state: IncidentState) -> dict: """ Pauses graph execution and waits for human decision. State is checkpointed to PostgreSQL survives server restarts. """ human_decision = interrupt({ "message": "Remediation plan ready for approval", "incident_id": state["incident_id"], "severity": state["triage_result"].severity, "plan": state["remediation_plan"].model_dump(), }) # Execution resumes HERE after Command(resume=...) is sent return {"human_approved": human_decision["approved"]}
# src/iras/graph/nodes/approval.py from langgraph.types import interrupt, Command
from ..state import IncidentState async def approval_node(state: IncidentState) -> dict: """ Pauses graph execution and waits for human decision. State is checkpointed to PostgreSQL survives server restarts. """ human_decision = interrupt({ "message": "Remediation plan ready for approval", "incident_id": state["incident_id"], "severity": state["triage_result"].severity, "plan": state["remediation_plan"].model_dump(), }) # Execution resumes HERE after Command(resume=...) is sent return {"human_approved": human_decision["approved"]}
# src/iras/api/routes/approval.py @router.post("/incidents/{incident_id}/approve")
async def approve_incident(incident_id: str, graph=Depends(get_graph)): """Resume the paused graph with an approval decision.""" await graph.ainvoke( Command(resume={"approved": True}), config={"configurable": {"thread_id": incident_id}} ) return {"incident_id": incident_id, "decision": "approved", "status": "resumed"}
# src/iras/api/routes/approval.py @router.post("/incidents/{incident_id}/approve")
async def approve_incident(incident_id: str, graph=Depends(get_graph)): """Resume the paused graph with an approval decision.""" await graph.ainvoke( Command(resume={"approved": True}), config={"configurable": {"thread_id": incident_id}} ) return {"incident_id": incident_id, "decision": "approved", "status": "resumed"}
# src/iras/api/routes/approval.py @router.post("/incidents/{incident_id}/approve")
async def approve_incident(incident_id: str, graph=Depends(get_graph)): """Resume the paused graph with an approval decision.""" await graph.ainvoke( Command(resume={"approved": True}), config={"configurable": {"thread_id": incident_id}} ) return {"incident_id": incident_id, "decision": "approved", "status": "resumed"}
# src/iras/graph/checkpointer.py from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
import asyncio _checkpointer: AsyncPostgresSaver | None = None
_lock = asyncio.Lock() async def get_checkpointer(postgres_url: str) -> AsyncPostgresSaver: """Singleton with asyncio.Lock to prevent double-initialization.""" global _checkpointer async with _lock: if _checkpointer is None: _checkpointer = AsyncPostgresSaver.from_conn_string(postgres_url) await _checkpointer.setup() # creates checkpoint tables return _checkpointer
# src/iras/graph/checkpointer.py from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
import asyncio _checkpointer: AsyncPostgresSaver | None = None
_lock = asyncio.Lock() async def get_checkpointer(postgres_url: str) -> AsyncPostgresSaver: """Singleton with asyncio.Lock to prevent double-initialization.""" global _checkpointer async with _lock: if _checkpointer is None: _checkpointer = AsyncPostgresSaver.from_conn_string(postgres_url) await _checkpointer.setup() # creates checkpoint tables return _checkpointer
# src/iras/graph/checkpointer.py from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
import asyncio _checkpointer: AsyncPostgresSaver | None = None
_lock = asyncio.Lock() async def get_checkpointer(postgres_url: str) -> AsyncPostgresSaver: """Singleton with asyncio.Lock to prevent double-initialization.""" global _checkpointer async with _lock: if _checkpointer is None: _checkpointer = AsyncPostgresSaver.from_conn_string(postgres_url) await _checkpointer.setup() # creates checkpoint tables return _checkpointer
# src/iras/api/background.py async def monitor_approval_timeouts(graph, settings): """ Runs as a background task. Queries PostgreSQL for interrupted threads that have exceeded their SLA window. No in-memory state required survives restarts cleanly. """ while True: await asyncio.sleep(60) # check every minute interrupted_incidents = await get_interrupted_incidents() for incident in interrupted_incidents: timeout = get_timeout_for_severity(incident.severity, settings) elapsed = datetime.utcnow() - incident.interrupted_at if elapsed > timeout: # Escalate by resuming with approved=False await graph.ainvoke( Command(resume={"approved": False, "reason": "timeout"}), config={"configurable": {"thread_id": incident.incident_id}} )
# src/iras/api/background.py async def monitor_approval_timeouts(graph, settings): """ Runs as a background task. Queries PostgreSQL for interrupted threads that have exceeded their SLA window. No in-memory state required survives restarts cleanly. """ while True: await asyncio.sleep(60) # check every minute interrupted_incidents = await get_interrupted_incidents() for incident in interrupted_incidents: timeout = get_timeout_for_severity(incident.severity, settings) elapsed = datetime.utcnow() - incident.interrupted_at if elapsed > timeout: # Escalate by resuming with approved=False await graph.ainvoke( Command(resume={"approved": False, "reason": "timeout"}), config={"configurable": {"thread_id": incident.incident_id}} )
# src/iras/api/background.py async def monitor_approval_timeouts(graph, settings): """ Runs as a background task. Queries PostgreSQL for interrupted threads that have exceeded their SLA window. No in-memory state required survives restarts cleanly. """ while True: await asyncio.sleep(60) # check every minute interrupted_incidents = await get_interrupted_incidents() for incident in interrupted_incidents: timeout = get_timeout_for_severity(incident.severity, settings) elapsed = datetime.utcnow() - incident.interrupted_at if elapsed > timeout: # Escalate by resuming with approved=False await graph.ainvoke( Command(resume={"approved": False, "reason": "timeout"}), config={"configurable": {"thread_id": incident.incident_id}} )
response = await llm.generate(prompt)
text = response.content
# Now parse the text... somehow
severity = re.search(r"severity: (P\d)", text).group(1)
confidence = float(re.search(r"confidence: ([\d.]+)", text).group(1))
response = await llm.generate(prompt)
text = response.content
# Now parse the text... somehow
severity = re.search(r"severity: (P\d)", text).group(1)
confidence = float(re.search(r"confidence: ([\d.]+)", text).group(1))
response = await llm.generate(prompt)
text = response.content
# Now parse the text... somehow
severity = re.search(r"severity: (P\d)", text).group(1)
confidence = float(re.search(r"confidence: ([\d.]+)", text).group(1))
# src/iras/models/incident.py from pydantic import BaseModel, Field
from enum import Enum class Severity(str, Enum): P0 = "P0" P1 = "P1" P2 = "P2" P3 = "P3" class TriageResult(BaseModel): severity: Severity affected_services: list[str] estimated_users_affected: int confidence: float = Field(ge=0.0, le=1.0) reasoning: str
# src/iras/models/incident.py from pydantic import BaseModel, Field
from enum import Enum class Severity(str, Enum): P0 = "P0" P1 = "P1" P2 = "P2" P3 = "P3" class TriageResult(BaseModel): severity: Severity affected_services: list[str] estimated_users_affected: int confidence: float = Field(ge=0.0, le=1.0) reasoning: str
# src/iras/models/incident.py from pydantic import BaseModel, Field
from enum import Enum class Severity(str, Enum): P0 = "P0" P1 = "P1" P2 = "P2" P3 = "P3" class TriageResult(BaseModel): severity: Severity affected_services: list[str] estimated_users_affected: int confidence: float = Field(ge=0.0, le=1.0) reasoning: str
# src/iras/agents/triage.py from pydantic_ai import Agent
from ..models.incident import TriageResult triage_agent = Agent( model="claude-haiku-4-5", result_type=TriageResult, # Pydantic AI validates and parses this automatically system_prompt=""" You are a production incident triage specialist. Classify the incident severity, identify affected services, estimate user impact, and provide a confidence score. Severity guide: - P0: Complete service outage, all users affected - P1: Major degradation, >20% of users affected - P2: Partial degradation, <20% of users affected - P3: Warning or informational, no user impact """
) async def run_triage(alert_payload: dict) -> TriageResult: result = await triage_agent.run(str(alert_payload)) return result.data # TriageResult fully validated, type-safe
# src/iras/agents/triage.py from pydantic_ai import Agent
from ..models.incident import TriageResult triage_agent = Agent( model="claude-haiku-4-5", result_type=TriageResult, # Pydantic AI validates and parses this automatically system_prompt=""" You are a production incident triage specialist. Classify the incident severity, identify affected services, estimate user impact, and provide a confidence score. Severity guide: - P0: Complete service outage, all users affected - P1: Major degradation, >20% of users affected - P2: Partial degradation, <20% of users affected - P3: Warning or informational, no user impact """
) async def run_triage(alert_payload: dict) -> TriageResult: result = await triage_agent.run(str(alert_payload)) return result.data # TriageResult fully validated, type-safe
# src/iras/agents/triage.py from pydantic_ai import Agent
from ..models.incident import TriageResult triage_agent = Agent( model="claude-haiku-4-5", result_type=TriageResult, # Pydantic AI validates and parses this automatically system_prompt=""" You are a production incident triage specialist. Classify the incident severity, identify affected services, estimate user impact, and provide a confidence score. Severity guide: - P0: Complete service outage, all users affected - P1: Major degradation, >20% of users affected - P2: Partial degradation, <20% of users affected - P3: Warning or informational, no user impact """
) async def run_triage(alert_payload: dict) -> TriageResult: result = await triage_agent.run(str(alert_payload)) return result.data # TriageResult fully validated, type-safe
# src/iras/models/incident.py (continued) class RootCauseHypothesis(BaseModel): primary_cause: str contributing_factors: list[str] evidence: list[str] # specific log lines or metric values confidence: float = Field(ge=0.0, le=1.0) recommended_investigation: str class RemediationStep(BaseModel): action: str rollback_command: str risk_level: Literal["low", "medium", "high"] estimated_duration_seconds: int class RemediationPlan(BaseModel): steps: list[RemediationStep] overall_risk: Literal["low", "medium", "high"] reversible: bool requires_human_approval: bool estimated_total_duration_seconds: int class PostMortem(BaseModel): incident_id: str severity: Severity timeline: list[str] root_cause_summary: str resolution_summary: str action_items: list[str] total_duration_minutes: float resolved: bool
# src/iras/models/incident.py (continued) class RootCauseHypothesis(BaseModel): primary_cause: str contributing_factors: list[str] evidence: list[str] # specific log lines or metric values confidence: float = Field(ge=0.0, le=1.0) recommended_investigation: str class RemediationStep(BaseModel): action: str rollback_command: str risk_level: Literal["low", "medium", "high"] estimated_duration_seconds: int class RemediationPlan(BaseModel): steps: list[RemediationStep] overall_risk: Literal["low", "medium", "high"] reversible: bool requires_human_approval: bool estimated_total_duration_seconds: int class PostMortem(BaseModel): incident_id: str severity: Severity timeline: list[str] root_cause_summary: str resolution_summary: str action_items: list[str] total_duration_minutes: float resolved: bool
# src/iras/models/incident.py (continued) class RootCauseHypothesis(BaseModel): primary_cause: str contributing_factors: list[str] evidence: list[str] # specific log lines or metric values confidence: float = Field(ge=0.0, le=1.0) recommended_investigation: str class RemediationStep(BaseModel): action: str rollback_command: str risk_level: Literal["low", "medium", "high"] estimated_duration_seconds: int class RemediationPlan(BaseModel): steps: list[RemediationStep] overall_risk: Literal["low", "medium", "high"] reversible: bool requires_human_approval: bool estimated_total_duration_seconds: int class PostMortem(BaseModel): incident_id: str severity: Severity timeline: list[str] root_cause_summary: str resolution_summary: str action_items: list[str] total_duration_minutes: float resolved: bool
# Fast and cheap for classification tasks
triage_agent = Agent(model="claude-haiku-4-5", result_type=TriageResult, ...)
context_agent = Agent(model="claude-haiku-4-5", result_type=ContextBundle, ...) # Slower and more capable for deep reasoning
rca_agent = Agent(model="claude-sonnet-4-5", result_type=RootCauseHypothesis, ...)
remediation_agent = Agent(model="claude-sonnet-4-5", result_type=RemediationPlan, ...)
postmortem_agent = Agent(model="claude-sonnet-4-5", result_type=PostMortem, ...)
# Fast and cheap for classification tasks
triage_agent = Agent(model="claude-haiku-4-5", result_type=TriageResult, ...)
context_agent = Agent(model="claude-haiku-4-5", result_type=ContextBundle, ...) # Slower and more capable for deep reasoning
rca_agent = Agent(model="claude-sonnet-4-5", result_type=RootCauseHypothesis, ...)
remediation_agent = Agent(model="claude-sonnet-4-5", result_type=RemediationPlan, ...)
postmortem_agent = Agent(model="claude-sonnet-4-5", result_type=PostMortem, ...)
# Fast and cheap for classification tasks
triage_agent = Agent(model="claude-haiku-4-5", result_type=TriageResult, ...)
context_agent = Agent(model="claude-haiku-4-5", result_type=ContextBundle, ...) # Slower and more capable for deep reasoning
rca_agent = Agent(model="claude-sonnet-4-5", result_type=RootCauseHypothesis, ...)
remediation_agent = Agent(model="claude-sonnet-4-5", result_type=RemediationPlan, ...)
postmortem_agent = Agent(model="claude-sonnet-4-5", result_type=PostMortem, ...)
# src/iras/graph/nodes/rca.py async def rca_node(state: IncidentState) -> IncidentState: hypothesis = await run_rca( context=state["context_bundle"], alert=state["alert_payload"], attempt=state.get("rca_attempts", 0) ) return { **state, "rca_hypothesis": hypothesis, "rca_attempts": state.get("rca_attempts", 0) + 1, } def should_retry_rca(state: IncidentState) -> str: """Conditional edge: decides what happens after RCA.""" hypothesis = state["rca_hypothesis"] attempts = state.get("rca_attempts", 0) max_attempts = state["settings"].rca_max_attempts threshold = state["settings"].rca_confidence_threshold if hypothesis.confidence >= threshold: return "generate_plan" # confidence is good, proceed elif attempts < max_attempts: return "context_gathering" # loop back for more evidence else: return "escalation" # exhausted retries, escalate
# src/iras/graph/nodes/rca.py async def rca_node(state: IncidentState) -> IncidentState: hypothesis = await run_rca( context=state["context_bundle"], alert=state["alert_payload"], attempt=state.get("rca_attempts", 0) ) return { **state, "rca_hypothesis": hypothesis, "rca_attempts": state.get("rca_attempts", 0) + 1, } def should_retry_rca(state: IncidentState) -> str: """Conditional edge: decides what happens after RCA.""" hypothesis = state["rca_hypothesis"] attempts = state.get("rca_attempts", 0) max_attempts = state["settings"].rca_max_attempts threshold = state["settings"].rca_confidence_threshold if hypothesis.confidence >= threshold: return "generate_plan" # confidence is good, proceed elif attempts < max_attempts: return "context_gathering" # loop back for more evidence else: return "escalation" # exhausted retries, escalate
# src/iras/graph/nodes/rca.py async def rca_node(state: IncidentState) -> IncidentState: hypothesis = await run_rca( context=state["context_bundle"], alert=state["alert_payload"], attempt=state.get("rca_attempts", 0) ) return { **state, "rca_hypothesis": hypothesis, "rca_attempts": state.get("rca_attempts", 0) + 1, } def should_retry_rca(state: IncidentState) -> str: """Conditional edge: decides what happens after RCA.""" hypothesis = state["rca_hypothesis"] attempts = state.get("rca_attempts", 0) max_attempts = state["settings"].rca_max_attempts threshold = state["settings"].rca_confidence_threshold if hypothesis.confidence >= threshold: return "generate_plan" # confidence is good, proceed elif attempts < max_attempts: return "context_gathering" # loop back for more evidence else: return "escalation" # exhausted retries, escalate
# src/iras/graph/builder.py graph.add_conditional_edges( "rca", should_retry_rca, { "generate_plan": "generate_plan", "context_gathering": "context_gathering", "escalation": "escalation", }
)
# src/iras/graph/builder.py graph.add_conditional_edges( "rca", should_retry_rca, { "generate_plan": "generate_plan", "context_gathering": "context_gathering", "escalation": "escalation", }
)
# src/iras/graph/builder.py graph.add_conditional_edges( "rca", should_retry_rca, { "generate_plan": "generate_plan", "context_gathering": "context_gathering", "escalation": "escalation", }
)
# src/iras/graph/nodes/generate_plan.py async def generate_plan_node(state: IncidentState) -> IncidentState: plan = await run_remediation_agent( hypothesis=state["rca_hypothesis"], context=state["context_bundle"], ) # SAFETY RULE 1: Any high-risk step forces human approval. # The model cannot classify all steps as "low" to bypass this. if any(step.risk_level == "high" for step in plan.steps): plan.requires_human_approval = True # SAFETY RULE 2: Any step without a rollback command marks # the plan as irreversible and forces human approval. if any(not step.rollback_command.strip() for step in plan.steps): plan.reversible = False plan.requires_human_approval = True return {**state, "remediation_plan": plan}
# src/iras/graph/nodes/generate_plan.py async def generate_plan_node(state: IncidentState) -> IncidentState: plan = await run_remediation_agent( hypothesis=state["rca_hypothesis"], context=state["context_bundle"], ) # SAFETY RULE 1: Any high-risk step forces human approval. # The model cannot classify all steps as "low" to bypass this. if any(step.risk_level == "high" for step in plan.steps): plan.requires_human_approval = True # SAFETY RULE 2: Any step without a rollback command marks # the plan as irreversible and forces human approval. if any(not step.rollback_command.strip() for step in plan.steps): plan.reversible = False plan.requires_human_approval = True return {**state, "remediation_plan": plan}
# src/iras/graph/nodes/generate_plan.py async def generate_plan_node(state: IncidentState) -> IncidentState: plan = await run_remediation_agent( hypothesis=state["rca_hypothesis"], context=state["context_bundle"], ) # SAFETY RULE 1: Any high-risk step forces human approval. # The model cannot classify all steps as "low" to bypass this. if any(step.risk_level == "high" for step in plan.steps): plan.requires_human_approval = True # SAFETY RULE 2: Any step without a rollback command marks # the plan as irreversible and forces human approval. if any(not step.rollback_command.strip() for step in plan.steps): plan.reversible = False plan.requires_human_approval = True return {**state, "remediation_plan": plan}
# tests/stress/test_adversarial.py class TestAdversarialModelOutputs: async def test_model_lies_about_risk_level(self, graph, mock_claude): """Model claims all steps are low-risk to bypass approval.""" mock_claude.remediation_returns(RemediationPlan( steps=[ RemediationStep( action="delete all pods", rollback_command="", # empty rollback risk_level="low", # model lying estimated_duration_seconds=5, ) ], overall_risk="low", reversible=True, requires_human_approval=False, # model bypassing approval )) result = await graph.ainvoke(make_incident_state()) # Safety invariants caught it assert result["remediation_plan"].requires_human_approval is True assert result["remediation_plan"].reversible is False async def test_all_context_tools_fail(self, graph, mock_tools): """All external integrations return errors simultaneously.""" mock_tools.logs.raises(ConnectionError("Elasticsearch down")) mock_tools.metrics.raises(ConnectionError("Prometheus down")) mock_tools.deployments.raises(ConnectionError("GitHub API rate limited")) # Should degrade gracefully, not crash result = await graph.ainvoke(make_incident_state()) assert result["status"] != "crashed" assert result["context_bundle"] is not None # empty but valid async def test_twenty_concurrent_incidents(self, graph): """No state contamination between concurrent incident graphs.""" incidents = [make_incident_state(f"incident-{i}") for i in range(20)] results = await asyncio.gather(*[ graph.ainvoke(state) for state in incidents ]) # Every incident has its own isolated state incident_ids = [r["incident_id"] for r in results] assert len(set(incident_ids)) == 20 # all unique
# tests/stress/test_adversarial.py class TestAdversarialModelOutputs: async def test_model_lies_about_risk_level(self, graph, mock_claude): """Model claims all steps are low-risk to bypass approval.""" mock_claude.remediation_returns(RemediationPlan( steps=[ RemediationStep( action="delete all pods", rollback_command="", # empty rollback risk_level="low", # model lying estimated_duration_seconds=5, ) ], overall_risk="low", reversible=True, requires_human_approval=False, # model bypassing approval )) result = await graph.ainvoke(make_incident_state()) # Safety invariants caught it assert result["remediation_plan"].requires_human_approval is True assert result["remediation_plan"].reversible is False async def test_all_context_tools_fail(self, graph, mock_tools): """All external integrations return errors simultaneously.""" mock_tools.logs.raises(ConnectionError("Elasticsearch down")) mock_tools.metrics.raises(ConnectionError("Prometheus down")) mock_tools.deployments.raises(ConnectionError("GitHub API rate limited")) # Should degrade gracefully, not crash result = await graph.ainvoke(make_incident_state()) assert result["status"] != "crashed" assert result["context_bundle"] is not None # empty but valid async def test_twenty_concurrent_incidents(self, graph): """No state contamination between concurrent incident graphs.""" incidents = [make_incident_state(f"incident-{i}") for i in range(20)] results = await asyncio.gather(*[ graph.ainvoke(state) for state in incidents ]) # Every incident has its own isolated state incident_ids = [r["incident_id"] for r in results] assert len(set(incident_ids)) == 20 # all unique
# tests/stress/test_adversarial.py class TestAdversarialModelOutputs: async def test_model_lies_about_risk_level(self, graph, mock_claude): """Model claims all steps are low-risk to bypass approval.""" mock_claude.remediation_returns(RemediationPlan( steps=[ RemediationStep( action="delete all pods", rollback_command="", # empty rollback risk_level="low", # model lying estimated_duration_seconds=5, ) ], overall_risk="low", reversible=True, requires_human_approval=False, # model bypassing approval )) result = await graph.ainvoke(make_incident_state()) # Safety invariants caught it assert result["remediation_plan"].requires_human_approval is True assert result["remediation_plan"].reversible is False async def test_all_context_tools_fail(self, graph, mock_tools): """All external integrations return errors simultaneously.""" mock_tools.logs.raises(ConnectionError("Elasticsearch down")) mock_tools.metrics.raises(ConnectionError("Prometheus down")) mock_tools.deployments.raises(ConnectionError("GitHub API rate limited")) # Should degrade gracefully, not crash result = await graph.ainvoke(make_incident_state()) assert result["status"] != "crashed" assert result["context_bundle"] is not None # empty but valid async def test_twenty_concurrent_incidents(self, graph): """No state contamination between concurrent incident graphs.""" incidents = [make_incident_state(f"incident-{i}") for i in range(20)] results = await asyncio.gather(*[ graph.ainvoke(state) for state in incidents ]) # Every incident has its own isolated state incident_ids = [r["incident_id"] for r in results] assert len(set(incident_ids)) == 20 # all unique
# src/iras/agents/context_gathering.py from pydantic_ai import Agent
from pydantic_ai.tools import Tool
from ..models.incident import ContextBundle
from ..deps import ContextDeps context_agent = Agent( model="claude-haiku-4-5", result_type=ContextBundle, deps_type=ContextDeps, system_prompt=""" You are an SRE context gathering specialist. Use the available tools to collect evidence about the incident. Fetch logs, metrics, and deployment history for the affected service. Bundle all evidence into a structured ContextBundle. """
) @context_agent.tool
async def fetch_logs(ctx, service: str, time_range_minutes: int = 30) -> list[str]: """Fetch recent error and warning logs for a service.""" return await ctx.deps.log_client.get_logs( service=service, time_range_minutes=time_range_minutes, levels=["ERROR", "WARN"], ) @context_agent.tool
async def fetch_metrics(ctx, service: str) -> dict: """Fetch current metrics vs 7-day baseline for a service.""" return await ctx.deps.metrics_client.get_comparison( service=service, metrics=["error_rate", "latency_p99", "request_rate", "cpu_usage"], ) @context_agent.tool
async def fetch_deployments(ctx, service: str, hours: int = 24) -> list[dict]: """Fetch recent deployments for a service from GitHub.""" return await ctx.deps.deployment_client.get_recent( service=service, hours=hours, )
# src/iras/agents/context_gathering.py from pydantic_ai import Agent
from pydantic_ai.tools import Tool
from ..models.incident import ContextBundle
from ..deps import ContextDeps context_agent = Agent( model="claude-haiku-4-5", result_type=ContextBundle, deps_type=ContextDeps, system_prompt=""" You are an SRE context gathering specialist. Use the available tools to collect evidence about the incident. Fetch logs, metrics, and deployment history for the affected service. Bundle all evidence into a structured ContextBundle. """
) @context_agent.tool
async def fetch_logs(ctx, service: str, time_range_minutes: int = 30) -> list[str]: """Fetch recent error and warning logs for a service.""" return await ctx.deps.log_client.get_logs( service=service, time_range_minutes=time_range_minutes, levels=["ERROR", "WARN"], ) @context_agent.tool
async def fetch_metrics(ctx, service: str) -> dict: """Fetch current metrics vs 7-day baseline for a service.""" return await ctx.deps.metrics_client.get_comparison( service=service, metrics=["error_rate", "latency_p99", "request_rate", "cpu_usage"], ) @context_agent.tool
async def fetch_deployments(ctx, service: str, hours: int = 24) -> list[dict]: """Fetch recent deployments for a service from GitHub.""" return await ctx.deps.deployment_client.get_recent( service=service, hours=hours, )
# src/iras/agents/context_gathering.py from pydantic_ai import Agent
from pydantic_ai.tools import Tool
from ..models.incident import ContextBundle
from ..deps import ContextDeps context_agent = Agent( model="claude-haiku-4-5", result_type=ContextBundle, deps_type=ContextDeps, system_prompt=""" You are an SRE context gathering specialist. Use the available tools to collect evidence about the incident. Fetch logs, metrics, and deployment history for the affected service. Bundle all evidence into a structured ContextBundle. """
) @context_agent.tool
async def fetch_logs(ctx, service: str, time_range_minutes: int = 30) -> list[str]: """Fetch recent error and warning logs for a service.""" return await ctx.deps.log_client.get_logs( service=service, time_range_minutes=time_range_minutes, levels=["ERROR", "WARN"], ) @context_agent.tool
async def fetch_metrics(ctx, service: str) -> dict: """Fetch current metrics vs 7-day baseline for a service.""" return await ctx.deps.metrics_client.get_comparison( service=service, metrics=["error_rate", "latency_p99", "request_rate", "cpu_usage"], ) @context_agent.tool
async def fetch_deployments(ctx, service: str, hours: int = 24) -> list[dict]: """Fetch recent deployments for a service from GitHub.""" return await ctx.deps.deployment_client.get_recent( service=service, hours=hours, )
# src/iras/agents/deps.py from dataclasses import dataclass
from ..tools.log_fetcher import LogClient, MockLogClient
from ..tools.metrics import MetricsClient, MockMetricsClient
from ..tools.deployment import DeploymentClient, MockDeploymentClient @dataclass
class ContextDeps: log_client: LogClient | MockLogClient metrics_client: MetricsClient | MockMetricsClient deployment_client: DeploymentClient | MockDeploymentClient def make_context_deps(settings) -> ContextDeps: """Returns real or mock clients based on environment config.""" return ContextDeps( log_client=LogClient(settings.elasticsearch_url) if settings.elasticsearch_url else MockLogClient(), metrics_client=MetricsClient(settings.prometheus_url) if settings.prometheus_url else MockMetricsClient(), deployment_client=DeploymentClient(settings.github_token) if settings.github_token else MockDeploymentClient(), )
# src/iras/agents/deps.py from dataclasses import dataclass
from ..tools.log_fetcher import LogClient, MockLogClient
from ..tools.metrics import MetricsClient, MockMetricsClient
from ..tools.deployment import DeploymentClient, MockDeploymentClient @dataclass
class ContextDeps: log_client: LogClient | MockLogClient metrics_client: MetricsClient | MockMetricsClient deployment_client: DeploymentClient | MockDeploymentClient def make_context_deps(settings) -> ContextDeps: """Returns real or mock clients based on environment config.""" return ContextDeps( log_client=LogClient(settings.elasticsearch_url) if settings.elasticsearch_url else MockLogClient(), metrics_client=MetricsClient(settings.prometheus_url) if settings.prometheus_url else MockMetricsClient(), deployment_client=DeploymentClient(settings.github_token) if settings.github_token else MockDeploymentClient(), )
# src/iras/agents/deps.py from dataclasses import dataclass
from ..tools.log_fetcher import LogClient, MockLogClient
from ..tools.metrics import MetricsClient, MockMetricsClient
from ..tools.deployment import DeploymentClient, MockDeploymentClient @dataclass
class ContextDeps: log_client: LogClient | MockLogClient metrics_client: MetricsClient | MockMetricsClient deployment_client: DeploymentClient | MockDeploymentClient def make_context_deps(settings) -> ContextDeps: """Returns real or mock clients based on environment config.""" return ContextDeps( log_client=LogClient(settings.elasticsearch_url) if settings.elasticsearch_url else MockLogClient(), metrics_client=MetricsClient(settings.prometheus_url) if settings.prometheus_url else MockMetricsClient(), deployment_client=DeploymentClient(settings.github_token) if settings.github_token else MockDeploymentClient(), )
# src/iras/graph/builder.py from langgraph.graph import StateGraph, START, END
from .state import IncidentState
from .nodes import ( ingestion, triage, context_gathering, rca, generate_plan, approval, apply_remediation, escalation, postmortem
) def build_graph(checkpointer) -> CompiledGraph: builder = StateGraph(IncidentState) # Add nodes builder.add_node("ingestion", ingestion.run) builder.add_node("triage", triage.run) builder.add_node("context_gathering", context_gathering.run) builder.add_node("rca", rca.run) builder.add_node("generate_plan", generate_plan.run) builder.add_node("approval", approval.run) builder.add_node("apply_remediation", apply_remediation.run) builder.add_node("escalation", escalation.run) builder.add_node("postmortem", postmortem.run) # Linear edges builder.add_edge(START, "ingestion") builder.add_edge("ingestion", "triage") builder.add_edge("triage", "context_gathering") builder.add_edge("context_gathering", "rca") # Confidence-gated RCA retry loop builder.add_conditional_edges( "rca", should_retry_rca, { "generate_plan": "generate_plan", "context_gathering": "context_gathering", "escalation": "escalation", } ) # Human approval branch builder.add_edge("generate_plan", "approval") builder.add_conditional_edges( "approval", lambda state: "apply_remediation" if state["human_approved"] else "escalation", { "apply_remediation": "apply_remediation", "escalation": "escalation", } ) # Both paths converge at postmortem builder.add_edge("apply_remediation", "postmortem") builder.add_edge("escalation", "postmortem") builder.add_edge("postmortem", END) return builder.compile( checkpointer=checkpointer, interrupt_before=["approval"], # pause before the approval node )
# src/iras/graph/builder.py from langgraph.graph import StateGraph, START, END
from .state import IncidentState
from .nodes import ( ingestion, triage, context_gathering, rca, generate_plan, approval, apply_remediation, escalation, postmortem
) def build_graph(checkpointer) -> CompiledGraph: builder = StateGraph(IncidentState) # Add nodes builder.add_node("ingestion", ingestion.run) builder.add_node("triage", triage.run) builder.add_node("context_gathering", context_gathering.run) builder.add_node("rca", rca.run) builder.add_node("generate_plan", generate_plan.run) builder.add_node("approval", approval.run) builder.add_node("apply_remediation", apply_remediation.run) builder.add_node("escalation", escalation.run) builder.add_node("postmortem", postmortem.run) # Linear edges builder.add_edge(START, "ingestion") builder.add_edge("ingestion", "triage") builder.add_edge("triage", "context_gathering") builder.add_edge("context_gathering", "rca") # Confidence-gated RCA retry loop builder.add_conditional_edges( "rca", should_retry_rca, { "generate_plan": "generate_plan", "context_gathering": "context_gathering", "escalation": "escalation", } ) # Human approval branch builder.add_edge("generate_plan", "approval") builder.add_conditional_edges( "approval", lambda state: "apply_remediation" if state["human_approved"] else "escalation", { "apply_remediation": "apply_remediation", "escalation": "escalation", } ) # Both paths converge at postmortem builder.add_edge("apply_remediation", "postmortem") builder.add_edge("escalation", "postmortem") builder.add_edge("postmortem", END) return builder.compile( checkpointer=checkpointer, interrupt_before=["approval"], # pause before the approval node )
# src/iras/graph/builder.py from langgraph.graph import StateGraph, START, END
from .state import IncidentState
from .nodes import ( ingestion, triage, context_gathering, rca, generate_plan, approval, apply_remediation, escalation, postmortem
) def build_graph(checkpointer) -> CompiledGraph: builder = StateGraph(IncidentState) # Add nodes builder.add_node("ingestion", ingestion.run) builder.add_node("triage", triage.run) builder.add_node("context_gathering", context_gathering.run) builder.add_node("rca", rca.run) builder.add_node("generate_plan", generate_plan.run) builder.add_node("approval", approval.run) builder.add_node("apply_remediation", apply_remediation.run) builder.add_node("escalation", escalation.run) builder.add_node("postmortem", postmortem.run) # Linear edges builder.add_edge(START, "ingestion") builder.add_edge("ingestion", "triage") builder.add_edge("triage", "context_gathering") builder.add_edge("context_gathering", "rca") # Confidence-gated RCA retry loop builder.add_conditional_edges( "rca", should_retry_rca, { "generate_plan": "generate_plan", "context_gathering": "context_gathering", "escalation": "escalation", } ) # Human approval branch builder.add_edge("generate_plan", "approval") builder.add_conditional_edges( "approval", lambda state: "apply_remediation" if state["human_approved"] else "escalation", { "apply_remediation": "apply_remediation", "escalation": "escalation", } ) # Both paths converge at postmortem builder.add_edge("apply_remediation", "postmortem") builder.add_edge("escalation", "postmortem") builder.add_edge("postmortem", END) return builder.compile( checkpointer=checkpointer, interrupt_before=["approval"], # pause before the approval node )
git clone https://github.com/krishnashakula/IRAS.git && cd IRAS # Start Postgres
docker run -d --name iras-postgres \ -e POSTGRES_USER=iras -e POSTGRES_PASSWORD=secret -e POSTGRES_DB=iras \ -p 5432:5432 postgres:16 python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]" cp .env.example .env
# Set ANTHROPIC_API_KEY and POSTGRES_URL python run.py
git clone https://github.com/krishnashakula/IRAS.git && cd IRAS # Start Postgres
docker run -d --name iras-postgres \ -e POSTGRES_USER=iras -e POSTGRES_PASSWORD=secret -e POSTGRES_DB=iras \ -p 5432:5432 postgres:16 python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]" cp .env.example .env
# Set ANTHROPIC_API_KEY and POSTGRES_URL python run.py
git clone https://github.com/krishnashakula/IRAS.git && cd IRAS # Start Postgres
docker run -d --name iras-postgres \ -e POSTGRES_USER=iras -e POSTGRES_PASSWORD=secret -e POSTGRES_DB=iras \ -p 5432:5432 postgres:16 python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]" cp .env.example .env
# Set ANTHROPIC_API_KEY and POSTGRES_URL python run.py
curl -X POST http://localhost:8000/webhook/alert \ -H "Content-Type: application/json" \ -d '{ "title": "High error rate on payment-service", "timestamp": "2026-05-03T10:30:00Z", "service": "payment-service", "error_rate": 0.45 }' # {"incident_id": "550e8400-...", "status": "processing"} # Approve the plan (or wait for the Slack message if configured)
curl -X POST http://localhost:8000/incidents/550e8400-.../approve
curl -X POST http://localhost:8000/webhook/alert \ -H "Content-Type: application/json" \ -d '{ "title": "High error rate on payment-service", "timestamp": "2026-05-03T10:30:00Z", "service": "payment-service", "error_rate": 0.45 }' # {"incident_id": "550e8400-...", "status": "processing"} # Approve the plan (or wait for the Slack message if configured)
curl -X POST http://localhost:8000/incidents/550e8400-.../approve
curl -X POST http://localhost:8000/webhook/alert \ -H "Content-Type: application/json" \ -d '{ "title": "High error rate on payment-service", "timestamp": "2026-05-03T10:30:00Z", "service": "payment-service", "error_rate": 0.45 }' # {"incident_id": "550e8400-...", "status": "processing"} # Approve the plan (or wait for the Slack message if configured)
curl -X POST http://localhost:8000/incidents/550e8400-.../approve - Ingests the alert from any monitoring system Prometheus AlertManager, PagerDuty, Datadog, or a raw JSON webhook
- Triages severity (P0–P3) and identifies affected services using Claude Haiku
- Gathers context error logs from Elasticsearch/Loki, metrics from Prometheus, recent deployments from GitHub
- Runs root-cause analysis with Claude Sonnet, retrying with broader context if confidence is below threshold
- Generates a step-by-step remediation plan with rollback commands for every step
- Pauses and waits for human approval via Slack or REST API
- Applies the fix if approved, or escalates to PagerDuty if rejected/timed out
- Writes a structured post-mortem timeline, root cause, resolution, action items stored in PostgreSQL and posted to Slack - The graph state is serialized to PostgreSQL via AsyncPostgresSaver
- The coroutine is suspended
- The FastAPI endpoint returns 202 Accepted with the incident_id
- The server can restart. The process can crash. The incident is safe. - LangGraph's interrupt() is not a workaround it's a first-class primitive for durable human-in-the-loop workflows. If you're building agents that need human approval in production, this is the pattern.
- Pydantic AI's typed outputs eliminate an entire class of bugs. Parsing LLM output with regex or manual JSON extraction is fragile. Defining your output schema as a Pydantic model and letting the framework handle parsing is strictly better.
- Safety invariants belong in code, not prompts. Prompting the model to be safe is not enough when the output drives production changes. Enforce your invariants programmatically, after the model responds.
- Mock clients everywhere. If every external integration falls back to a mock, the full system is testable with zero infrastructure. This pays for itself immediately in CI speed and developer experience. - GitHub: https://github.com/krishnashakula/IRAS
- LangGraph docs: https://langchain-ai.github.io/langgraph/
- Pydantic AI docs: https://ai.pydantic.dev
- AsyncPostgresSaver: https://langchain-ai.github.io/langgraph/reference/checkpoints/