从LeetCode刷题到项目实战:DAG(有向无环图)如何帮你搞定任务调度与依赖管理
从LeetCode刷题到项目实战DAG如何帮你搞定任务调度与依赖管理当你面对LeetCode 207题课程表时是否曾疑惑这道拓扑排序题和实际工程有什么关系三年前我在重构公司CI系统时突然发现每天处理的流水线依赖问题本质上就是一道活生生的DAG应用题。本文将带你从算法题出发直击Airflow等调度框架的核心思想最终手把手实现一个迷你任务调度器。1. 从LeetCode出发拓扑排序的实战密码LeetCode 207题给出课程间的先修关系要求判断能否完成所有课程。这道题正是DAG检测和拓扑排序的经典场景。我们先看最精简的Python解法from collections import deque def canFinish(numCourses, prerequisites): indegree [0] * numCourses adj [[] for _ in range(numCourses)] for cur, pre in prerequisites: adj[pre].append(cur) indegree[cur] 1 queue deque([i for i in range(numCourses) if indegree[i] 0]) count 0 while queue: node queue.popleft() count 1 for neighbor in adj[node]: indegree[neighbor] - 1 if indegree[neighbor] 0: queue.append(neighbor) return count numCourses关键点在于**入度表(indegree)和邻接表(adj)**的维护入度表记录每个节点的前置依赖数量邻接表保存每个节点的后继节点列表实际工程中这种解法的时间复杂度O(VE)仍然是最优选择但需要添加重试机制和状态持久化在真实项目里我们往往需要扩展基础算法。比如添加优先级调度# 在基础拓扑排序上增加优先级队列 import heapq def schedule_tasks(tasks, dependencies): # tasks格式: {task_id: priority} task_heap [] # ...其余拓扑排序逻辑... heapq.heappush(task_heap, (-tasks[neighbor], neighbor))2. 工业级调度系统的DAG实践当任务量达到百万级时简单的拓扑排序就需要升级。以Airflow为例其核心设计值得借鉴组件作用实现原理Scheduler任务调度中枢动态计算DAG处理任务依赖Executor任务执行器线程池/进程池实现并发Metadata DB存储任务状态持久化DAG运行状态Webserver可视化界面将DAG渲染为可交互的任务流图一个生产级的DAG调度系统需要处理动态DAG更新支持运行时修改任务依赖故障恢复自动重试失败任务而不破坏依赖资源隔离控制并发任务对系统资源的占用3. 手写迷你调度器200行代码的DAG引擎让我们用Python实现一个具备核心功能的调度器。首先定义DAG结构class DAG: def __init__(self): self.tasks {} # 任务ID到任务的映射 self.graph {} # 邻接表表示的依赖关系 def add_task(self, task_id, funcNone): self.tasks[task_id] { func: func, status: pending, upstream: set(), downstream: set() } self.graph[task_id] [] def add_dependency(self, from_task, to_task): self.graph[from_task].append(to_task) self.tasks[to_task][upstream].add(from_task) self.tasks[from_task][downstream].add(to_task)执行引擎的核心逻辑from concurrent.futures import ThreadPoolExecutor class Scheduler: def __init__(self, max_workers4): self.executor ThreadPoolExecutor(max_workers) def run_task(self, dag, task_id): task dag.tasks[task_id] try: if task[func]: task[func]() task[status] success except Exception as e: task[status] failed raise def execute_dag(self, dag): ready_tasks [ t for t in dag.tasks if not dag.tasks[t][upstream] ] futures { self.executor.submit(self.run_task, dag, task): task for task in ready_tasks } while futures: done, _ concurrent.futures.wait( futures, return_whenconcurrent.futures.FIRST_COMPLETED ) for future in done: completed_task futures.pop(future) for successor in dag.graph[completed_task]: dag.tasks[successor][upstream].remove(completed_task) if not dag.tasks[successor][upstream]: new_future self.executor.submit( self.run_task, dag, successor ) futures[new_future] successor这个简易引擎已经具备并发执行通过线程池并行执行独立任务依赖感知动态跟踪任务状态并解锁后续任务错误传播失败任务会终止整个DAG执行4. 进阶技巧DAG优化的五个实战策略在真实项目中我们还需要考虑以下优化方向动态优先级调整def adjust_priority(dag, runtime_metrics): for task_id in dag.tasks: # 根据运行时指标动态调整优先级 if runtime_metrics[task_id][duration] threshold: dag.tasks[task_id][priority] 1资源感知调度class ResourceAwareScheduler(Scheduler): def __init__(self, max_cpu8, max_mem16): self.available_cpu max_cpu self.available_mem max_mem def can_run(self, task): return (task[cpu] self.available_cpu and task[mem] self.available_mem)其他关键策略包括增量执行只运行受代码变更影响的任务子集缓存复用对确定性任务实施结果缓存超时控制为每个任务设置合理的timeout阈值可视化追踪实时展示DAG执行状态分布式扩展将任务分发到多台机器执行5. 真实案例电商订单处理流水线最后看一个电商场景的DAG应用案例。订单处理通常包含以下阶段graph LR A[支付验证] -- B[库存锁定] B -- C[物流调度] B -- D[发票生成] C -- E[发货通知] D -- E用我们实现的调度器建模dag DAG() tasks [pay_verify, stock_lock, logistics, invoice, notify] for task in tasks: dag.add_task(task) dag.add_dependency(pay_verify, stock_lock) dag.add_dependency(stock_lock, logistics) dag.add_dependency(stock_lock, invoice) dag.add_dependency(logistics, notify) dag.add_dependency(invoice, notify) # 绑定实际处理函数 dag.tasks[pay_verify][func] validate_payment dag.tasks[stock_lock][func] reserve_inventory这个案例展示了DAG如何清晰表达并行任务物流调度和发票生成与聚合点发货通知需要等待前面两个任务完成。