deer-flow 介绍 简介 DeerFlow 是一个社区驱动的开源深度研究框架,旨在通过多智能体协作,将大型语言模型(LLM)与专业工具(如网络搜索、网页爬虫、Python 代码执行等)结合,提升自动化研究和内容创作的效率。它基于 LangChain 和 LangGraph 构建,支持模块化的多智能体架构,强调“人在回路”(human-in-the-loop)的人机协作体验,允许用户随时干预和调整研究计划。
核心功能 多智能体架构:
系统基于 LangGraph 实现,通过有向图协调多个专门的 AI 智能体:
研究者(Researcher) :负责网络搜索、爬虫和信息收集,支持 Tavily、DuckDuckGo、Brave Search 和 Arxiv 等搜索工具。
编码者(Coder) :处理代码分析、执行和数据处理,使用 Python REPL 工具。
报告者(Reporter) :整合研究成果,生成报告、幻灯片、播客脚本等。
智能体通过消息传递系统协作,工作流灵活且可扩展。
工具集成 :
网络搜索与爬虫 :支持实时知识获取和数据聚合,特别适合学术研究(例如 Arxiv 搜索)。
Python REPL :支持数据处理、统计分析和代码生成。
MCP 集成 :与 ByteDance 的内部模型控制平台(Model Control Platform)无缝协作,提升企业级自动化。
文本转语音(TTS) :通过 volcengine TTS API 将研究报告转为高质量音频,支持语速、音量和音调定制。
多模态输出 :生成文本报告、幻灯片、播客脚本甚至可视化内容。
技术架构
LangGraph :用于工作流编排,确保智能体间的协作高效且可控。
LangChain :支持 LLM 的推理和记忆管理,增强智能体的交互能力。
模块化设计 :各智能体和工具可独立扩展,适合定制化开发。
Web 界面 :使用 Next.js 和 Zustand 管理状态,通过流式 API 与后端实时交互。
项目结构介绍 main.py 基于 Python,用于启动项目的命令行界面(CLI)或交互式模式。使用 argparse 处理命令行参数,InquirerPy 提供交互式用户界面,asyncio 支持异步运行智能体工作流。
运行模式:
命令行模式 :用户通过命令行直接输入查询(如 python entry.py “我的问题”)。
交互模式 :通过 –interactive 标志启动,用户可选择语言(英文/中文)、预定义问题或自定义问题。
核心功能
解析命令行参数,设置运行配置(如调试模式、最大计划迭代次数等)。
提供交互式界面,引导用户选择或输入问题。
调用异步工作流函数 run_agent_workflow_async,执行多智能体协作任务。
ask 函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def ask( question, debug=False, max_plan_iterations=1, max_step_num=3, enable_background_investigation=True, ): """Run the agent workflow with the given question.""" asyncio.run( run_agent_workflow_async( user_input=question, debug=debug, max_plan_iterations=max_plan_iterations, max_step_num=max_step_num, enable_background_investigation=enable_background_investigation, ) )
作用:核心执行函数,将用户的问题传递给异步工作流 run_agent_workflow_async。
server.py 用于运行一个基于 FastAPI 的 Web 服务。它使用 uvicorn 作为 ASGI 服务器,处理 HTTP 请求,并通过命令行参数配置服务器行为。
核心功能 :
配置日志系统,记录服务器运行信息。
解析命令行参数,设置服务器的主机、端口、日志级别和自动重载等。
使用 uvicorn 启动 FastAPI 应用(src.server:app),运行 DeerFlow 的 API 服务。
运行方式 :通过命令行运行(如 python server.py –host 0.0.0.0 –port 8080),支持开发和生产环境的灵活配置。
启动服务器
1 2 3 4 5 6 7 8 logger.info("Starting DeerFlow API server") uvicorn.run( "src.server:app", host=args.host, port=args.port, reload=reload, log_level=args.log_level, )
src.server:app:指定 FastAPI 应用,位于 src/server.py 模块中的 app 对象(通常是 fastapi.FastAPI 实例)。
src/server.py 定义了一个基于 FastAPI 的 API 服务(src/server.py 中的核心部分),用于处理客户端的聊天流请求。它通过 Server-Sent Events (SSE) 提供实时流式响应,支持多智能体工作流(如研究、编码、报告生成),并与 LangGraph 集成以管理状态和消息流。
核心功能 :
初始化 FastAPI 应用,配置 CORS 中间件以支持跨域请求。
定义如/api/chat/stream 的POST 路由端点,接收聊天请求,生成流式响应。
使用 LangGraph(graph)管理多智能体工作流,支持消息传递、工具调用和中断处理。
支持实时事件流(SSE),返回消息片段、工具调用结果或中断提示。
/api/chat/stream 端点,处理客户端的聊天请求,触发多智能体工作流,并以流式方式返回响应。
/api/tts:将文本转换为语音,使用火山引擎(Volcengine)TTS API。
/api/podcast/generate:生成播客音频,基于输入内容。
/api/ppt/generate:生成 PPT 文件,基于输入内容。
/api/prose/generate:生成散文内容,支持流式响应。
/api/mcp/server/metadata:查询 MCP 服务器的元数据,如工具列表。
Work Flow详解 DeerFlow 中的 LangGraph 工作流由 8 个主要节点组成,每个节点在代码库中以函数形式实现。该函数处理当前状态并返回更新的状态或转换到另一个节点的命令。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def _build_base_graph(): """Build and return the base state graph with all nodes and edges.""" builder = StateGraph(State) builder.add_edge(START, "coordinator") # 协调员,确定查询是否应由规划器直接处理或需要背景调查 builder.add_node("coordinator", coordinator_node) # 背景调查人员,进行初步的网络搜索,为规划师收集背景信息 builder.add_node("background_investigator", background_investigation_node) # 规划师,创建包含步骤的结构化研究计划,是否需要更多研究或现有背景是否足够 builder.add_node("planner", planner_node) builder.add_node("reporter", reporter_node) # 研究团队 builder.add_node("research_team", research_team_node) # 研究员 builder.add_node("researcher", researcher_node) # 编码 builder.add_node("coder", coder_node) # 人工反馈,允许用户审查和修改研究计划 builder.add_node("human_feedback", human_feedback_node) # 汇报 builder.add_edge("reporter", END) return builder
协调器节点 coordinator_node
是工作流的入口点,负责分析用户查询并确定下一步的操作。
协调器节点:
分析用户的查询
确定查询是否需要规划或是否具有足够的上下文
检测用户的语言区域
决定是否在规划前进行背景调查
背景调查节点 background_investigation_node
对用户的查询进行初步研究,为规划提供背景。
背景调查节点:
使用该LoggedTavilySearch
工具搜索与用户查询相关的信息
处理并格式化搜索结果
更新背景调查结果
将工作流引导至规划器节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def background_investigation_node(state: State) -> Command[Literal["planner"]]: logger.info("background investigation node is running.") # 列表中最后一条消息的内容作为用户的查询字符串。 query = state["messages"][-1].content # 搜索引擎选择执行搜索 if SELECTED_SEARCH_ENGINE == SearchEngine.TAVILY: searched_content = LoggedTavilySearch(max_results=SEARCH_MAX_RESULTS).invoke( {"query": query} ) background_investigation_results = None if isinstance(searched_content, list): background_investigation_results = [ {"title": elem["title"], "content": elem["content"]} for elem in searched_content ] else: logger.error( f"Tavily search returned malformed response: {searched_content}" ) else: background_investigation_results = web_search_tool.invoke(query) # 搜索结果(转成 JSON 字符串)保存在 background_investigation_results # 跳转到下一节点 "planner",继续流程执行 return Command( update={ "background_investigation_results": json.dumps( background_investigation_results, ensure_ascii=False ) }, goto="planner", )
规划器节点 planner_node
根据用户查询和可用上下文生成结构化的研究计划。
规划器节点:
使用提示模板和 LLM 生成研究计划
将计划构建为一系列步骤
确定该计划是否需要人工审核或是否具有足够的背景信息
使用生成的计划更新状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 def planner_node( state: State, config: RunnableConfig ) -> Command[Literal["human_feedback", "reporter"]]: """Planner node that generate the full plan.""" logger.info("Planner generating full plan") # 获取允许的最大 plan 次数 configurable = Configuration.from_runnable_config(config) plan_iterations = state["plan_iterations"] if state.get("plan_iterations", 0) else 0 # 生成 prompt 模板 messages = apply_prompt_template("planner", state, configurable) # 加入背景调查结果 if ( plan_iterations == 0 and state.get("enable_background_investigation") and state.get("background_investigation_results") ): messages += [ { "role": "user", "content": ( "background investigation results of user query:\n" + state["background_investigation_results"] + "\n" ), } ] # 初始化 Planner 使用的 LLM if AGENT_LLM_MAP["planner"] == "basic": llm = get_llm_by_type(AGENT_LLM_MAP["planner"]).with_structured_output( Plan, method="json_mode", ) else: llm = get_llm_by_type(AGENT_LLM_MAP["planner"]) # if the plan iterations is greater than the max plan iterations, return the reporter node # 超过最大计划迭代次数,直接跳转 reporter if plan_iterations >= configurable.max_plan_iterations: return Command(goto="reporter") # 调用语言模型生成计划 full_response = "" if AGENT_LLM_MAP["planner"] == "basic": response = llm.invoke(messages) full_response = response.model_dump_json(indent=4, exclude_none=True) else: response = llm.stream(messages) for chunk in response: full_response += chunk.content logger.debug(f"Current state messages: {state['messages']}") logger.info(f"Planner response: {full_response}") # 尝试解析计划 JSON 内容 try: curr_plan = json.loads(repair_json_output(full_response)) except json.JSONDecodeError: logger.warning("Planner response is not a valid JSON") if plan_iterations > 0: return Command(goto="reporter") else: return Command(goto="__end__") # 判断计划是否足够有上下文,决定下一步跳转 if curr_plan.get("has_enough_context"): logger.info("Planner response has enough context.") new_plan = Plan.model_validate(curr_plan) return Command( update={ "messages": [AIMessage(content=full_response, name="planner")], "current_plan": new_plan, }, goto="reporter", ) return Command( update={ "messages": [AIMessage(content=full_response, name="planner")], "current_plan": full_response, }, goto="human_feedback", )
人工反馈节点 human_feedback_node
允许人工审查和批准生成的研究计划。
人工反馈节点:
检查 auto_accepted_plan 是否启用
如果没有,则中断工作流程以提交计划以供审查
处理人工反馈以编辑计划或接受计划
根据反馈和计划质量,路由到合适的下一个节点
研究团队节点 research_team_node
充当研究执行的协调者,将任务委托给研究人员或编码员节点。
研究团队节点:
分析当前的研究计划
标识下一个未执行的步骤
确定该步骤是研究任务还是编码任务
前往相应专家节点的路线
研究员节点 researcher_node
使用网络搜索和爬网工具执行研究步骤。
研究员节点:
使用网络搜索和爬网工具收集信息
处理和格式化研究结果
通过研究观察更新状态
将控制权交还给研究团队节点
编码器节点 coder_node
使用 Python REPL 执行与代码相关的任务。
编码器节点:
使用 Python REPL 执行代码
分析代码执行结果
使用编码观察更新状态
将控制权交还给研究团队节点
报告节点 reporter_node
根据所有收集到的信息生成最终的研究报告。
汇编所有研究观察结果
用要点、概述和分析来格式化研究报告
包含所有来源的正确引用
使用最终报告更新状态
MCP 执行 MCP服务器的调用发生在编码器节点和研究员节点的使用过程中,两者都调用函数_setup_and_execute_agent_step
实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 async def _setup_and_execute_agent_step( state: State, config: RunnableConfig, agent_type: str, default_agent, default_tools: list, ) -> Command[Literal["research_team"]]: """Helper function to set up an agent with appropriate tools and execute a step. This function handles the common logic for both researcher_node and coder_node: 1. Configures MCP servers and tools based on agent type 2. Creates an agent with the appropriate tools or uses the default agent 3. Executes the agent on the current step Args: state: The current state config: The runnable config agent_type: The type of agent ("researcher" or "coder") default_agent: The default agent to use if no MCP servers are configured default_tools: The default tools to add to the agent Returns: Command to update state and go to research_team """ configurable = Configuration.from_runnable_config(config) mcp_servers = {} enabled_tools = {} # Extract MCP server configuration for this agent type if configurable.mcp_settings: for server_name, server_config in configurable.mcp_settings["servers"].items(): if ( server_config["enabled_tools"] and agent_type in server_config["add_to_agents"] ): mcp_servers[server_name] = { k: v for k, v in server_config.items() if k in ("transport", "command", "args", "url", "env") } for tool_name in server_config["enabled_tools"]: enabled_tools[tool_name] = server_name # Create and execute agent with MCP tools if available if mcp_servers: try: async with MultiServerMCPClient(mcp_servers) as client: loaded_tools = default_tools[:] for tool in client.get_tools(): if tool.name in enabled_tools: tool.description = ( f"Powered by '{enabled_tools[tool.name]}'.\n{tool.description}" ) loaded_tools.append(tool) agent = create_agent(agent_type, agent_type, loaded_tools, agent_type) return await _execute_agent_step(state, agent, agent_type) except Exception as e: logger.exception(f"[{agent_type}] MCP agent setup or execution failed: {repr(e)}") raise ToolException(f"Error: {str(e) or repr(e)}\nTraceback:\n{traceback.format_exc()}") else: # Use default agent if no MCP servers are configured return await _execute_agent_step(state, default_agent, agent_type)