从总线接收消息组装上下文推理该做什么这是 LLM 调用根据决定行动调用工具、执行命令观察结果保存状态判断我完成了吗还是再循环一次完成后回复注本系列借鉴的文章过多可能在参考文献中有遗漏的文章如果有还请大家指出。0x01 原理1.1 Agent负责“执行”一个 Agent 一个完整的 AI “大脑实例”每个 Agent 都拥有独立资源。Agent 是“执行平面”解决“消息怎样变成模型调用、工具执行和最终回复”。具体如下图来自MiniClaw。Feishu Cloud | | HTTP POST /feishu/events | (im.message.receive_v1) v [ESP32 Webhook Server :18790] | | message_bus_push_inbound() v [Message Bus] ── [Agent Loop] ── [Message Bus] (Claude/GPT) | | outbound dispatch v [feishu_send_message()] | | POST /im/v1/messages v Feishu API下图是Agent 的最小循环。每个 AI Agent 都需要这个循环。模型决定何时调用工具、何时停止。代码只是执行模型的要求。An agentic loop is the full “real” run of an agent: intake → context assembly → model inference → tool execution → streaming replies → persistence. THE AGENT PATTERN User -- messages[] -- LLM -- response | stop_reason tool_use? / \ yes no | | execute tools return text append results loop back ----------------- messages[]1.2 Pi-Agent框架OpenClaw所使用的引擎是Pi-Agent框架它是一个仅有四个工具、系统提示词不到1000个token秉持“精简至上”原则的AI编程Agent。与其他编程Agent相比Pi的工程设计和决策机制极为简洁形成了鲜明对比。下图是 OpenClaw 的循环概要。runEmbeddedPiAgent() └── while (true) { // 主重试循环 ├── 检查重试次数限制 (MAX_RUN_LOOP_ITERATIONS) ├── 调用 runEmbeddedAttempt() // 单次推理尝试 ├── 处理 context overflow → 自动压缩 ├── 处理 auth failure → profile轮换 ├── 处理 timeout → 重试或报错 └── 成功则返回 payloads }Pi的设计理念可以总结为不是为LLM打造一个复杂的“控制台”而是给它一把“多功能小刀”——工具虽少但实用提示虽简但明确让模型的原生能力成为主导而不是被框架的复杂性所掩盖。Pi 这种设计理念是基于一个关键事实——经过强化学习训练的前沿LLM模型已经具备了很强的理解和执行能力。它们能明确知道“编码Agent”的主要任务是什么根本不需要长篇大论的系统提示词和复杂的辅助模块来“指导”它们工作。从数据层面分析Pi的系统提示词加上工具定义总长度还不到1000个token仅仅是Claude Code的十分之一内置工具也只有4个远少于同类产品。这说明Pi在主流Agent都在强化的方面几乎都做了简化系统提示词简短明了内置工具数量精简没有复杂的规划模式和多代理通信协议Plan Mode和MCP支持更没有难以监控的子AgentPi的核心策略是去除冗余辅助模块让LLM模型发挥核心作用用最简洁的结构实现最核心的功能。或许有人会问如此简单的设计真的能应对复杂的编码任务吗实际上Pi的简洁并非“简陋”而是“精准”。接下来我们详细解析这4个内置工具的设计思路——read、write、edit、bash工具主要功能read读取文件、审查代码、获取上下文信息write创建文件、写入内容edit修改代码、进行增量更新bash执行命令、操作环境、通过自我调用来拆分任务这四个工具几乎涵盖了编码Agent的所有核心需求。特别是bash工具的引入既实现了复杂任务的拆分和执行保证了功能的完整性又避免了引入子Agent可能带来的不可预测性和监控难题——这就是Pi敢于放弃子Agent架构的原因。同时Pi使用简短的系统提示词并非降低了对LLM的引导标准而是充分信任前沿LLM的能力。正如Mario Zechner所倡导的与其用大量token去“教导”LLM如何成为Agent不如用简洁的提示词明确其核心任务让LLM充分发挥自身的理解和执行能力。这种设计思路带来了三大好处节省上下文空间——降低推理成本提高运行效率行为更加灵活自主——LLM能根据实际情况动态调整策略不受冗长规则限制更好的适应性——简洁的结构意味着更低的认知负担和更强的泛化能力0x02 AgentLoopAgentLoop 是nanobot Agent运行的核心。智能体循环是区分聊天机器人和智能体的关键。2.1 架构AgentLoop 类的架构如下2.2 流程下面是一个 AI Agent智能体的消息处理流程图展示了从消息接收到响应发送的完整链路包括 LLM 交互、工具调用循环等核心机制。入口消息到达InboundMessage ↓ AgentLoop.run() - 监听并接收消息 ↓ AgentLoop._dispatch() - 分派处理 ↓ AgentLoop._process_message() - 主要处理逻辑 ↓ ContextBuilder.build_messages() - 构建上下文 ↓ AgentLoop._run_agent_loop() - 核心代理循环 ↓ Provider.chat() - LLM交互 ↓ ← 判断是否有工具调用 ↓ 否 ← 返回最终内容 ↓ 是 ← 执行工具调用 ↓ ContextBuilder.add_tool_result() - 添加工具结果 ↓ ← 继续循环直到没有更多工具调用 ↓ AgentLoop._save_turn() - 保存交互记录 ↓ 通过MessageBus发布OutboundMessage - 发送响应部分环节详细拆解如下2.3 定义和初始化AgentLoop 的定义和初始化代码如下class AgentLoop: The agent loop is the core processing engine. It: 1. Receives messages from the bus 2. Builds context with history, memory, skills 3. Calls the LLM 4. Executes tool calls 5. Sends responses back def __init__( self, bus: MessageBus, # 消息总线用于接收/发送消息 provider: LLMProvider, # LLM提供者如OpenAI/本地模型 workspace: Path, # Agent工作目录用于隔离文件操作 model: str | None None, # 使用的LLM模型名称 max_iterations: int 40, # Agent最大迭代次数防止无限循环 temperature: float 0.1, # LLM温度参数越低越确定 max_tokens: int 4096, # LLM最大生成Token数 memory_window: int 100, # 记忆窗口大小会话历史最大条数 brave_api_key: str | None None, # Brave搜索API密钥用于网页搜索工具 exec_config: ExecToolConfig | None None, # 命令执行工具配置 cron_service: CronService | None None, # 定时任务服务可选 restrict_to_workspace: bool False, # 是否限制Agent仅操作工作区 session_manager: SessionManager | None None, # 会话管理器可选 mcp_servers: dict | None None, # MCP服务器配置可选 channels_config: ChannelsConfig | None None, # 通道配置可选 ): # 解决循环导入问题仅运行时导入ExecToolConfig from nanobot.config.schema import ExecToolConfig # 基础属性初始化 self.bus bus # 消息总线实例 self.channels_config channels_config # 通道配置 self.provider provider # LLM提供者实例 self.workspace workspace # 工作目录路径 # 模型名称优先传入值否则使用LLM提供者默认模型 self.model model or provider.get_default_model() self.max_iterations max_iterations # 最大迭代次数 self.temperature temperature # LLM温度 self.max_tokens max_tokens # LLM最大Token数 self.memory_window memory_window # 记忆窗口大小 self.brave_api_key brave_api_key # Brave API密钥 # 执行工具配置默认空配置 self.exec_config exec_config or ExecToolConfig() self.cron_service cron_service # 定时任务服务 self.restrict_to_workspace restrict_to_workspace # 工作区限制开关 # 核心组件初始化 self.context ContextBuilder(workspace) # 上下文构建器构建LLM输入上下文 # 会话管理器优先传入实例否则创建新实例 self.sessions session_manager or SessionManager(workspace) self.tools ToolRegistry() # 工具注册表管理所有可用工具 # 子Agent管理器用于生成子Agent处理子任务 self.subagents SubagentManager( providerprovider, workspaceworkspace, busbus, modelself.model, temperatureself.temperature, max_tokensself.max_tokens, brave_api_keybrave_api_key, exec_configself.exec_config, restrict_to_workspacerestrict_to_workspace, ) # 运行状态与资源管理属性 self._running False # Agent循环是否运行 self._mcp_servers mcp_servers or {} # MCP服务器配置 self._mcp_stack: AsyncExitStack | None None # MCP连接上下文栈 self._mcp_connected False # MCP是否已连接 self._mcp_connecting False # MCP是否正在连接 self._consolidating: set[str] set() # 正在进行记忆合并的会话Key集合 self._consolidation_tasks: set[asyncio.Task] set() # 记忆合并任务集合 self._consolidation_locks: dict[str, asyncio.Lock] {} # 会话记忆合并锁 self._active_tasks: dict[str, list[asyncio.Task]] {} # 活跃任务session_key - 任务列表 self._processing_lock asyncio.Lock() # 全局消息处理锁防止并发冲突 self._register_default_tools() # 注册默认工具 def _register_default_tools(self) - None: Register the default set of tools. 注册默认工具集 # 确定文件工具的允许目录如果限制工作区则为工作目录否则为None无限制 allowed_dir self.workspace if self.restrict_to_workspace else None # 注册文件系统工具读/写/编辑/列目录 for cls in (ReadFileTool, WriteFileTool, EditFileTool, ListDirTool): self.tools.register(cls(workspaceself.workspace, allowed_dirallowed_dir)) # 注册命令执行工具 self.tools.register(ExecTool( working_dirstr(self.workspace), # 工作目录 timeoutself.exec_config.timeout, # 执行超时时间 restrict_to_workspaceself.restrict_to_workspace, # 工作区限制 path_appendself.exec_config.path_append, # 环境变量PATH追加 )) # 注册网页相关工具搜索/爬取 self.tools.register(WebSearchTool(api_keyself.brave_api_key)) self.tools.register(WebFetchTool()) # 注册消息发送工具回调函数为消息总线发布出站消息 self.tools.register(MessageTool(send_callbackself.bus.publish_outbound)) # 注册子Agent生成工具 self.tools.register(SpawnTool(managerself.subagents)) # 如果有定时任务服务注册定时任务工具 if self.cron_service: self.tools.register(CronTool(self.cron_service)) async def _connect_mcp(self) - None: Connect to configured MCP servers (one-time, lazy). 连接MCP服务器懒加载仅一次 # 跳过条件已连接/正在连接/无MCP配置 if self._mcp_connected or self._mcp_connecting or not self._mcp_servers: return self._mcp_connecting True # 标记为正在连接 from nanobot.agent.tools.mcp import connect_mcp_servers # 延迟导入MCP连接函数 try: # 创建异步上下文栈用于管理MCP连接资源 self._mcp_stack AsyncExitStack() await self._mcp_stack.__aenter__() # 进入上下文栈 # 连接MCP服务器将工具注册到MCP await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) self._mcp_connected True # 标记为已连接 except Exception as e: # 连接失败记录日志下次消息处理时重试 logger.error(Failed to connect MCP servers (will retry next message): {}, e) if self._mcp_stack: try: await self._mcp_stack.aclose() # 关闭上下文栈 except Exception: pass self._mcp_stack None finally: self._mcp_connecting False # 清除正在连接标记 def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None None) - None: Update context for all tools that need routing info. 更新需要路由信息的工具上下文 # 消息工具设置通道/聊天ID/消息ID用于消息发送路由 if message_tool : self.tools.get(message): if isinstance(message_tool, MessageTool): message_tool.set_context(channel, chat_id, message_id) # 子Agent生成工具设置通道/聊天ID if spawn_tool : self.tools.get(spawn): if isinstance(spawn_tool, SpawnTool): spawn_tool.set_context(channel, chat_id) # 定时任务工具设置通道/聊天ID if cron_tool : self.tools.get(cron): if isinstance(cron_tool, CronTool): cron_tool.set_context(channel, chat_id)2.4 runrun是代理的主循环入口。核心作用run 负责持续消费消息总线的入站消息并异步分发处理同时保证/stop指令的实时响应。关键逻辑1 秒超时消费消息避免主线程阻塞确保/stop能及时被处理异步任务分发非/stop消息通过_dispatch异步处理不阻塞主循环任务追踪通过_active_tasks记录各会话的活跃任务配合回调自动清理支持/stop批量终止。异常处理超时无消息时直接跳过不中断主循环保证代理持续运行。async def run(self) - None: Run the agent loop, dispatching messages as tasks to stay responsive to /stop. # 将代理运行状态标记为True表示开始运行 self._running True # 异步连接MCP服务器懒加载仅首次执行失败会在后续重试 await self._connect_mcp() # 记录日志代理循环已启动 logger.info(Agent loop started) # 核心循环只要代理处于运行状态就持续消费并处理消息 while self._running: try: # 从消息总线消费入站消息设置1秒超时避免无限阻塞保证/stop指令响应性 # asyncio.wait_for超时会抛出TimeoutError触发continue继续循环 msg await asyncio.wait_for(self.bus.consume_inbound(), timeout1.0) except asyncio.TimeoutError: # 超时无消息时跳过本次循环继续等待下一轮 continue # 判断消息内容是否为/stop指令忽略首尾空格、大小写 if msg.content.strip().lower() /stop: # 处理/stop指令终止当前会话的所有活跃任务和子代理 await self._handle_stop(msg) else: # 非/stop指令创建异步任务处理消息保证主线程不阻塞响应后续/stop task asyncio.create_task(self._dispatch(msg)) # 将任务添加到_active_tasks映射中session_key为键便于后续批量终止 # setdefault如果session_key不存在则创建空列表再追加任务 self._active_tasks.setdefault(msg.session_key, []).append(task) # 为任务添加完成回调任务结束后从_active_tasks中移除避免内存泄漏 # 匿名函数参数k绑定当前msg.session_keyt为完成的任务对象 # 逻辑如果任务仍在对应session的任务列表中则移除否则无操作 task.add_done_callback(lambda t, kmsg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None)2.5 _dispatch_dispatch是消息分发的核心方法。核心作用_dispatch在全局锁保护下执行消息处理保证串行化同时统一处理异常和响应发布。关键逻辑全局锁_processing_lock避免多任务并发处理消息导致的资源冲突响应发布规则有响应则发布响应、CLI 渠道无响应则发布空消息、异常则发布错误提示异常处理区分任务取消异常重新抛出和通用异常记录 返回错误提示保证异常链路清晰。边界处理针对 CLI 渠道做特殊适配发布空消息避免命令行交互阻塞。async def _dispatch(self, msg: InboundMessage) - None: Process a message under the global lock. # 获取全局处理锁异步上下文管理器确保消息串行处理避免资源竞争 async with self._processing_lock: try: # 调用核心消息处理方法传入入站消息获取出站响应可能为None response await self._process_message(msg) # 如果处理后有非空的出站响应 if response is not None: # 将响应发布到消息总线的出站队列 await self.bus.publish_outbound(response) # 如果无响应且消息渠道是CLI命令行界面 elif msg.channel cli: # 向CLI渠道发布空内容的出站消息保证CLI交互的完整性避免阻塞 await self.bus.publish_outbound(OutboundMessage( channelmsg.channel, chat_idmsg.chat_id, content, metadatamsg.metadata or {}, )) # 捕获任务取消异常如/stop指令触发的任务终止 except asyncio.CancelledError: # 记录日志会话对应的任务已被取消 logger.info(Task cancelled for session {}, msg.session_key) # 重新抛出取消异常让上层逻辑处理如清理任务列表 raise # 捕获所有其他未预期的异常 except Exception: # 记录异常日志包含堆栈信息便于问题排查 logger.exception(Error processing message for session {}, msg.session_key) # 向消息来源渠道发布统一的错误提示消息 await self.bus.publish_outbound(OutboundMessage( channelmsg.channel, chat_idmsg.chat_id, contentSorry, I encountered an error., ))2.6 _process_message()_process_message是单条消息处理的核心入口。核心作用_process_message支持系统消息、斜杠命令、普通对话三种场景完成「上下文构建→代理循环→结果保存→响应返回」全流程。关键逻辑系统消息处理解析渠道信息独立构建会话和上下文适用于后台任务斜杠命令/new合并记忆并清空会话/help返回命令列表记忆合并未合并消息达阈值时异步执行避免阻塞主流程进度回调实时推送处理进度含工具调用提示提升交互体验重复回复防护消息工具已发送过消息则返回 None避免重复响应。边界处理兜底默认回复无最终内容时返回标准化提示媒体消息支持构建上下文时兼容图片等媒体内容会话锁机制通过合并锁避免并发修改会话记忆。async def _process_message( self, msg: InboundMessage, session_key: str | None None, on_progress: Callable[[str], Awaitable[None]] | None None, ) - OutboundMessage | None: Process a single inbound message and return the response. # 处理系统消息从chat_id中解析原始渠道和聊天ID格式为channel:chat_id if msg.channel system: # 拆分chat_id有分隔符则拆分为渠道聊天ID否则默认CLI渠道 channel, chat_id (msg.chat_id.split(:, 1) if : in msg.chat_id else (cli, msg.chat_id)) # 记录日志正在处理来自指定发送者的系统消息 logger.info(Processing system message from {}, msg.sender_id) # 构建会话唯一标识渠道聊天ID key f{channel}:{chat_id} # 获取或创建该会话不存在则新建 session self.sessions.get_or_create(key) # 为工具设置上下文渠道、聊天ID、消息ID用于消息路由 self._set_tool_context(channel, chat_id, msg.metadata.get(message_id)) # 从会话中获取历史消息最多保留memory_window条控制上下文长度 history session.get_history(max_messagesself.memory_window) # 构建LLM所需的完整上下文消息历史当前消息渠道信息 messages self.context.build_messages( historyhistory, current_messagemsg.content, channelchannel, chat_idchat_id, ) # 运行代理核心循环获取最终回复内容、使用的工具列表、所有消息 final_content, _, all_msgs await self._run_agent_loop(messages) # 保存本轮对话到会话跳过已存在的历史消息仅保存新内容 self._save_turn(session, all_msgs, 1 len(history)) # 将更新后的会话持久化到本地 self.sessions.save(session) # 返回系统消息处理结果无内容则默认Background task completed. return OutboundMessage(channelchannel, chat_idchat_id, contentfinal_content or Background task completed.) # 非系统消息截取消息内容预览超过80字符则截断加省略号 preview msg.content[:80] ... if len(msg.content) 80 else msg.content # 记录日志正在处理来自指定渠道/发送者的消息展示预览 logger.info(Processing message from {}:{}: {}, msg.channel, msg.sender_id, preview) # 确定会话key优先使用传入的session_key否则使用消息自带的session_key key session_key or msg.session_key # 获取或创建该会话 session self.sessions.get_or_create(key) # 处理斜杠命令Slash commands # 标准化命令去除首尾空格并转为小写 cmd msg.content.strip().lower() # 处理/new命令新建会话合并当前记忆并清空 if cmd /new: # 获取该会话的记忆合并锁避免并发合并 lock self._get_consolidation_lock(session.key) # 将会话标记为正在合并记忆 self._consolidating.add(session.key) try: # 加锁执行记忆合并异步锁防止并发操作 async with lock: # 截取会话中未合并的消息从上次合并位置到末尾 snapshot session.messages[session.last_consolidated:] # 如果有未合并的消息 if snapshot: # 创建临时会话对象仅包含未合并的消息 temp Session(keysession.key) temp.messages list(snapshot) # 执行记忆合并归档所有消息失败则返回错误提示 if not await self._consolidate_memory(temp, archive_allTrue): return OutboundMessage( channelmsg.channel, chat_idmsg.chat_id, contentMemory archival failed, session not cleared. Please try again., ) # 捕获合并过程中的所有异常 except Exception: # 记录异常日志含堆栈便于排查 logger.exception(/new archival failed for {}, session.key) # 返回合并失败的错误提示 return OutboundMessage( channelmsg.channel, chat_idmsg.chat_id, contentMemory archival failed, session not cleared. Please try again., ) # 无论成功/失败最终执行 finally: # 取消会话的正在合并标记 self._consolidating.discard(session.key) # 清理该会话的合并锁未锁定则移除 self._prune_consolidation_lock(session.key, lock) # 清空当前会话的所有消息 session.clear() # 保存清空后的会话 self.sessions.save(session) # 使会话缓存失效确保下次获取最新状态 self.sessions.invalidate(session.key) # 返回新建会话成功的提示 return OutboundMessage(channelmsg.channel, chat_idmsg.chat_id, contentNew session started.) # 处理/help命令返回可用命令列表 if cmd /help: return OutboundMessage(channelmsg.channel, chat_idmsg.chat_id, content nanobot commands:\n/new — Start a new conversation\n/stop — Stop the current task\n/help — Show available commands) # 计算会话中未合并的消息数量总消息数 - 上次合并位置 unconsolidated len(session.messages) - session.last_consolidated # 如果未合并消息数≥内存窗口且会话未在合并中异步执行记忆合并 if (unconsolidated self.memory_window and session.key not in self._consolidating): # 标记会话为正在合并 self._consolidating.add(session.key) # 获取该会话的合并锁 lock self._get_consolidation_lock(session.key) # 定义异步函数合并记忆并解锁 async def _consolidate_and_unlock(): try: # 加锁执行记忆合并 async with lock: await self._consolidate_memory(session) finally: # 取消正在合并标记 self._consolidating.discard(session.key) # 清理合并锁 self._prune_consolidation_lock(session.key, lock) # 获取当前任务对象 _task asyncio.current_task() # 从合并任务集合中移除当前任务避免内存泄漏 if _task is not None: self._consolidation_tasks.discard(_task) # 创建异步任务执行合并操作 _task asyncio.create_task(_consolidate_and_unlock()) # 将任务加入合并任务集合强引用防止被GC回收 self._consolidation_tasks.add(_task) # 为工具设置上下文渠道、聊天ID、消息ID self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get(message_id)) # 获取消息工具实例如果存在 if message_tool : self.tools.get(message): # 验证工具类型并标记本轮对话开始 if isinstance(message_tool, MessageTool): message_tool.start_turn() # 从会话中获取历史消息最多memory_window条 history session.get_history(max_messagesself.memory_window) # 构建LLM初始上下文消息历史当前消息媒体渠道信息 initial_messages self.context.build_messages( historyhistory, current_messagemsg.content, mediamsg.media if msg.media else None, # 处理带媒体的消息如图片 channelmsg.channel, chat_idmsg.chat_id, ) # 定义进度回调函数向消息总线发布处理进度支持工具调用提示标记 async def _bus_progress(content: str, *, tool_hint: bool False) - None: # 复制消息元数据避免修改原数据 meta dict(msg.metadata or {}) # 标记为进度消息 meta[_progress] True # 标记是否为工具调用提示 meta[_tool_hint] tool_hint # 发布进度消息到消息总线 await self.bus.publish_outbound(OutboundMessage( channelmsg.channel, chat_idmsg.chat_id, contentcontent, metadatameta, )) # 运行代理核心循环传入初始上下文和进度回调优先使用传入的on_progress final_content, _, all_msgs await self._run_agent_loop( initial_messages, on_progresson_progress or _bus_progress, ) # 兜底如果最终内容为空设置默认提示语 if final_content is None: final_content Ive completed processing but have no response to give. # 截取回复内容预览超过120字符则截断加省略号 preview final_content[:120] ... if len(final_content) 120 else final_content # 记录日志返回给指定渠道/发送者的回复展示预览 logger.info(Response to {}:{}: {}, msg.channel, msg.sender_id, preview) # 保存本轮对话到会话跳过已存在的历史消息 self._save_turn(session, all_msgs, 1 len(history)) # 持久化更新后的会话 self.sessions.save(session) # 检查消息工具如果本轮对话中已通过消息工具发送过消息则返回None避免重复回复 if message_tool : self.tools.get(message): if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn: return None # 返回最终的出站消息包含回复内容、渠道、聊天ID和元数据 return OutboundMessage( channelmsg.channel, chat_idmsg.chat_id, contentfinal_content, metadatamsg.metadata or {}, )2.7 _run_agent_loop()核心逻辑该函数是智能体的核心执行循环通过不断调用大模型并根据响应决定是否调用工具直到模型返回最终回答或达到最大迭代次数。关键分支分支 1有工具调用记录工具调用、执行工具、将工具结果加入对话上下文继续循环。分支 2无工具调用将模型回答作为最终结果终止循环。边界处理当达到最大迭代次数仍未得到最终回答时会生成提示文本并记录警告日志保证函数有明确的返回值。async def _run_agent_loop( self, initial_messages: list[dict], on_progress: Callable[..., Awaitable[None]] | None None, ) - tuple[str | None, list[str], list[dict]]: Run the agent iteration loop. Returns (final_content, tools_used, messages). # 初始化对话消息列表将传入的初始上下文历史当前消息赋值给循环变量后续持续更新 messages initial_messages # 初始化迭代计数器用于控制最大循环次数避免无限迭代导致死循环 iteration 0 # 初始化最终回复内容存储LLM最终无工具调用时的直接回复内容初始为None final_content None # 初始化工具使用列表记录本次循环中调用过的所有工具名称用于后续统计/日志 tools_used: list[str] [] # 核心循环迭代执行「LLM调用→工具执行」逻辑直到达到最大迭代次数 while iteration self.max_iterations: # 迭代次数自增每次循环先计数再执行核心逻辑 iteration 1 # 调用大模型提供商的聊天接口获取模型响应 # 参数说明 # - messages: 当前完整的对话上下文 # - tools: 可供模型调用的工具定义列表 # - model: 指定使用的大模型名称 # - temperature: 模型生成的随机性参数 # - max_tokens: 模型生成的最大令牌数 response await self.provider.chat( messagesmessages, toolsself.tools.get_definitions(), modelself.model, temperatureself.temperature, max_tokensself.max_tokens, ) # 判断模型响应是否包含工具调用指令 if response.has_tool_calls: # 如果传入了进度回调函数则执行进度通知 if on_progress: # 清理模型响应内容移除思考过程等辅助文本 clean self._strip_think(response.content) # 如果清理后有有效内容则调用进度回调 if clean: await on_progress(clean) # 调用进度回调传递工具调用提示信息并标记为tool_hint类型 await on_progress(self._tool_hint(response.tool_calls), tool_hintTrue) # 将模型返回的工具调用对象转换为标准格式的字典列表 tool_call_dicts [ { id: tc.id, # 工具调用的唯一标识ID type: function, # 工具调用类型固定为function function: { name: tc.name, # 要调用的工具名称 # 将工具调用参数转换为JSON字符串确保非ASCII字符不转义 arguments: json.dumps(tc.arguments, ensure_asciiFalse) } } for tc in response.tool_calls # 遍历所有工具调用指令 ] # 将模型的响应包含工具调用添加到对话消息列表中 # 同时记录推理过程内容reasoning_content messages self.context.add_assistant_message( messages, response.content, tool_call_dicts, reasoning_contentresponse.reasoning_content, ) # 遍历每一个工具调用指令执行具体的工具调用逻辑 for tool_call in response.tool_calls: # 记录本次调用的工具名称到工具使用列表 tools_used.append(tool_call.name) # 将工具参数转换为JSON字符串截取前200字符避免日志过长 args_str json.dumps(tool_call.arguments, ensure_asciiFalse) # 记录工具调用日志包含工具名称和参数截断显示 logger.info(Tool call: {}({}), tool_call.name, args_str[:200]) # 执行工具调用获取工具返回结果异步执行 result await self.tools.execute(tool_call.name, tool_call.arguments) # 将工具调用的结果添加到对话消息列表中关联对应的工具调用ID messages self.context.add_tool_result( messages, tool_call.id, tool_call.name, result ) # 如果模型响应不包含工具调用直接返回最终回答 else: # 清理模型响应内容移除思考过程等辅助文本 clean self._strip_think(response.content) # 将模型的最终回答添加到对话消息列表中 messages self.context.add_assistant_message( messages, clean, reasoning_contentresponse.reasoning_content, ) # 将清理后的内容赋值给最终返回内容 final_content clean # 跳出循环结束智能体迭代 break # 处理循环结束但未得到最终回答的情况达到最大迭代次数 if final_content is None and iteration self.max_iterations: # 记录警告日志提示达到最大迭代次数 logger.warning(Max iterations ({}) reached, self.max_iterations) # 生成默认的提示文本告知用户达到最大迭代次数 final_content ( fI reached the maximum number of tool call iterations ({self.max_iterations}) without completing the task. You can try breaking the task into smaller steps. ) # 返回最终结果最终回答内容、使用过的工具列表、完整的对话消息列表 return final_content, tools_used, messages