LangGraph是2026年构建生产级AI Agent的首选框架——它将Agent的执行过程建模为有向图DAG每个节点是一个处理步骤边是条件跳转逻辑。这种设计让复杂的多步骤AI工作流变得可视化、可调试、可扩展。本文从工程实践角度深入解析LangGraph的核心概念与生产部署技巧。一、为什么选择LangGraph而非简单的Agent循环很多团队初期用while循环 工具调用实现Agent够用但难以维护。LangGraph的优势在于状态机语义工作流的每个状态都是显式定义的便于调试和测试条件分支可以根据LLM输出或外部条件动态决定下一步走哪条路并行执行支持多个节点同时执行然后聚合结果持久化内置checkpointing工作流可以暂停、恢复支持Human-in-the-Loop可视化图结构可以直接渲染为流程图方便团队协作理解## 二、LangGraph核心概念### 2.1 State工作流的共享状态pythonfrom typing import TypedDict, Annotatedfrom operator import addfrom langgraph.graph import StateGraph, ENDclass WorkflowState(TypedDict): 工作流的共享状态定义 # 用户输入 user_query: str # 中间结果使用add操作符新值追加而非覆盖 search_results: Annotated[list[str], add] # 最终输出 final_answer: str # 控制流 iteration_count: int should_continue: bool # 工具调用历史 tool_calls: Annotated[list[dict], add] # 错误信息 errors: Annotated[list[str], add]### 2.2 Node处理节点每个节点是一个接受State、返回State更新的函数pythonimport anthropicfrom langgraph.graph import StateGraphclient anthropic.Anthropic()def analyze_query_node(state: WorkflowState) - dict: 分析用户查询确定搜索策略 response client.messages.create( modelclaude-opus-4-7, max_tokens500, messages[{ role: user, content: f分析这个查询输出JSON查询{state[user_query]}输出格式{{ query_type: factual|analytical|creative, search_keywords: [关键词1, 关键词2], complexity: simple|medium|complex, requires_calculation: true|false}} }] ) import json try: analysis json.loads(response.content[0].text) except: analysis {query_type: factual, search_keywords: [state[user_query]], complexity: simple} return { search_keywords: analysis.get(search_keywords, []), query_analysis: analysis }def web_search_node(state: WorkflowState) - dict: 执行网络搜索 results [] for keyword in state.get(search_keywords, [state[user_query]])[:3]: # 调用搜索API search_result perform_web_search(keyword) results.extend(search_result) return { search_results: results, tool_calls: [{tool: web_search, keywords: state.get(search_keywords)}] }def synthesis_node(state: WorkflowState) - dict: 综合搜索结果生成最终回答 context \n\n.join(state.get(search_results, [])[:5]) response client.messages.create( modelclaude-opus-4-7, max_tokens1500, messages[{ role: user, content: f基于以下搜索结果回答用户问题。问题{state[user_query]}搜索结果{context}请给出准确、全面的回答。 }] ) return { final_answer: response.content[0].text, should_continue: False }def quality_check_node(state: WorkflowState) - dict: 质量检查判断回答是否满足要求 response client.messages.create( modelclaude-opus-4-7, max_tokens200, messages[{ role: user, content: f评估回答质量问题{state[user_query]}回答{state.get(final_answer, )}回答是否完整准确(yes/no)如果no给出改进方向一句话 }] ) answer_text response.content[0].text.lower() is_good yes in answer_text return { quality_passed: is_good, iteration_count: state.get(iteration_count, 0) 1 }### 2.3 Graph编排工作流pythonfrom langgraph.graph import StateGraph, ENDdef build_research_workflow(): 构建研究型工作流 workflow StateGraph(WorkflowState) # 添加节点 workflow.add_node(analyze, analyze_query_node) workflow.add_node(search, web_search_node) workflow.add_node(synthesize, synthesis_node) workflow.add_node(quality_check, quality_check_node) # 设置起始节点 workflow.set_entry_point(analyze) # 顺序边 workflow.add_edge(analyze, search) workflow.add_edge(search, synthesize) workflow.add_edge(synthesize, quality_check) # 条件边质量检查后决定是否重试 def should_retry(state: WorkflowState) - str: if state.get(quality_passed, True): return done elif state.get(iteration_count, 0) 2: return done # 最多重试2次 else: return retry workflow.add_conditional_edges( quality_check, should_retry, { done: END, retry: search # 重新搜索 } ) return workflow.compile()# 使用工作流app build_research_workflow()result app.invoke({ user_query: 2026年AI Agent的最新技术进展, search_results: [], tool_calls: [], errors: [], iteration_count: 0, should_continue: True})print(result[final_answer])## 三、Human-in-the-Loop工作流暂停与恢复LangGraph内置了检查点机制支持在关键步骤暂停等待人工确认pythonfrom langgraph.checkpoint.sqlite import SqliteSaverfrom langgraph.graph import StateGraph, END# 使用SQLite持久化检查点memory SqliteSaver.from_conn_string(checkpoints.db)def build_approval_workflow(): 需要人工审批的工作流 workflow StateGraph(WorkflowState) workflow.add_node(draft_response, draft_response_node) workflow.add_node(human_review, human_review_node) # 等待人工 workflow.add_node(finalize, finalize_node) workflow.set_entry_point(draft_response) workflow.add_edge(draft_response, human_review) # human_review节点会在此处暂停等待人工输入 workflow.add_conditional_edges( human_review, lambda state: approve if state.get(approved) else revise, { approve: finalize, revise: draft_response } ) workflow.add_edge(finalize, END) # 编译时注入检查点 return workflow.compile( checkpointermemory, interrupt_before[human_review] # 在此节点前暂停 )app build_approval_workflow()# 第一次运行会在human_review前暂停thread_config {configurable: {thread_id: task_001}}result app.invoke( {user_query: 起草给客户的季度报告}, configthread_config)print(草稿已生成等待审批, result.get(draft))# 人工审查后继续运行注入审批状态app.update_state( thread_config, {approved: True, human_feedback: 很好可以发送})final_result app.invoke(None, configthread_config)print(最终结果, final_result.get(final_answer))## 四、并行节点提升多任务效率pythondef build_parallel_research_workflow(): 并行搜索多个来源提高效率 workflow StateGraph(WorkflowState) workflow.add_node(decompose, decompose_query_node) # 三个并行搜索节点 workflow.add_node(web_search, web_search_node) workflow.add_node(db_search, database_search_node) workflow.add_node(docs_search, docs_search_node) workflow.add_node(merge_results, merge_results_node) workflow.add_node(synthesize, synthesis_node) workflow.set_entry_point(decompose) # 分解后并行执行三个搜索LangGraph自动并行处理同一源节点的多条边 workflow.add_edge(decompose, web_search) workflow.add_edge(decompose, db_search) workflow.add_edge(decompose, docs_search) # 三个节点都完成后才到merge_results workflow.add_edge(web_search, merge_results) workflow.add_edge(db_search, merge_results) workflow.add_edge(docs_search, merge_results) workflow.add_edge(merge_results, synthesize) workflow.add_edge(synthesize, END) return workflow.compile()## 五、流式输出与实时进度pythonasync def run_with_streaming(user_query: str): 流式执行工作流实时显示进度 app build_research_workflow() async for event in app.astream_events( {user_query: user_query, search_results: [], tool_calls: [], errors: [], iteration_count: 0}, versionv1 ): kind event[event] if kind on_chain_start: node_name event[name] if node_name in [analyze, search, synthesize, quality_check]: print(f 执行节点: {node_name}) elif kind on_chain_end: node_name event[name] if node_name synthesize: output event[data].get(output, {}) if final_answer in output: print(f✅ 生成回答完成) elif kind on_llm_stream: # 实时输出LLM生成的文字 chunk event[data].get(chunk, ) if hasattr(chunk, content) and chunk.content: print(chunk.content, end, flushTrue)import asyncioasyncio.run(run_with_streaming(2026年最值得关注的AI技术方向))## 六、生产部署LangGraph PlatformLangGraph 0.2提供了Platform功能简化生产部署python# langgraph.json - 部署配置{ dependencies: [./my_agent], graphs: { research_agent: ./my_agent/workflow.py:app, code_agent: ./my_agent/code_workflow.py:app }, env: { ANTHROPIC_API_KEY: env:ANTHROPIC_API_KEY }}bash# 本地开发服务器langgraph dev# 构建Docker镜像langgraph build -t my-agent:latest# 部署到云langgraph up # 使用LangSmith托管关键生产配置pythonfrom langgraph.checkpoint.postgres import PostgresSaverimport psycopg2# 生产环境使用PostgreSQL持久化检查点conn psycopg2.connect(os.environ[DATABASE_URL])checkpointer PostgresSaver(conn)checkpointer.setup()# 编译生产级工作流production_app workflow.compile( checkpointercheckpointer, interrupt_before[human_approval], # 需要审批的步骤)## 七、监控与调试LangGraph与LangSmith深度集成自动追踪每次执行pythonimport osos.environ[LANGCHAIN_TRACING_V2] trueos.environ[LANGCHAIN_API_KEY] your_langsmith_keyos.environ[LANGCHAIN_PROJECT] production-agent# 之后所有工作流执行都会自动发送到LangSmith# 可以在LangSmith界面看到# - 完整的执行路径哪些节点被执行了# - 每个节点的输入/输出# - Token消耗和延迟# - 失败节点和错误信息## 八、总结LangGraph是构建生产级AI工作流的工程利器1.StateGraph显式的状态管理避免隐式状态传递的混乱2.条件边基于LLM输出或外部条件的动态路由3.并行节点自动并行化无依赖的处理步骤4.Checkpointing工作流暂停/恢复支持Human-in-the-Loop5.流式执行实时输出中间结果提升用户体验6.Platform部署从本地开发到生产的一站式支持相比简单的Agent循环LangGraph提供了工业级的可靠性、可调试性和可扩展性——这正是从原型迈向生产的关键差距。