Agentic Design Patterns-模式3:并行化(Parallelization)的代码实现
目录1. 概述2. 价值3. 代码实现代码说明实际使用注意事项1. 概述许多复杂的智能体任务其实包含多个可以同时执行的子任务而不是一个接一个地串行处理这时并行化设计模式就变得至关重要。并行化模式是一种通过同时执行独立子任务优化计算流程的方法尤其适用于涉及多次模型推理或外部服务调用的复杂操作可有效降低整体延迟。核心思想是识别流程中彼此无依赖的部分并将它们并行执行。尤其在涉及外部服务如API或数据库有延迟时可以同时发起多个请求显著提升效率。图1并行化设计模式图2并行化与子智能体示例2. 价值并行化是一种通过并发执行独立任务提升效率的设计模式尤其适用于涉及外部资源如API调用等待的场景并行化可显著降低整体延迟让智能体系统在复杂任务下更具响应性。但并发/并行架构也可能会增加设计、调试和日志等开发复杂度与成本。3. 代码实现以下是并行化Parallelization模式的Python实现包含完整的使用示例import asyncio import concurrent.futures from typing import List, Any, Dict, Callable import time from dataclasses import dataclass import logging # 配置日志 logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) dataclass class ParallelTask: 并行任务数据结构 task_id: str system_prompt: str user_prompt: str model: str gpt-4o metadata: Dict[str, Any] None class ParallelProcessor: 并行处理器 def __init__(self, max_workers: int 5): 初始化并行处理器 Args: max_workers: 最大并发工作线程数 self.max_workers max_workers self.client None def _init_client(self): 初始化模型客户端懒加载 if self.client is None: # 模拟外部模型客户端获取 # 实际使用时取消下面两行的注释 # from model_factory import get_model_client # self.client get_model_client() # 模拟客户端用于演示 class MockClient: class chat: class completions: staticmethod def create(model, messages, streamFalse): class Response: class Choice: class Message: content fMock response for model {model} choices [Choice()] return Response() self.client MockClient() return self.client def _process_single_task_sync(self, task: ParallelTask) - str: 同步处理单个任务 Args: task: 并行任务 Returns: 处理结果 try: client self._init_client() messages [ {role: system, content: task.system_prompt}, {role: user, content: task.user_prompt}, ] logger.info(f开始处理任务 {task.task_id}使用模型 {task.model}) response client.chat.completions.create( modeltask.model, messagesmessages, streamFalse ) result response.choices[0].message.content logger.info(f任务 {task.task_id} 处理完成) return result except Exception as e: logger.error(f处理任务 {task.task_id} 时出错: {e}) return fError: {str(e)} def process_tasks_parallel(self, tasks: List[ParallelTask]) - Dict[str, str]: 并行处理多个任务线程池方式 Args: tasks: 任务列表 Returns: 任务ID到结果的映射字典 results {} logger.info(f开始并行处理 {len(tasks)} 个任务最大并发数: {self.max_workers}) start_time time.time() with concurrent.futures.ThreadPoolExecutor(max_workersself.max_workers) as executor: # 提交所有任务 future_to_task { executor.submit(self._process_single_task_sync, task): task for task in tasks } # 收集结果 for future in concurrent.futures.as_completed(future_to_task): task future_to_task[future] try: result future.result() results[task.task_id] result except Exception as e: logger.error(f任务 {task.task_id} 执行异常: {e}) results[task.task_id] fError: {str(e)} elapsed_time time.time() - start_time logger.info(f所有任务处理完成总耗时: {elapsed_time:.2f}秒) return results async def _process_single_task_async(self, task: ParallelTask) - str: 异步处理单个任务 Args: task: 并行任务 Returns: 处理结果 try: # 在实际IO操作处使用asyncio.to_thread loop asyncio.get_event_loop() result await loop.run_in_executor( None, self._process_single_task_sync, task ) return result except Exception as e: logger.error(f异步处理任务 {task.task_id} 时出错: {e}) return fError: {str(e)} async def process_tasks_async(self, tasks: List[ParallelTask]) - Dict[str, str]: 异步并行处理多个任务 Args: tasks: 任务列表 Returns: 任务ID到结果的映射字典 logger.info(f开始异步并行处理 {len(tasks)} 个任务) start_time time.time() # 创建所有异步任务 async_tasks [ self._process_single_task_async(task) for task in tasks ] # 等待所有任务完成 results_list await asyncio.gather(*async_tasks, return_exceptionsTrue) # 整理结果 results {} for task, result in zip(tasks, results_list): if isinstance(result, Exception): results[task.task_id] fError: {str(result)} else: results[task.task_id] result elapsed_time time.time() - start_time logger.info(f异步任务处理完成总耗时: {elapsed_time:.2f}秒) return results class ParallelAgent: 并行智能体示例 def __init__(self, processor: ParallelProcessor): self.processor processor def analyze_multiple_documents(self, documents: List[Dict]) - Dict: 并行分析多个文档 Args: documents: 文档列表每个文档包含内容和分析要求 Returns: 分析结果 tasks [] for i, doc in enumerate(documents): task ParallelTask( task_idfdoc_{i}, system_prompt你是一个文档分析助手。请分析用户提供的文档内容。, user_promptf请分析以下文档\n\n{doc[content]}\n\n分析要求{doc[analysis_type]}, modelgpt-4o, metadata{doc_index: i, title: doc.get(title, f文档{i})} ) tasks.append(task) # 并行处理所有文档分析任务 results self.processor.process_tasks_parallel(tasks) return { total_documents: len(documents), analysis_results: results, summary: f成功分析 {len([r for r in results.values() if not r.startswith(Error)])}/{len(documents)} 个文档 } def batch_sentiment_analysis(self, texts: List[str]) - Dict: 批量情感分析 Args: texts: 文本列表 Returns: 情感分析结果 tasks [] for i, text in enumerate(texts): task ParallelTask( task_idfsentiment_{i}, system_prompt你是一个情感分析助手。请分析文本的情感倾向。, user_promptf请分析以下文本的情感倾向正面、负面或中性\n\n{text}, modelgpt-4o ) tasks.append(task) # 使用异步方式处理 results asyncio.run(self.processor.process_tasks_async(tasks)) return { total_texts: len(texts), sentiment_results: results, statistics: self._calculate_sentiment_stats(results) } def _calculate_sentiment_stats(self, results: Dict[str, str]) - Dict: 计算情感统计 stats {positive: 0, negative: 0, neutral: 0, error: 0} for result in results.values(): if 正面 in result: stats[positive] 1 elif 负面 in result: stats[negative] 1 elif 中性 in result: stats[neutral] 1 elif Error in result: stats[error] 1 return stats # 使用示例 def main(): 主函数示例 # 创建并行处理器 processor ParallelProcessor(max_workers3) # 创建并行智能体 agent ParallelAgent(processor) # 示例1并行处理多个独立任务 print( * 50) print(示例1基本并行任务处理) print( * 50) tasks [ ParallelTask( task_idtask_1, system_prompt你是一个翻译助手。, user_prompt请将以下英文翻译成中文Hello, how are you today?, modelgpt-4o ), ParallelTask( task_idtask_2, system_prompt你是一个代码助手。, user_prompt用Python写一个快速排序算法。, modelgpt-4o ), ParallelTask( task_idtask_3, system_prompt你是一个知识问答助手。, user_prompt简述人工智能的发展历史。, modelgpt-4o ), ParallelTask( task_idtask_4, system_prompt你是一个诗歌创作助手。, user_prompt写一首关于春天的五言绝句。, modelgpt-4o ), ] # 并行处理任务 results processor.process_tasks_parallel(tasks) for task_id, result in results.items(): print(f\n任务 {task_id} 结果:) print(f结果预览: {result[:100]}...) # 示例2文档分析 print(\n * 50) print(示例2并行文档分析) print( * 50) documents [ { title: AI报告, content: 人工智能正在改变世界..., analysis_type: 提取关键观点 }, { title: 技术文章, content: 深度学习在自然语言处理中的应用..., analysis_type: 总结主要内容 }, { title: 市场分析, content: 2024年科技市场趋势..., analysis_type: 分析市场机会 } ] doc_results agent.analyze_multiple_documents(documents) print(f文档分析完成: {doc_results[summary]}) # 示例3批量情感分析 print(\n * 50) print(示例3批量情感分析) print( * 50) texts [ 这个产品非常好用我很满意, 服务态度很差不会再来了。, 质量很好但价格有点高。, 物流很快包装也很用心。 ] sentiment_results agent.batch_sentiment_analysis(texts) print(f情感分析统计: {sentiment_results[statistics]}) # 高级用法示例带错误处理和超时控制 class RobustParallelProcessor(ParallelProcessor): 增强型并行处理器带错误处理和超时控制 def process_tasks_with_timeout(self, tasks: List[ParallelTask], timeout_per_task: float 30.0) - Dict[str, str]: 带超时控制的并行处理 Args: tasks: 任务列表 timeout_per_task: 每个任务的超时时间秒 Returns: 任务ID到结果的映射字典 results {} with concurrent.futures.ThreadPoolExecutor(max_workersself.max_workers) as executor: future_to_task {} # 提交任务设置超时 for task in tasks: future executor.submit(self._process_single_task_sync, task) future_to_task[future] task # 收集结果处理超时 for future in concurrent.futures.as_completed(future_to_task.keys(), timeouttimeout_per_task * len(tasks)): task future_to_task[future] try: result future.result(timeouttimeout_per_task) results[task.task_id] result except concurrent.futures.TimeoutError: logger.warning(f任务 {task.task_id} 超时) results[task.task_id] Error: Timeout except Exception as e: logger.error(f任务 {task.task_id} 执行异常: {e}) results[task.task_id] fError: {str(e)} return results if __name__ __main__: # 运行示例 main() # 使用增强版处理器 print(\n * 50) print(增强版并行处理器示例) print( * 50) robust_processor RobustParallelProcessor(max_workers2) test_tasks [ ParallelTask( task_idrobust_task_1, system_prompt测试, user_prompt简单响应, modelgpt-4o ) ] robust_results robust_processor.process_tasks_with_timeout( test_tasks, timeout_per_task10.0 ) print(f增强版处理结果: {robust_results})代码说明1. 核心组件ParallelTask并行任务数据结构封装单个任务的所有信息ParallelProcessor核心并行处理器提供两种并行方式process_tasks_parallel()基于线程池的同步并行process_tasks_async()基于asyncio的异步并行ParallelAgent应用示例展示如何在智能体任务中使用并行化2. 主要特性并发控制通过max_workers限制最大并发数错误处理每个任务独立处理异常不影响其他任务结果收集自动收集并整理所有任务结果性能监控记录每个任务的执行时间和总耗时3. 使用场景示例文档批量分析同时分析多个文档情感批量分析并行处理大量文本情感分析多任务处理同时执行翻译、代码生成、问答等独立任务实际使用注意事项1. 客户端初始化# 取消注释以下代码使用真实模型客户端 # from model_factory import get_model_client # self.client get_model_client()2. 并发数调整根据API限制调整max_workers考虑网络带宽和服务器负载3. 错误处理增强可添加重试机制实现断路器模式防止级联故障添加更详细的监控和日志4. 资源管理使用连接池管理数据库/API连接实现优雅关闭确保所有任务完成适用场景总结这个并行化设计模式特别适合以下场景需要同时调用多个外部API批量处理大量独立数据需要优化整体响应时间的智能体系统有多个可独立执行的子任务通过并行执行独立任务可以显著减少总体等待时间特别是在涉及网络延迟的情况下。参照书籍《Agentic Design Patterns》的基本概念和观点。