Agent通信协议深度解析
Agent-to-Agent协议深度解析:构建多Agent通信系统从MCP到A2A,理解Agent通信的下一个范式转移前言:从单Agent到多Agent协作2025年,MCP(Model Context Protocol)让AI Agent能够调用外部工具。但随着Agent系统越来越复杂,一个新问题浮出水面:Agent之间怎么通信?想象一个场景:你有一个"研究Agent"负责搜集资料,一个"写作Agent"负责撰写文章,一个"审核Agent"负责质量检查。这三个Agent需要协作完成任务,但它们可能是不同团队开发的,使用不同的框架,部署在不同的服务器上。MCP解决的是"Agent ↔ 工具"的通信,而A2A(Agent-to-Agent)协议解决的是"Agent ↔ Agent"的通信。本文将深入解析Google发布的A2A协议,以及如何用Python构建一个支持A2A的多Agent系统。一、A2A协议核心概念1.1 A2A vs MCP:不同的通信层# MCP: Agent调用工具# Agent → Tool(单向调用,工具不主动与Agent对话)# A2A: Agent之间对等通信# Agent A ↔ Agent B(双向、有状态、支持长时间任务)# 类比:# MCP = 函数调用(你调用别人的API)# A2A = HTTP请求(两个服务之间的对等通信)特性MCPA2A通信方向Agent → ToolAgent ↔ Agent协议类型JSON-RPCHTTP + JSON-RPC状态管理无状态有状态(Task)发现机制配置文件Agent Card(.well-known)适用场景工具调用多Agent协作流式支持有限SSE流式认证本地OAuth2/API Key1.2 A2A的核心概念A2A协议定义了几个核心概念:1. Agent Card(Agent名片)每个A2A兼容的Agent都需要发布一个Agent Card,描述自己的能力:{"name":"Research Agent","description":"专业的研究分析Agent,擅长信息搜集和整理","url":"https://research-agent.example.com","version":"1.0.0","capabilities":{"streaming":true,"pushNotifications":false,"stateTransitionHistory":true},"authentication":{"schemes":["Bearer"]},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"web_search","name":"网络搜索","description":"搜索互联网获取最新信息","tags":["search","research"]},{"id":"summarize","name":"内容总结","description":"将长文本总结为要点","tags":["summarize","nlp"]}]}2. Task(任务)A2A的核心抽象是Task,表示一个Agent交给另一个Agent的任务:fromdataclassesimportdataclass,fieldfromtypingimportOptionalfromenumimportEnumimportuuidfromdatetimeimportdatetimeclassTaskState(Enum):SUBMITTED="submitted"# 已提交WORKING="working"# 执行中INPUT_REQUIRED="input-required"# 需要输入COMPLETED="completed"# 已完成CANCELED="canceled"# 已取消FAILED="failed"# 失败@dataclassclassMessage:"""消息"""role:str# "user" 或 "agent"parts:list[dict]# 消息内容(支持多模态)@dataclassclassArtifact:"""产物"""name:strdescription:strparts:list[dict]@dataclassclassTask:"""A2A任务"""id:str=field(default_factory=lambda:str(uuid.uuid4()))status:TaskState=TaskState.SUBMITTED messages:list[Message]=field(default_factory=list)artifacts:list[Artifact]=field(default_factory=list)metadata:dict=field(default_factory=dict)created_at:datetime=field(default_factory=datetime.now)defadd_message(self,role:str,text:str):self.messages.append(Message(role=role,parts=[{"type":"text","text":text}]))defset_status(self,state:TaskState):self.status=state3. Message Part(消息部分)A2A支持多模态消息:# 文本消息text_part={"type":"text","text":"请帮我分析这个数据"}# 文件消息file_part={"type":"file","file":{"name":"data.csv","mimeType":"text/csv","bytes":"base64编码的内容"}}# 结构化数据data_part={"type":"data","data":{"query":"Python异步编程","filters":{"language":"zh","date":"2026"}}}二、A2A Server实现2.1 基础Server框架fromfastapiimportFastAPI,HTTPException,Requestfromfastapi.responsesimportStreamingResponsefrompydanticimportBaseModelimportjsonimportasynciofromtypingimportAsyncGenerator app=FastAPI(title="A2A Research Agent")# ============ Agent Card ============AGENT_CARD={"name":"Research Agent","description":"专业的研究分析Agent","url":"http://localhost:8000","version":"1.0.0","capabilities":{"streaming":True,"pushNotifications":False,"stateTransitionHistory":True},"defaultInputModes":["text"],"defaultOutputModes":["text"],"skills":[{"id":"research","name":"研究分析","description":"深度研究某个主题并提供分析报告","tags":["research","analysis"]}]}@app.get("/.well-known/agent.json")asyncdefget_agent_card():"""发布Agent Card"""returnAGENT_CARD# ============ Task管理 ============tasks:dict[str,Task]={}classTaskRequest(BaseModel):id:str|None=Nonemessage:dictclassTaskResponse(BaseModel):id:strstatus:strmessages:list[dict]=[]artifacts:list[dict]=[]@app.post("/tasks/send")asyncdefsend_task(request:TaskRequest)-TaskResponse:"""接收任务(非流式)"""task_id=request.idorstr(uuid.uuid4())# 创建或获取Taskiftask_idintasks:task=tasks[task_id]else:task=Task(id=task_id)tasks[task_id]=task# 添加用户消息user_text=request.message.get("parts",[{}])[0].get("text","")task.add_message("user",user_text)task.set_status(TaskState.WORKING)# 执行Agent逻辑try:result=awaitprocess_research_task(user_text)task.add_message("agent",result)task.artifacts.append(Artifact(name="research_report",description="研究报告",parts=[{"type":"text","text":result}]))task.set_status(TaskState.COMPLETED)exceptExceptionase:task.add_message("agent",f"任务执行失败:{str(e)}")task.set_status(TaskState.FAILED)returnTaskResponse(id=task.id,status=task.status.value,messages=[{"role":m.role,"parts":