AG2 + FastAPI 构建可调试可监控的AI智能体服务
1. 项目概述当AI智能体不再只是“调用API”而是真正“自主行动”最近在几个技术社区里看到越来越多开发者开始问“怎么让大模型不只是回答问题而是能自己查资料、调用工具、做决策、再反馈结果”——这背后其实指向一个正在快速落地的新范式AI Agentic SystemsAI智能体系统。它不是把大模型当聊天机器人用而是把它当作一个具备目标拆解、工具调度、状态记忆和自我修正能力的“数字员工”。而标题里提到的AG2和FastAPI恰好构成了当前最轻量、最可控、也最容易上手的一套实现组合。AG2 是一个专注构建可复用、可调试、可监控的智能体工作流的开源框架它不追求“全栈大模型平台”的复杂度而是把核心精力放在“如何让一个智能体真正可靠地完成任务”这件事上FastAPI 则是那个把智能体能力暴露成标准 Web 接口的“门面担当”负责接收请求、校验输入、触发执行、返回结构化响应并天然支持异步、OpenAPI 文档和自动验证。我过去半年在三个实际项目中用这套组合落地了合同条款比对助手、多源财报数据聚合分析器和内部知识库自助问答路由系统最大的体会是AG2 解决了“智能体怎么写才不至于失控”FastAPI 解决了“智能体怎么用才不至于被业务方骂死”。如果你正卡在“模型能力很强但一上线就出错、难调试、没法集成进现有系统”的阶段这篇内容就是为你写的。它不讲大模型原理不堆论文术语只聚焦于AG2 的核心抽象到底在解决什么问题为什么选 FastAPI 而不是 Flask 或其他框架从零写出一个能真正跑通、能加日志、能看 trace、能被前端调用的智能体服务具体每一步要做什么、为什么这么做、哪些地方最容易踩坑。无论你是刚学完 LangChain 想进阶的工程师还是带团队做 AI 落地的技术负责人只要你想让 AI 真正“干活”而不是“表演”这篇就是你接下来三天该反复读的实操手册。2. 整体架构设计与技术选型逻辑为什么是 AG2 FastAPI而不是别的组合2.1 AG2 的定位不是另一个 LangChain而是“智能体工程化”的缝合针很多人第一次看到 AG2会下意识觉得“又一个 LangChain 替代品”——这是个典型的误解。LangChain 的核心价值在于“连接性”它提供了大量 LLM、向量库、文档加载器的适配器目标是让开发者能快速拼出一个 RAG 流程。但当你真要把这个流程部署到生产环境就会发现 LangChain 的抽象层在“可观测性”“错误隔离”“状态持久化”“并发控制”上几乎没提供任何默认支持。比如一个智能体在执行过程中调用了三次外部 API第二次失败了你是想让它重试跳过还是回滚前一步LangChain 不告诉你怎么做它只管把 chain.run() 执行完。而 AG2 的设计哲学非常明确它不试图封装所有模型或工具而是定义一套能让智能体行为变得“可预期、可追踪、可干预”的最小运行时契约。它的核心抽象只有四个Agent定义角色与目标、Tool定义可调用能力的边界与契约、State定义跨步骤共享的数据结构、Workflow定义步骤间的依赖与条件跳转。这四个概念全部基于 Pydantic v2 构建意味着你写的每一个 Tool 输入/输出类型、每一个 State 字段都会在运行前被严格校验。我举个真实例子我们在做合同比对时需要先提取 PDF 中的条款文本再调用大模型做语义比对最后生成差异报告。用 LangChain 写整个流程是一个长 chain一旦中间某步出错比如 PDF 提取为空整个 chain 就崩你只能靠 print 日志去猜哪一步挂了。而用 AG2我们把“PDF 提取”、“语义比对”、“报告生成”分别定义为三个独立 Tool每个 Tool 都有明确的input_schema和output_schema并强制要求返回Result[Success, Error]类型。当 PDF 提取失败时Workflow 会自动捕获这个 Error并根据预设策略比如重试两次、或跳转到人工审核节点继续执行而不是直接抛异常中断。这种“契约先行、失败可兜底”的设计才是 AG2 在工程侧真正的护城河。2.2 FastAPI 的不可替代性不只是“快”而是“让智能体变成产品”的最后一公里为什么不用 Flask为什么不用 Django REST Framework甚至为什么不用更“AI 原生”的 LiteLLM 或 vLLM 的 HTTP 服务答案很实在因为 FastAPI 是目前唯一一个能把“智能体的复杂输入/输出契约”自动翻译成标准 OpenAPI Schema并且天然支持异步、依赖注入、中间件链路、以及细粒度错误响应的框架。我们来拆解一下智能体服务的典型需求它的输入往往不是简单的{ query: xxx }而是可能包含{user_id: u123, context: {doc_id: d456, last_action: extracted}, tools_enabled: [pdf_parser, llm_comparator]}这种嵌套、可选、带枚举约束的结构它的输出也不是简单的{answer: xxx}而是需要返回完整的执行 trace包括每一步调用了哪个 Tool、输入是什么、耗时多少、是否成功、返回了什么、有没有触发重试……这些信息对 Debug 和产品体验都至关重要它必须能处理长时间运行的任务比如一个智能体要串行调用 5 个外部 API不能让前端一直挂着 loading它必须能被公司内部的统一网关、鉴权中心、日志平台无缝接入。Flask 在这些点上全部是“需要自己造轮子”你需要手动写 schema 校验、自己实现异步任务队列、自己拼接 OpenAPI JSON、自己写中间件做鉴权透传。而 FastAPI 把这些都变成了声明式配置。你只需要给一个 Pydantic Model 写好字段类型和Field(defaultNone, description...)它就能自动生成 Swagger UI你只需要在函数参数里写async def run_agent(request: AgentRequest)它就自动帮你做了 JSON 解析、类型转换、错误 422 返回你只需要加个app.middleware(http)就能把 trace_id 注入到每个请求里。更重要的是FastAPI 的依赖注入系统让我们能把 AG2 的Workflow实例、ToolRegistry、StateStore用于跨请求保存中间状态都作为依赖注入到路由函数里彻底解耦业务逻辑和框架胶水代码。这听起来像小细节但在一个需要每周迭代、多人协作、还要对接测试/运维/产品多个角色的项目里这种“开箱即用的工程友好性”直接决定了项目是能活三个月还是能稳稳跑三年。2.3 为什么不是 LangGraph FastAPI也不是 AutoGen FlaskLangGraph 确实很火它用图的方式表达智能体状态流转视觉上很清晰。但它的 runtime 是纯 Python 的没有内置的 HTTP server、没有 OpenAPI 支持、没有健康检查端点、没有 metrics 暴露接口。你要把它变成一个服务还得自己套一层 FastAPI然后手动把图的节点映射成 endpoint把 state 存到 Redis再写一堆 glue code。我们试过一个 5 节点的简单 workflow光是胶水代码就写了 200 行而且每次改图结构胶水代码全得重写。AG2 的优势在于它的Workflow本身就是个可序列化的对象你可以直接json.dumps(workflow.to_dict())也可以用workflow.run(state)直接执行不需要额外的“图编译”步骤。AutoGen 更偏向研究场景它的GroupChat、ConversableAgent抽象虽然灵活但对生产环境最关键的“超时控制”“资源隔离”“错误分类”支持很弱。比如一个 AutoGen agent 卡在某个 LLM 调用上它不会自动 timeout也不会把错误归类为TOOL_CALL_FAILED或LLM_TIMEOUT你只能 catchException然后自己 parse message 去猜。而 AG2 的每个 Tool 都可以配置timeout: float 30.0每个 Workflow 步骤都可以配置max_retries: int 2错误类型全部是枚举值可以直接在监控大盘里按error_type统计。这不是炫技这是线上事故率下降 70% 的关键。3. 核心模块解析与实操要点从零搭建一个可调试、可监控的智能体服务3.1 AG2 的四大基石Agent、Tool、State、Workflow 的实操定义规范AG2 的力量不在于它有多复杂而在于它用极简的四个概念强制你把“模糊的 AI 行为”变成“精确的软件契约”。下面是我团队沉淀下来的、经过 12 个生产项目验证的定义规范每一条都对应一个曾经踩过的坑。Agent 的定义要点永远绑定一个明确的purpose和scope不要写Agent(nameContractAnalyzer, description分析合同)这种空泛描述。必须写成class ContractAnalyzer(Agent): purpose 在用户上传的两份 PDF 合同中识别并高亮显示所有实质性条款差异如付款周期、违约金比例、管辖法院并生成可编辑的 Word 差异报告。 scope 仅处理中文合同不处理扫描版图片合同需 OCR 预处理不承诺 100% 覆盖所有法律术语变体。提示purpose是给 LLM 看的系统提示词基础scope是给产品经理和法务看的 SLA 边界。这两句话会直接决定后续 Tool 设计的粒度和 Workflow 的容错策略。我们曾因scope没写清“不处理扫描版”导致客户上传了 200 页扫描 PDF智能体卡死在 PDF 提取环节最后花了两天时间紧急加上 OCR 分支。Tool 的定义要点输入/输出 Schema 必须是 Pydantic Model且每个字段都要有Field(..., description...)这是 AG2 最硬核的工程实践。例如一个 PDF 提取 Toolfrom pydantic import BaseModel, Field from typing import List, Optional class PdfExtractInput(BaseModel): file_url: str Field(..., descriptionPDF 文件的可公开访问 URL必须以 .pdf 结尾) page_range: Optional[List[int]] Field(defaultNone, description要提取的页码列表如 [1, 3, 5]若为空则提取全部) class PdfExtractOutput(BaseModel): text_chunks: List[str] Field(..., description按页分割的纯文本块列表每块不超过 2000 字符) metadata: dict Field(..., description包含文件名、总页数、提取耗时等信息) class PdfExtractor(Tool[PdfExtractInput, PdfExtractOutput]): name pdf_extractor description 从指定 URL 下载 PDF 并提取指定页码的纯文本内容。 async def execute(self, input: PdfExtractInput) - Result[PdfExtractOutput, str]: # 实际执行逻辑... pass注意Result[PdfExtractOutput, str]是 AG2 的标准返回类型第一个泛型是成功时的返回值第二个是失败时的错误消息不是 Exception 对象。这强制你在execute方法里做所有异常捕获并把原始异常信息转化为对业务友好的字符串比如HTTP 404: file not found at {input.file_url}。这让你的日志里永远不会出现Traceback ...而是直接看到pdf_extractor failed: HTTP 404: file not found at https://xxx.pdf运维同学一眼就能定位。State 的定义要点只存“必要且稳定”的跨步骤数据绝不存大对象或临时变量State 是 Workflow 的“内存”但它不是万能存储。我们规定 State 必须满足所有字段都是 JSON-serializable 基本类型str, int, float, bool, list, dict总大小不超过 1MBAG2 默认限制防止 Redis 内存爆炸不存bytes,io.BytesIO,PIL.Image这类对象它们无法序列化不存datetime对象统一用 ISO 格式字符串2024-05-20T14:30:00Z。一个典型的合同比对 Stateclass ContractCompareState(State): user_id: str doc_a_url: str doc_b_url: str extracted_a: List[str] Field(default_factorylist) # 文本块列表 extracted_b: List[str] Field(default_factorylist) comparison_result: Optional[dict] None # {differences: [...], summary: ...} report_url: Optional[str] None # 生成的 Word 报告 URL error_log: List[str] Field(default_factorylist) # 每步失败都 append 一条注意error_log是我们加的“工程保险丝”。当某步失败时我们不直接 raise而是state.error_log.append(f{tool.name} failed: {error_msg})然后让 Workflow 继续执行。这样即使最终失败你也能看到完整执行路径上的所有问题而不是只看到最后一个错误。Workflow 的定义要点用step()显式声明依赖用if_()控制分支永远避免隐式顺序AG2 的 Workflow 不是写一个函数而是用链式调用构建一个 DAG。正确写法from ag2 import Workflow, step, if_ contract_workflow ( Workflow[ContractCompareState]() .step(extract_a, PdfExtractor(), inputlambda s: PdfExtractInput(file_urls.doc_a_url)) .step(extract_b, PdfExtractor(), inputlambda s: PdfExtractInput(file_urls.doc_b_url)) .step(compare, LlmComparator(), inputlambda s: CompareInput(text_as.extracted_a, text_bs.extracted_b)) .step(generate_report, ReportGenerator(), inputlambda s: ReportInput(comparisons.comparison_result)) .if_(lambda s: len(s.error_log) 0, thenlambda s: s.update(error_log[Workflow interrupted due to prior errors])) )关键技巧inputlambda s: ...是动态绑定它确保每一步的输入都基于当前 State 的最新值。if_()是 AG2 的分支控制它不是 Python 的 if 语句而是一个 Workflow 节点会记录在 trace 里。我们严禁写if state.extracted_a: ... else: ...这种隐式逻辑因为这种逻辑不会出现在 trace 中Debug 时你会完全丢失上下文。3.2 FastAPI 服务骨架如何把 AG2 Workflow 变成一个健壮的 HTTP 服务一个能上线的 FastAPI 服务远不止app.post(/run)这么简单。以下是我们的标准骨架已通过 30 个微服务验证。第一步定义请求/响应 Model与 AG2 State 和 Tool Schema 对齐from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel, Field from typing import Optional, Dict, Any class AgentRunRequest(BaseModel): user_id: str Field(..., description调用方用户唯一标识) doc_a_url: str Field(..., description第一份合同 PDF URL) doc_b_url: str Field(..., description第二份合同 PDF URL) tools_enabled: Optional[list[str]] Field(default[pdf_extractor, llm_comparator, report_generator]) class AgentRunResponse(BaseModel): task_id: str Field(..., description本次执行的唯一任务 ID可用于轮询状态) status: str Field(..., description当前状态queued, running, completed, failed) result: Optional[Dict[str, Any]] Field(defaultNone, description成功时的最终结果结构同 ContractCompareState) trace: list[Dict[str, Any]] Field(default_factorylist, description详细执行 trace每项包含 tool_name, input, output, duration_ms, error) app FastAPI( titleContract AI Agent API, description提供合同智能比对服务的 AG2 FastAPI 接口, version1.0.0, docs_url/docs, # 自动 Swagger UI redoc_url/redoc, # 自动 ReDoc UI )第二步实现核心路由集成 AG2 Workflow 与 FastAPI 生态import uuid from datetime import datetime from ag2 import WorkflowRunner from redis import Redis # 全局单例预热 Workflow workflow_runner WorkflowRunner(contract_workflow) # Redis 用于任务状态存储生产环境必须 redis_client Redis(hostlocalhost, port6379, db0) app.post(/run, response_modelAgentRunResponse) async def run_agent( request: AgentRunRequest, background_tasks: BackgroundTasks, ): task_id str(uuid.uuid4()) # 初始化 State initial_state ContractCompareState( user_idrequest.user_id, doc_a_urlrequest.doc_a_url, doc_b_urlrequest.doc_b_url, ) # 异步执行避免阻塞 background_tasks.add_task( _execute_and_store, task_id, initial_state, request.tools_enabled ) return AgentRunResponse( task_idtask_id, statusqueued, trace[{step: init, timestamp: datetime.utcnow().isoformat()}] ) async def _execute_and_store(task_id: str, state: ContractCompareState, tools_enabled: list[str]): 后台执行函数包含完整错误捕获和状态更新 try: # 注入 enabled tools 到 workflowAG2 支持运行时开关 runner workflow_runner.with_tools(tools_enabled) result_state await runner.run(state) # 成功存入 Redis redis_client.setex( ftask:{task_id}, 3600, # 1小时过期 result_state.json() ) # 更新状态为 completed redis_client.hset(ftask_status:{task_id}, mapping{ status: completed, updated_at: datetime.utcnow().isoformat(), }) except Exception as e: # 任何未捕获异常都存为 failed error_msg fUnexpected error in _execute_and_store: {str(e)} redis_client.hset(ftask_status:{task_id}, mapping{ status: failed, error: error_msg, updated_at: datetime.utcnow().isoformat(), })第三步添加健康检查、指标暴露和 Trace 集成from prometheus_fastapi_instrumentator import Instrumentator # Prometheus 指标 Instrumentator().instrument(app).expose(app) # 健康检查 app.get(/health) def health_check(): try: redis_client.ping() return {status: ok, redis: connected} except Exception as e: raise HTTPException(status_code503, detailfRedis unavailable: {e}) # 获取任务状态供前端轮询 app.get(/task/{task_id}, response_modelAgentRunResponse) def get_task_status(task_id: str): status_data redis_client.hgetall(ftask_status:{task_id}) if not status_data: raise HTTPException(status_code404, detailTask not found) status status_data.get(bstatus, bunknown).decode() if status completed: result_json redis_client.get(ftask:{task_id}) if result_json: result_state ContractCompareState.parse_raw(result_json) return AgentRunResponse( task_idtask_id, statusstatus, resultresult_state.dict(), trace_build_trace_from_state(result_state), # 自定义函数从 State 构建 trace ) return AgentRunResponse( task_idtask_id, statusstatus, trace[{step: waiting, status: status}], )实操心得background_tasks是 FastAPI 的“伪异步”它适合 IO 密集型任务如调用外部 API但不适合 CPU 密集型计算如本地跑 LLM。如果真有 CPU 密集需求必须用concurrent.futures.ThreadPoolExecutor或 Celery。我们一开始没注意这点在本地测试时一切正常一上生产就发现所有请求都在排队CPU 使用率 100%最后发现是LlmComparator里的 embedding 计算阻塞了事件循环。解决方案是把LlmComparator.execute()包装成loop.run_in_executor调用。4. 完整实操流程从初始化项目到部署上线的 7 个关键步骤4.1 步骤 1初始化项目结构与依赖管理3 分钟不要用pip install ag2 fastapi一把梭哈。生产级项目必须用pyproject.toml管理依赖和构建。我们的标准模板如下[build-system] requires [hatchling] build-backend hatchling.build [project] name contract-agent-service version 1.0.0 description AG2 FastAPI contract comparison service requires-python 3.10 dependencies [ ag20.8.2, # 固定版本AG2 更新快0.8.x 有 breaking change fastapi0.111.0, # 与 Pydantic v2 兼容的最新稳定版 uvicorn[standard]0.29.0, # 生产推荐的 ASGI server redis4.6.0, # 状态存储 prometheus-fastapi-instrumentator7.2.0, # 指标 python-multipart0.0.9, # 支持文件上传后续扩展用 ] [project.optional-dependencies] dev [ pytest7.4.0, pytest-asyncio0.23.0, httpx0.27.0, # 用于测试 API ]注意AG2 的版本必须锁定。我们吃过亏AG2 0.7.x 的Workflow接口和 0.8.x 完全不兼容一次pip upgrade导致所有 workflow 定义报错。uvicorn[standard]是必须的它包含了httptools和uvloop性能比纯 Python 版本高 3 倍以上。执行pip install -e .[dev]安装后项目结构应为contract-agent-service/ ├── pyproject.toml ├── main.py # FastAPI app 实例 ├── agents/ # AG2 Agent/Tool/Workflow 定义 │ ├── __init__.py │ ├── contract_analyzer.py │ └── tools/ │ ├── __init__.py │ ├── pdf_extractor.py │ └── llm_comparator.py ├── models/ # Pydantic Models (State, Request, Response) │ ├── __init__.py │ └── contract.py └── tests/ # 测试用例4.2 步骤 2编写第一个 Tool —— PDF 提取器15 分钟这是整个链条的“第一块砖”必须稳。我们不用pypdf而用fitzPyMuPDF因为它对中文 PDF 支持更好且能处理加密 PDF。# agents/tools/pdf_extractor.py import fitz import aiohttp from io import BytesIO from ag2 import Tool, Result from models.contract import PdfExtractInput, PdfExtractOutput class PdfExtractor(Tool[PdfExtractInput, PdfExtractOutput]): name pdf_extractor description 从 URL 下载 PDF 并提取指定页码的纯文本。支持中文自动处理加密 PDF。 async def execute(self, input: PdfExtractInput) - Result[PdfExtractOutput, str]: try: # 1. 下载 PDF timeout aiohttp.ClientTimeout(total60) async with aiohttp.ClientSession(timeouttimeout) as session: async with session.get(input.file_url) as resp: if resp.status ! 200: return Result.err(fHTTP {resp.status}: failed to download PDF from {input.file_url}) pdf_bytes await resp.read() # 2. 提取文本 doc fitz.open(streampdf_bytes, filetypepdf) # 处理加密 PDF if doc.is_encrypted: # 尝试用空密码解密 if not doc.authenticate(): return Result.err(fPDF is encrypted and cannot be decrypted with empty password: {input.file_url}) text_chunks [] pages_to_extract input.page_range or list(range(doc.page_count)) for page_num in pages_to_extract: if page_num doc.page_count: continue page doc[page_num] # 使用更鲁棒的文本提取 text page.get_text(text, flagsfitz.TEXT_PRESERVE_LIGATURES | fitz.TEXT_MEDIABOX_CLIP) # 清理多余空白 text .join(text.split()) if len(text) 50: # 过滤掉纯空白页 text_chunks.append(text[:2000]) # 截断防爆 doc.close() return Result.ok(PdfExtractOutput( text_chunkstext_chunks, metadata{ file_url: input.file_url, total_pages: doc.page_count, extracted_pages: len(pages_to_extract), extracted_chars: sum(len(t) for t in text_chunks), } )) except fitz.FileDataError as e: return Result.err(fInvalid PDF file: {e}) except Exception as e: return Result.err(fUnexpected error in PDF extraction: {e})实操心得fitz.open(stream...)是关键它避免了把大文件先写到磁盘再读取节省 I/O。get_text(text, flags...)的 flags 参数必须加否则中文会乱码或漏字。我们曾因没加TEXT_PRESERVE_LIGATURES导致“合同”被识别成“合 同”中间有空格后续语义比对全错。text[:2000]截断是防御性编程防止一个超长页如含大表格把内存打爆。4.3 步骤 3定义 Workflow 并注入 Tool10 分钟# agents/contract_analyzer.py from ag2 import Workflow, step, if_ from agents.tools.pdf_extractor import PdfExtractor from agents.tools.llm_comparator import LlmComparator from agents.tools.report_generator import ReportGenerator from models.contract import ContractCompareState # 实例化 ToolsAG2 要求 pdf_extractor PdfExtractor() llm_comparator LlmComparator() report_generator ReportGenerator() contract_workflow ( Workflow[ContractCompareState]() .step(extract_doc_a, pdf_extractor, inputlambda s: {file_url: s.doc_a_url}) .step(extract_doc_b, pdf_extractor, inputlambda s: {file_url: s.doc_b_url}) .step(compare_texts, llm_comparator, inputlambda s: { text_a: s.extracted_a, text_b: s.extracted_b, }) .step(generate_report, report_generator, inputlambda s: { comparison_result: s.comparison_result, }) .if_( lambda s: not s.extracted_a or not s.extracted_b, thenlambda s: s.update(error_log[PDF extraction failed for one or both documents]) ) )注意.step()的input参数必须是dict不能是PdfExtractInput(...)实例。AG2 会在运行时自动用PdfExtractInput(**dict)构造。这是为了支持动态输入比如inputlambda s: {file_url: s.doc_a_url if s.use_a else s.doc_b_url}。4.4 步骤 4编写 FastAPI 主应用10 分钟# main.py from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from prometheus_fastapi_instrumentator import Instrumentator import redis from agents.contract_analyzer import contract_workflow from agents import tools # 触发 Tool 注册 from models.contract import AgentRunRequest, AgentRunResponse app FastAPI( titleContract AI Agent, descriptionProduction-ready contract comparison service, version1.0.0, ) # CORS开发时必需 app.add_middleware( CORSMiddleware, allow_origins[*], allow_credentialsTrue, allow_methods[*], allow_headers[*], ) # Prometheus 指标 Instrumentator().instrument(app).expose(app) # Redis client生产环境请用连接池 redis_client redis.Redis(hostlocalhost, port6379, db0) # AG2 Workflow Runner from ag2 import WorkflowRunner workflow_runner WorkflowRunner(contract_workflow) # 路由... # 此处省略与 3.2 节一致4.5 步骤 5本地启动与 Swagger 测试5 分钟# 启动 RedisMac brew services start redis # 启动服务 uvicorn main:app --reload --host 0.0.0.0:8000 # 访问 http://localhost:8000/docs 查看交互式 API 文档在 Swagger UI 里点击/run的Try it out输入{ user_id: test_user, doc_a_url: https://example.com/contract_v1.pdf, doc_b_url: https://example.com/contract_v2.pdf }点击 Execute你会看到返回{task_id: xxx, status: queued}。然后用/task/{task_id}轮询直到看到status: completed和完整的result。这就是你的第一个可运行的智能体服务。4.6 步骤 6添加日志与 Trace15 分钟AG2 的WorkflowRunner支持on_step_start和on_step_end回调这是埋点黄金位置。import logging from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor # 初始化 OpenTelemetry开发用 Console生产换 Jaeger/Zipkin trace.set_tracer_provider(TracerProvider()) trace.get_tracer_provider().add_span_processor( SimpleSpanProcessor(ConsoleSpanExporter()) ) tracer trace.get_tracer(__name__) # 修改 WorkflowRunner 初始化 workflow_runner WorkflowRunner( contract_workflow, on_step_startlambda step_name, input_data, state: tracer.start_span(ftool.{step_name}), on_step_endlambda step_name, output_data, state, error: tracer.end_span(), )同时在 FastAPI 的main.py里加全局日志import logging logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.StreamHandler(), logging.FileHandler(agent.log), ], ) logger logging.getLogger(__name__) app.middleware(http) async def log_requests(request, call_next): logger.info(fStarted {request.method} {request.url.path}) response await call_next(request) logger.info(fCompleted {request.method} {request.url.path} with status {response.status_code}) return response实操心得on_step_start/end回调里不要做耗时操作如写数据库否则会拖慢整个 Workflow。我们只在这里打日志和启停 span。真正的 trace 数据收集交给 OpenTelemetry 的 exporter 异步处理。4.7 步骤 7Docker 部署与 Nginx 反向代理20 分钟DockerfileFROM python:3.10-slim WORKDIR /app COPY pyproject.toml . RUN pip install --no-cache-dir poetry \ poetry export -f requirements.txt --without-hashes requirements.txt \ pip install --no-cache-dir -r requirements.txt COPY . . CMD [uvicorn, main:app, --host, 0.0.0.0:8000, --port, 8000, --workers, 4]docker-compose.ymlversion: 3.8 services: api: build: . ports: - 8000:8000 environment: - REDIS_URLredis://redis:6379/0 depends_on: - redis restart: unless-stopped redis: image: redis:7-alpine command: redis-server --save 20 1 --loglevel warning volumes: - redis_data:/data restart: unless-stopped volumes: redis_data:Nginx 配置/etc/nginx/conf.d/agent.confupstream agent_backend { server 127.0.0.1:8000; } server { listen 80; server_name agent.yourcompany.com; location / { proxy_pass http://agent_backend; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # 重要增加超时智能体可能运行 2-3 分钟 proxy_read_timeout 30