完整监控方案 | Prometheus Grafana ELK | 链路追踪 告警规则 | 生产级实践指南 为什么需要监控生产环境的挑战在没有监控的情况下你会遇到❌ 系统变慢了但不知道哪里慢 ❌ 用户报错了但找不到原因 ❌ API费用突然暴涨不知道谁在用 ❌ 缓存命中率下降但没有告警 ❌ 半夜系统挂了第二天才知道监控的价值✅快速定位问题- 从小时级降到分钟级✅预防故障- 提前发现异常趋势✅优化性能- 数据驱动的优化决策✅成本控制- 实时监控API费用✅提升用户体验- 及时发现并解决问题️ 监控架构总览┌─────────────────────────────────────────────┐ │ Agent Application │ │ ┌──────────┐ ┌──────────┐ ┌─────────────┐ │ │ │ Metrics │ │ Logs │ │ Traces │ │ │ └────┬─────┘ └────┬─────┘ └──────┬──────┘ │ └───────┼────────────┼──────────────┼─────────┘ ↓ ↓ ↓ ┌──────────────┐ ┌──────────┐ ┌──────────────┐ │ Prometheus │ │ ELK │ │ Jaeger/ │ │ (指标收集) │ │ (日志) │ │ Zipkin │ │ │ │ │ │ (链路追踪) │ └──────┬───────┘ └────┬─────┘ └──────┬───────┘ ↓ ↓ ↓ ┌─────────────────────────────────────────────┐ │ Grafana Dashboard │ │ (统一可视化和告警) │ └─────────────────────────────────────────────┘三大支柱Metrics指标- 数值型数据QPS、延迟、错误率Logs日志- 事件记录请求详情、错误堆栈Traces链路追踪- 请求全流程跨服务调用链 Metrics关键指标监控核心指标定义指标类别指标名称说明告警阈值性能request_duration_seconds请求耗时P95 2s流量requests_total总请求数-错误errors_total错误总数错误率 5%缓存cache_hit_rate缓存命中率 60%成本api_cost_usdAPI费用日费用 $50资源active_connections活跃连接数 1000实现代码from prometheus_client import Counter, Histogram, Gauge, start_http_server import time import functools # 定义指标 REQUEST_COUNT Counter( agent_requests_total, Total number of requests, [method, endpoint, status, tenant_id] ) REQUEST_DURATION Histogram( agent_request_duration_seconds, Request duration in seconds, [method, endpoint, tenant_id], buckets[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0] ) ERROR_COUNT Counter( agent_errors_total, Total number of errors, [error_type, tenant_id] ) CACHE_HIT_RATE Gauge( agent_cache_hit_rate, Cache hit rate percentage, [cache_type] ) API_COST Counter( agent_api_cost_usd, API cost in USD, [model, tenant_id] ) ACTIVE_CONNECTIONS Gauge( agent_active_connections, Number of active connections ) def monitor_request(func): 请求监控装饰器 functools.wraps(func) def wrapper(*args, **kwargs): start_time time.time() try: result func(*args, **kwargs) # 记录成功请求 REQUEST_COUNT.labels( methodPOST, endpointfunc.__name__, statussuccess, tenant_idkwargs.get(tenant_id, unknown) ).inc() return result except Exception as e: # 记录错误 ERROR_COUNT.labels( error_typetype(e).__name__, tenant_idkwargs.get(tenant_id, unknown) ).inc() raise finally: # 记录耗时 duration time.time() - start_time REQUEST_DURATION.labels( methodPOST, endpointfunc.__name__, tenant_idkwargs.get(tenant_id, unknown) ).observe(duration) return wrapper # 使用示例 monitor_request def query_knowledge_base(tenant_id: str, query: str): 查询知识库自动监控 # 业务逻辑 result rag_service.query(tenant_id, query) # 记录API成本 cost calculate_api_cost(result) API_COST.labels( modelgpt-3.5-turbo, tenant_idtenant_id ).inc(cost) return result # 启动Prometheus指标服务器 start_http_server(8000) print(✅ Prometheus metrics server started on port 8000) Logs结构化日志日志级别定义import logging import json from datetime import datetime class StructuredFormatter(logging.Formatter): 结构化日志格式化器JSON格式 def format(self, record): log_entry { timestamp: datetime.utcnow().isoformat(), level: record.levelname, logger: record.name, message: record.getMessage(), module: record.module, function: record.funcName, line: record.lineno, } # 添加额外字段 if hasattr(record, tenant_id): log_entry[tenant_id] record.tenant_id if hasattr(record, request_id): log_entry[request_id] record.request_id if record.exc_info and record.exc_info[1]: log_entry[exception] { type: type(record.exc_info[1]).__name__, message: str(record.exc_info[1]), traceback: self.formatException(record.exc_info) } return json.dumps(log_entry, ensure_asciiFalse) # 配置日志 logger logging.getLogger(agent) logger.setLevel(logging.INFO) handler logging.StreamHandler() handler.setFormatter(StructuredFormatter()) logger.addHandler(handler)日志记录最佳实践import uuid from contextvars import ContextVar # 请求ID上下文变量 request_id_var: ContextVar[str] ContextVar(request_id, default) def generate_request_id() - str: 生成唯一请求ID return str(uuid.uuid4()) class RequestLogger: 请求日志记录器 def __init__(self, tenant_id: str): self.tenant_id tenant_id self.request_id generate_request_id() request_id_var.set(self.request_id) def log_request_start(self, query: str): 记录请求开始 logger.info( Request started, extra{ tenant_id: self.tenant_id, request_id: self.request_id, query_preview: query[:100] } ) def log_cache_hit(self, cache_type: str): 记录缓存命中 logger.info( Cache hit, extra{ tenant_id: self.tenant_id, request_id: self.request_id, cache_type: cache_type } ) def log_llm_call(self, model: str, tokens: int, cost: float): 记录LLM调用 logger.info( LLM call completed, extra{ tenant_id: self.tenant_id, request_id: self.request_id, model: model, tokens: tokens, cost_usd: cost } ) def log_error(self, error: Exception, context: dict None): 记录错误 logger.error( fRequest failed: {str(error)}, extra{ tenant_id: self.tenant_id, request_id: self.request_id, context: context or {} }, exc_infoTrue ) def log_request_end(self, duration: float): 记录请求结束 logger.info( Request completed, extra{ tenant_id: self.tenant_id, request_id: self.request_id, duration_seconds: duration } ) # 使用示例 def handle_query(tenant_id: str, query: str): req_logger RequestLogger(tenant_id) start_time time.time() try: req_logger.log_request_start(query) # 检查缓存 cached cache.get(query) if cached: req_logger.log_cache_hit(memory) return cached # 调用LLM result call_llm(query) tokens count_tokens(result) cost calculate_cost(tokens) req_logger.log_llm_call(gpt-3.5-turbo, tokens, cost) duration time.time() - start_time req_logger.log_request_end(duration) return result except Exception as e: req_logger.log_error(e, {query: query}) raiseELK Stack集成# elasticsearch_handler.py from elasticsearch import Elasticsearch from logging.handlers import BufferingHandler class ElasticsearchHandler(BufferingHandler): Elasticsearch日志处理器 def __init__(self, es_host: str, index_prefix: str agent-logs): super().__init__(capacity100) # 缓冲100条日志 self.es Elasticsearch([es_host]) self.index_prefix index_prefix def emit(self, record): 发送日志到Elasticsearch try: log_entry { timestamp: datetime.utcnow().isoformat(), level: record.levelname, message: record.getMessage(), tenant_id: getattr(record, tenant_id, None), request_id: getattr(record, request_id, None), module: record.module, function: record.funcName, } # 按天创建索引 index_name f{self.index_prefix}-{datetime.utcnow().strftime(%Y.%m.%d)} self.es.index(indexindex_name, documentlog_entry) except Exception as e: print(fFailed to send log to ES: {e}) # 配置 es_handler ElasticsearchHandler(es_hosthttp://localhost:9200) logger.addHandler(es_handler) Traces分布式链路追踪OpenTelemetry集成from opentelemetry import trace from opentelemetry.exporter.jaeger.thrift import JaegerExporter from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.instrumentation.requests import RequestsInstrumentor # 配置Jaeger导出器 jaeger_exporter JaegerExporter( agent_host_namelocalhost, agent_port6831, ) # 设置TracerProvider provider TracerProvider() processor BatchSpanProcessor(jaeger_exporter) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # 自动instrument requests库 RequestsInstrumentor().instrument() # 获取tracer tracer trace.get_tracer(__name__) # 使用示例 def process_user_request(tenant_id: str, query: str): with tracer.start_as_current_span(process_request) as span: # 添加属性 span.set_attribute(tenant.id, tenant_id) span.set_attribute(query.length, len(query)) # 子span缓存查询 with tracer.start_as_current_span(check_cache) as cache_span: cached cache.get(query) cache_span.set_attribute(cache.hit, cached is not None) if cached: return cached # 子spanLLM调用 with tracer.start_as_current_span(call_llm) as llm_span: result call_llm(query) llm_span.set_attribute(llm.model, gpt-3.5-turbo) return result自定义Span装饰器def trace_operation(operation_name: str): 链路追踪装饰器 def decorator(func): functools.wraps(func) def wrapper(*args, **kwargs): with tracer.start_as_current_span(operation_name) as span: # 添加参数信息 for key, value in kwargs.items(): if isinstance(value, (str, int, float, bool)): span.set_attribute(fparam.{key}, value) try: result func(*args, **kwargs) span.set_status(trace.StatusCode.OK) return result except Exception as e: span.set_status(trace.StatusCode.ERROR) span.record_exception(e) raise return wrapper return decorator # 使用 trace_operation(rag_query) def query_rag(tenant_id: str, query: str): # 自动追踪 ... Grafana DashboardDashboard JSON配置{ dashboard: { title: Agent Performance Monitor, panels: [ { title: 请求QPS, type: graph, targets: [ { expr: rate(agent_requests_total[5m]), legendFormat: {{tenant_id}} } ] }, { title: P95响应时间, type: graph, targets: [ { expr: histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[5m])), legendFormat: P95 } ] }, { title: 错误率, type: graph, targets: [ { expr: rate(agent_errors_total[5m]) / rate(agent_requests_total[5m]) * 100, legendFormat: Error Rate % } ] }, { title: 缓存命中率, type: gauge, targets: [ { expr: agent_cache_hit_rate, legendFormat: {{cache_type}} } ] }, { title: API费用今日, type: stat, targets: [ { expr: sum(increase(agent_api_cost_usd[24h])), legendFormat: Total Cost } ] } ] } }关键视图1. 概览面板实时QPS平均响应时间错误率活跃租户数2. 租户详情每个租户的请求量每个租户的API费用每个租户的缓存命中率3. 性能分析P50/P95/P99响应时间慢请求Top 10错误类型分布4. 成本监控每日API费用趋势各模型费用占比各租户费用排名 告警规则Prometheus告警配置# alert_rules.yml groups: - name: agent_alerts rules: # 高错误率告警 - alert: HighErrorRate expr: rate(agent_errors_total[5m]) / rate(agent_requests_total[5m]) 0.05 for: 5m labels: severity: critical annotations: summary: 高错误率 detected description: 错误率超过5%当前值: {{ $value | humanizePercentage }} # 慢请求告警 - alert: SlowRequests expr: histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[5m])) 2 for: 10m labels: severity: warning annotations: summary: 慢请求 detected description: P95响应时间超过2秒当前值: {{ $value }}s # 缓存命中率低告警 - alert: LowCacheHitRate expr: agent_cache_hit_rate 60 for: 15m labels: severity: warning annotations: summary: 缓存命中率低 description: 缓存命中率低于60%当前值: {{ $value }}% # API费用异常告警 - alert: HighAPICost expr: increase(agent_api_cost_usd[1h]) 10 for: 0m labels: severity: critical annotations: summary: API费用异常 description: 过去1小时API费用超过$10当前值: ${{ $value }} # 活跃连接数过高 - alert: HighActiveConnections expr: agent_active_connections 1000 for: 5m labels: severity: warning annotations: summary: 活跃连接数过高 description: 活跃连接数超过1000当前值: {{ $value }}告警通知渠道# notification.py import requests from typing import List class AlertNotifier: 告警通知器 def __init__(self): self.webhook_url https://hooks.slack.com/services/YOUR/WEBHOOK/URL self.email_recipients [admincompany.com] def send_slack_alert(self, alert_name: str, severity: str, description: str): 发送Slack告警 color_map { critical: #ff0000, warning: #ffa500, info: #00ff00 } payload { attachments: [ { color: color_map.get(severity, #000000), title: f {alert_name}, text: description, fields: [ { title: Severity, value: severity.upper(), short: True }, { title: Time, value: datetime.now().strftime(%Y-%m-%d %H:%M:%S), short: True } ] } ] } requests.post(self.webhook_url, jsonpayload) def send_email_alert(self, subject: str, body: str): 发送邮件告警 import smtplib from email.mime.text import MIMEText msg MIMEText(body) msg[Subject] f[ALERT] {subject} msg[From] alertscompany.com msg[To] , .join(self.email_recipients) with smtplib.SMTP(smtp.company.com) as server: server.send_message(msg) # 使用 notifier AlertNotifier() notifier.send_slack_alert( alert_nameHighErrorRate, severitycritical, description错误率超过5%请立即检查 ) 完整项目代码我已经为你准备了完整的监控系统项目GitHub仓库git clone https://github.com/Lee985-cmd/AI-30-Day-Challenge.git cd projects/agent-monitoring-dashboard项目特性✅ Prometheus指标收集✅ Grafana Dashboard模板✅ ELK日志聚合✅ Jaeger链路追踪✅ 告警规则配置✅ Slack/邮件通知✅ Docker Compose一键部署快速启动# 使用Docker Compose启动所有服务 docker-compose up -d # 访问服务 # Grafana: http://localhost:3000 (admin/admin) # Prometheus: http://localhost:9090 # Jaeger: http://localhost:16686 # Kibana: http://localhost:5601 # 运行示例应用 python example_app.py️ 常见问题与解决Q1: 如何选择合适的监控粒度答分层监控应用层QPS、响应时间、错误率秒级 业务层转化率、用户行为分钟级 基础设施CPU、内存、磁盘分钟级 成本层API费用、资源使用小时级Q2: 日志量太大怎么办答采样和分级# 只记录10%的成功请求日志 if random.random() 0.1 or level logging.WARNING: logger.info(...) # 或者按租户分级 if tenant_tier vip: log_level logging.DEBUG else: log_level logging.INFOQ3: 如何降低监控开销答异步和批量# 异步发送指标 from prometheus_client import push_to_gateway # 批量发送日志 class BatchElasticsearchHandler(BufferingHandler): def __init__(self, capacity1000): # 增大缓冲区 super().__init__(capacity) 实际应用案例案例1电商客服Agent监控重点响应时间P95 1s错误率 2%缓存命中率 70%每日API费用 $100效果故障发现时间从30分钟降到2分钟性能问题定位时间从2小时降到10分钟API成本降低40%通过监控发现异常调用案例2金融投研Agent监控重点数据准确性对比基准合规性检查敏感词检测审计日志所有操作留痕高可用99.9% SLA效果满足金融监管要求实现完整的审计追溯系统可用性达到99.95% 总结监控系统的核心价值✅可见性- 知道系统在发生什么✅快速响应- 问题出现时立即知道✅数据驱动- 基于数据做优化决策✅成本控制- 实时监控费用✅合规审计- 完整的操作记录最佳实践从简单开始逐步完善关注业务指标不只是技术指标设置合理的告警阈值避免告警疲劳定期review监控看板持续优化文档化所有指标和告警规则下一步部署完整的监控系统根据业务需求定制Dashboard设置告警通知渠道建立On-call值班制度完整代码和详细教程 GitHub仓库