1. 项目概述当AI智能体开始“组队打怪”最近在AI应用开发圈里一个词的热度持续攀升智能体Agent。如果说大语言模型LLM是给计算机装上了“大脑”那么智能体就是让这个大脑学会了“动手”和“协作”。我们不再满足于让AI回答一个问题、生成一段文本而是希望它能像一个真正的助手或团队成员一样自主理解目标、规划步骤、调用工具、执行任务甚至在复杂场景下与其他智能体沟通配合共同完成一个宏大目标。sampleXbro/agentsmesh这个项目就精准地踩在了“智能体协作”这个前沿赛道上。从名字就能看出端倪——agents智能体和mesh网格/网络。它不是一个单一的智能体框架而是一个旨在构建“智能体网络”或“智能体网格”的底层基础设施。你可以把它想象成一个智能体世界的“操作系统”或“协作平台”。它的核心目标是解决当多个具备不同能力的AI智能体需要一起工作时所产生的通信、协调、任务分发与状态管理等一系列复杂问题。我最初关注到这类项目是因为在实际业务中遇到了瓶颈。我们团队尝试用智能体自动化处理客户工单一个智能体负责分类一个负责查询知识库另一个负责生成回复草稿。想法很美好但实操起来全是坑智能体之间怎么传递信息某个环节出错了怎么通知上下游任务优先级怎么管理自己从头搭建这套协调系统工作量巨大且容易出错。agentsmesh这类框架的出现正是为了填平这个“从单个智能体到智能体团队”的鸿沟。它适合谁如果你是AI应用开发者、研究多智能体系统MAS的研究者或者正在构建复杂自动化流程的工程师这个项目值得你深入研究。它帮你从繁琐的通信协议和状态管理中解脱出来让你更专注于智能体本身的能力设计与业务逻辑。2. 核心架构与设计哲学拆解要理解agentsmesh我们不能只把它看作一堆代码而需要先理解其背后要解决的核心问题以及它可能采取的设计哲学。虽然项目具体实现可能各有不同但这类框架通常遵循一些共通的设计原则。2.1 核心挑战多智能体协作的“三座大山”为什么多个智能体一起工作就那么难主要卡在三个地方通信与协议智能体A如何把“我处理完了这是结果下一个环节需要注意用户情绪”这句话准确地、结构化地传递给智能体B它们需要一个共同的“语言”和“通信渠道”。这个语言不能是模糊的自然语言必须是机器可精确解析的指令或数据。协调与调度十个智能体一百个任务谁先谁后任务A失败了是重试、跳过还是触发另一个补偿任务两个智能体都需要同一个资源比如调用同一个付费API怎么避免冲突这需要一个中央大脑或一套分布式的协调规则。状态管理与监控整个任务流进行到哪一步了每个智能体的输入输出是什么系统整体健康度如何没有一个全局的、可观测的状态视图系统就像黑盒出问题连诊断都无从下手。agentsmesh的设计目标就是提供一套标准化的“积木”让开发者能像搭乐高一样快速构建出能平稳应对这些挑战的智能体协作系统。2.2 架构猜想从“中心化调度”到“去中心化网络”基于其项目名“mesh”网格我推测agentsmesh的架构可能倾向于一种去中心化或弱中心化的设计。这与传统的“中心化任务调度器”模式有本质区别。传统中心化模式有一个主控节点Orchestrator它像项目经理一样接收总任务拆分子任务分派给各个智能体Worker并收集结果。所有协调逻辑都集中在主控节点。优点是控制力强逻辑清晰缺点是主控节点容易成为性能和单点故障的瓶颈。Mesh网格模式每个智能体都是一个对等节点Peer它们之间可以直接通信。任务可以通过预定义的规则或路由机制在网格中流动。可能有一个轻量级的“注册中心”或“路由层”但不过多干预具体执行。优点是扩展性好容错性高更贴近分布式系统的理念缺点是实现复杂调试难度大。agentsmesh很可能在两者之间取得了平衡。例如它可能提供一个轻量级的“协调层”负责服务发现和基础路由而具体的任务逻辑和智能体间的直接对话则由智能体自身完成。这种架构非常适合动态伸缩、异构智能体用不同语言、框架编写的智能体共存的场景。2.3 核心抽象智能体、消息、通道与流一个优秀的框架离不开清晰的核心抽象。对于agentsmesh我们可以预期它定义了以下几个关键概念Agent智能体协作网络中的基本单元。每个智能体需要向网格注册自己的能力我能处理什么类型的任务并声明自己消费和生产的消息类型。Message消息智能体之间通信的基本载体。它不应该是一段纯文本而是一个结构化的数据对象至少包含消息ID、类型、发送者、接收者、优先级、负载Payload、时间戳等元数据。负载部分才是真正的业务数据。Channel/Topic通道/主题消息流动的管道。智能体可以订阅Subscribe某个通道来接收消息也可以发布Publish消息到某个通道。这类似于消息队列如RabbitMQ, Kafka中的概念是实现解耦的关键。例如可以有一个customer_query通道所有处理用户查询的智能体都订阅它。Flow/Workflow流/工作流对一系列智能体协作完成一个业务过程的抽象描述。它定义了任务的触发条件、智能体的执行顺序、分支判断if-else、循环以及错误处理机制。这可能是通过YAML/JSON配置文件或DSL领域特定语言来定义的。通过这几层抽象开发者就能以声明式的方式描述“谁在什么情况下做什么”而框架则负责底层的通信、执行和监控。3. 关键技术组件与实现细节探秘理解了设计理念我们深入到可能的技术实现层面。一个完整的agentsmesh框架通常会包含以下核心组件。3.1 通信层智能体如何“说话”这是网格的神经系统。实现方式的选择至关重要。备选方案一基于HTTP/gRPC的同步调用。智能体作为服务端暴露API协调器或其他智能体通过HTTP/gRPC直接调用。这种方式简单直观但耦合度高调用方必须知道被调用方的确切地址并且要处理网络超时、重试等问题不适合复杂的异步工作流。备选方案二基于消息队列的异步通信。这是更主流和优雅的方案。框架可以内置或集成像Redis Pub/Sub、Apache Kafka、NATS、RabbitMQ这样的消息中间件。智能体只需要和消息队列交互完全不知道消息的生产者和消费者是谁实现了彻底解耦。实操细节每个智能体在启动时会连接到消息服务器并订阅自己关心的主题。当它完成工作后将结果消息发布到下一个主题。框架需要封装好消息的序列化如JSON, Protobuf、发送、接收、确认Ack和错误重投的逻辑。消息协议设计这是核心。一个典型的消息结构可能是{ id: msg_001, type: TASK_EXECUTE, from: agent_classifier, to: [agent_knowledge_retriever], // 可以是广播或指定接收者 priority: 5, timestamp: 2023-10-27T10:00:00Z, payload: { task_id: task_abc, data: {user_query: 如何重置密码, category: account}, context: {session_id: sess_123, previous_steps: [...]} }, metadata: {retry_count: 0} }3.2 协调与编排层智能体如何“共舞”通信解决了“传话”问题协调则解决“何时、何人、做何事”的问题。这里通常有两种模式基于规则/路由的协调这是Mesh思想的体现。框架提供一个“路由器”Router它根据消息的内容、类型或头信息按照预配置的规则将消息路由到相应的智能体。规则可以是简单的“if message.type ‘X’ then send to agent ‘Y’”也可以是复杂的表达式。智能体之间没有直接的调用关系一切通过消息路由驱动。工作流引擎集成对于顺序、分支、循环等复杂逻辑集成一个轻量级工作流引擎是更专业的选择。例如使用Temporal、Cadence或Apache Airflow的核心概念。开发者用代码或DSL定义一个工作流DAG有向无环图框架的工作流执行器Executor负责按图索骥触发各个智能体任务并传递上下文。实操示例一个客服工单处理工作流可能定义为workflow: name: customer_ticket_processing steps: - id: classify type: agent agent: classifier input: “{{trigger.query}}” - id: retrieve type: agent agent: knowledge_retriever input: “{{steps.classify.output}}” depends_on: [classify] # 显式声明依赖 - id: generate type: agent agent: reply_generator input: “{{steps.retrieve.output}}” depends_on: [retrieve] - id: human_review type: decision condition: “{{steps.generate.output.confidence 0.8}}” if_true: send_to_human if_false: auto_reply框架需要解析这个定义并管理每个步骤的状态等待、执行中、成功、失败。3.3 智能体生命周期与资源管理网格中的智能体不是静态的它们可能上线、下线、崩溃重启。框架需要提供智能体的注册与发现机制。服务注册中心可以是一个简单的键值存储如Etcd、ZooKeeper也可以利用消息中间件的能力。智能体启动时将自己的地址、能力、健康状态注册到中心。其他组件或协调器通过查询中心来发现可用的智能体。健康检查与心跳智能体需要定期向注册中心发送心跳表明自己还“活着”。如果心跳超时注册中心将其标记为不可用后续消息就不会再路由给它直到它恢复注册。资源隔离与限流为了防止某个智能体过载拖垮整个网格框架需要支持对智能体的资源CPU、内存和调用速率进行限制。这通常需要与容器化技术Docker或编排平台Kubernetes结合在更底层实现。3.4 可观测性给网格装上“眼睛”没有监控的系统就是在裸奔。对于由多个动态组件构成的智能体网格可观测性Observability不是加分项是必选项。框架需要原生集成或提供接口给三大支柱日志Logging每个智能体的运行日志、消息的流转日志需要被集中收集如接入ELK栈并关联上统一的追踪IDTrace ID方便排查问题。指标Metrics关键指标必须暴露例如消息吞吐量、各智能体的处理延迟和成功率、队列长度、错误类型分布等。这些指标可以通过Prometheus等工具收集并在Grafana上展示。追踪Tracing这是理解复杂工作流的关键。一个用户请求从进入网格到流经多个智能体最终返回结果这整条链路需要被完整追踪。框架需要支持OpenTelemetry这样的标准为每个请求生成唯一的Trace ID并贯穿所有消息和智能体调用最终在Jaeger或Zipkin上呈现出完整的调用链图谱。有了可观测性你才能回答“为什么这个请求慢了”是哪个智能体卡住了、“错误是怎么发生的”消息在哪个环节丢失或格式错了这类关键问题。4. 实战演练从零构建一个简易智能体网格原型理论说了这么多我们来动手搭建一个极度简化的agentsmesh原型理解其核心运行机制。我们将使用Python、Redis作为消息队列和FastAPI提供智能体HTTP接口来实现。注意这是一个用于理解原理的教学原型不具备生产环境的可靠性、安全性和性能。4.1 环境准备与依赖安装首先确保你的开发环境中有Python 3.8和Redis。我们使用pip安装必要的库。# 安装Python依赖 pip install fastapi uvicorn redis pydantic python-dotenv # 启动Redis服务如果你使用Docker docker run -d -p 6379:6379 --name agentsmesh-redis redis:alpine4.2 定义核心消息模型在models.py中我们使用Pydantic定义结构化消息这是智能体间通信的“合同”。from pydantic import BaseModel, Field from typing import Any, Dict, List, Optional from uuid import uuid4 from datetime import datetime class AgentMessage(BaseModel): 智能体间通信的基本消息单元 id: str Field(default_factorylambda: str(uuid4())) type: str # 消息类型如 TASK_START, DATA_PROCESSED, ERROR from_agent: str # 发送方标识 to_agents: List[str] # 接收方标识列表空列表表示广播 priority: int 5 # 优先级1-10 timestamp: datetime Field(default_factorydatetime.now) payload: Dict[str, Any] # 实际业务数据 context: Optional[Dict[str, Any]] None # 链路上下文可传递Trace ID等 metadata: Dict[str, Any] Field(default_factorydict) # 元数据如重试次数4.3 实现基于Redis的通信层创建message_bus.py封装Redis的发布/订阅操作。import json import redis import logging from typing import Callable from models import AgentMessage logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) class RedisMessageBus: 基于Redis Pub/Sub的简单消息总线 def __init__(self, redis_url: str redis://localhost:6379): self.redis_client redis.from_url(redis_url) self.pubsub self.redis_client.pubsub() def publish(self, channel: str, message: AgentMessage): 发布消息到指定频道 message_dict message.dict() # 确保datetime可序列化 message_dict[timestamp] message_dict[timestamp].isoformat() self.redis_client.publish(channel, json.dumps(message_dict)) logger.info(fPublished message {message.id} to channel {channel}) def subscribe(self, channel: str, callback: Callable[[AgentMessage], None]): 订阅频道并设置消息处理回调 def message_handler(raw_message): if raw_message[type] message: try: data json.loads(raw_message[data]) data[timestamp] datetime.fromisoformat(data[timestamp]) # 反序列化 msg AgentMessage(**data) # 检查接收者简化版如果to_agents为空或包含当前agent则处理 # 实际应用中接收者过滤逻辑应更复杂 callback(msg) except Exception as e: logger.error(fFailed to process message: {e}) self.pubsub.subscribe(**{channel: message_handler}) # 在新线程中监听实际生产环境应使用更健壮的方式 thread self.pubsub.run_in_thread(sleep_time0.001) logger.info(fSubscribed to channel {channel}) return thread4.4 构建基础智能体基类在base_agent.py中我们创建一个所有智能体都继承的基类它封装了注册、消息收发等通用逻辑。import asyncio from abc import ABC, abstractmethod from typing import List from message_bus import RedisMessageBus from models import AgentMessage class BaseAgent(ABC): 智能体基类 def __init__(self, agent_id: str, capabilities: List[str], message_bus: RedisMessageBus): self.agent_id agent_id self.capabilities capabilities # 此智能体能处理的任务类型 self.bus message_bus self.subscription_thread None async def start(self): 启动智能体订阅相关频道开始监听消息 # 订阅以自己ID命名的私有频道用于接收定向消息 self.subscription_thread self.bus.subscribe( fagent.{self.agent_id}, self._handle_message ) # 订阅广播频道 self.bus.subscribe(broadcast, self._handle_message) logger.info(fAgent {self.agent_id} started, capabilities: {self.capabilities}) def _handle_message(self, message: AgentMessage): 内部消息处理路由 # 检查消息是否意图发送给本智能体简化逻辑 if not message.to_agents or self.agent_id in message.to_agents: logger.info(fAgent {self.agent_id} received message {message.id} of type {message.type}) # 在实际处理前可以加入能力匹配检查 self.process_message(message) abstractmethod def process_message(self, message: AgentMessage): 处理消息的核心业务逻辑由子类实现 pass def send_message(self, to_agents: List[str], msg_type: str, payload: dict): 发送消息到其他智能体或频道 message AgentMessage( typemsg_type, from_agentself.agent_id, to_agentsto_agents, payloadpayload ) # 简化如果指定了接收者发到其私有频道否则发到广播频道 if to_agents: for agent in to_agents: self.bus.publish(fagent.{agent}, message) else: self.bus.publish(broadcast, message) def stop(self): 停止智能体 if self.subscription_thread: self.subscription_thread.stop() logger.info(fAgent {self.agent_id} stopped)4.5 实现两个具体的业务智能体现在我们创建两个有具体功能的智能体一个分类器ClassifierAgent和一个回答生成器AnswerGeneratorAgent。分类器智能体 (classifier_agent.py)from base_agent import BaseAgent from models import AgentMessage import logging logger logging.getLogger(__name__) class ClassifierAgent(BaseAgent): 示例分类器智能体负责对用户查询进行分类 def __init__(self, message_bus): super().__init__( agent_idclassifier_v1, capabilities[text_classification], message_busmessage_bus ) # 模拟一个简单的分类规则 self.category_keywords { account: [密码, 登录, 注册, 注销], billing: [支付, 发票, 扣费, 订阅], technical: [错误, 无法连接, 崩溃, bug] } def process_message(self, message: AgentMessage): if message.type USER_QUERY: user_text message.payload.get(text, ) # 简单的关键词匹配分类 detected_category general for category, keywords in self.category_keywords.items(): if any(keyword in user_text for keyword in keywords): detected_category category break logger.info(fClassifier {self.agent_id} categorized query as: {detected_category}) # 将分类结果发送给回答生成器 next_payload { original_query: user_text, category: detected_category, session_id: message.payload.get(session_id) } # 注意这里硬编码了下一个智能体的ID实际应由路由规则决定 self.send_message( to_agents[answer_generator_v1], msg_typeQUERY_CLASSIFIED, payloadnext_payload )回答生成器智能体 (answer_generator_agent.py)from base_agent import BaseAgent from models import AgentMessage import logging logger logging.getLogger(__name__) class AnswerGeneratorAgent(BaseAgent): 示例回答生成器智能体根据分类生成回答 def __init__(self, message_bus): super().__init__( agent_idanswer_generator_v1, capabilities[text_generation], message_busmessage_bus ) # 模拟一个简单的回答模板 self.response_templates { account: 关于账户问题{query}请您尝试在设置页面中重置密码。如需进一步帮助请联系客服。, billing: 关于账单问题{query}您的发票可在用户中心的账单历史中下载。扣费疑问请提供订单号。, technical: 关于技术问题{query}我们已记录。请尝试清除缓存后重试或提供错误截图。, general: 您好关于‘{query}’我已收到您的查询。正在为您转接至人工客服请稍候。 } def process_message(self, message: AgentMessage): if message.type QUERY_CLASSIFIED: category message.payload.get(category, general) original_query message.payload.get(original_query, ) # 根据分类选择模板生成回答 template self.response_templates.get(category, self.response_templates[general]) final_response template.format(queryoriginal_query) logger.info(fAnswerGenerator {self.agent_id} generated response for category {category}) # 模拟最终输出例如发送到API网关或存储 output_payload { session_id: message.payload.get(session_id), category: category, response: final_response, confidence: high if category ! general else low } # 这里可以发布到最终输出频道或调用回调API self.send_message( to_agents[], # 广播谁需要谁订阅 msg_typeFINAL_RESPONSE, payloadoutput_payload ) print(f\n[最终回复] {final_response}\n)4.6 编写主程序与协调逻辑最后在main.py中我们启动所有组件并模拟一个用户请求触发整个流程。import asyncio import time from message_bus import RedisMessageBus from classifier_agent import ClassifierAgent from answer_generator_agent import AnswerGeneratorAgent from models import AgentMessage async def main(): # 1. 初始化消息总线 bus RedisMessageBus() # 2. 创建并启动智能体 classifier ClassifierAgent(bus) answer_generator AnswerGeneratorAgent(bus) # 启动智能体在实际异步框架中应使用asyncio任务 # 这里简化直接调用start方法内部会开线程 await classifier.start() await answer_generator.start() # 等待智能体就绪 time.sleep(2) # 3. 模拟一个外部请求触发工作流 print(模拟用户请求我的密码忘记了怎么办) trigger_message AgentMessage( typeUSER_QUERY, from_agentapi_gateway, to_agents[classifier_v1], # 初始请求发送给分类器 payload{ text: 我的密码忘记了怎么办, session_id: sess_001, user_id: user_123 } ) # 将触发消息发布到分类器的私有频道 bus.publish(agent.classifier_v1, trigger_message) # 4. 等待一段时间观察处理结果 print(等待智能体处理...) time.sleep(5) # 5. 清理 classifier.stop() answer_generator.stop() print(演示结束。) if __name__ __main__: asyncio.run(main())运行与观察启动Redis。运行python main.py。观察控制台输出。你会看到分类器接收到消息进行分类然后将QUERY_CLASSIFIED消息发送给回答生成器。回答生成器接收到消息后生成最终回复并广播出来。这个原型虽然简陋但它清晰地展示了agentsmesh的核心思想智能体通过消息总线进行异步、解耦的通信各自独立运行通过消息类型和路由逻辑串联成工作流。在实际的agentsmesh项目中你会发现更完善的路由规则、持久化、错误处理、服务发现和监控仪表盘。5. 生产级考量与常见陷阱将原型发展为生产可用的系统需要跨越诸多鸿沟。以下是你在实际使用或借鉴agentsmesh理念时会遇到的典型挑战及应对思路。5.1 消息的可靠性与顺序性在原型中我们使用了Redis Pub/Sub。但它有一个关键问题它不是持久化的。如果订阅者离线它将错过消息。在生产环境中这通常是不可接受的。解决方案使用具有持久化和消费者组功能的消息队列如Apache Kafka或NATS Streaming。它们能确保消息在消费成功前不会丢失并支持“至少一次”或“恰好一次”的投递语义。顺序性挑战对于同一会话或同一实体的消息处理顺序可能很重要。一些消息队列如Kafka分区能保证同一键Key下消息的顺序。框架需要提供机制让开发者能根据业务逻辑如session_id来定义消息的路由键以确保相关消息按序处理。5.2 错误处理与重试策略智能体处理消息可能失败网络超时、内部错误、依赖服务不可用。框架必须提供健壮的错误处理机制。死信队列DLQ当消息重试多次仍失败后应被移入一个特殊的“死信队列”供人工排查而不是无限重试或丢弃。退避重试重试不应立即进行而应采用指数退避策略如等待1秒、2秒、4秒...避免在服务短暂故障时引发雪崩。补偿事务在分布式场景下一个工作流中多个智能体可能已执行了部分操作此时后续步骤失败可能需要“回滚”已执行的操作。框架应支持定义“补偿动作”Saga模式或在设计工作流时强调“幂等性”操作多次执行结果一致。5.3 性能、伸缩与资源管理当任务量激增时系统如何应对智能体水平伸缩这是Mesh架构的优势。对于无状态或状态外置的智能体你可以轻松启动多个相同能力的实例。框架的负载均衡器或路由层需要能够将消息均匀地分发给这些实例。背压控制如果下游智能体处理速度慢上游不应无限制地发送消息否则会导致内存溢出。框架需要实现背压机制例如基于队列长度的流控或者集成响应式流Reactive Streams规范。资源隔离不同智能体对CPU、内存的需求不同。最理想的方式是将每个智能体部署在独立的容器中利用Kubernetes进行资源限制和调度。框架需要提供与容器编排平台良好集成的部署描述文件如Helm Chart。5.4 安全与权限控制在开放的网络中智能体网格不能是“裸奔”的。身份认证与授权每个智能体在加入网格前必须验证身份如使用mTLS双向TLS认证或JWT令牌。消息总线本身也需要安全访问控制。消息加密与完整性敏感消息在传输过程中应被加密如使用TLS并且可能需要签名以确保未被篡改。输入验证与沙箱对于执行任意代码或访问外部资源的智能体如Tool-Using Agent必须进行严格的输入验证并考虑在沙箱环境中运行以防止恶意代码执行。5.5 调试与问题诊断分布式系统的调试是出了名的困难。分布式追踪的深度集成不仅仅是记录Trace ID还要将智能体的处理延迟、输入输出快照、调用的外部服务等信息都关联到追踪链上。结构化日志日志必须结构化如JSON格式并包含统一的字段trace_idagent_idmessage_idtimestamplevelmessage。这便于日志聚合系统如Loki进行高效的查询和过滤。交互式调试工具框架可以提供Web UI让开发者能够实时查看消息流动、工作流状态、智能体健康度甚至能够手动重放或注入消息进行测试。6. 进阶应用场景与生态展望agentsmesh这类框架的价值在更复杂的场景中会愈发凸显。它不仅仅是连接几个AI模型的胶水更是构建下一代AI原生应用的基石。场景一自动化研发与运维AI for DevOps想象一个由多个智能体组成的“虚拟研发团队”需求分析智能体从PR描述中提取要点代码生成智能体编写初始实现代码审查智能体检查最佳实践和安全漏洞测试生成智能体编写单元测试部署智能体执行CI/CD流水线。它们通过agentsmesh协同工作将一个人工任务转化为一个自动化的工作流。场景二动态复杂决策支持在金融交易、供应链优化或游戏AI中决策依赖于瞬息万变的多源信息。可以部署多个“侦察”智能体分别监控市场数据、物流状态、对手行为将信息汇总给一个“分析”智能体再由“决策”智能体生成操作建议。Mesh架构允许这些智能体低延迟、高并发地交换信息做出比单一模型更优的决策。场景三个性化AI助手网络未来的个人AI助手可能不是一个庞然大物而是一个由众多垂直领域小智能体组成的网络。当你问“帮我规划下周去上海的出差并做一份竞品分析PPT”行程规划智能体、机票酒店预订智能体、市场数据抓取智能体、PPT生成智能体通过Mesh协同共同完成任务。agentsmesh为这种“可组合的AI能力”提供了通信和协调标准。生态展望agentsmesh的理想形态是成为AI智能体领域的“TCP/IP协议栈”或“Kubernetes”。它定义智能体之间如何通信、如何发现、如何编排的开放标准而不绑定特定的AI模型或云厂商。在此基础上可以生长出丰富的生态可视化的编排工具、智能体能力市场、性能监控SaaS、预构建的行业解决方案模板等。从我个人的实践经验来看当前直接采用agentsmesh或类似框架进入生产仍需面对一定的成熟度挑战和自研投入。但对于任何有志于构建复杂、可扩展AI应用的中大型团队投入资源去理解、试点甚至参与贡献这类项目无疑是抢占未来技术制高点的关键一步。它迫使你以分布式的、松耦合的、面向服务的方式来思考AI能力的组织方式这种架构思维的价值远超过框架本身。