from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages class AgentState(TypedDict): messages: Annotated[list, add_messages] search_results: list[str] final_answer: str
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages class AgentState(TypedDict): messages: Annotated[list, add_messages] search_results: list[str] final_answer: str
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages class AgentState(TypedDict): messages: Annotated[list, add_messages] search_results: list[str] final_answer: str
def search_web(state: AgentState) -> dict: query = state["messages"][-1].content results = tavily_client.search(query) return {"search_results": results}
def search_web(state: AgentState) -> dict: query = state["messages"][-1].content results = tavily_client.search(query) return {"search_results": results}
def search_web(state: AgentState) -> dict: query = state["messages"][-1].content results = tavily_client.search(query) return {"search_results": results}
def should_continue(state: AgentState) -> str: if len(state["search_results"]) >= 3: return "summarize" return "search" graph.add_conditional_edges("analyze", should_continue)
def should_continue(state: AgentState) -> str: if len(state["search_results"]) >= 3: return "summarize" return "search" graph.add_conditional_edges("analyze", should_continue)
def should_continue(state: AgentState) -> str: if len(state["search_results"]) >= 3: return "summarize" return "search" graph.add_conditional_edges("analyze", should_continue)
mkdir langgraph-research-agent
cd langgraph-research-agent
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
mkdir langgraph-research-agent
cd langgraph-research-agent
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
mkdir langgraph-research-agent
cd langgraph-research-agent
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install langgraph==0.3.34 \ langchain-openai==0.3.12 \ langchain-community==0.3.19 \ tavily-python==0.5.0 \ python-dotenv==1.1.0
pip install langgraph==0.3.34 \ langchain-openai==0.3.12 \ langchain-community==0.3.19 \ tavily-python==0.5.0 \ python-dotenv==1.1.0
pip install langgraph==0.3.34 \ langchain-openai==0.3.12 \ langchain-community==0.3.19 \ tavily-python==0.5.0 \ python-dotenv==1.1.0
OPENAI_API_KEY=sk-your-openai-key-here
TAVILY_API_KEY=tvly-your-tavily-key-here
OPENAI_API_KEY=sk-your-openai-key-here
TAVILY_API_KEY=tvly-your-tavily-key-here
OPENAI_API_KEY=sk-your-openai-key-here
TAVILY_API_KEY=tvly-your-tavily-key-here
import langgraph
print(langgraph.__version__)
import langgraph
print(langgraph.__version__)
import langgraph
print(langgraph.__version__)
"""Research agent built with LangGraph.""" import os
from typing import TypedDict, Annotated
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages load_dotenv() # --- State Schema --- class ResearchState(TypedDict): """The agent's working memory.""" messages: Annotated[list, add_messages] # Conversation history research_topic: str # What we are researching search_queries: list[str] # Queries we have run sources: list[dict] # Raw search results analysis: str # Our analysis of the sources final_report: str # The finished research report iteration: int # How many research loops we have done max_iterations: int # Safety limit on loops
"""Research agent built with LangGraph.""" import os
from typing import TypedDict, Annotated
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages load_dotenv() # --- State Schema --- class ResearchState(TypedDict): """The agent's working memory.""" messages: Annotated[list, add_messages] # Conversation history research_topic: str # What we are researching search_queries: list[str] # Queries we have run sources: list[dict] # Raw search results analysis: str # Our analysis of the sources final_report: str # The finished research report iteration: int # How many research loops we have done max_iterations: int # Safety limit on loops
"""Research agent built with LangGraph.""" import os
from typing import TypedDict, Annotated
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages load_dotenv() # --- State Schema --- class ResearchState(TypedDict): """The agent's working memory.""" messages: Annotated[list, add_messages] # Conversation history research_topic: str # What we are researching search_queries: list[str] # Queries we have run sources: list[dict] # Raw search results analysis: str # Our analysis of the sources final_report: str # The finished research report iteration: int # How many research loops we have done max_iterations: int # Safety limit on loops
# --- LLM Setup --- llm = ChatOpenAI( model="gpt-4o-mini", temperature=0.1, # Low temperature for factual research
)
# --- LLM Setup --- llm = ChatOpenAI( model="gpt-4o-mini", temperature=0.1, # Low temperature for factual research
)
# --- LLM Setup --- llm = ChatOpenAI( model="gpt-4o-mini", temperature=0.1, # Low temperature for factual research
)
# --- Node Functions --- def generate_queries(state: ResearchState) -> dict: """Turn the research topic into specific search queries.""" topic = state["research_topic"] iteration = state.get("iteration", 0) # On later iterations, refine based on what we already found existing_info = "" if state.get("analysis"): existing_info = f"\n\nWe already know:\n{state['analysis']}\n\nGenerate queries to fill gaps in our knowledge." response = llm.invoke([ SystemMessage(content=( "You are a research assistant. Generate 3 specific, diverse " "search queries to research the given topic. Return only the " "queries, one per line. No numbering, no extra text." f"{existing_info}" )), HumanMessage(content=f"Research topic: {topic}"), ]) new_queries = [q.strip() for q in response.content.strip().split("\n") if q.strip()] return { "search_queries": state.get("search_queries", []) + new_queries, "messages": [response], }
# --- Node Functions --- def generate_queries(state: ResearchState) -> dict: """Turn the research topic into specific search queries.""" topic = state["research_topic"] iteration = state.get("iteration", 0) # On later iterations, refine based on what we already found existing_info = "" if state.get("analysis"): existing_info = f"\n\nWe already know:\n{state['analysis']}\n\nGenerate queries to fill gaps in our knowledge." response = llm.invoke([ SystemMessage(content=( "You are a research assistant. Generate 3 specific, diverse " "search queries to research the given topic. Return only the " "queries, one per line. No numbering, no extra text." f"{existing_info}" )), HumanMessage(content=f"Research topic: {topic}"), ]) new_queries = [q.strip() for q in response.content.strip().split("\n") if q.strip()] return { "search_queries": state.get("search_queries", []) + new_queries, "messages": [response], }
# --- Node Functions --- def generate_queries(state: ResearchState) -> dict: """Turn the research topic into specific search queries.""" topic = state["research_topic"] iteration = state.get("iteration", 0) # On later iterations, refine based on what we already found existing_info = "" if state.get("analysis"): existing_info = f"\n\nWe already know:\n{state['analysis']}\n\nGenerate queries to fill gaps in our knowledge." response = llm.invoke([ SystemMessage(content=( "You are a research assistant. Generate 3 specific, diverse " "search queries to research the given topic. Return only the " "queries, one per line. No numbering, no extra text." f"{existing_info}" )), HumanMessage(content=f"Research topic: {topic}"), ]) new_queries = [q.strip() for q in response.content.strip().split("\n") if q.strip()] return { "search_queries": state.get("search_queries", []) + new_queries, "messages": [response], }
from tavily import TavilyClient tavily = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) def search_web(state: ResearchState) -> dict: """Execute search queries and collect results.""" queries = state.get("search_queries", []) # Only search the latest batch of queries (last 3) recent_queries = queries[-3:] all_results = state.get("sources", []) for query in recent_queries: try: response = tavily.search( query=query, max_results=3, include_raw_content=False, ) for result in response.get("results", []): # Avoid duplicate URLs if not any(s["url"] == result["url"] for s in all_results): all_results.append({ "title": result.get("title", ""), "url": result.get("url", ""), "content": result.get("content", ""), "query": query, }) except Exception as e: # Log but do not crash — the agent can work with partial results print(f"Search failed for '{query}': {e}") return {"sources": all_results}
from tavily import TavilyClient tavily = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) def search_web(state: ResearchState) -> dict: """Execute search queries and collect results.""" queries = state.get("search_queries", []) # Only search the latest batch of queries (last 3) recent_queries = queries[-3:] all_results = state.get("sources", []) for query in recent_queries: try: response = tavily.search( query=query, max_results=3, include_raw_content=False, ) for result in response.get("results", []): # Avoid duplicate URLs if not any(s["url"] == result["url"] for s in all_results): all_results.append({ "title": result.get("title", ""), "url": result.get("url", ""), "content": result.get("content", ""), "query": query, }) except Exception as e: # Log but do not crash — the agent can work with partial results print(f"Search failed for '{query}': {e}") return {"sources": all_results}
from tavily import TavilyClient tavily = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) def search_web(state: ResearchState) -> dict: """Execute search queries and collect results.""" queries = state.get("search_queries", []) # Only search the latest batch of queries (last 3) recent_queries = queries[-3:] all_results = state.get("sources", []) for query in recent_queries: try: response = tavily.search( query=query, max_results=3, include_raw_content=False, ) for result in response.get("results", []): # Avoid duplicate URLs if not any(s["url"] == result["url"] for s in all_results): all_results.append({ "title": result.get("title", ""), "url": result.get("url", ""), "content": result.get("content", ""), "query": query, }) except Exception as e: # Log but do not crash — the agent can work with partial results print(f"Search failed for '{query}': {e}") return {"sources": all_results}
def analyze_results(state: ResearchState) -> dict: """Analyze search results and assess if we have enough information.""" sources = state.get("sources", []) if not sources: return { "analysis": "No search results found. Need to try different queries.", "iteration": state.get("iteration", 0) + 1, } # Format sources for the LLM source_text = "" for i, source in enumerate(sources, 1): source_text += f"\n[{i}] {source['title']}\nURL: {source['url']}\n{source['content']}\n" response = llm.invoke([ SystemMessage(content=( "You are a research analyst. Analyze the following search results " "about the given topic. Provide:\n" "1. Key findings (what we know)\n" "2. Gaps (what we still need to find out)\n" "3. Confidence level (low/medium/high) in our overall understanding\n\n" "Be specific and cite source numbers." )), HumanMessage(content=f"Topic: {state['research_topic']}\n\nSources:{source_text}"), ]) return { "analysis": response.content, "iteration": state.get("iteration", 0) + 1, "messages": [response], }
def analyze_results(state: ResearchState) -> dict: """Analyze search results and assess if we have enough information.""" sources = state.get("sources", []) if not sources: return { "analysis": "No search results found. Need to try different queries.", "iteration": state.get("iteration", 0) + 1, } # Format sources for the LLM source_text = "" for i, source in enumerate(sources, 1): source_text += f"\n[{i}] {source['title']}\nURL: {source['url']}\n{source['content']}\n" response = llm.invoke([ SystemMessage(content=( "You are a research analyst. Analyze the following search results " "about the given topic. Provide:\n" "1. Key findings (what we know)\n" "2. Gaps (what we still need to find out)\n" "3. Confidence level (low/medium/high) in our overall understanding\n\n" "Be specific and cite source numbers." )), HumanMessage(content=f"Topic: {state['research_topic']}\n\nSources:{source_text}"), ]) return { "analysis": response.content, "iteration": state.get("iteration", 0) + 1, "messages": [response], }
def analyze_results(state: ResearchState) -> dict: """Analyze search results and assess if we have enough information.""" sources = state.get("sources", []) if not sources: return { "analysis": "No search results found. Need to try different queries.", "iteration": state.get("iteration", 0) + 1, } # Format sources for the LLM source_text = "" for i, source in enumerate(sources, 1): source_text += f"\n[{i}] {source['title']}\nURL: {source['url']}\n{source['content']}\n" response = llm.invoke([ SystemMessage(content=( "You are a research analyst. Analyze the following search results " "about the given topic. Provide:\n" "1. Key findings (what we know)\n" "2. Gaps (what we still need to find out)\n" "3. Confidence level (low/medium/high) in our overall understanding\n\n" "Be specific and cite source numbers." )), HumanMessage(content=f"Topic: {state['research_topic']}\n\nSources:{source_text}"), ]) return { "analysis": response.content, "iteration": state.get("iteration", 0) + 1, "messages": [response], }
def write_report(state: ResearchState) -> dict: """Write a structured research report from our findings.""" sources = state.get("sources", []) analysis = state.get("analysis", "") source_text = "" for i, source in enumerate(sources, 1): source_text += f"\n[{i}] {source['title']}\nURL: {source['url']}\n{source['content']}\n" response = llm.invoke([ SystemMessage(content=( "You are a research writer. Write a clear, well-structured research " "report based on the analysis and sources provided. Include:\n" "- Executive summary (2-3 sentences)\n" "- Key findings with citations [1], [2], etc.\n" "- Conclusions\n" "- Sources list\n\n" "Write for a technical audience. Be factual and specific." )), HumanMessage(content=( f"Topic: {state['research_topic']}\n\n" f"Analysis:\n{analysis}\n\n" f"Sources:{source_text}" )), ]) return { "final_report": response.content, "messages": [response], }
def write_report(state: ResearchState) -> dict: """Write a structured research report from our findings.""" sources = state.get("sources", []) analysis = state.get("analysis", "") source_text = "" for i, source in enumerate(sources, 1): source_text += f"\n[{i}] {source['title']}\nURL: {source['url']}\n{source['content']}\n" response = llm.invoke([ SystemMessage(content=( "You are a research writer. Write a clear, well-structured research " "report based on the analysis and sources provided. Include:\n" "- Executive summary (2-3 sentences)\n" "- Key findings with citations [1], [2], etc.\n" "- Conclusions\n" "- Sources list\n\n" "Write for a technical audience. Be factual and specific." )), HumanMessage(content=( f"Topic: {state['research_topic']}\n\n" f"Analysis:\n{analysis}\n\n" f"Sources:{source_text}" )), ]) return { "final_report": response.content, "messages": [response], }
def write_report(state: ResearchState) -> dict: """Write a structured research report from our findings.""" sources = state.get("sources", []) analysis = state.get("analysis", "") source_text = "" for i, source in enumerate(sources, 1): source_text += f"\n[{i}] {source['title']}\nURL: {source['url']}\n{source['content']}\n" response = llm.invoke([ SystemMessage(content=( "You are a research writer. Write a clear, well-structured research " "report based on the analysis and sources provided. Include:\n" "- Executive summary (2-3 sentences)\n" "- Key findings with citations [1], [2], etc.\n" "- Conclusions\n" "- Sources list\n\n" "Write for a technical audience. Be factual and specific." )), HumanMessage(content=( f"Topic: {state['research_topic']}\n\n" f"Analysis:\n{analysis}\n\n" f"Sources:{source_text}" )), ]) return { "final_report": response.content, "messages": [response], }
# --- Routing Logic --- def should_continue_research(state: ResearchState) -> str: """Decide whether to keep researching or write the report.""" iteration = state.get("iteration", 0) max_iterations = state.get("max_iterations", 3) analysis = state.get("analysis", "") # Hard stop: prevent infinite loops if iteration >= max_iterations: return "write_report" # If analysis mentions low confidence or significant gaps, keep going analysis_lower = analysis.lower() if "low" in analysis_lower and "confidence" in analysis_lower: return "generate_queries" if "significant gaps" in analysis_lower or "need more" in analysis_lower: return "generate_queries" # Otherwise, we have enough to write return "write_report" # --- Build the Graph --- workflow = StateGraph(ResearchState) # Add nodes
workflow.add_node("generate_queries", generate_queries)
workflow.add_node("search_web", search_web)
workflow.add_node("analyze_results", analyze_results)
workflow.add_node("write_report", write_report) # Add edges
workflow.add_edge(START, "generate_queries")
workflow.add_edge("generate_queries", "search_web")
workflow.add_edge("search_web", "analyze_results") # Conditional edge: the agent decides whether to loop or finish
workflow.add_conditional_edges( "analyze_results", should_continue_research, { "generate_queries": "generate_queries", "write_report": "write_report", },
) workflow.add_edge("write_report", END) # Compile the graph
agent = workflow.compile()
# --- Routing Logic --- def should_continue_research(state: ResearchState) -> str: """Decide whether to keep researching or write the report.""" iteration = state.get("iteration", 0) max_iterations = state.get("max_iterations", 3) analysis = state.get("analysis", "") # Hard stop: prevent infinite loops if iteration >= max_iterations: return "write_report" # If analysis mentions low confidence or significant gaps, keep going analysis_lower = analysis.lower() if "low" in analysis_lower and "confidence" in analysis_lower: return "generate_queries" if "significant gaps" in analysis_lower or "need more" in analysis_lower: return "generate_queries" # Otherwise, we have enough to write return "write_report" # --- Build the Graph --- workflow = StateGraph(ResearchState) # Add nodes
workflow.add_node("generate_queries", generate_queries)
workflow.add_node("search_web", search_web)
workflow.add_node("analyze_results", analyze_results)
workflow.add_node("write_report", write_report) # Add edges
workflow.add_edge(START, "generate_queries")
workflow.add_edge("generate_queries", "search_web")
workflow.add_edge("search_web", "analyze_results") # Conditional edge: the agent decides whether to loop or finish
workflow.add_conditional_edges( "analyze_results", should_continue_research, { "generate_queries": "generate_queries", "write_report": "write_report", },
) workflow.add_edge("write_report", END) # Compile the graph
agent = workflow.compile()
# --- Routing Logic --- def should_continue_research(state: ResearchState) -> str: """Decide whether to keep researching or write the report.""" iteration = state.get("iteration", 0) max_iterations = state.get("max_iterations", 3) analysis = state.get("analysis", "") # Hard stop: prevent infinite loops if iteration >= max_iterations: return "write_report" # If analysis mentions low confidence or significant gaps, keep going analysis_lower = analysis.lower() if "low" in analysis_lower and "confidence" in analysis_lower: return "generate_queries" if "significant gaps" in analysis_lower or "need more" in analysis_lower: return "generate_queries" # Otherwise, we have enough to write return "write_report" # --- Build the Graph --- workflow = StateGraph(ResearchState) # Add nodes
workflow.add_node("generate_queries", generate_queries)
workflow.add_node("search_web", search_web)
workflow.add_node("analyze_results", analyze_results)
workflow.add_node("write_report", write_report) # Add edges
workflow.add_edge(START, "generate_queries")
workflow.add_edge("generate_queries", "search_web")
workflow.add_edge("search_web", "analyze_results") # Conditional edge: the agent decides whether to loop or finish
workflow.add_conditional_edges( "analyze_results", should_continue_research, { "generate_queries": "generate_queries", "write_report": "write_report", },
) workflow.add_edge("write_report", END) # Compile the graph
agent = workflow.compile()
from langgraph.checkpoint.memory import MemorySaver # Add memory to the compiled graph
memory = MemorySaver()
agent_with_memory = workflow.compile(checkpointer=memory)
from langgraph.checkpoint.memory import MemorySaver # Add memory to the compiled graph
memory = MemorySaver()
agent_with_memory = workflow.compile(checkpointer=memory)
from langgraph.checkpoint.memory import MemorySaver # Add memory to the compiled graph
memory = MemorySaver()
agent_with_memory = workflow.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "research-session-1"}} result = agent_with_memory.invoke( { "research_topic": "Impact of AI coding assistants on developer productivity in 2026", "messages": [], "search_queries": [], "sources": [], "analysis": "", "final_report": "", "iteration": 0, "max_iterations": 3, }, config=config,
)
config = {"configurable": {"thread_id": "research-session-1"}} result = agent_with_memory.invoke( { "research_topic": "Impact of AI coding assistants on developer productivity in 2026", "messages": [], "search_queries": [], "sources": [], "analysis": "", "final_report": "", "iteration": 0, "max_iterations": 3, }, config=config,
)
config = {"configurable": {"thread_id": "research-session-1"}} result = agent_with_memory.invoke( { "research_topic": "Impact of AI coding assistants on developer productivity in 2026", "messages": [], "search_queries": [], "sources": [], "analysis": "", "final_report": "", "iteration": 0, "max_iterations": 3, }, config=config,
)
from langgraph.store.memory import InMemoryStore store = InMemoryStore()
agent_with_long_memory = workflow.compile( checkpointer=memory, store=store,
)
from langgraph.store.memory import InMemoryStore store = InMemoryStore()
agent_with_long_memory = workflow.compile( checkpointer=memory, store=store,
)
from langgraph.store.memory import InMemoryStore store = InMemoryStore()
agent_with_long_memory = workflow.compile( checkpointer=memory, store=store,
)
# --- Run the Agent --- if __name__ == "__main__": print("=" * 60) print("LangGraph Research Agent") print("=" * 60) topic = input("\nEnter a research topic: ").strip() if not topic: topic = "How are companies using AI agents in production in 2026?" print(f"\nResearching: {topic}") print("-" * 60) initial_state = { "research_topic": topic, "messages": [], "search_queries": [], "sources": [], "analysis": "", "final_report": "", "iteration": 0, "max_iterations": 3, } config = {"configurable": {"thread_id": "session-001"}} # Stream to see the agent's progress in real time for event in agent_with_memory.stream(initial_state, config=config): for node_name, output in event.items(): print(f"\n>> Node: {node_name}") if node_name == "generate_queries" and "search_queries" in output: print(f" Queries: {output['search_queries'][-3:]}") elif node_name == "search_web" and "sources" in output: print(f" Found {len(output['sources'])} total sources") elif node_name == "analyze_results" and "analysis" in output: print(f" Iteration: {output.get('iteration', '?')}") print(f" Analysis preview: {output['analysis'][:200]}...") elif node_name == "write_report" and "final_report" in output: print(f"\n{'=' * 60}") print("RESEARCH REPORT") print("=" * 60) print(output["final_report"]) print(f"\n{'=' * 60}") print("Research complete.")
# --- Run the Agent --- if __name__ == "__main__": print("=" * 60) print("LangGraph Research Agent") print("=" * 60) topic = input("\nEnter a research topic: ").strip() if not topic: topic = "How are companies using AI agents in production in 2026?" print(f"\nResearching: {topic}") print("-" * 60) initial_state = { "research_topic": topic, "messages": [], "search_queries": [], "sources": [], "analysis": "", "final_report": "", "iteration": 0, "max_iterations": 3, } config = {"configurable": {"thread_id": "session-001"}} # Stream to see the agent's progress in real time for event in agent_with_memory.stream(initial_state, config=config): for node_name, output in event.items(): print(f"\n>> Node: {node_name}") if node_name == "generate_queries" and "search_queries" in output: print(f" Queries: {output['search_queries'][-3:]}") elif node_name == "search_web" and "sources" in output: print(f" Found {len(output['sources'])} total sources") elif node_name == "analyze_results" and "analysis" in output: print(f" Iteration: {output.get('iteration', '?')}") print(f" Analysis preview: {output['analysis'][:200]}...") elif node_name == "write_report" and "final_report" in output: print(f"\n{'=' * 60}") print("RESEARCH REPORT") print("=" * 60) print(output["final_report"]) print(f"\n{'=' * 60}") print("Research complete.")
# --- Run the Agent --- if __name__ == "__main__": print("=" * 60) print("LangGraph Research Agent") print("=" * 60) topic = input("\nEnter a research topic: ").strip() if not topic: topic = "How are companies using AI agents in production in 2026?" print(f"\nResearching: {topic}") print("-" * 60) initial_state = { "research_topic": topic, "messages": [], "search_queries": [], "sources": [], "analysis": "", "final_report": "", "iteration": 0, "max_iterations": 3, } config = {"configurable": {"thread_id": "session-001"}} # Stream to see the agent's progress in real time for event in agent_with_memory.stream(initial_state, config=config): for node_name, output in event.items(): print(f"\n>> Node: {node_name}") if node_name == "generate_queries" and "search_queries" in output: print(f" Queries: {output['search_queries'][-3:]}") elif node_name == "search_web" and "sources" in output: print(f" Found {len(output['sources'])} total sources") elif node_name == "analyze_results" and "analysis" in output: print(f" Iteration: {output.get('iteration', '?')}") print(f" Analysis preview: {output['analysis'][:200]}...") elif node_name == "write_report" and "final_report" in output: print(f"\n{'=' * 60}") print("RESEARCH REPORT") print("=" * 60) print(output["final_report"]) print(f"\n{'=' * 60}") print("Research complete.")
python agent.py
python agent.py
python agent.py
# Requires: pip install pygraphviz
print(agent.get_graph().draw_mermaid())
# Requires: pip install pygraphviz
print(agent.get_graph().draw_mermaid())
# Requires: pip install pygraphviz
print(agent.get_graph().draw_mermaid())
pip install langserve[all]==0.3.1 fastapi==0.115.0 uvicorn==0.34.0
pip install langserve[all]==0.3.1 fastapi==0.115.0 uvicorn==0.34.0
pip install langserve[all]==0.3.1 fastapi==0.115.0 uvicorn==0.34.0
"""Serve the research agent as a REST API.""" from fastapi import FastAPI
from langserve import add_routes
from agent import agent_with_memory app = FastAPI( title="Research Agent API", description="AI research agent powered by LangGraph",
) add_routes(app, agent_with_memory, path="/research") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
"""Serve the research agent as a REST API.""" from fastapi import FastAPI
from langserve import add_routes
from agent import agent_with_memory app = FastAPI( title="Research Agent API", description="AI research agent powered by LangGraph",
) add_routes(app, agent_with_memory, path="/research") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
"""Serve the research agent as a REST API.""" from fastapi import FastAPI
from langserve import add_routes
from agent import agent_with_memory app = FastAPI( title="Research Agent API", description="AI research agent powered by LangGraph",
) add_routes(app, agent_with_memory, path="/research") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
python server.py
python server.py
python server.py
from langgraph.types import interrupt def send_email(state): # Pause and wait for human approval approval = interrupt( {"question": f"Send report to {state['recipient']}?"} ) if approval.get("approved"): # Actually send the email ...
from langgraph.types import interrupt def send_email(state): # Pause and wait for human approval approval = interrupt( {"question": f"Send report to {state['recipient']}?"} ) if approval.get("approved"): # Actually send the email ...
from langgraph.types import interrupt def send_email(state): # Pause and wait for human approval approval = interrupt( {"question": f"Send report to {state['recipient']}?"} ) if approval.get("approved"): # Actually send the email ...
from langchain_community.llms import Ollama llm = Ollama(model="llama3.1:8b")
from langchain_community.llms import Ollama llm = Ollama(model="llama3.1:8b")
from langchain_community.llms import Ollama llm = Ollama(model="llama3.1:8b")
"""Research agent built with LangGraph — complete code.""" import os
from typing import TypedDict, Annotated
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from tavily import TavilyClient load_dotenv() # --- State --- class ResearchState(TypedDict): messages: Annotated[list, add_messages] research_topic: str search_queries: list[str] sources: list[dict] analysis: str final_report: str iteration: int max_iterations: int # --- Setup --- llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)
tavily = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) # --- Nodes --- def generate_queries(state: ResearchState) -> dict: topic = state["research_topic"] existing_info = "" if state.get("analysis"): existing_info = ( f"\n\nWe already know:\n{state['analysis']}\n\n" "Generate queries to fill gaps in our knowledge." ) response = llm.invoke([ SystemMessage(content=( "You are a research assistant. Generate 3 specific, diverse " "search queries to research the given topic. Return only the " "queries, one per line. No numbering, no extra text." f"{existing_info}" )), HumanMessage(content=f"Research topic: {topic}"), ]) new_queries = [q.strip() for q in response.content.strip().split("\n") if q.strip()] return { "search_queries": state.get("search_queries", []) + new_queries, "messages": [response], } def search_web(state: ResearchState) -> dict: queries = state.get("search_queries", []) recent_queries = queries[-3:] all_results = state.get("sources", []) for query in recent_queries: try: response = tavily.search(query=query, max_results=3, include_raw_content=False) for result in response.get("results", []): if not any(s["url"] == result["url"] for s in all_results): all_results.append({ "title": result.get("title", ""), "url": result.get("url", ""), "content": result.get("content", ""), "query": query, }) except Exception as e: print(f"Search failed for '{query}': {e}") return {"sources": all_results} def analyze_results(state: ResearchState) -> dict: sources = state.get("sources", []) if not sources: return { "analysis": "No search results found. Need to try different queries.", "iteration": state.get("iteration", 0) + 1, } source_text = "" for i, source in enumerate(sources, 1): source_text += f"\n[{i}] {source['title']}\nURL: {source['url']}\n{source['content']}\n" response = llm.invoke([ SystemMessage(content=( "You are a research analyst. Analyze the following search results " "about the given topic. Provide:\n" "1. Key findings (what we know)\n" "2. Gaps (what we still need to find out)\n" "3. Confidence level (low/medium/high) in our overall understanding\n\n" "Be specific and cite source numbers." )), HumanMessage(content=f"Topic: {state['research_topic']}\n\nSources:{source_text}"), ]) return { "analysis": response.content, "iteration": state.get("iteration", 0) + 1, "messages": [response], } def write_report(state: ResearchState) -> dict: sources = state.get("sources", []) analysis = state.get("analysis", "") source_text = "" for i, source in enumerate(sources, 1): source_text += f"\n[{i}] {source['title']}\nURL: {source['url']}\n{source['content']}\n" response = llm.invoke([ SystemMessage(content=( "You are a research writer. Write a clear, well-structured research " "report based on the analysis and sources provided. Include:\n" "- Executive summary (2-3 sentences)\n" "- Key findings with citations [1], [2], etc.\n" "- Conclusions\n" "- Sources list\n\n" "Write for a technical audience. Be factual and specific." )), HumanMessage(content=( f"Topic: {state['research_topic']}\n\n" f"Analysis:\n{analysis}\n\n" f"Sources:{source_text}" )), ]) return { "final_report": response.content, "messages": [response], } # --- Routing --- def should_continue_research(state: ResearchState) -> str: iteration = state.get("iteration", 0) max_iterations = state.get("max_iterations", 3) analysis = state.get("analysis", "") if iteration >= max_iterations: return "write_report" analysis_lower = analysis.lower() if "low" in analysis_lower and "confidence" in analysis_lower: return "generate_queries" if "significant gaps" in analysis_lower or "need more" in analysis_lower: return "generate_queries" return "write_report" # --- Graph --- workflow = StateGraph(ResearchState)
workflow.add_node("generate_queries", generate_queries)
workflow.add_node("search_web", search_web)
workflow.add_node("analyze_results", analyze_results)
workflow.add_node("write_report", write_report) workflow.add_edge(START, "generate_queries")
workflow.add_edge("generate_queries", "search_web")
workflow.add_edge("search_web", "analyze_results")
workflow.add_conditional_edges( "analyze_results", should_continue_research, {"generate_queries": "generate_queries", "write_report": "write_report"},
)
workflow.add_edge("write_report", END) memory = MemorySaver()
agent = workflow.compile(checkpointer=memory) # --- Main --- if __name__ == "__main__": print("=" * 60) print("LangGraph Research Agent") print("=" * 60) topic = input("\nEnter a research topic: ").strip() if not topic: topic = "How are companies using AI agents in production in 2026?" print(f"\nResearching: {topic}") print("-" * 60) initial_state = { "research_topic": topic, "messages": [], "search_queries": [], "sources": [], "analysis": "", "final_report": "", "iteration": 0, "max_iterations": 3, } config = {"configurable": {"thread_id": "session-001"}} for event in agent.stream(initial_state, config=config): for node_name, output in event.items(): print(f"\n>> Node: {node_name}") if node_name == "generate_queries" and "search_queries" in output: print(f" Queries: {output['search_queries'][-3:]}") elif node_name == "search_web" and "sources" in output: print(f" Found {len(output['sources'])} total sources") elif node_name == "analyze_results" and "analysis" in output: print(f" Iteration: {output.get('iteration', '?')}") print(f" Analysis preview: {output['analysis'][:200]}...") elif node_name == "write_report" and "final_report" in output: print(f"\n{'=' * 60}") print("RESEARCH REPORT") print("=" * 60) print(output["final_report"]) print(f"\n{'=' * 60}") print("Research complete.")
"""Research agent built with LangGraph — complete code.""" import os
from typing import TypedDict, Annotated
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from tavily import TavilyClient load_dotenv() # --- State --- class ResearchState(TypedDict): messages: Annotated[list, add_messages] research_topic: str search_queries: list[str] sources: list[dict] analysis: str final_report: str iteration: int max_iterations: int # --- Setup --- llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)
tavily = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) # --- Nodes --- def generate_queries(state: ResearchState) -> dict: topic = state["research_topic"] existing_info = "" if state.get("analysis"): existing_info = ( f"\n\nWe already know:\n{state['analysis']}\n\n" "Generate queries to fill gaps in our knowledge." ) response = llm.invoke([ SystemMessage(content=( "You are a research assistant. Generate 3 specific, diverse " "search queries to research the given topic. Return only the " "queries, one per line. No numbering, no extra text." f"{existing_info}" )), HumanMessage(content=f"Research topic: {topic}"), ]) new_queries = [q.strip() for q in response.content.strip().split("\n") if q.strip()] return { "search_queries": state.get("search_queries", []) + new_queries, "messages": [response], } def search_web(state: ResearchState) -> dict: queries = state.get("search_queries", []) recent_queries = queries[-3:] all_results = state.get("sources", []) for query in recent_queries: try: response = tavily.search(query=query, max_results=3, include_raw_content=False) for result in response.get("results", []): if not any(s["url"] == result["url"] for s in all_results): all_results.append({ "title": result.get("title", ""), "url": result.get("url", ""), "content": result.get("content", ""), "query": query, }) except Exception as e: print(f"Search failed for '{query}': {e}") return {"sources": all_results} def analyze_results(state: ResearchState) -> dict: sources = state.get("sources", []) if not sources: return { "analysis": "No search results found. Need to try different queries.", "iteration": state.get("iteration", 0) + 1, } source_text = "" for i, source in enumerate(sources, 1): source_text += f"\n[{i}] {source['title']}\nURL: {source['url']}\n{source['content']}\n" response = llm.invoke([ SystemMessage(content=( "You are a research analyst. Analyze the following search results " "about the given topic. Provide:\n" "1. Key findings (what we know)\n" "2. Gaps (what we still need to find out)\n" "3. Confidence level (low/medium/high) in our overall understanding\n\n" "Be specific and cite source numbers." )), HumanMessage(content=f"Topic: {state['research_topic']}\n\nSources:{source_text}"), ]) return { "analysis": response.content, "iteration": state.get("iteration", 0) + 1, "messages": [response], } def write_report(state: ResearchState) -> dict: sources = state.get("sources", []) analysis = state.get("analysis", "") source_text = "" for i, source in enumerate(sources, 1): source_text += f"\n[{i}] {source['title']}\nURL: {source['url']}\n{source['content']}\n" response = llm.invoke([ SystemMessage(content=( "You are a research writer. Write a clear, well-structured research " "report based on the analysis and sources provided. Include:\n" "- Executive summary (2-3 sentences)\n" "- Key findings with citations [1], [2], etc.\n" "- Conclusions\n" "- Sources list\n\n" "Write for a technical audience. Be factual and specific." )), HumanMessage(content=( f"Topic: {state['research_topic']}\n\n" f"Analysis:\n{analysis}\n\n" f"Sources:{source_text}" )), ]) return { "final_report": response.content, "messages": [response], } # --- Routing --- def should_continue_research(state: ResearchState) -> str: iteration = state.get("iteration", 0) max_iterations = state.get("max_iterations", 3) analysis = state.get("analysis", "") if iteration >= max_iterations: return "write_report" analysis_lower = analysis.lower() if "low" in analysis_lower and "confidence" in analysis_lower: return "generate_queries" if "significant gaps" in analysis_lower or "need more" in analysis_lower: return "generate_queries" return "write_report" # --- Graph --- workflow = StateGraph(ResearchState)
workflow.add_node("generate_queries", generate_queries)
workflow.add_node("search_web", search_web)
workflow.add_node("analyze_results", analyze_results)
workflow.add_node("write_report", write_report) workflow.add_edge(START, "generate_queries")
workflow.add_edge("generate_queries", "search_web")
workflow.add_edge("search_web", "analyze_results")
workflow.add_conditional_edges( "analyze_results", should_continue_research, {"generate_queries": "generate_queries", "write_report": "write_report"},
)
workflow.add_edge("write_report", END) memory = MemorySaver()
agent = workflow.compile(checkpointer=memory) # --- Main --- if __name__ == "__main__": print("=" * 60) print("LangGraph Research Agent") print("=" * 60) topic = input("\nEnter a research topic: ").strip() if not topic: topic = "How are companies using AI agents in production in 2026?" print(f"\nResearching: {topic}") print("-" * 60) initial_state = { "research_topic": topic, "messages": [], "search_queries": [], "sources": [], "analysis": "", "final_report": "", "iteration": 0, "max_iterations": 3, } config = {"configurable": {"thread_id": "session-001"}} for event in agent.stream(initial_state, config=config): for node_name, output in event.items(): print(f"\n>> Node: {node_name}") if node_name == "generate_queries" and "search_queries" in output: print(f" Queries: {output['search_queries'][-3:]}") elif node_name == "search_web" and "sources" in output: print(f" Found {len(output['sources'])} total sources") elif node_name == "analyze_results" and "analysis" in output: print(f" Iteration: {output.get('iteration', '?')}") print(f" Analysis preview: {output['analysis'][:200]}...") elif node_name == "write_report" and "final_report" in output: print(f"\n{'=' * 60}") print("RESEARCH REPORT") print("=" * 60) print(output["final_report"]) print(f"\n{'=' * 60}") print("Research complete.")
"""Research agent built with LangGraph — complete code.""" import os
from typing import TypedDict, Annotated
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from tavily import TavilyClient load_dotenv() # --- State --- class ResearchState(TypedDict): messages: Annotated[list, add_messages] research_topic: str search_queries: list[str] sources: list[dict] analysis: str final_report: str iteration: int max_iterations: int # --- Setup --- llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)
tavily = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) # --- Nodes --- def generate_queries(state: ResearchState) -> dict: topic = state["research_topic"] existing_info = "" if state.get("analysis"): existing_info = ( f"\n\nWe already know:\n{state['analysis']}\n\n" "Generate queries to fill gaps in our knowledge." ) response = llm.invoke([ SystemMessage(content=( "You are a research assistant. Generate 3 specific, diverse " "search queries to research the given topic. Return only the " "queries, one per line. No numbering, no extra text." f"{existing_info}" )), HumanMessage(content=f"Research topic: {topic}"), ]) new_queries = [q.strip() for q in response.content.strip().split("\n") if q.strip()] return { "search_queries": state.get("search_queries", []) + new_queries, "messages": [response], } def search_web(state: ResearchState) -> dict: queries = state.get("search_queries", []) recent_queries = queries[-3:] all_results = state.get("sources", []) for query in recent_queries: try: response = tavily.search(query=query, max_results=3, include_raw_content=False) for result in response.get("results", []): if not any(s["url"] == result["url"] for s in all_results): all_results.append({ "title": result.get("title", ""), "url": result.get("url", ""), "content": result.get("content", ""), "query": query, }) except Exception as e: print(f"Search failed for '{query}': {e}") return {"sources": all_results} def analyze_results(state: ResearchState) -> dict: sources = state.get("sources", []) if not sources: return { "analysis": "No search results found. Need to try different queries.", "iteration": state.get("iteration", 0) + 1, } source_text = "" for i, source in enumerate(sources, 1): source_text += f"\n[{i}] {source['title']}\nURL: {source['url']}\n{source['content']}\n" response = llm.invoke([ SystemMessage(content=( "You are a research analyst. Analyze the following search results " "about the given topic. Provide:\n" "1. Key findings (what we know)\n" "2. Gaps (what we still need to find out)\n" "3. Confidence level (low/medium/high) in our overall understanding\n\n" "Be specific and cite source numbers." )), HumanMessage(content=f"Topic: {state['research_topic']}\n\nSources:{source_text}"), ]) return { "analysis": response.content, "iteration": state.get("iteration", 0) + 1, "messages": [response], } def write_report(state: ResearchState) -> dict: sources = state.get("sources", []) analysis = state.get("analysis", "") source_text = "" for i, source in enumerate(sources, 1): source_text += f"\n[{i}] {source['title']}\nURL: {source['url']}\n{source['content']}\n" response = llm.invoke([ SystemMessage(content=( "You are a research writer. Write a clear, well-structured research " "report based on the analysis and sources provided. Include:\n" "- Executive summary (2-3 sentences)\n" "- Key findings with citations [1], [2], etc.\n" "- Conclusions\n" "- Sources list\n\n" "Write for a technical audience. Be factual and specific." )), HumanMessage(content=( f"Topic: {state['research_topic']}\n\n" f"Analysis:\n{analysis}\n\n" f"Sources:{source_text}" )), ]) return { "final_report": response.content, "messages": [response], } # --- Routing --- def should_continue_research(state: ResearchState) -> str: iteration = state.get("iteration", 0) max_iterations = state.get("max_iterations", 3) analysis = state.get("analysis", "") if iteration >= max_iterations: return "write_report" analysis_lower = analysis.lower() if "low" in analysis_lower and "confidence" in analysis_lower: return "generate_queries" if "significant gaps" in analysis_lower or "need more" in analysis_lower: return "generate_queries" return "write_report" # --- Graph --- workflow = StateGraph(ResearchState)
workflow.add_node("generate_queries", generate_queries)
workflow.add_node("search_web", search_web)
workflow.add_node("analyze_results", analyze_results)
workflow.add_node("write_report", write_report) workflow.add_edge(START, "generate_queries")
workflow.add_edge("generate_queries", "search_web")
workflow.add_edge("search_web", "analyze_results")
workflow.add_conditional_edges( "analyze_results", should_continue_research, {"generate_queries": "generate_queries", "write_report": "write_report"},
)
workflow.add_edge("write_report", END) memory = MemorySaver()
agent = workflow.compile(checkpointer=memory) # --- Main --- if __name__ == "__main__": print("=" * 60) print("LangGraph Research Agent") print("=" * 60) topic = input("\nEnter a research topic: ").strip() if not topic: topic = "How are companies using AI agents in production in 2026?" print(f"\nResearching: {topic}") print("-" * 60) initial_state = { "research_topic": topic, "messages": [], "search_queries": [], "sources": [], "analysis": "", "final_report": "", "iteration": 0, "max_iterations": 3, } config = {"configurable": {"thread_id": "session-001"}} for event in agent.stream(initial_state, config=config): for node_name, output in event.items(): print(f"\n>> Node: {node_name}") if node_name == "generate_queries" and "search_queries" in output: print(f" Queries: {output['search_queries'][-3:]}") elif node_name == "search_web" and "sources" in output: print(f" Found {len(output['sources'])} total sources") elif node_name == "analyze_results" and "analysis" in output: print(f" Iteration: {output.get('iteration', '?')}") print(f" Analysis preview: {output['analysis'][:200]}...") elif node_name == "write_report" and "final_report" in output: print(f"\n{'=' * 60}") print("RESEARCH REPORT") print("=" * 60) print(output["final_report"]) print(f"\n{'=' * 60}") print("Research complete.") - Normal edges: Always go from A to B. graph.add_edge("search", "analyze")
- Conditional edges: A routing function decides where to go next. This is where agents make decisions.
- Entry/exit edges: START and END mark where the graph begins and terminates. - Python 3.11 or higher (LangGraph 1.x requires 3.11+)
- An OpenAI API key (we use GPT-4o-mini for affordability; you can swap in any LLM)
- A Tavily API key (for web search — free tier gives 1,000 searches/month)
- Basic Python knowledge (functions, dictionaries, type hints) - messages uses the add_messages annotation so conversation history accumulates automatically.
- iteration and max_iterations prevent infinite loops. This is not optional — any agent that can loop must have a safety limit.
- Each field has a clear purpose. When you debug your agent (and you will), having well-named state fields saves hours. - START → generate_queries: The agent creates search queries from the topic.
- generate_queries → search_web: It executes those queries.
- search_web → analyze_results: It analyzes what it found.
- analyze_results → ???: The conditional edge kicks in. If the analysis says we need more information, we loop back to generate new queries. If we have enough, we move to writing.
- write_report → END: The agent outputs its final report. - Resume a failed research session from the last successful step
- Inspect exactly what the agent did at each node
- Replay the agent's decision-making for debugging - A playground UI at /research/playground
- Streaming endpoints for real-time output
- Input/output schema documentation - You need fine-grained control over agent behavior
- Your workflow has loops, branches, or complex decision points
- Production reliability matters (retries, persistence, monitoring)
- You want human-in-the-loop approval steps - You are prototyping a multi-agent team quickly
- Your workflow is straightforward (researcher → writer → editor)
- You want the easiest possible getting-started experience
- You need native A2A protocol support - Your use case is primarily multi-party conversations
- You are already in the Microsoft ecosystem
- Note: Microsoft has shifted AutoGen to maintenance mode, so evaluate long-term viability - You are building within the Microsoft/Azure stack
- You need tight integration with Azure AI services
- Your team primarily writes C# or Java - A supervisor agent that delegates to specialist sub-agents
- A debate system where two agents argue for and against a position
- A pipeline where a researcher feeds a writer who feeds an editor - File I/O: Read and write local files for persistent research
- Code execution: Run Python code to analyze data
- API calls: Interact with external services
- Browser automation: Navigate and extract from web pages - LangGraph models agents as graphs — nodes are functions, edges are transitions, conditional edges are decisions. This makes complex agent behavior natural to express and debug.
- State is everything. Design your state schema carefully. It is the agent's working memory and determines what the agent can know and track.
- Always add safety limits. Any agent that loops needs a max_iterations guard. Infinite loops waste money and time.
- Start simple, add complexity later. Our research agent started with 4 nodes. You can add tool nodes, approval gates, and sub-agents incrementally.
- LangGraph is production-ready. With v1.0, durable execution, and adoption by major companies, it is no longer experimental. Build with confidence.