目 录今天做了什么系统分层补异常处理和日志遇到的问题当前理解今天做了什么系统分层今天的工作是给系统分层以前我在写系统的时候更多的关注于系统功能但是这样在工程项目里面是不对的一个系统不光要有功能还要有系统层次让系统好维护。所以今天的主要工作是给咱们的系统做个升级增加一个编排层。讲到这里我需要讲讲什么是编排层编排层是一个系统将自身复杂的内部操作封装对外暴露出仅一个接口的这样一个模块。这样做最直接的好处是功能内部逻辑和上层接口解开实现系统的模块化这样系统功能容易插拔比较日后的换功能和系统维护升级。我在网上找到了一个关于编排层的解释案例觉得能很好的帮助理解编排层的作用从下面的案例可以看出来让顾客去操作底层业务逻辑比如指挥厨师1或厨师2完成某项工作是很不合适的所以比较合理的方式是将上层请求与底层业务逻辑分开加上编排层那么系统分层后日后维护例如换厨师那么就不需要修改大量的调用代码了。项目像一家餐厅厨房 1RAG 检索厨房 2time tool厨房 3calculator tool调度流程LangGraph顾客API / 前端 / Django那“编排层入口”就像前台接单员。前面咱们在main.py里面写build_agent_graph(...)然后再graph.invoke(...)这样系统能跑是没错但是等于是FastAPI的上层请求直接调用底层逻辑系统没有分层的分层后的系统对于上传调用只留有一个接口最佳状态是用户传入一个问题和对话ID后系统会编排层会直接给出一个最终结果。下面就来创建编排层首先需要在app/graph/目录下创建一个workflow.py文件用写我们的编排层逻辑fromapp.graph.builderimportbuild_agent_graphclassAgentWorkflow:def__init__(self,tools,ragNone):self.toolstools self.ragrag self.graphbuild_agent_graph(tools,ragrag)definvoke(self,session_id:str,query:str,chat_historyNone):ifchat_historyisNone:chat_history[]state{session_id:session_id,query:query,chat_history:chat_history,}resultself.graph.invoke(state)returnresult[final_answer]最后需要改main.py文件fromfastapiimportFastAPI,HTTPExceptionfrompydanticimportBaseModelfromapp.configimportDATA_DIRfromapp.data_loaderimportload_pdfs,process_documentsfromapp.rag_systemimportRAGSystemfromapp.toolsimportTOOLSfromapp.session_managerimportSessionManagerfromapp.logger_configimportsetup_loggerfromapp.graph.workflowimportAgentWorkflow loggersetup_logger()appFastAPI()session_managerSessionManager(max_turns3)ragNoneworkflowNoneclassQueryRequest(BaseModel):session_id:strquestion:strapp.on_event(startup)defstartup_event():globalrag,workflow logger.info(Loading RAG system...)docsload_pdfs(DATA_DIR)logger.info(fdocs数量:{len(docs)})chunksprocess_documents(docs)logger.info(fchunks数量:{len(chunks)})ragRAGSystem(chunks)rag.build_index()workflowAgentWorkflow(TOOLS,ragrag)logger.info(RAG LangGraph ready!)app.post(/ask)defask_question(req:QueryRequest):try:historysession_manager.get_history(req.session_id)answerworkflow.invoke(session_idreq.session_id,queryreq.question,chat_historyhistory)session_manager.append_turn(req.session_id,req.question,answer)return{session_id:req.session_id,question:req.question,answer:answer}exceptExceptionase:logger.exception(Error occurred in /ask)raiseHTTPException(status_code500,detailstr(e))app.post(/clear/{session_id})defclear_session(session_id:str):session_manager.clear_session(session_id)return{session_id:session_id,message:session cleared}补异常处理和日志这里没有什么多说的咱们需要给现有的node.py补充异常处理也就是加try/except和日志importjsonfromtypingimportAnyfromapp.graph.stateimportAgentStatefromapp.llm_utilsimportclientfromapp.configimportCHAT_MODELfromapp.logger_configimportsetup_logger loggersetup_logger()defbuild_choose_tool_node(tools:list[dict[str,Any]]):defchoose_tool_node(state:AgentState)-AgentState:querystate[query]logger.info(f[choose_tool_node] query:{query})tool_desc\n.join([f{t[name]}:{t[description]}fortintools])promptf You are an AI agent. Available tools:{tool_desc}User question:{query}Return JSON: {{tool: ..., input: ...}} contenttry:responseclient.chat.completions.create(modelCHAT_MODEL,messages[{role:user,content:prompt}])contentresponse.choices[0].message.content decisionjson.loads(content)logger.info(f[choose_tool_node] decision:{decision})return{decision:decision}exceptExceptionase:logger.exception(choose_tool_node failed)return{decision:{tool:llm,input:query},error:fchoose_tool_node failed:{str(e)}}returnchoose_tool_nodedefbuild_execute_tool_node(tools:list[dict[str,Any]],ragNone):defexecute_tool_node(state:AgentState)-AgentState:try:decisionstate[decision]chat_historystate.get(chat_history,[])tool_namedecision[tool]tool_inputdecision[input]logger.info(f[execute_tool_node] tool_name:{tool_name}, tool_input:{tool_input})fortintools:ift[name]tool_name:iftool_namerag:resultt[func](tool_input,rag,chat_historychat_history)eliftool_namellm:resultt[func](tool_input,chat_historychat_history)else:resultt[func](tool_input)logger.info(f[execute_tool_node] tool_output:{result})return{tool_result:{tool_name:tool_name,tool_input:tool_input,tool_output:result}}logger.warning(f[execute_tool_node] tool not found:{tool_name})return{tool_result:{tool_name:none,tool_input:tool_input,tool_output:No valid tool found.},error:fTool not found:{tool_name}}exceptExceptionase:logger.exception(execute_tool_node failed)return{tool_result:{tool_name:error,tool_input:,tool_output:Tool execution failed.},error:fexecute_tool_node failed:{str(e)}}returnexecute_tool_nodedefgenerate_answer_node(state:AgentState)-AgentState:ifstate.get(error):logger.warning(f[generate_answer_node] error found in state:{state[error]})return{final_answer:f系统执行过程中出现问题{state[error]}}tool_resultstate[tool_result]logger.info(f[generate_answer_node] final_answer:{tool_result[tool_output]})return{final_answer:str(tool_result[tool_output])}遇到的问题在做编排层的时候其实我有三个点是有疑惑的1、首先是为什么需要LangGraph2、定义的Graph是如何接State的3、Node的返回结果总是给人一种割裂的感觉但是又说不清是什么当前理解1、为什么用LangGraph其实现在的理解是为了扩展方便以前做手写Agent的时候选工具、执行再到给返回结果这样的工作流其实是写死的但是用了LangGraph后这样工作流变了变得更灵活了每一个对数据的操作都是一个节点那么我们可以创建很多的节点然后补断增加边来建立新的工作流其实现在想LangGraph的意义在于可以从工作流角度设计Agent相当于站在一个高处设计业务手写Agent系统每次都盯着具体的函数和变量怎么传递其实业务少还好如果业务多了在一堆函数之间来回调用其实设计会非常繁琐和烧脑因为你需要想各种变量怎么传但是有了LangGraph你只需要又一个状态图State然后补断设计业务节点怎么串起来就好。2、后来查了一些资料似乎我们在定义图的时候这就相当于建立了一个工作流水线虽然创建流水线的代码没有明确定义传入state参数defbuild_agent_graph(tools,ragNone):graph_builderStateGraph(AgentState)...但是graph_builder StateGraph(AgentState)这句话描述了传入图数据的样子当使用invoke()函数时这个函数做了封装会自动将state作为参数传给node函数。3、关于Node的返回值后来看node.py我有注意到node的返回值不是某个参数node的返回值在LangGraph这个框架下是对State的修改操作这里有个坑他跟一般理解上的函数返回一个值不一样他是返回一个修改操作。如果这篇文章对你有帮助可以点个赞完整代码地址https://github.com/1186141415/LangChain-for-A-Paper-Rag-Agent