【与我学 ClaudeCode】规划与协调篇 之 Task System :持久化任务图与多 Agent 协作骨架
作者逆境不可逃技术永无止境希望我的内容可以帮助到你大家吼 ! 我是 逆境不可逃 今天给大家带来文章《【与我学 ClaudeCode】规划与协调篇 之 Task System 持久化任务图与多 Agent 协作骨架》.Learn-Claude-Code 官方地址 :https://github.com/shareAI-lab/learn-claude-code上一篇文章【与我学 ClaudeCode】记忆管理篇 之 Context Compact 三层压缩实现「无限会话」-CSDN博客Task System 是迭代的第 7 个版本s07核心解决早期 Todo 模型的局限性它将内存中的扁平清单升级为持久化到磁盘的任务依赖图DAG为多 Agent 协作、长时任务提供了可靠的协调骨架让任务的生命周期超越单次对话和上下文压缩。学习路线s01 s02 s03 s04 s05 s06| s07 s08 s09 s10 s11 s12一、问题根源为什么早期 Todo 模型撑不起复杂任务s03 的 TodoManager 是内存中的扁平清单存在三个致命缺陷无依赖关系只有 “做完 / 没做完” 两种状态无法表达 “任务 B 必须等任务 A 完成才能开始” 的顺序约束也无法识别可并行执行的任务状态易丢失仅保存在内存中一旦进程崩溃、上下文压缩或会话重置任务状态就会永久丢失无法多 Agent 协作没有持久化的共享状态多个 Agent 无法协同推进任务也无法感知彼此的工作进度二、四大核心设计决策Task System 通过四个关键设计构建了一个零依赖、高可靠、可扩展的任务协调框架。1. 任务存储为 JSON 文件而非内存核心设计任务以 JSON 文件形式持久化在.tasks/目录中而非保存在内存里。这带来三个关键好处进程崩溃不丢失Agent 在任务中途崩溃重启后任务板仍在磁盘上可继续推进多 Agent 协调多个 Agent 读写同一任务目录无需共享内存即可实现多代理协调文件系统就是共享数据库可调试可干预人类可以查看和手动编辑任务文件来调试直接修改状态或依赖关系替代方案的致命缺陷内存存储如 s03 的 TodoWrite实现更简单更快但崩溃会丢失状态也无法跨多个代理进程工作使用 SQLite、Redis 等数据库能提供更好的并发性但会增加依赖和运维复杂度。文件是零依赖、可在任何地方工作的持久化层。2. 任务具有blocks/blockedBy依赖字段核心设计每个任务可以声明它阻塞哪些下游任务blocks以及它被哪些上游任务阻塞blockedBy。Agent 永远不会开始有未解决blockedBy依赖的任务。这对多代理协调至关重要当 Agent A 在编写数据库 Schema、Agent B 需要写查询时Agent B 的任务会被 Agent A 的任务阻塞避免针对不存在的 Schema 工作。替代方案的致命缺陷简单的优先级排序高 / 中 / 低无法表达 “任务 B 必须等任务 A 完成才能开始” 的硬依赖中心化协调器按顺序分配任务会创建单点故障和瓶颈。声明式依赖让每个 Agent 通过读取任务文件就能独立判断自己能做什么。3. Task 为课程主线Todo 仍有适用场景核心设计TaskManager 延续了 Todo 的心智模型并在 s07 之后成为默认主线。两者都管理带状态的任务项但 TaskManager 增加了文件持久化崩溃后可恢复依赖追踪blocks/blockedByowner字段与多进程协作能力Todo 仍适合短、线性、一次性的轻量级跟踪无需复杂依赖和持久化。替代方案的致命缺陷只用 Todo 能保持模型极简但不适合长期运行或协作工作到处都用 Task 能最大化一致性但对一次性小任务来说会显得笨重。分层设计兼顾了不同场景的效率。4. 持久化仍需要写入纪律核心设计文件持久化能降低上下文丢失但不会自动消除并发写入风险。写任务状态前应先重读 JSON 文件、校验status/blockedBy是否符合预期再原子写回避免不同 Agent 悄悄覆盖彼此状态。替代方案的致命缺陷盲目覆盖写入实现更简单但在并行执行下会破坏协调状态带乐观锁的数据库能提供更强的安全性但课程选择基于文件的状态以保持零依赖教学。三、系统整体架构与工作原理1. 核心架构持久化任务依赖图DAG.tasks/ task_1.json {id:1, subject:Setup DB, status:completed} task_2.json {id:2, subject:API routes, blockedBy:[1], status:pending} task_3.json {id:3, subject:Auth module, blockedBy:[1], status:pending} task_4.json {id:4, subject:Integration, blockedBy:[2,3], status:pending} 任务图 (DAG): ---------- -- | task 2 | -- | | pending | | ---------- ---------- -- ---------- | task 1 | | task 4 | | completed| -- ---------- -- | blocked | ---------- | task 3 | -- ---------- | pending | ---------- 顺序: task 1 必须先完成, 才能开始 2 和 3 并行: task 2 和 3 可以同时执行 依赖: task 4 要等 2 和 3 都完成 状态: pending - in_progress - completed这个任务图回答了三个关键问题什么可以做状态为pending且blockedBy为空的任务什么被卡住等待前置任务完成的任务blockedBy不为空什么做完了状态为completed的任务完成时自动解锁后续任务2. 关键组件与实现细节(1) TaskManager任务 CRUD 与依赖图核心class TaskManager: def __init__(self, tasks_dir: Path): self.dir tasks_dir self.dir.mkdir(exist_okTrue) self._next_id self._max_id() 1 # 自动生成下一个任务ID def _max_id(self) - int: 获取当前最大任务ID用于生成新任务ID ids [int(f.stem.split(_)[1]) for f in self.dir.glob(task_*.json)] return max(ids) if ids else 0 def _load(self, task_id: int) - dict: 从磁盘加载任务文件 path self.dir / ftask_{task_id}.json if not path.exists(): raise ValueError(fTask {task_id} not found) return json.loads(path.read_text()) def _save(self, task: dict): 将任务写入磁盘原子写入可扩展 path self.dir / ftask_{task[id]}.json path.write_text(json.dumps(task, indent2, ensure_asciiFalse)) def create(self, subject: str, description: str ) - str: 创建新任务初始状态为pending无依赖 task { id: self._next_id, subject: subject, description: description, status: pending, blockedBy: [], owner: , } self._save(task) self._next_id 1 return json.dumps(task, indent2, ensure_asciiFalse) def update(self, task_id: int, status: str None, add_blocked_by: list None, remove_blocked_by: list None) - str: 更新任务状态或依赖关系完成任务时自动解锁下游 task self._load(task_id) if status: if status not in (pending, in_progress, completed): raise ValueError(fInvalid status: {status}) task[status] status # 任务完成时自动从所有下游任务的blockedBy中移除自身ID if status completed: self._clear_dependency(task_id) # 添加/移除依赖 if add_blocked_by: task[blockedBy] list(set(task[blockedBy] add_blocked_by)) if remove_blocked_by: task[blockedBy] [x for x in task[blockedBy] if x not in remove_blocked_by] self._save(task) return json.dumps(task, indent2, ensure_asciiFalse) def _clear_dependency(self, completed_id: int): 解除已完成任务对其他任务的阻塞 for f in self.dir.glob(task_*.json): task json.loads(f.read_text()) if completed_id in task.get(blockedBy, []): task[blockedBy].remove(completed_id) self._save(task) def list_all(self) - str: 列出所有任务格式化显示状态和依赖 tasks [] files sorted(self.dir.glob(task_*.json), keylambda f: int(f.stem.split(_)[1])) for f in files: tasks.append(json.loads(f.read_text())) if not tasks: return No tasks. lines [] for t in tasks: marker {pending: [ ], in_progress: [], completed: [x]}.get(t[status], [?]) blocked f (blocked by: {t[blockedBy]}) if t.get(blockedBy) else lines.append(f{marker} #{t[id]}: {t[subject]}{blocked}) return \n.join(lines)(2) 工具注册四个任务工具TOOL_HANDLERS { # 基础工具bash、read_file等 bash: lambda **kw: run_bash(kw[command]), read_file: lambda **kw: run_read(kw[path], kw.get(limit)), write_file: lambda **kw: run_write(kw[path], kw[content]), edit_file: lambda **kw: run_edit(kw[path], kw[old_text], kw[new_text]), # 任务系统工具 task_create: lambda **kw: TASKS.create(kw[subject], kw.get(description, )), task_update: lambda **kw: TASKS.update(kw[task_id], kw.get(status), kw.get(addBlockedBy), kw.get(removeBlockedBy)), task_list: lambda **kw: TASKS.list_all(), task_get: lambda **kw: TASKS.get(kw[task_id]), } TOOLS [ # 其他工具略 {name: task_create, description: Create a new task., input_schema: {type: object, properties: {subject: {type: string}, description: {type: string}}, required: [subject]}}, {name: task_update, description: Update a tasks status or dependencies., input_schema: {type: object, properties: {task_id: {type: integer}, status: {type: string, enum: [pending, in_progress, completed]}, addBlockedBy: {type: array, items: {type: integer}}, removeBlockedBy: {type: array, items: {type: integer}}}, required: [task_id]}}, {name: task_list, description: List all tasks with status summary., input_schema: {type: object, properties: {}}}, {name: task_get, description: Get full details of a task by ID., input_schema: {type: object, properties: {task_id: {type: integer}}, required: [task_id]}}, ](3) Agent 主循环工具调用处理def agent_loop(messages: list): while True: response client.messages.create( modelMODEL, systemSYSTEM, messagesmessages, toolsTOOLS, max_tokens8000, ) messages.append({role: assistant, content: response.content}) if response.stop_reason ! tool_use: return results [] for block in response.content: if block.type tool_use: handler TOOL_HANDLERS.get(block.name) try: output handler(**block.input) if handler else fUnknown tool: {block.name} except Exception as e: output fError: {e} print(f {block.name}:) print(str(output)[:200]) results.append({type: tool_result, tool_use_id: block.id, content: str(output)}) messages.append({role: user, content: results})(4)执行流程四、与 Context Compacts06的关键变更对比组件之前s06 Context Compact之后s07 Task System工具集5 个工具基础 compact8 个工具基础 task_create/update/list/get规划模型无仅上下文压缩带依赖关系的任务图DAG任务关系无blockedBy依赖边支持顺序 / 并行约束状态追踪无对话级状态pending - in_progress - completed三状态持久化对话压缩后丢失任务状态保存在磁盘压缩和重启后存活核心优化上下文压缩与归档任务持久化、依赖管理与多 Agent 协作五、核心优势与创新点超越对话的持久化状态任务状态保存在磁盘不受上下文压缩、会话重置、进程崩溃影响支持长时任务声明式依赖与自动解锁通过blockedBy字段表达任务依赖完成任务时自动解除下游阻塞Agent 无需手动管理依赖关系天然支持多 Agent 协作多个 Agent 读写同一任务目录通过文件系统实现无锁协调无需中心化调度器可调试可干预人类可直接查看、编辑任务文件手动调整状态或依赖关系调试成本极低零依赖实现基于文件系统无需数据库或额外服务可在任何环境中运行符合课程教学的零依赖原则六、运行示例假设 Agent 正在开发一个 Web 应用任务系统的工作流程如下Agent 调用task_create创建初始任务#1 Setup DB schema状态pending#2 Build API routes状态pendingblockedBy: [1]#3 Implement auth module状态pendingblockedBy: [1]#4 Integrate routes with auth状态pendingblockedBy: [2,3]Agent 调用task_update将#1状态改为in_progress开始执行数据库 Schema 设计完成后将#1状态改为completedTaskManager 自动将#2和#3的blockedBy中的1移除解锁这两个任务Agent或多个 Agent可以同时执行#2和#3无需等待彼此当#2和#3都完成后它们的completed事件会自动解除#4的阻塞Agent 开始执行集成任务即使中间会话被压缩或进程重启所有任务状态仍保存在.tasks/目录中重启后 Agent 调用task_list即可恢复进度七、可扩展方向原子写入与并发控制实现文件写入的原子操作如先写临时文件再重命名避免多 Agent 并发写入导致的状态损坏任务优先级与分配为任务添加priority字段和owner字段支持 Agent 认领任务和优先级调度依赖可视化基于 JSON 文件生成任务依赖图如 Mermaid 或 DOT 格式方便人类直观查看任务关系任务超时与重试为任务添加timeout和retries字段自动标记超时任务并支持重试跨 Agent 通信基于任务文件添加简单的消息字段实现 Agent 之间的异步通信如任务备注、问题反馈