npx @insforge/install --client claude-code \ --env API_KEY=your_insforge_api_key \ --env API_BASE_URL=https://your-project.us-east.insforge.app
npx @insforge/install --client claude-code \ --env API_KEY=your_insforge_api_key \ --env API_BASE_URL=https://your-project.us-east.insforge.app
npx @insforge/install --client claude-code \ --env API_KEY=your_insforge_api_key \ --env API_BASE_URL=https://your-project.us-east.insforge.app
*Build me a FastAPI backend with four routers: events, metrics, insights, and simulate. Use the InsForge SDK to connect to my backend. The insights router should stream AI responses using the anthropic/claude-sonnet-4.5 model through the InsForge AI gateway.*
*Build me a FastAPI backend with four routers: events, metrics, insights, and simulate. Use the InsForge SDK to connect to my backend. The insights router should stream AI responses using the anthropic/claude-sonnet-4.5 model through the InsForge AI gateway.*
*Build me a FastAPI backend with four routers: events, metrics, insights, and simulate. Use the InsForge SDK to connect to my backend. The insights router should stream AI responses using the anthropic/claude-sonnet-4.5 model through the InsForge AI gateway.*
insforge-dashboard/
├── main.py # App entry point, CORS, router registration
├── config.py # InsForge credentials from environment
├── client.py # Shared InsForgeClient with database and AI helpers
├── requirements.txt
└── routers/
├── events.py # GET + POST /events
├── metrics.py # GET /metrics/summary and /metrics/hourly
├── insights.py # POST /insights/generate — SSE streaming
└── simulate.py # POST /simulate/events
insforge-dashboard/
├── main.py # App entry point, CORS, router registration
├── config.py # InsForge credentials from environment
├── client.py # Shared InsForgeClient with database and AI helpers
├── requirements.txt
└── routers/
├── events.py # GET + POST /events
├── metrics.py # GET /metrics/summary and /metrics/hourly
├── insights.py # POST /insights/generate — SSE streaming
└── simulate.py # POST /simulate/events
insforge-dashboard/
├── main.py # App entry point, CORS, router registration
├── config.py # InsForge credentials from environment
├── client.py # Shared InsForgeClient with database and AI helpers
├── requirements.txt
└── routers/
├── events.py # GET + POST /events
├── metrics.py # GET /metrics/summary and /metrics/hourly
├── insights.py # POST /insights/generate — SSE streaming
└── simulate.py # POST /simulate/events
class InsForgeClient: def __init__(self) -> None: self.base_url = INSFORGE_BASE_URL self._anon_key = INSFORGE_ANON_KEY @property def _headers(self) -> dict: return { 'Authorization': f'Bearer {self._anon_key}', 'Content-Type': 'application/json', } async def get_records(self, table, params=None): async with httpx.AsyncClient(timeout=30) as http: resp = await http.get( f'{self.base_url}/api/database/records/{table}', headers=self._headers, params=params or {}, ) resp.raise_for_status() raw_total = resp.headers.get('X-Total-Count') return resp.json(), int(raw_total) if raw_total else None async def create_records(self, table, records): async with httpx.AsyncClient(timeout=30) as http: resp = await http.post( f'{self.base_url}/api/database/records/{table}', headers={**self._headers, 'Prefer': 'return=representation'}, json=records, ) resp.raise_for_status() return resp.json()
class InsForgeClient: def __init__(self) -> None: self.base_url = INSFORGE_BASE_URL self._anon_key = INSFORGE_ANON_KEY @property def _headers(self) -> dict: return { 'Authorization': f'Bearer {self._anon_key}', 'Content-Type': 'application/json', } async def get_records(self, table, params=None): async with httpx.AsyncClient(timeout=30) as http: resp = await http.get( f'{self.base_url}/api/database/records/{table}', headers=self._headers, params=params or {}, ) resp.raise_for_status() raw_total = resp.headers.get('X-Total-Count') return resp.json(), int(raw_total) if raw_total else None async def create_records(self, table, records): async with httpx.AsyncClient(timeout=30) as http: resp = await http.post( f'{self.base_url}/api/database/records/{table}', headers={**self._headers, 'Prefer': 'return=representation'}, json=records, ) resp.raise_for_status() return resp.json()
class InsForgeClient: def __init__(self) -> None: self.base_url = INSFORGE_BASE_URL self._anon_key = INSFORGE_ANON_KEY @property def _headers(self) -> dict: return { 'Authorization': f'Bearer {self._anon_key}', 'Content-Type': 'application/json', } async def get_records(self, table, params=None): async with httpx.AsyncClient(timeout=30) as http: resp = await http.get( f'{self.base_url}/api/database/records/{table}', headers=self._headers, params=params or {}, ) resp.raise_for_status() raw_total = resp.headers.get('X-Total-Count') return resp.json(), int(raw_total) if raw_total else None async def create_records(self, table, records): async with httpx.AsyncClient(timeout=30) as http: resp = await http.post( f'{self.base_url}/api/database/records/{table}', headers={**self._headers, 'Prefer': 'return=representation'}, json=records, ) resp.raise_for_status() return resp.json()
AI_MODEL = 'anthropic/claude-sonnet-4.5' async def ai_stream(self, messages, system_prompt=None): payload = { 'model': AI_MODEL, 'messages': messages, 'stream': True, } if system_prompt: payload['systemPrompt'] = system_prompt async with httpx.AsyncClient(timeout=120) as http: async with http.stream( 'POST', f'{self.base_url}/api/ai/chat/completion', headers=self._headers, json=payload, ) as resp: resp.raise_for_status() async for line in resp.aiter_lines(): if line.startswith('data: '): yield line + '\n\n'
AI_MODEL = 'anthropic/claude-sonnet-4.5' async def ai_stream(self, messages, system_prompt=None): payload = { 'model': AI_MODEL, 'messages': messages, 'stream': True, } if system_prompt: payload['systemPrompt'] = system_prompt async with httpx.AsyncClient(timeout=120) as http: async with http.stream( 'POST', f'{self.base_url}/api/ai/chat/completion', headers=self._headers, json=payload, ) as resp: resp.raise_for_status() async for line in resp.aiter_lines(): if line.startswith('data: '): yield line + '\n\n'
AI_MODEL = 'anthropic/claude-sonnet-4.5' async def ai_stream(self, messages, system_prompt=None): payload = { 'model': AI_MODEL, 'messages': messages, 'stream': True, } if system_prompt: payload['systemPrompt'] = system_prompt async with httpx.AsyncClient(timeout=120) as http: async with http.stream( 'POST', f'{self.base_url}/api/ai/chat/completion', headers=self._headers, json=payload, ) as resp: resp.raise_for_status() async for line in resp.aiter_lines(): if line.startswith('data: '): yield line + '\n\n'
@router.get('/summary')
async def get_summary(): records, total = await insforge.get_records( 'events', {'limit': 1000, 'select': 'event_name,user_id,session_id,page'}, ) event_counts = Counter(r['event_name'] for r in records) unique_users = len({r['user_id'] for r in records if r['user_id']}) unique_sessions = len({r['session_id'] for r in records if r['session_id']}) page_counts = Counter(r['page'] for r in records if r['page']) return { 'total_events': total or len(records), 'unique_users': unique_users, 'unique_sessions': unique_sessions, 'events_by_name': dict(event_counts.most_common()), 'top_pages': dict(page_counts.most_common(10)), } @router.get('/hourly')
async def get_hourly_stats(limit: int = 168): params = {'limit': limit, 'order': 'bucket_start.desc'} records, _ = await insforge.get_records('event_hourly_stats', params) return records
@router.get('/summary')
async def get_summary(): records, total = await insforge.get_records( 'events', {'limit': 1000, 'select': 'event_name,user_id,session_id,page'}, ) event_counts = Counter(r['event_name'] for r in records) unique_users = len({r['user_id'] for r in records if r['user_id']}) unique_sessions = len({r['session_id'] for r in records if r['session_id']}) page_counts = Counter(r['page'] for r in records if r['page']) return { 'total_events': total or len(records), 'unique_users': unique_users, 'unique_sessions': unique_sessions, 'events_by_name': dict(event_counts.most_common()), 'top_pages': dict(page_counts.most_common(10)), } @router.get('/hourly')
async def get_hourly_stats(limit: int = 168): params = {'limit': limit, 'order': 'bucket_start.desc'} records, _ = await insforge.get_records('event_hourly_stats', params) return records
@router.get('/summary')
async def get_summary(): records, total = await insforge.get_records( 'events', {'limit': 1000, 'select': 'event_name,user_id,session_id,page'}, ) event_counts = Counter(r['event_name'] for r in records) unique_users = len({r['user_id'] for r in records if r['user_id']}) unique_sessions = len({r['session_id'] for r in records if r['session_id']}) page_counts = Counter(r['page'] for r in records if r['page']) return { 'total_events': total or len(records), 'unique_users': unique_users, 'unique_sessions': unique_sessions, 'events_by_name': dict(event_counts.most_common()), 'top_pages': dict(page_counts.most_common(10)), } @router.get('/hourly')
async def get_hourly_stats(limit: int = 168): params = {'limit': limit, 'order': 'bucket_start.desc'} records, _ = await insforge.get_records('event_hourly_stats', params) return records
_SYSTEM_PROMPT = ( 'You are an expert product analytics consultant. ' 'You will receive a structured summary of user event data and a specific question. ' 'Provide clear, concise, and actionable insights. ' 'Structure your response with labeled sections ' '(e.g.
Key Findings,
Recommendations). ' 'Be specific — reference actual numbers from the data where relevant.'
)
async def stream_and_save(): accumulated: list[str] = [] async for sse_line in insforge.ai_stream( messages=[{'role': 'user', 'content': context}], system_prompt=_SYSTEM_PROMPT, ): data_str = sse_line.removeprefix('data: ').strip() try: parsed = json.loads(data_str) if 'chunk' in parsed: accumulated.append(parsed['chunk']) except (json.JSONDecodeError, KeyError): pass yield sse_line # Persist the full response once streaming is complete if req.save and accumulated: full_text = ''.join(accumulated) try: await insforge.create_records('ai_insights', [{ 'insight_type': req.insight_type, 'title': req.query[:80], 'content': full_text, 'time_range': req.time_range, 'metadata': { 'total_events': total, 'unique_users': unique_users, 'unique_sessions': unique_sessions, 'event_breakdown': dict(event_counts.most_common(5)), }, }]) except Exception: pass # Don't let a save failure break the delivered stream return StreamingResponse( stream_and_save(), media_type='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}, )
_SYSTEM_PROMPT = ( 'You are an expert product analytics consultant. ' 'You will receive a structured summary of user event data and a specific question. ' 'Provide clear, concise, and actionable insights. ' 'Structure your response with labeled sections ' '(e.g.
Key Findings,
Recommendations). ' 'Be specific — reference actual numbers from the data where relevant.'
)
async def stream_and_save(): accumulated: list[str] = [] async for sse_line in insforge.ai_stream( messages=[{'role': 'user', 'content': context}], system_prompt=_SYSTEM_PROMPT, ): data_str = sse_line.removeprefix('data: ').strip() try: parsed = json.loads(data_str) if 'chunk' in parsed: accumulated.append(parsed['chunk']) except (json.JSONDecodeError, KeyError): pass yield sse_line # Persist the full response once streaming is complete if req.save and accumulated: full_text = ''.join(accumulated) try: await insforge.create_records('ai_insights', [{ 'insight_type': req.insight_type, 'title': req.query[:80], 'content': full_text, 'time_range': req.time_range, 'metadata': { 'total_events': total, 'unique_users': unique_users, 'unique_sessions': unique_sessions, 'event_breakdown': dict(event_counts.most_common(5)), }, }]) except Exception: pass # Don't let a save failure break the delivered stream return StreamingResponse( stream_and_save(), media_type='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}, )
_SYSTEM_PROMPT = ( 'You are an expert product analytics consultant. ' 'You will receive a structured summary of user event data and a specific question. ' 'Provide clear, concise, and actionable insights. ' 'Structure your response with labeled sections ' '(e.g.
Key Findings,
Recommendations). ' 'Be specific — reference actual numbers from the data where relevant.'
)
async def stream_and_save(): accumulated: list[str] = [] async for sse_line in insforge.ai_stream( messages=[{'role': 'user', 'content': context}], system_prompt=_SYSTEM_PROMPT, ): data_str = sse_line.removeprefix('data: ').strip() try: parsed = json.loads(data_str) if 'chunk' in parsed: accumulated.append(parsed['chunk']) except (json.JSONDecodeError, KeyError): pass yield sse_line # Persist the full response once streaming is complete if req.save and accumulated: full_text = ''.join(accumulated) try: await insforge.create_records('ai_insights', [{ 'insight_type': req.insight_type, 'title': req.query[:80], 'content': full_text, 'time_range': req.time_range, 'metadata': { 'total_events': total, 'unique_users': unique_users, 'unique_sessions': unique_sessions, 'event_breakdown': dict(event_counts.most_common(5)), }, }]) except Exception: pass # Don't let a save failure break the delivered stream return StreamingResponse( stream_and_save(), media_type='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}, )
python -m venv .venv # Windows
.venv\Scripts\activate # Mac / Linux
source .venv/bin/activate pip install -r requirements.txt
uvicorn main:app --reload
python -m venv .venv # Windows
.venv\Scripts\activate # Mac / Linux
source .venv/bin/activate pip install -r requirements.txt
uvicorn main:app --reload
python -m venv .venv # Windows
.venv\Scripts\activate # Mac / Linux
source .venv/bin/activate pip install -r requirements.txt
uvicorn main:app --reload
curl http://localhost:8000/metrics/summary # {"total_events": 1000, "unique_users": 198, "unique_sessions": 445, # "events_by_name": {"page_view": 214, "search": 208, "purchase": 205, ...}}
curl http://localhost:8000/metrics/summary # {"total_events": 1000, "unique_users": 198, "unique_sessions": 445, # "events_by_name": {"page_view": 214, "search": 208, "purchase": 205, ...}}
curl http://localhost:8000/metrics/summary # {"total_events": 1000, "unique_users": 198, "unique_sessions": 445, # "events_by_name": {"page_view": 214, "search": 208, "purchase": 205, ...}}
*Build a Next.js frontend for this analytics dashboard. It should have a metrics summary row, an event volume chart, an event breakdown chart, a live event feed, and an AI insights panel that streams responses word by word. Poll the FastAPI backend every 5 seconds for live data.*
*Build a Next.js frontend for this analytics dashboard. It should have a metrics summary row, an event volume chart, an event breakdown chart, a live event feed, and an AI insights panel that streams responses word by word. Poll the FastAPI backend every 5 seconds for live data.*
*Build a Next.js frontend for this analytics dashboard. It should have a metrics summary row, an event volume chart, an event breakdown chart, a live event feed, and an AI insights panel that streams responses word by word. Poll the FastAPI backend every 5 seconds for live data.*
useEffect(() => { loadMetrics(); loadEvents(); const poll = setInterval(() => { loadMetrics(); loadEvents(); }, 5_000); return () => clearInterval(poll);
}, []);
useEffect(() => { loadMetrics(); loadEvents(); const poll = setInterval(() => { loadMetrics(); loadEvents(); }, 5_000); return () => clearInterval(poll);
}, []);
useEffect(() => { loadMetrics(); loadEvents(); const poll = setInterval(() => { loadMetrics(); loadEvents(); }, 5_000); return () => clearInterval(poll);
}, []);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() ?? ''; for (const line of lines) { if (!line.startsWith('data:')) continue; const parsed = JSON.parse(line.slice(5).trim()); if (parsed.chunk) callbacks.onChunk(parsed.chunk); if (parsed.done && parsed.insight) callbacks.onDone(parsed.insight); }
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() ?? ''; for (const line of lines) { if (!line.startsWith('data:')) continue; const parsed = JSON.parse(line.slice(5).trim()); if (parsed.chunk) callbacks.onChunk(parsed.chunk); if (parsed.done && parsed.insight) callbacks.onDone(parsed.insight); }
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() ?? ''; for (const line of lines) { if (!line.startsWith('data:')) continue; const parsed = JSON.parse(line.slice(5).trim()); if (parsed.chunk) callbacks.onChunk(parsed.chunk); if (parsed.done && parsed.insight) callbacks.onDone(parsed.insight); }
}
cd frontend
npm install
npm run dev
cd frontend
npm install
npm run dev
cd frontend
npm install
npm run dev
curl -X POST "http://localhost:8000/simulate/events" \ -H "Content-Type: application/json" \ -d "{\"count\": 50}"
curl -X POST "http://localhost:8000/simulate/events" \ -H "Content-Type: application/json" \ -d "{\"count\": 50}"
curl -X POST "http://localhost:8000/simulate/events" \ -H "Content-Type: application/json" \ -d "{\"count\": 50}"
> insforge-dashboard/
> > > ├── Dockerfile # FastAPI backend --- Python 3.11 slim, uvicorn on port 8000
> > ├── .dockerignore
> > └── frontend/
> > ├── Dockerfile # Next.js --- multi-stage Node 20 build, standalone output
> > └── .dockerignore
>
> insforge-dashboard/
> > > ├── Dockerfile # FastAPI backend --- Python 3.11 slim, uvicorn on port 8000
> > ├── .dockerignore
> > └── frontend/
> > ├── Dockerfile # Next.js --- multi-stage Node 20 build, standalone output
> > └── .dockerignore
>
> insforge-dashboard/
> > > ├── Dockerfile # FastAPI backend --- Python 3.11 slim, uvicorn on port 8000
> > ├── .dockerignore
> > └── frontend/
> > ├── Dockerfile # Next.js --- multi-stage Node 20 build, standalone output
> > └── .dockerignore
> - A FastAPI backend with event ingestion, metrics aggregation, AI streaming insights, and an event simulator
- A Next.js frontend with a live metrics panel, event volume and breakdown charts, a live event feed, and a streaming AI insights panel
- InsForge as the backend platform, managing our database, AI models, and REST API layer
- Claude Code as the agent that builds the backend through a conversation with our live InsForge instance via MCP - The managed AI gateway: You configure your OpenRouter API key once inside InsForge, and the platform handles all model routing from there. Your application calls one InsForge endpoint and passes a model string. Swap the string, and everything else stays the same. No per-model SDKs, no separate credentials in your codebase.
- The MCP server: InsForge ships with an MCP server that gives Claude Code direct access to your live backend. The agent can read your schema, fetch documentation, and generate auth tokens as part of a conversation. This is what makes the one-prompt build possible.
- The PostgREST layer: Every table in your InsForge database is automatically exposed as a REST endpoint. You do not write data access code. You describe your schema, and InsForge handles the rest. - Base URL — your project's unique API endpoint, for example, https://xxxxxxxx.us-east.insforge.app
- Anon Key — for browser-side and public API operations
- Service Key — for privileged server-side operations - Fetched the InsForge SDK documentation to understand the correct API patterns for database and AI calls
- Read all three table schemas so that the code it generated matched our actual data structure
- Generated a JWT token for authenticated database access
- Inspected the existing project structure to understand what was already in place - Total Events, Unique Users, Unique Sessions, and Top Event in the metrics row at the top
- An Event Volume chart showing activity over the selected time range, switchable between 1 hour, 24 hours, and 7 days
- An Event Breakdown bar chart grouping events by type
- A Live Event Feed showing recent events with user IDs, pages, and timestamps, updating every 5 seconds
- An AI Insights panel where you submit a question and Claude Sonnet streams a structured analysis through the InsForge gateway in real time - Backend service: point Zeabur at the root directory. It detects the Dockerfile automatically. Set the following environment variables: INSFORGE_BASE_URL and INSFORGE_ANON_KEY.
- Frontend service: point Zeabur at the /frontend subdirectory. Set NEXT_PUBLIC_API_URL to the public URL Zeabur assigns to your backend service, for example, https://your backend.zeabur.app. This value must be set before the build runs, as it is baked into the Next.js bundle at build time.