导读在生产环境中运行 Apache Airflow,仅仅保证 DAG 能"跑起来"是远远不够的。你需要清楚地知道:调度器是否在正常工作?任务的平均延迟是多少?哪些 DAG 的执行频繁失败?Worker 的负载是否健康?回答这些问题,需要一套完整的可观测性体系——包括指标采集、日志系统、事件监听和回调通知。Airflow 从 1.x 时代就内置了 StatsD 指标支持,随着云原生和微服务架构的普及,又引入了 OpenTelemetry 作为现代化的可观测性标准。在 Airflow 3.x 中,整个指标系统经历了重大重构:核心实现被抽离到共享包airflow_shared中,引入了 YAML 格式的指标注册表实现新旧指标名称的双轨发射,日志系统全面迁移至 structlog 实现结构化日志输出,Listener 机制基于 pluggy 提供了无侵入式的事件扩展能力。本课将从架构设计到源码实现,完整剖析 Airflow 的四大可观测性支柱:指标(Metrics)、日志(Logging)、监听器(Listeners)和回调(Callbacks)。理解这些机制不仅能帮助你在生产环境中快速定位问题,还能让你构建出自定义的监控告警体系。学习目标完成本课学习后,你将能够:理解 Airflow 指标系统的三层架构(Stats 门面 → Backend Logger → Transport 传输),掌握 StatsD、OpenTelemetry 和 Datadog 三种后端的配置与选择策略深入分析MetricsRegistry如何通过 YAML 注册表实现新旧指标名称的双轨并行发射机制掌握 Airflow 3.x 基于 structlog 的结构化日志系统,理解 JSON/Console 双模式输出和远程日志存储架构理解基于 pluggy 的 Listener 机制,学会编写自定义监听器响应 DAG/任务/资产的生命周期事件掌握 Callback 系统的请求-转发架构,理解TaskCallbackRequest和DagCallbackRequest如何通过DatabaseCallbackSink实现异步回调执行能够设计并搭建一套完整的 Airflow 监控告警体系,涵盖核心指标采集、告警规则配置和可视化面板构建正文内容一、指标系统架构总览1.1 三层架构设计Airflow 的指标系统采用了经典的门面模式(Facade Pattern),将指标发射的调用方与具体的传输实现解耦。整体架构可以分为三层:┌─────────────────────────────────────────────────────┐ │ 调用层(Airflow 各组件) │ │ scheduler / executor / task_runner / dag_processor │ │ stats.incr("ti.finish", tags={...}) │ └────────────────────────┬────────────────────────────┘ │ ┌────────────────────────▼────────────────────────────┐ │ 门面层(Stats Module) │ │ shared/observability/metrics/stats.py │ │ ┌─────────────┐ ┌──────────────┐ ┌───────────┐ │ │ │ incr/decr │ │ gauge │ │ timer │ │ │ └──────┬──────┘ └──────┬───────┘ └─────┬─────┘ │ │ │ MetricsRegistry 双轨发射 │ │ │ │ (legacy_name + modern_name) │ │ └─────────┼──────────────────────────────────┼────────┘ │ │ ┌─────────▼──────────────────────────────────▼────────┐ │ 后端层(Backend Logger) │ │ ┌──────────────┐ ┌──────────────┐ ┌─────────────┐ │ │ │SafeStatsd │ │SafeOtelLogger│ │SafeDogStatsd │ │ │ │Logger │ │ │ │Logger │ │ │ └──────┬───────┘ └──────┬───────┘ └──────┬──────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ StatsD Server OTel Collector Datadog Agent │ └─────────────────────────────────────────────────────┘这种设计的核心优势在于:调用方无感知:所有组件统一调用stats.incr()、stats.timing()等函数,无需关心底层使用哪种传输协议后端可插拔:通过配置文件即可切换 StatsD / OpenTelemetry / Datadog,无需修改任何业务代码进程安全:通过os.register_at_fork()在 fork 后重置后端实例,避免子进程继承父进程的陈旧连接1.2 后端选择工厂后端的选择逻辑封装在stats_utils.py中的get_stats_factory()函数:# airflow-core/src/airflow/observability/metrics/stats_utils.pydefget_stats_factory()-Callable:ifconf.getboolean("metrics","statsd_datadog_enabled"):fromairflow.observability.metricsimportdatadog_loggerreturndatadog_logger.get_dogstatsd_loggerifconf.getboolean("metrics","statsd_on"):fromairflow.observability.metricsimportstatsd_loggerreturnstatsd_logger.get_statsd_loggerifconf.getboolean("metrics","otel_on"):fromairflow.observability.metricsimportotel_loggerreturnotel_logger.get_otel_loggerreturnNoStatsLogger这里的设计遵循优先级链模式:Datadog StatsD OpenTelemetry NoOp。注意这些选项是互斥的——只能激活一个后端。如果同时开启多个配置,则按优先级选择第一个匹配的后端。二、Stats 门面层深度解析2.1 模块级单例与延迟初始化Stats 模块的核心实现位于shared/observability/src/airflow_shared/observability/metrics/stats.py。它采用模块级全局变量实现单例模式:# shared/observability/src/airflow_shared/observability/metrics/stats.py# 模块级单例状态_factory:Callable[[],StatsLogger|NoStatsLogger]|None=None_backend:StatsLogger|NoStatsLogger|None=None_export_legacy_names:bool=True_registry:MetricsRegistry|None=Nonedef_reset_backend_after_fork()-None:"""Reset the backend after a fork so the child process initializes it again."""global_backend _backend=Noneos.register_at_fork(after_in_child=_reset_backend_after_fork)关键设计点:延迟初始化:_backend在第一次使用时才通过_factory()创建,避免导入时的副作用Fork 安全:os.register_at_fork()确保子进程不会复用父进程的后端实例(网络连接不能跨进程共享)容错降级:如果后端创建失败(如 DNS 解析错误、缺少依赖包),自动降级为NoStatsLoggerdef_get_backend()-StatsLogger|NoStatsLogger:global_backendif_backendisNone:factory=_factoryif_factoryisnotNoneelseNoStatsLoggertry:_backend=factory()except(socket.gaierror,ImportError)ase:log.error("Could not configure StatsClient: %s, using NoStatsLogger instead.",e)_backend=NoStatsLogger()return_backend2.2 StatsLogger Protocol 与 NoStatsLogger指标后端必须遵循StatsLoggerProtocol 接口:# shared/observability/src/airflow_shared/observability/metrics/base_stats_logger.pyclassStatsLogger(Protocol):"""This class is only used for TypeChecking (for IDEs, mypy, etc)."""@classmethoddefincr(cls,stat:str,count:int=1,rate:int|float=1,*,tags:dict[str,Any]|None=None)-None:...@classmethoddefdecr(cls,stat:str,count:int=1,rate:int|float=1,*,tags:dict[str,Any]|None=None)-None:...@classmethoddefgauge(cls,stat:str,value:float,rate:int|float=1,delta:bool=False,*,tags:dict[str,Any]|None=None)-None:...@classmethoddeftiming(cls,stat:str,dt:DeltaType|None,*,tags:dict[str,Any]|None=None)-None:...@classmethoddeftimer(cls,*args,**kwargs)-Timer:...四种指标类型对应四个核心方法:方法指标类型用途示例incrCounter累加计数器任务启动次数、心跳次数decrCounter递减计数器进程数变化(UpDownCounter)gaugeGauge瞬时值池中空闲槽位数、DAG 总数timingTimer/Histogram耗时记录任务执行时长、调度延迟NoStatsLogger是空操作实现,所有方法都是 no-op,确保在不配置指标后端时系统仍能正常运行。2.3 Timer 的精密计时实现Timer 是指标系统中最复杂的组件,它既是上下文管理器,又是秒表式计时器:# shared/observability/src/airflow_shared/observability/metrics/protocols.pyclassTimer(TimerProtocol):_start_time:float|Noneduration:float|Nonedef__init__(self,real_timer:Timer|None=None)-None:self.real_timer=real_timerdefstart(self)-Self:ifself.real_timer:self.real_timer.start()self._start_time=time.perf_counter()returnselfdefstop(self,send:bool=True)-None:ifself._start_timeisnotNone:self.duration=1000.0*(time.perf_counter()-self._start_time)# 转换为毫秒ifsendandself.real_timer:self.real_timer.stop()注意这里使用time.perf_counter()而非time.time()——前者不受系统时钟调整影响,精度更高,专为计时场景设计。计算结果统一为毫秒单位。Timer 支持两种使用模式:# 模式一:上下文管理器withstats.timer("task.duration",tags={"dag_id":"my_dag"})ast:execute_task()log.info("Task took %.2f ms",t.duration)# 模式二:手动秒表timer=stats.timer().start()execute_task()timer.stop()log.info("Task took %.2f ms",timer.duration)三、MetricsRegistry 与双轨发射机制3.1 YAML 指标注册表Airflow 3.x 引入了一个重大改进:通过 YAML 文件定义所有指标的元数据,包括名称、类型、描述和新旧名称映射。这个注册表存储在metrics_template.yaml中:# shared/observability/src/airflow_shared/observability/metrics/metrics_template.yamlmetrics:# 计数器示例-name:"ti.finish"description:"Number of completed task in a given Dag."type:"counter"legacy_name:"ti.finish.{dag_id}.{task_id}.{state}"name_variables:["dag_id","task_id","state"]# 计时器示例-name:"dagrun.schedule_delay"description:"Milliseconds of delay between the scheduled DagRun start date and the actual start date