多Agent系统:比单Agent难的不是技术而是协调AI Agent工程实战系列 · 第05篇 / 共10篇Orchestrator模式、任务分解、冲突解决、结果聚合以及为什么大多数多Agent系统最终退化成了单Agent一个让我们返工三周的架构决策去年我们给一个法律科技公司搭了一套合同审查系统。需求很清楚:上传一份合同,AI自动识别风险条款、检查合规性、对比行业标准条款、生成修改建议。四个任务,我们自然地想到用四个Agent:Demo效果出色,四个Agent各司其职,输出丰富。但上线之后问题来了:风险识别Agent认为第7条是严重风险,修改建议Agent却建议"轻微调整措辞即可"——两个Agent的结论互相矛盾,用户不知道听谁的合规检查Agent和条款对比Agent都需要读原始合同,但它们拿到的"原始合同"版本不一样——前一个Agent处理时做了格式清洗,后一个Agent用的是原始版本,导致段落编号对不上整个流程串行执行,四个Agent加起来要等90秒,但其实风险识别和合规检查完全可以并行某个中间Agent失败了,后续Agent拿到的是空输入,但它不知道,继续生成了一堆基于空输入的"建议"这四个问题,一个都不是模型能力的问题。全是协调问题。多Agent的本质:分布式系统的所有问题,加上LLM的不确定性在引入多Agent之前,必须先问自己一个问题:为什么不用单Agent?这不是反问,是真实的决策依据。多Agent带来的复杂度是真实的,收益必须大于成本才值得引入。值得用多Agent的场景① 任务天然可并行,且每个子任务足够独立 例:同时分析一份文档的多个维度,各维度没有依赖关系 ② 需要专业化分工,不同子任务需要不同的工具集或System Prompt 例:一个Agent专门做数学计算,一个专门做文本生成, 混在一起会互相干扰 ③ 单个任务超过单个Agent的上下文窗口上限 例:分析一份500页的合同,需要分段处理再聚合 ④ 需要多视角交叉验证 例:医疗诊断,多个Agent从不同角度分析, 再由主Agent综合判断不值得用多Agent的场景① 任务是线性的,每步都依赖上一步的结果 → 用单Agent更简单,中间状态不需要序列化传递 ② 子任务之间高度耦合,一个子任务的输出频繁影响另一个的输入 → 多Agent的通信开销会大于并行带来的收益 ③ 团队对分布式系统缺乏经验 → 多Agent的故障排查难度远超单Agent ④ 总任务复杂度不高,3步以内可以完成 → 引入多Agent是过度设计Orchestrator模式:多Agent的标准架构所有生产级多Agent系统,最终都需要一个协调者。这个协调者叫做Orchestrator(编排器)。Orchestrator不执行具体任务,只负责协调。子Agent不做决策,只负责执行分配到的任务。这个职责分离是多Agent系统稳定运行的基础。一旦子Agent开始自行决定"我觉得应该做点额外的事",系统就开始失控。任务分解:Orchestrator最核心的能力任务分解不是"把大任务切成小块",而是要识别:哪些子任务可以并行? 哪些子任务有依赖关系,必须串行? 哪些子任务的输出需要传递给其他子任务? 哪些子任务失败后,整体任务还能继续?fromdataclassesimportdataclass,fieldfromtypingimportList,Optional,Dict,Any,SetfromenumimportEnumimportasyncioimportuuidfromdatetimeimportdatetime,timezoneclassExecutionMode(Enum):PARALLEL="parallel"# 可以并行执行SEQUENTIAL="sequential"# 必须串行执行@dataclassclassSubTask:"""Orchestrator分配给子Agent的子任务"""task_id:strname:strdescription:stragent_role:str# 分配给哪类Agent("risk_analyzer" / "compliance_checker"等)execution_mode:ExecutionMode# 依赖关系:这个子任务需要等哪些子任务完成才能开始depends_on:List[str]=field(default_factory=list)# 输入:需要哪些上游任务的输出作为输入input_from:Dict[str,str]=field(default_factory=dict)# 格式:{"参数名": "上游任务ID"},例如 {"risk_summary": "task-001"}# 容错:这个子任务失败了,整体任务是否可以继续optional:bool=False# 结果status:str="pending"# pending / running / completed / failedresult:Optional[Any]=Noneerror:Optional[str]=Nonestarted_at:Optional[str]=Nonecompleted_at:Optional[str]=NoneclassTaskDecomposer:""" 任务分解器 输入:用户的高层目标 输出:有依赖关系的子任务DAG(有向无环图) """def__init__(self,llm):self.llm=llmasyncdefdecompose(self,goal:str,available_agents:Dict[str,str],context:Dict=None)-List[SubTask]:""" 将高层目标分解为子任务列表 available_agents: {"agent_role": "agent的能力描述"} 返回的子任务列表已经包含依赖关系,可直接用于调度 """agents_desc="\n".join([f"-{role}:{desc}"forrole,descinavailable_agents.items()])prompt=f"""你是一个任务编排专家。请将以下目标分解为可执行的子任务。 目标:{goal}可用的Agent角色:{agents_desc}要求: 1. 识别哪些子任务可以并行执行,哪些必须串行 2. 明确每个子任务依赖哪些其他子任务的输出 3. 每个子任务分配给最合适的Agent角色 4. 可选任务(失败不影响整体)请标注optional=true 请以JSON格式输出子任务列表: [ { { "name": "子任务名称", "description": "具体任务描述", "agent_role": "assigned_agent_role", "execution_mode": "parallel|sequential", "depends_on": ["依赖的子任务名称列表"], "input_from": { {"输入参数名": "来源任务名"}}, "optional": false }} ] 重要:只输出JSON,不要有任何额外文字。"""response=awaitself.llm.ainvoke([{"role":"user","content":prompt}])importjson raw_tasks=json.loads(response.content)# 转换为SubTask对象,生成唯一IDname_to_id={}subtasks=[]fori,tinenumerate(raw_tasks):task_id=f"subtask-{i+1:02d}"name_to_id[t["name"]]=task_id subtasks.append(SubTask(task_id=task_id,name=t["name"],description=t["description"],agent_role=t["agent_role"],execution_mode=ExecutionMode(t["execution_mode"]),depends_on=[],# 先空着,下面填input_from=t.get("input_from",{}),optional=t.get("optional",False)))# 填入依赖关系(用ID而不是名称)fori,tinenumerate(raw_tasks):subtasks[i].depends_on=[name_to_id[dep_name]fordep_nameint.get("depends_on",[])ifdep_nameinname_to_id]# 验证DAG无环self._validate_dag(subtasks)returnsubtasksdef_validate_dag(self,subtasks:List[SubTask]):"""验证任务依赖关系不存在循环"""id_to_task={t.task_id:tfortinsubtasks}visited:Set[str]=set()rec_stack:Set[str]=set()defhas_cycle(task_id:str)-bool:visited.add(task_id)rec_stack.add(task_id)task=id_to_task.get(task_id)iftask:fordep_idintask.depends_on:ifdep_idnotinvisited:ifhas_cycle(dep_id):returnTrueelifdep_idinrec_stack:returnTruerec_stack.remove(task_id)returnFalsefortaskinsubtasks:iftask.task_idnotinvisited:ifhas_cycle(task.task_id):raiseValueError(f"任务依赖图存在循环,请检查任务 '{task.name}' 的依赖关系")