$ import sqlite3
import asyncio
from pydantic_ai import Agent, RunContext, ModelRetry
from pydantic import BaseModel
from dataclasses import dataclass # 1. Set up a sample SQLite database
conn = sqlite3.connect(":memory:")
conn.execute(\"\"\"CREATE TABLE employees ( id INTEGER PRIMARY KEY, name TEXT, department TEXT, salary INTEGER, hire_date TEXT
)\"\"\")
conn.executemany( "INSERT INTO employees (name, department, salary, hire_date) VALUES (?, ?, ?, ?)", [ ("Alice", "Engineering", 120000, "2023-01-15"), ("Bob", "Marketing", 85000, "2023-06-01"), ("Carol", "Engineering", 135000, "2022-03-20"), ("Dave", "Sales", 90000, "2024-01-10"), ("Eve", "Engineering", 110000, "2024-07-01"), ],
)
conn.commit() # 2. Define dependencies and output schema
@dataclass
class Deps: conn: sqlite3.Connection class QueryResult(BaseModel): sql: str explanation: str rows: list[dict] # 3. Create the agent
agent = Agent( "openai:gpt-4o-mini", deps_type=Deps, system_prompt=( "You are a SQL assistant. Given the 'employees' table with columns: " "id, name, department, salary, hire_date — generate a SELECT query " "for the user's question. Return ONLY the SQL query, nothing else." ),
) # 4. Add a tool that runs the generated SQL
@agent.tool
async def run_query(ctx: RunContext[Deps], sql_query: str) -> str: \"\"\"Execute a SQL query against the employees database and return results.\"\"\" if not sql_query.strip().upper().startswith("SELECT"): raise ModelRetry("Only SELECT queries are allowed. Try again.") try: cursor = ctx.deps.conn.execute(sql_query) columns = [desc[0] for desc in cursor.description] rows = [dict(zip(columns, row)) for row in cursor.fetchall()] return f"Query: {sql_query}\nResults ({len(rows)} rows):\n{rows}" except sqlite3.Error as e: raise ModelRetry(f"SQL error: {e}. Fix the query and try again.") # 5. Run it
async def main(): questions = [ "Who earns the most?", "How many engineers do we have?", "Show me everyone hired in 2024", ] deps = Deps(conn=conn) for q in questions: print(f"\n> {q}") result = await agent.run(q, deps=deps) print(result.output) asyncio.run(main())
import sqlite3
import asyncio
from pydantic_ai import Agent, RunContext, ModelRetry
from pydantic import BaseModel
from dataclasses import dataclass # 1. Set up a sample SQLite database
conn = sqlite3.connect(":memory:")
conn.execute(\"\"\"CREATE TABLE employees ( id INTEGER PRIMARY KEY, name TEXT, department TEXT, salary INTEGER, hire_date TEXT
)\"\"\")
conn.executemany( "INSERT INTO employees (name, department, salary, hire_date) VALUES (?, ?, ?, ?)", [ ("Alice", "Engineering", 120000, "2023-01-15"), ("Bob", "Marketing", 85000, "2023-06-01"), ("Carol", "Engineering", 135000, "2022-03-20"), ("Dave", "Sales", 90000, "2024-01-10"), ("Eve", "Engineering", 110000, "2024-07-01"), ],
)
conn.commit() # 2. Define dependencies and output schema
@dataclass
class Deps: conn: sqlite3.Connection class QueryResult(BaseModel): sql: str explanation: str rows: list[dict] # 3. Create the agent
agent = Agent( "openai:gpt-4o-mini", deps_type=Deps, system_prompt=( "You are a SQL assistant. Given the 'employees' table with columns: " "id, name, department, salary, hire_date — generate a SELECT query " "for the user's question. Return ONLY the SQL query, nothing else." ),
) # 4. Add a tool that runs the generated SQL
@agent.tool
async def run_query(ctx: RunContext[Deps], sql_query: str) -> str: \"\"\"Execute a SQL query against the employees database and return results.\"\"\" if not sql_query.strip().upper().startswith("SELECT"): raise ModelRetry("Only SELECT queries are allowed. Try again.") try: cursor = ctx.deps.conn.execute(sql_query) columns = [desc[0] for desc in cursor.description] rows = [dict(zip(columns, row)) for row in cursor.fetchall()] return f"Query: {sql_query}\nResults ({len(rows)} rows):\n{rows}" except sqlite3.Error as e: raise ModelRetry(f"SQL error: {e}. Fix the query and try again.") # 5. Run it
async def main(): questions = [ "Who earns the most?", "How many engineers do we have?", "Show me everyone hired in 2024", ] deps = Deps(conn=conn) for q in questions: print(f"\n> {q}") result = await agent.run(q, deps=deps) print(result.output) asyncio.run(main())
import sqlite3
import asyncio
from pydantic_ai import Agent, RunContext, ModelRetry
from pydantic import BaseModel
from dataclasses import dataclass # 1. Set up a sample SQLite database
conn = sqlite3.connect(":memory:")
conn.execute(\"\"\"CREATE TABLE employees ( id INTEGER PRIMARY KEY, name TEXT, department TEXT, salary INTEGER, hire_date TEXT
)\"\"\")
conn.executemany( "INSERT INTO employees (name, department, salary, hire_date) VALUES (?, ?, ?, ?)", [ ("Alice", "Engineering", 120000, "2023-01-15"), ("Bob", "Marketing", 85000, "2023-06-01"), ("Carol", "Engineering", 135000, "2022-03-20"), ("Dave", "Sales", 90000, "2024-01-10"), ("Eve", "Engineering", 110000, "2024-07-01"), ],
)
conn.commit() # 2. Define dependencies and output schema
@dataclass
class Deps: conn: sqlite3.Connection class QueryResult(BaseModel): sql: str explanation: str rows: list[dict] # 3. Create the agent
agent = Agent( "openai:gpt-4o-mini", deps_type=Deps, system_prompt=( "You are a SQL assistant. Given the 'employees' table with columns: " "id, name, department, salary, hire_date — generate a SELECT query " "for the user's question. Return ONLY the SQL query, nothing else." ),
) # 4. Add a tool that runs the generated SQL
@agent.tool
async def run_query(ctx: RunContext[Deps], sql_query: str) -> str: \"\"\"Execute a SQL query against the employees database and return results.\"\"\" if not sql_query.strip().upper().startswith("SELECT"): raise ModelRetry("Only SELECT queries are allowed. Try again.") try: cursor = ctx.deps.conn.execute(sql_query) columns = [desc[0] for desc in cursor.description] rows = [dict(zip(columns, row)) for row in cursor.fetchall()] return f"Query: {sql_query}\nResults ({len(rows)} rows):\n{rows}" except sqlite3.Error as e: raise ModelRetry(f"SQL error: {e}. Fix the query and try again.") # 5. Run it
async def main(): questions = [ "Who earns the most?", "How many engineers do we have?", "Show me everyone hired in 2024", ] deps = Deps(conn=conn) for q in questions: print(f"\n> {q}") result = await agent.run(q, deps=deps) print(result.output) asyncio.run(main())
> Who earns the most?
Carol in Engineering earns the most at $135,000. > How many engineers do we have?
There are 3 engineers in the Engineering department. > Show me everyone hired in 2024
Dave (Sales, hired 2024-01-10) and Eve (Engineering, hired 2024-07-01).
> Who earns the most?
Carol in Engineering earns the most at $135,000. > How many engineers do we have?
There are 3 engineers in the Engineering department. > Show me everyone hired in 2024
Dave (Sales, hired 2024-01-10) and Eve (Engineering, hired 2024-07-01).
> Who earns the most?
Carol in Engineering earns the most at $135,000. > How many engineers do we have?
There are 3 engineers in the Engineering department. > Show me everyone hired in 2024
Dave (Sales, hired 2024-01-10) and Eve (Engineering, hired 2024-07-01).
conn = sqlite3.connect("your_database.db")
conn = sqlite3.connect("your_database.db")
conn = sqlite3.connect("your_database.db") - Only SELECT queries run (no drops or deletes)
- SQL errors trigger ModelRetry, which tells the LLM to fix its query and try again