AI 故障排障 Agent:从人工诊断到多源数据自动推理的工程实践
AI 故障排障 Agent从人工诊断到多源数据自动推理的工程实践一、排障效率的天花板当资深工程师成为瓶颈一次 P0 故障的排障过程暴露了传统模式的深层问题交易服务响应超时值班工程师花了 15 分钟确认现象20 分钟逐个服务排查日志10 分钟对比监控指标最终在数据库侧找到根因——慢查询锁表。整个排障过程 45 分钟其中 80% 的时间花在找数据而非做判断。更深层的问题是知识断层能快速排障的资深工程师只有 3 个他们轮岗时排障效率直接腰斩。新人面对故障不知道该看哪些指标、查哪些日志、按什么顺序排查只能靠猜和试。AI 排障 Agent 的核心目标将排障经验从人脑中的隐性知识转化为可执行的显性流程让 Agent 自动完成数据收集、关联分析和推理判断输出根因假设和处置建议工程师只需做最终决策。二、AI 排障 Agent 的架构与推理链路graph TB subgraph 感知层 A1[告警事件输入] A2[人工触发排障] A3[定时巡检触发] end subgraph 规划层 PL[排障规划器br/根据故障类型选择排障流程] KB[知识库br/排障SOP历史案例] end subgraph 执行层 E1[指标查询工具br/Prometheus API] E2[日志查询工具br/Elasticsearch API] E3[链路查询工具br/Jaeger API] E4[K8s查询工具br/kubectl/API Server] E5[数据库查询工具br/慢查询锁状态] end subgraph 推理层 R1[数据关联分析br/时序拓扑因果] R2[假设生成br/基于规则历史案例] R3[假设验证br/查询验证性数据] R4[置信度评估br/多证据交叉验证] end subgraph 输出层 O1[根因报告br/含推理过程] O2[处置建议br/含风险评级] O3[自动执行br/低风险操作] end A1 -- PL A2 -- PL A3 -- PL PL -- KB PL -- E1 PL -- E2 PL -- E3 PL -- E4 PL -- E5 E1 -- R1 E2 -- R1 E3 -- R1 E4 -- R1 E5 -- R1 R1 -- R2 R2 -- R3 R3 -- R4 R4 -- O1 R4 -- O2 R4 -- O3AI 排障 Agent 采用 ReActReasoning Acting模式先根据故障类型规划排障步骤然后逐步执行工具调用获取数据根据数据推理生成根因假设再调用工具验证假设循环迭代直到置信度达标。这与传统规则引擎的区别在于规则引擎是if-then的静态匹配Agent 是观察-推理-行动的动态循环。三、AI 排障 Agent 的生产级代码实现3.1 排障 Agent 核心框架#!/usr/bin/env python3 AI 排障 Agent基于 ReAct 模式的自动故障诊断 import json import time from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass, field from enum import Enum import logging logger logging.getLogger(__name__) class DiagnosisStatus(Enum): 诊断状态 PLANNING planning # 规划排障步骤 EXECUTING executing # 执行工具调用 REASONING reasoning # 推理分析 VERIFYING verifying # 验证假设 COMPLETED completed # 诊断完成 FAILED failed # 诊断失败 dataclass class Hypothesis: 根因假设 id: str description: str # 假设描述 confidence: float # 置信度 0.0-1.0 evidence: List[str] # 支持证据 verification_queries: List[Dict] # 验证该假设需要的数据查询 verified: bool False verification_result: Optional[str] None dataclass class DiagnosisStep: 诊断步骤记录 step_id: int action: str # 执行的动作 tool: str # 使用的工具 query: Dict # 查询参数 result: Any # 查询结果 observation: str # 观察结论 timestamp: datetime field(default_factorydatetime.utcnow) dataclass class DiagnosisReport: 诊断报告 incident_id: str start_time: datetime end_time: Optional[datetime] status: DiagnosisStatus steps: List[DiagnosisStep] hypotheses: List[Hypothesis] root_cause: Optional[Hypothesis] remediation: List[Dict] # 处置建议 summary: str class DiagnosisAgent: 排障诊断 Agent def __init__(self, tools: Dict[str, Any], knowledge_base: Any, max_iterations: int 10, confidence_threshold: float 0.8): Args: tools: 可用工具集合 {tool_name: tool_instance} knowledge_base: 排障知识库 max_iterations: 最大推理迭代次数 confidence_threshold: 根因确认的置信度阈值 self.tools tools self.kb knowledge_base self.max_iterations max_iterations self.confidence_threshold confidence_threshold def diagnose(self, incident_id: str, alert_info: Dict) - DiagnosisReport: 执行故障诊断 Args: incident_id: 故障ID alert_info: 告警信息 {alert_name, service, severity, labels} report DiagnosisReport( incident_idincident_id, start_timedatetime.utcnow(), end_timeNone, statusDiagnosisStatus.PLANNING, steps[], hypotheses[], root_causeNone, remediation[], summary ) try: # 阶段1: 规划排障步骤 plan self._plan_diagnosis(alert_info) # 阶段2: 执行排障循环ReAct模式 for iteration in range(self.max_iterations): report.status DiagnosisStatus.EXECUTING # 选择下一步动作 next_action self._select_next_action( plan, report.steps, report.hypotheses ) if not next_action: break # 执行工具调用 step self._execute_action( iteration 1, next_action, alert_info ) report.steps.append(step) # 推理分析 report.status DiagnosisStatus.REASONING new_hypotheses self._reason( step, report.hypotheses, alert_info ) report.hypotheses.extend(new_hypotheses) # 验证假设 report.status DiagnosisStatus.VERIFYING self._verify_hypotheses(report.hypotheses) # 检查是否找到高置信度根因 best self._get_best_hypothesis(report.hypotheses) if best and best.confidence self.confidence_threshold: report.root_cause best break # 生成诊断报告 report.status DiagnosisStatus.COMPLETED report.end_time datetime.utcnow() report.remediation self._generate_remediation( report.root_cause ) report.summary self._generate_summary(report) except Exception as e: logger.error(诊断过程异常: %s, e, exc_infoTrue) report.status DiagnosisStatus.FAILED report.end_time datetime.utcnow() report.summary f诊断失败: {str(e)} return report def _plan_diagnosis(self, alert_info: Dict) - List[Dict]: 根据告警类型规划排障步骤 alert_name alert_info.get(alert_name, ) service alert_info.get(service, ) # 从知识库获取排障SOP sop self.kb.get_sop(alert_name) if sop: return sop.steps # 通用排障流程从外到内逐层排查 return [ {tool: prometheus, action: check_service_metrics, params: {service: service, metrics: [ error_rate, latency_p99, qps ]}}, {tool: k8s, action: check_pod_status, params: {service: service}}, {tool: elasticsearch, action: search_error_logs, params: {service: service, level: ERROR, window: 30m}}, {tool: jaeger, action: check_trace_errors, params: {service: service, window: 30m}}, {tool: prometheus, action: check_infra_metrics, params: {service: service, metrics: [ cpu, memory, disk, network ]}}, {tool: database, action: check_slow_queries, params: {service: service, window: 30m}}, ] def _select_next_action(self, plan: List[Dict], completed_steps: List[DiagnosisStep], hypotheses: List[Hypothesis]) - Optional[Dict]: 选择下一步排障动作 completed_actions { (s.tool, s.action) for s in completed_steps } # 优先执行验证假设的查询 for hypothesis in hypotheses: if not hypothesis.verified and hypothesis.verification_queries: query hypothesis.verification_queries[0] action_key (query.get(tool, ), query.get(action, )) if action_key not in completed_actions: return query # 否则按计划顺序执行 for step in plan: action_key (step.get(tool, ), step.get(action, )) if action_key not in completed_actions: return step return None def _execute_action(self, step_id: int, action: Dict, alert_info: Dict) - DiagnosisStep: 执行工具调用 tool_name action.get(tool, ) action_name action.get(action, ) params action.get(params, {}) # 合并告警信息到查询参数 params.update({ namespace: alert_info.get(namespace, default), cluster: alert_info.get(cluster, ) }) tool self.tools.get(tool_name) if not tool: return DiagnosisStep( step_idstep_id, actionaction_name, tooltool_name, queryparams, resultNone, observationf工具 {tool_name} 不可用 ) # 执行工具调用捕获异常 try: result tool.execute(action_name, params) observation self._interpret_result( tool_name, action_name, result ) except Exception as e: result None observation f工具调用失败: {str(e)} return DiagnosisStep( step_idstep_id, actionaction_name, tooltool_name, queryparams, resultresult, observationobservation ) def _reason(self, step: DiagnosisStep, existing: List[Hypothesis], alert_info: Dict) - List[Hypothesis]: 根据新数据推理生成根因假设 new_hypotheses [] service alert_info.get(service, ) # 基于工具调用结果生成假设 if step.tool prometheus and step.action check_service_metrics: result step.result or {} error_rate result.get(error_rate, 0) latency result.get(latency_p99, 0) if error_rate 0.05: new_hypotheses.append(Hypothesis( idfhyp-{len(existing)len(new_hypotheses)1}, descriptionf服务 {service} 错误率异常: {error_rate:.2%}, confidence0.5, evidence[f错误率 {error_rate:.2%} 超过阈值5%], verification_queries[ {tool: elasticsearch, action: search_error_logs, params: {service: service, level: ERROR}} ] )) if latency 1.0: new_hypotheses.append(Hypothesis( idfhyp-{len(existing)len(new_hypotheses)2}, descriptionf服务 {service} 延迟异常: P99{latency:.2f}s, confidence0.4, evidence[fP99延迟 {latency:.2f}s 超过阈值1s], verification_queries[ {tool: jaeger, action: check_trace_errors, params: {service: service}}, {tool: database, action: check_slow_queries, params: {service: service}} ] )) elif step.tool database and step.action check_slow_queries: result step.result or {} slow_queries result.get(slow_queries, []) if slow_queries: top_query slow_queries[0] new_hypotheses.append(Hypothesis( idfhyp-{len(existing)len(new_hypotheses)1}, descriptionf数据库慢查询: {top_query.get(query, )[:100]}, confidence0.7, evidence[ f慢查询耗时 {top_query.get(duration_ms, 0)}ms, f影响行数 {top_query.get(rows_examined, 0)} ], verification_queries[ {tool: database, action: check_lock_status, params: {service: service}} ] )) # 更新已有假设的置信度 for hyp in existing: if step.observation and any( e in step.observation for e in hyp.evidence ): hyp.confidence min(hyp.confidence 0.1, 1.0) return new_hypotheses def _verify_hypotheses(self, hypotheses: List[Hypothesis]): 验证假设检查验证查询的结果是否支持假设 for hyp in hypotheses: if hyp.verified: continue # 如果验证查询已执行且有结果更新置信度 if hyp.verification_result: hyp.verified True # 验证通过增加置信度验证不通过降低置信度 if confirmed in hyp.verification_result: hyp.confidence min(hyp.confidence 0.2, 1.0) else: hyp.confidence max(hyp.confidence - 0.3, 0.0) def _get_best_hypothesis(self, hypotheses: List[Hypothesis]) - Optional[Hypothesis]: 获取最高置信度的假设 if not hypotheses: return None return max(hypotheses, keylambda h: h.confidence) def _generate_remediation(self, root_cause: Optional[Hypothesis]) - List[Dict]: 生成处置建议 if not root_cause: return [{action: manual_investigation, description: 未找到高置信度根因需人工排查, risk: high}] # 从知识库获取处置方案 remediation self.kb.get_remediation(root_cause.description) if remediation: return remediation # 通用处置建议 return [ {action: investigate, description: f根因: {root_cause.description}, risk: medium}, {action: escalate, description: 建议升级到相关团队处理, risk: low} ] def _generate_summary(self, report: DiagnosisReport) - str: 生成诊断摘要 duration (report.end_time - report.start_time).total_seconds() steps_count len(report.steps) summary ( f故障诊断完成耗时 {duration:.1f}s f执行 {steps_count} 步排查。 ) if report.root_cause: summary ( f根因: {report.root_cause.description} f置信度 {report.root_cause.confidence:.0%}。 ) else: summary 未找到高置信度根因需人工介入。 return summary def _interpret_result(self, tool_name: str, action_name: str, result: Any) - str: 解释工具调用结果 if result is None: return 查询无结果 if isinstance(result, dict): # 提取关键信息 keys list(result.keys())[:5] return f查询返回 {len(result)} 个字段: {, .join(keys)} return str(result)[:200]3.2 工具集实现示例#!/usr/bin/env python3 排障工具集封装各数据源的查询接口 import requests from typing import Any, Dict, Optional import logging logger logging.getLogger(__name__) class PrometheusTool: Prometheus 指标查询工具 def __init__(self, base_url: str http://prometheus:9090): self.base_url base_url def execute(self, action: str, params: Dict) - Dict: 执行 Prometheus 查询 if action check_service_metrics: return self._check_service_metrics(params) elif action check_infra_metrics: return self._check_infra_metrics(params) else: return {error: f未知动作: {action}} def _check_service_metrics(self, params: Dict) - Dict: 查询服务级指标 service params.get(service, ) result {} # 错误率 error_rate_query ( fsum(rate(http_requests_total{{service{service}, fstatus~5..}}[5m])) f / sum(rate(http_requests_total{{service{service}}}[5m])) ) result[error_rate] self._query_scalar(error_rate_query) # P99延迟 latency_query ( fhistogram_quantile(0.99, f sum(rate(http_request_duration_seconds_bucket f{{service{service}}}[5m])) by (le)) ) result[latency_p99] self._query_scalar(latency_query) # QPS qps_query ( fsum(rate(http_requests_total{{service{service}}}[5m])) ) result[qps] self._query_scalar(qps_query) return result def _check_infra_metrics(self, params: Dict) - Dict: 查询基础设施指标 service params.get(service, ) result {} cpu_query ( fsum(rate(container_cpu_usage_seconds_total f{{pod~{service}-.*}}[5m])) ) result[cpu] self._query_scalar(cpu_query) memory_query ( fsum(container_memory_working_set_bytes f{{pod~{service}-.*}}) ) result[memory] self._query_scalar(memory_query) return result def _query_scalar(self, query: str) - float: 执行 PromQL 查询并返回标量值 try: resp requests.get( f{self.base_url}/api/v1/query, params{query: query}, timeout10 ) data resp.json() if data[status] success and data[data][result]: return float(data[data][result][0][value][1]) except Exception as e: logger.warning(Prometheus查询失败: %s, e) return 0.0 class ElasticsearchTool: Elasticsearch 日志查询工具 def __init__(self, base_url: str http://elasticsearch:9200): self.base_url base_url def execute(self, action: str, params: Dict) - Dict: if action search_error_logs: return self._search_error_logs(params) return {error: f未知动作: {action}} def _search_error_logs(self, params: Dict) - Dict: 搜索错误日志 service params.get(service, ) level params.get(level, ERROR) window params.get(window, 30m) query { query: { bool: { must: [ {term: {service: service}}, {term: {level: level}}, {range: {timestamp: {gte: fnow-{window}}}} ] } }, size: 20, sort: [{timestamp: {order: desc}}] } try: resp requests.post( f{self.base_url}/app-logs-*/_search, jsonquery, timeout15 ) data resp.json() hits data.get(hits, {}).get(hits, []) return { total: data.get(hits, {}).get(total, {}).get(value, 0), logs: [ hit[_source] for hit in hits[:10] ] } except Exception as e: logger.warning(ES查询失败: %s, e) return {total: 0, logs: []}四、AI 排障 Agent 的局限性与工程妥协4.1 推理链路的可靠性Agent 的推理质量取决于每一步工具调用的结果。如果 Prometheus 查询超时、ES 返回不完整数据推理链路就会断裂或产生错误假设。解决方案每个工具调用设置超时和重试对异常结果做降级处理如查询失败时跳过该步骤继续其他推理同时限制最大迭代次数防止死循环。4.2 知识库的维护成本排障 SOP 和历史案例需要持续维护。新服务上线时没有历史案例Agent 只能走通用流程效率较低。建议每次人工排障后将排障过程和结论自动入库形成案例积累。同时定期由资深工程师审核知识库淘汰过时的 SOP。4.3 自动执行的风险Agent 输出处置建议后是否允许自动执行低风险操作如扩容、重启非核心 Pod可以自动执行但高风险操作如数据库切换、配置变更必须人工确认。建议设置操作风险评级高风险操作即使 Agent 有高置信度也需要人工审批后才能执行。4.4 禁用场景以下场景不适合 AI 排障 Agent第一安全事件排查如入侵检测Agent 的工具调用可能破坏取证现场第二涉及敏感数据的故障如用户隐私泄露Agent 的日志记录可能违反合规要求第三基础设施级故障如机房断电Agent 本身可能无法运行。五、总结AI 排障 Agent 将故障诊断从人工逐项排查升级为自动推理循环。通过 ReAct 模式Agent 自动规划排障步骤、调用工具获取数据、推理生成根因假设、验证假设直到找到高置信度根因。但推理链路的可靠性依赖工具调用的稳定性知识库需要持续维护自动执行必须限制在低风险操作。务实的落地路径先从高频故障类型如服务超时、Pod 重启入手积累排障案例逐步扩展覆盖范围。让排障从靠经验变成靠流程让开发少背锅。