别再只会用Dify了!手把手教你用LangGraph+FastAPI+React从零搭建一个带搜索和引用的AI研究员
从低代码到全栈开发用LangGraph构建智能研究员的实战指南如果你已经玩转过Dify、Coze这类低代码AI平台却总被积木不够用的挫败感困扰那么是时候升级你的开发武器库了。本文将带你用LangGraphFastAPIReact三件套亲手打造一个能自动搜索、反思并生成带引用报告的AI研究员系统。这不是又一个Hello World式的教程而是一套面向真实生产环境的全栈开发蓝图。1. 为什么需要跳出低代码舒适区低代码平台就像乐高积木——能快速拼出像样的作品但当你需要非标准零件时就会束手无策。最近帮某金融科技团队重构他们的AI客服系统时我们遇到了典型瓶颈他们需要动态调整对话流程中的验证环节但平台提供的条件分支模块根本无法满足业务规则的多变性。代码级开发的核心优势流程自由度可以自定义任何类型的循环、分支和并行处理深度集成直接对接企业内部数据库、API和身份验证系统性能优化精确控制内存使用、缓存策略和计算资源分配调试可见性每个中间状态都能打日志或断点检查实际案例某电商团队将促销文案生成Agent从低代码平台迁移到自主开发后响应速度从3.2秒降至800毫秒只因能自主优化LLM调用批处理。2. 技术栈选型与架构设计2.1 现代AI应用的技术拼图我们的AI研究员系统采用分层架构各层技术选型经过严格的生产环境验证层级技术选型替代方案选择理由流程编排LangGraphLangChain更清晰的循环和状态管理大模型Gemini 1.5 ProGPT-4128K上下文更适合长文档分析后端框架FastAPIFlask自动生成OpenAPI文档前端框架React ViteNext.js更快的热更新速度搜索服务Google Custom SearchSerper API结果包含完整网页片段2.2 核心工作流设计AI研究员的核心逻辑是一个自我迭代的搜索-反思循环# 伪代码展示工作流逻辑 def research_loop(initial_question): state {question: initial_question} for _ in range(MAX_ITERATIONS): queries generate_search_queries(state) results [] for query in queries: search_results google_search(query) summarized llm_summarize(search_results) results.append(summarized) reflection llm_analyze_coverage(results, state[question]) if reflection[is_sufficient]: break state[missing_info] reflection[knowledge_gaps] final_answer synthesize_answer(results) return format_with_citations(final_answer)这个流程的关键在于动态查询生成根据问题自动生成多组搜索关键词结果可信度验证检查已有信息是否全面覆盖问题要点智能终止条件当信息足够或达到最大迭代次数时退出循环3. 环境搭建与项目初始化3.1 开发环境准备推荐使用conda创建隔离的Python环境conda create -n ai_researcher python3.11 conda activate ai_researcher pip install langgraph[all] fastapi uvicorn[standard]前端开发需要Node.js环境nvm install 18 npm create vitelatest frontend -- --template react-ts3.2 项目目录结构保持清晰的代码组织能大幅降低后期维护成本ai-researcher/ ├── backend/ │ ├── main.py # FastAPI入口 │ ├── agent/ │ │ ├── graph.py # LangGraph工作流定义 │ │ └── tools.py # 搜索/反思工具实现 ├── frontend/ │ ├── src/ │ │ ├── hooks/ # 自定义React Hooks │ │ └── components/ # 引用展示组件 └── infra/ ├── Dockerfile # 容器化配置 └── requirements.txt # 生产环境依赖4. LangGraph核心实现详解4.1 状态机建模LangGraph的核心是明确定义状态结构和流转规则。这是我们定义的State类型from typing import TypedDict, List from langchain_core.messages import AIMessage class ResearchState(TypedDict): question: str search_queries: List[str] search_results: List[dict] # 包含text和source_url reflection: dict final_answer: AIMessage4.2 构建有状态工作流在graph.py中定义完整的循环工作流from langgraph.graph import StateGraph builder StateGraph(ResearchState) # 定义节点 builder.add_node(generate_queries, generate_search_queries) builder.add_node(execute_search, parallel_web_search) builder.add_node(analyze_coverage, analyze_information_coverage) builder.add_node(format_output, format_final_answer) # 设置流转逻辑 builder.set_entry_point(generate_queries) builder.add_edge(generate_queries, execute_search) builder.add_conditional_edges( analyze_coverage, decide_to_continue, { continue: execute_search, complete: format_output } ) builder.add_edge(format_output, END) research_flow builder.compile()4.3 关键节点实现示例搜索查询生成节点def generate_search_queries(state: ResearchState): prompt 基于以下问题生成3-5个搜索查询 问题{question} 考虑不同的角度和表达方式确保覆盖这些方面 - 专业术语的准确定义 - 最新的发展动态 - 不同观点的对比分析 返回格式 json { queries: [查询1, 查询2] } llm ChatAnthropic(modelclaude-3-sonnet) structured_llm llm.with_structured_output(SearchQueries) return structured_llm.invoke(prompt.format(questionstate[question]))信息覆盖分析节点def analyze_information_coverage(state: ResearchState): sources_text \n\n.join( f来源{i}: {res[text]} for i, res in enumerate(state[search_results]) ) prompt f已有信息 {sources_text} 请评估这些信息是否足够回答以下问题 问题{state[question]} 需要特别检查 1. 是否有相互矛盾的信息 2. 是否缺少关键数据支撑 3. 时间敏感性内容是否最新 返回JSON格式 json {{ is_sufficient: bool, missing_points: [未覆盖要点1, 要点2] }} # 实现类似generate_search_queries的LLM调用 return analysis_result5. 前后端集成实战5.1 FastAPI后端设计创建支持流式响应的API端点from fastapi import FastAPI from fastapi.responses import StreamingResponse app FastAPI() app.post(/api/research) async def run_research(query: str): async def event_stream(): state {question: query} async for event in research_flow.astream(state): if final_answer in event: yield fdata: {event[final_answer]}\n\n elif search_results in event: yield fevent: search_update\ndata: {json.dumps(event)}\n\n return StreamingResponse( event_stream(), media_typetext/event-stream )5.2 React前端实现使用自定义Hook管理研究状态// hooks/useResearcher.js import { useState, useEffect } from react; export function useResearcher(initialQuestion) { const [stages, setStages] useState({ queries: [], results: [], answer: null }); useEffect(() { const eventSource new EventSource( /api/research?query${encodeURIComponent(initialQuestion)} ); eventSource.addEventListener(search_update, (e) { const data JSON.parse(e.data); setStages(prev ({ ...prev, queries: data.search_queries, results: [...prev.results, ...data.search_results] })); }); return () eventSource.close(); }, [initialQuestion]); return stages; }5.3 引用展示组件实现美观的学术风格引用// components/Citation.jsx function Citation({ source }) { return ( div classNamecitation-box blockquote {source.text} /blockquote div classNamesource-meta a href{source.url} target_blank {new URL(source.url).hostname} /a span classNameretrieval-date 获取时间: {new Date().toLocaleDateString()} /span /div /div ); }6. 生产环境优化技巧6.1 性能调优缓存策略实现示例from functools import lru_cache from datetime import timedelta lru_cache(maxsize1000) def cached_search(query: str): return original_search_function(query) # 在FastAPI中添加缓存头 app.middleware(http) async def add_cache_control(request: Request, call_next): response await call_next(request) if request.url.path /api/research: response.headers[Cache-Control] max-age3600 return response6.2 错误处理增强为LangGraph工作流添加重试逻辑from tenacity import retry, stop_after_attempt, wait_exponential retry( stopstop_after_attempt(3), waitwait_exponential(multiplier1, min4, max10) ) def robust_web_search(query): try: return google_search(query) except Exception as e: log_error(f搜索失败: {str(e)}) raise6.3 监控与日志集成Prometheus监控指标from prometheus_client import Counter, Histogram SEARCH_REQUESTS Counter( research_search_requests_total, Total search requests made ) RESEARCH_DURATION Histogram( research_duration_seconds, Time spent processing research requests, buckets[0.1, 0.5, 1, 2, 5] ) app.post(/api/research) RESEARCH_DURATION.time() async def run_research(query: str): SEARCH_REQUESTS.inc() # 原有实现...7. 从项目到产品进阶路线当这个AI研究员系统需要服务真实用户时还需要考虑用户身份系统保存研究历史支持多设备同步团队协作功能共享研究结果添加批注讨论知识库集成与企业内部Confluence/Notion内容联动自动化测试确保工作流各环节的稳定性一个实用的测试策略是构建验证用例库TEST_CASES [ { input: 量子计算对金融风险管理的影响, expected_aspects: [算法优势, 实际应用案例, 与传统方法对比] }, # 更多测试用例... ] def test_research_flow(): for case in TEST_CASES: result research_flow.invoke({question: case[input]}) for aspect in case[expected_aspects]: assert aspect in result[final_answer].content在最近为法律科技公司实施类似系统时我们通过添加领域特定的验证规则如必须引用最新判例使生成内容的专业度提升了40%。关键是在analyze_information_coverage节点中加入行业特定的检查逻辑。