From ea68121853eac7182da40f046682fc202aef8439 Mon Sep 17 00:00:00 2001 From: zxstty Date: Tue, 5 Aug 2025 11:14:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84Agent=20=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/executor/agent.py | 389 +++++++++++++++++++++++++- apps/scheduler/mcp/select.py | 6 - apps/scheduler/mcp_agent/host.py | 12 +- apps/scheduler/mcp_agent/plan.py | 220 ++++++++++----- apps/scheduler/mcp_agent/prompt.py | 340 ++++++++++++++-------- apps/scheduler/mcp_agent/select.py | 240 ++++++---------- apps/scheduler/scheduler/scheduler.py | 56 +++- apps/schemas/enum_var.py | 6 +- apps/schemas/mcp.py | 27 ++ apps/schemas/message.py | 4 +- apps/schemas/pool.py | 3 + apps/schemas/task.py | 6 +- 12 files changed, 933 insertions(+), 376 deletions(-) diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 603ea65b6..4db38587c 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -2,21 +2,34 @@ """MCP Agent执行器""" import logging - +import uuid from pydantic import Field - +from typing import Any +from apps.llm.patterns.rewrite import QuestionRewrite from apps.llm.reasoning import ReasoningLLM from apps.scheduler.executor.base import BaseExecutor from apps.schemas.enum_var import EventType, SpecialCallType, FlowStatus, StepStatus from apps.scheduler.mcp_agent.host import MCPHost from apps.scheduler.mcp_agent.plan import MCPPlanner +from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID, MCPSelector from apps.scheduler.pool.mcp.client import MCPClient -from apps.schemas.mcp import MCPCollection, MCPTool -from apps.schemas.task import ExecutorState, StepQueueItem +from apps.schemas.mcp import ( + GoalEvaluationResult, + RestartStepIndex, + ToolRisk, + ErrorType, + ToolExcutionErrorType, + MCPPlan, + MCPCollection, + MCPTool +) +from apps.schemas.task import ExecutorState, FlowStepHistory, StepQueueItem from apps.schemas.message import param from apps.services.task import TaskManager from apps.services.appcenter import AppCenterManager from apps.services.mcp_service import MCPServiceManager +from apps.services.task import TaskManager +from apps.services.user import UserManager logger = logging.getLogger(__name__) @@ -31,7 +44,9 @@ class MCPAgentExecutor(BaseExecutor): mcp_client: dict[str, MCPClient] = Field( description="MCP客户端列表,key为mcp_id", default={} ) - tool_list: list[MCPTool] = Field(description="MCP工具列表", default=[]) + tools: dict[str, MCPTool] = Field( + description="MCP工具列表,key为tool_id", default={} + ) params: param | None = Field( default=None, description="流执行过程中的参数补充", alias="params" ) @@ -65,10 +80,372 @@ class MCPAgentExecutor(BaseExecutor): self.mcp_list.append(mcp_service) self.mcp_client[mcp_id] = await MCPHost.get_client(self.task.ids.user_sub, mcp_id) - self.tool_list.extend(mcp_service.tools) + for tool in mcp_service.tools: + self.tools[tool.id] = tool + + async def plan(self, is_replan: bool = False, start_index: int | None = None) -> None: + if is_replan: + error_message = "之前的计划遇到以下报错\n\n"+self.task.state.error_message + else: + error_message = "初始化计划" + tools = MCPSelector.select_top_tool( + self.task.runtime.question, list(self.tools.values()), + additional_info=error_message, top_n=40) + if is_replan: + logger.info("[MCPAgentExecutor] 重新规划流程") + if not start_index: + start_index = await MCPPlanner.get_replan_start_step_index(self.task.runtime.question, + self.task.state.error_message, + self.task.runtime.temporary_plans, + self.resoning_llm) + current_plan = self.task.runtime.temporary_plans.plans[start_index:] + error_message = self.task.state.error_message + temporary_plans = await MCPPlanner.create_plan(self.task.runtime.question, + is_replan=is_replan, + error_message=error_message, + current_plan=current_plan, + tool_list=tools, + max_steps=self.max_steps-start_index-1, + reasoning_llm=self.resoning_llm + ) + self.msg_queue.push_output( + self.task, + EventType.STEP_CANCEL, + data={} + ) + if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: + self.task.context[-1].step_status = StepStatus.CANCELLED + self.task.runtime.temporary_plans = self.task.runtime.temporary_plans.plans[:start_index] + temporary_plans.plans + self.task.state.step_index = start_index + else: + start_index = 0 + self.task.runtime.temporary_plans = await MCPPlanner.create_plan(self.task.runtime.question, tool_list=tools, max_steps=self.max_steps, reasoning_llm=self.resoning_llm) + for i in range(start_index, len(self.task.runtime.temporary_plans.plans)): + self.task.runtime.temporary_plans.plans[i].step_id = str(uuid.uuid4()) + + async def get_tool_input_param(self, is_first: bool) -> dict[str, Any]: + if is_first: + # 获取第一个输入参数 + self.task.state.current_input = await MCPHost._get_first_input_params(self.tools[self.task.state.step_id], self.task.runtime.question, self.task) + else: + # 获取后续输入参数 + if isinstance(self.params, param): + params = self.params.content + params_description = self.params.description + else: + params = {} + params_description = "" + self.task.state.current_input = await MCPHost._fill_params(self.tools[self.task.state.step_id], self.task.state.current_input, self.task.state.error_message, params, params_description) + + async def reset_step_to_index(self, start_index: int) -> None: + """重置步骤到开始""" + logger.info("[MCPAgentExecutor] 重置步骤到索引 %d", start_index) + if self.task.runtime.temporary_plans: + self.task.state.flow_status = FlowStatus.RUNNING + self.task.state.step_id = self.task.runtime.temporary_plans.plans[start_index].step_id + self.task.state.step_index = 0 + self.task.state.step_name = self.task.runtime.temporary_plans.plans[start_index].tool + self.task.state.step_description = self.task.runtime.temporary_plans.plans[start_index].content + self.task.state.step_status = StepStatus.RUNNING + self.task.state.retry_times = 0 + else: + self.task.state.flow_status = FlowStatus.SUCCESS + self.task.state.step_id = FINAL_TOOL_ID + + async def confirm_before_step(self) -> None: + logger.info("[MCPAgentExecutor] 等待用户确认步骤 %d", self.task.state.step_index) + # 发送确认消息 + confirm_message = await MCPPlanner.get_tool_risk(self.tools[self.task.state.step_id], self.task.state.current_input, "", self.resoning_llm) + self.msg_queue.push_output(self.task, EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( + exclude_none=True, by_alias=True)) + self.msg_queue.push_output(self.task, EventType.FLOW_STOP, {}) + self.task.state.flow_status = FlowStatus.WAITING + self.task.state.step_status = StepStatus.WAITING + self.task.context.append( + FlowStepHistory( + step_id=self.task.state.step_id, + step_name=self.task.state.step_name, + step_description=self.task.state.step_description, + step_status=self.task.state.step_status, + flow_id=self.task.state.flow_id, + flow_name=self.task.state.flow_name, + flow_status=self.task.state.flow_status, + input_data={}, + output_data={}, + ex_data=confirm_message.model_dump(exclude_none=True, by_alias=True), + ) + ) + + async def run_step(self): + """执行步骤""" + self.task.state.flow_status = FlowStatus.RUNNING + self.task.state.step_status = StepStatus.RUNNING + logger.info("[MCPAgentExecutor] 执行步骤 %d", self.task.state.step_index) + # 获取MCP客户端 + mcp_tool = self.tools[self.task.state.step_id] + mcp_client = self.mcp_client[mcp_tool.mcp_id] + if not mcp_client: + logger.error("[MCPAgentExecutor] MCP客户端未找到: %s", mcp_tool.mcp_id) + self.task.state.flow_status = FlowStatus.ERROR + error = "[MCPAgentExecutor] MCP客户端未找到: {}".format(mcp_tool.mcp_id) + raise Exception(error) + try: + output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input) + self.msg_queue.push_output( + self.task, + EventType.STEP_INPUT, + self.task.state.current_input + ) + self.msg_queue.push_output( + self.task, + EventType.STEP_OUTPUT, + output_params + ) + self.task.context.append( + FlowStepHistory( + step_id=self.task.state.step_id, + step_name=self.task.state.step_name, + step_description=self.task.state.step_description, + step_status=StepStatus.SUCCESS, + flow_id=self.task.state.flow_id, + flow_name=self.task.state.flow_name, + flow_status=self.task.state.flow_status, + input_data=self.task.state.current_input, + output_data=output_params, + ) + ) + self.task.state.step_status = StepStatus.SUCCESS + except Exception as e: + logging.warning("[MCPAgentExecutor] 执行步骤 %s 失败: %s", mcp_tool.name, str(e)) + import traceback + self.task.state.error_message = traceback.format_exc() + self.task.state.step_status = StepStatus.ERROR + + async def generate_params_with_null(self) -> None: + """生成参数补充""" + mcp_tool = self.tools[self.task.state.step_id] + params_with_null = await MCPPlanner.get_missing_param( + mcp_tool, + self.task.state.current_input, + self.task.state.error_message, + self.resoning_llm + ) + self.msg_queue.push_output( + self.task, + EventType.STEP_WAITING_FOR_PARAM, + data={ + "message": "当运行产生如下报错:\n" + self.task.state.error_message, + "params": params_with_null + } + ) + self.task.state.flow_status = FlowStatus.WAITING + self.task.state.step_status = StepStatus.PARAM + self.task.context.append( + FlowStepHistory( + step_id=self.task.state.step_id, + step_name=self.task.state.step_name, + step_description=self.task.state.step_description, + step_status=self.task.state.step_status, + flow_id=self.task.state.flow_id, + flow_name=self.task.state.flow_name, + flow_status=self.task.state.flow_status, + input_data={}, + output_data={}, + ex_data={ + "message": "当运行产生如下报错:\n" + self.task.state.error_message, + "params": params_with_null + } + ) + ) + + async def get_next_step(self) -> None: + self.task.state.step_index += 1 + if self.task.state.step_index < len(self.task.runtime.temporary_plans): + if self.task.runtime.temporary_plans.plans[self.task.state.step_index].step_id == FINAL_TOOL_ID: + # 最后一步 + self.task.state.flow_status = FlowStatus.SUCCESS + self.task.state.step_status = StepStatus.SUCCESS + self.msg_queue.push_output( + self.task, + EventType.FLOW_SUCCESS, + data={} + ) + return + self.task.state.step_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].step_id + self.task.state.step_name = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + self.task.state.step_description = self.task.runtime.temporary_plans.plans[self.task.state.step_index].content + self.task.state.step_status = StepStatus.INIT + self.task.state.current_input = {} + self.msg_queue.push_output( + self.task, + EventType.STEP_INIT, + data={} + ) + else: + # 没有下一步了,结束流程 + self.task.state.flow_status = FlowStatus.SUCCESS + self.task.state.step_status = StepStatus.SUCCESS + self.msg_queue.push_output( + self.task, + EventType.FLOW_SUCCESS, + data={} + ) + return + + async def error_handle_after_step(self) -> None: + """步骤执行失败后的错误处理""" + self.task.state.step_status = StepStatus.ERROR + self.task.state.flow_status = FlowStatus.ERROR + self.msg_queue.push_output( + self.task, + EventType.FLOW_FAILED, + data={} + ) + if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: + del self.task.context[-1] + self.task.context.append( + FlowStepHistory( + step_id=self.task.state.step_id, + step_name=self.task.state.step_name, + step_description=self.task.state.step_description, + step_status=self.task.state.step_status, + flow_id=self.task.state.flow_id, + flow_name=self.task.state.flow_name, + flow_status=self.task.state.flow_status, + input_data={}, + output_data={}, + ) + ) + + async def work(self) -> None: + """执行当前步骤""" + if self.task.state.step_status == StepStatus.INIT: + self.get_tool_input_param(is_first=True) + user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) + if not user_info.auto_execute: + # 等待用户确认 + await self.confirm_before_step() + return + self.step.state.step_status = StepStatus.RUNNING + elif self.task.state.step_status in [StepStatus.PARAM, StepStatus.WAITING, StepStatus.RUNNING]: + if self.task.context[-1].step_status == StepStatus.PARAM: + if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: + del self.task.context[-1] + elif self.task.state.step_status == StepStatus.WAITING: + if self.params.content: + if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: + del self.task.context[-1] + else: + self.task.state.flow_status = FlowStatus.CANCELLED + self.task.state.step_status = StepStatus.CANCELLED + self.msg_queue.push_output( + self.task, + EventType.STEP_CANCEL, + data={} + ) + self.msg_queue.push_output( + self.task, + EventType.FLOW_CANCEL, + data={} + ) + if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: + self.task.context[-1].step_status = StepStatus.CANCELLED + if self.task.state.step_status == StepStatus.PARAM: + self.get_tool_input_param(is_first=False) + max_retry = 5 + for i in range(max_retry): + if i != 0: + self.get_tool_input_param(is_first=False) + await self.run_step() + if self.task.state.step_status == StepStatus.SUCCESS: + break + elif self.task.state.step_status == StepStatus.ERROR: + # 错误处理 + if self.task.state.retry_times >= 3: + await self.error_handle_after_step() + else: + user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) + mcp_tool = self.tools[self.task.state.step_id] + error_type = await MCPPlanner.get_tool_execute_error_type( + self.task.runtime.question, + self.task.runtime.temporary_plans, + mcp_tool, + self.task.state.current_input, + self.task.state.error_message, + self.resoning_llm + ) + if error_type.type == ErrorType.DECORRECT_PLAN or user_info.auto_execute: + await self.plan(is_replan=True) + self.reset_step_to_index(self.task.state.step_index) + elif error_type.type == ErrorType.MISSING_PARAM: + await self.generate_params_with_null() + elif self.task.state.step_status == StepStatus.SUCCESS: + await self.get_next_step() + + async def summarize(self) -> None: + async for chunk in MCPPlanner.generate_answer( + self.task.runtime.question, + self.task.runtime.temporary_plans, + (await MCPHost.assemble_memory(self.task)), + self.resoning_llm + ): + self.msg_queue.push_output( + self.task, + EventType.TEXT_ADD, + data=chunk + ) + self.task.runtime.answer += chunk async def run(self) -> None: """执行MCP Agent的主逻辑""" # 初始化MCP服务 self.load_state() self.load_mcp() + if self.task.state.flow_status == FlowStatus.INIT: + # 初始化状态 + self.task.state.flow_id = str(uuid.uuid4()) + self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm) + self.task.runtime.temporary_plans = await self.plan(is_replan=False) + self.reset_step_to_index(0) + TaskManager.save_task(self.task.id, self.task) + self.task.state.flow_status = FlowStatus.RUNNING + self.msg_queue.push_output( + self.task, + EventType.FLOW_START, + data={} + ) + try: + while self.task.state.step_index < len(self.task.runtime.temporary_plans) and \ + self.task.state.flow_status == FlowStatus.RUNNING: + self.work() + TaskManager.save_task(self.task.id, self.task) + except Exception as e: + logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e)) + self.task.state.flow_status = FlowStatus.ERROR + self.task.state.error_message = str(e) + self.task.state.step_status = StepStatus.ERROR + self.msg_queue.push_output( + self.task, + EventType.STEP_ERROR, + data={} + ) + self.msg_queue.push_output( + self.task, + EventType.FLOW_FAILED, + data={} + ) + if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: + del self.task.context[-1] + self.task.context.append( + FlowStepHistory( + step_id=self.task.state.step_id, + step_name=self.task.state.step_name, + step_description=self.task.state.step_description, + step_status=self.task.state.step_status, + flow_id=self.task.state.flow_id, + flow_name=self.task.state.flow_name, + flow_status=self.task.state.flow_status, + input_data={}, + output_data={}, + ) + ) diff --git a/apps/scheduler/mcp/select.py b/apps/scheduler/mcp/select.py index 2ff503447..f3d6e0d4a 100644 --- a/apps/scheduler/mcp/select.py +++ b/apps/scheduler/mcp/select.py @@ -39,7 +39,6 @@ class MCPSelector: sql += f"'{mcp_id}', " return sql.rstrip(", ") + ")" - async def _get_top_mcp_by_embedding( self, query: str, @@ -72,7 +71,6 @@ class MCPSelector: }]) return llm_mcp_list - async def _get_mcp_by_llm( self, query: str, @@ -100,7 +98,6 @@ class MCPSelector: # 使用小模型提取JSON return await self._call_function_mcp(result, mcp_ids) - async def _call_reasoning(self, prompt: str) -> str: """调用大模型进行推理""" logger.info("[MCPHelper] 调用推理大模型") @@ -116,7 +113,6 @@ class MCPSelector: self.output_tokens += llm.output_tokens return result - async def _call_function_mcp(self, reasoning_result: str, mcp_ids: list[str]) -> MCPSelectResult: """调用结构化输出小模型提取JSON""" logger.info("[MCPHelper] 调用结构化输出小模型") @@ -136,7 +132,6 @@ class MCPSelector: raise return result - async def select_top_mcp( self, query: str, @@ -153,7 +148,6 @@ class MCPSelector: # 通过LLM选择最合适的 return await self._get_mcp_by_llm(query, llm_mcp_list, mcp_list) - @staticmethod async def select_top_tool(query: str, mcp_list: list[str], top_n: int = 10) -> list[MCPTool]: """选择最合适的工具""" diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index 3217f5393..ced175ef9 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -60,7 +60,7 @@ class MCPHost: context_list=task.context, ) - async def _get_first_input_params(schema: dict[str, Any], query: str) -> dict[str, Any]: + async def _get_first_input_params(mcp_tool: MCPTool, query: str, task: Task) -> dict[str, Any]: """填充工具参数""" # 更清晰的输入·指令,这样可以调用generate llm_query = rf""" @@ -74,13 +74,13 @@ class MCPHost: llm_query, [ {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": await MCPHost.assemble_memory()}, + {"role": "user", "content": await MCPHost.assemble_memory(task)}, ], - schema, + mcp_tool.input_schema, ) return await json_generator.generate() - async def _fill_params(mcp_tool: MCPTool, schema: dict[str, Any], + async def _fill_params(mcp_tool: MCPTool, current_input: dict[str, Any], error_message: str = "", params: dict[str, Any] = {}, params_description: str = "") -> dict[str, Any]: @@ -88,7 +88,7 @@ class MCPHost: prompt = _env.from_string(REPAIR_PARAMS).render( tool_name=mcp_tool.name, tool_description=mcp_tool.description, - input_schema=schema, + input_schema=mcp_tool.input_schema, current_input=current_input, error_message=error_message, params=params, @@ -101,7 +101,7 @@ class MCPHost: {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}, ], - schema, + mcp_tool.input_schema, ) return await json_generator.generate() diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index 13e7a98dc..91d293fbf 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -9,38 +9,36 @@ from apps.llm.function import JsonGenerator from apps.scheduler.mcp_agent.prompt import ( EVALUATE_GOAL, GENERATE_FLOW_NAME, + GET_REPLAN_START_STEP_INDEX, CREATE_PLAN, RECREATE_PLAN, RISK_EVALUATE, + TOOL_EXECUTE_ERROR_TYPE_ANALYSIS, GET_MISSING_PARAMS, FINAL_ANSWER ) from apps.schemas.mcp import ( GoalEvaluationResult, + RestartStepIndex, ToolRisk, + ToolExcutionErrorType, MCPPlan, MCPTool ) from apps.scheduler.slot.slot import Slot +_env = SandboxedEnvironment( + loader=BaseLoader, + autoescape=True, + trim_blocks=True, + lstrip_blocks=True, +) + class MCPPlanner: """MCP 用户目标拆解与规划""" - - def __init__(self, user_goal: str, resoning_llm: ReasoningLLM = None) -> None: - """初始化MCP规划器""" - self.user_goal = user_goal - self._env = SandboxedEnvironment( - loader=BaseLoader, - autoescape=True, - trim_blocks=True, - lstrip_blocks=True, - ) - self.resoning_llm = resoning_llm or ReasoningLLM() - self.input_tokens = 0 - self.output_tokens = 0 - - async def get_resoning_result(self, prompt: str) -> str: + @staticmethod + async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: """获取推理结果""" # 调用推理大模型 message = [ @@ -48,7 +46,7 @@ class MCPPlanner: {"role": "user", "content": prompt}, ] result = "" - async for chunk in self.resoning_llm.call( + async for chunk in resoning_llm.call( message, streaming=False, temperature=0.07, @@ -56,12 +54,10 @@ class MCPPlanner: ): result += chunk - # 保存token用量 - self.input_tokens += self.resoning_llm.input_tokens - self.output_tokens += self.resoning_llm.output_tokens return result - async def _parse_result(self, result: str, schema: dict[str, Any]) -> str: + @staticmethod + async def _parse_result(result: str, schema: dict[str, Any]) -> str: """解析推理结果""" json_generator = JsonGenerator( result, @@ -74,126 +70,210 @@ class MCPPlanner: json_result = await json_generator.generate() return json_result - async def evaluate_goal(self, tool_list: list[MCPTool]) -> GoalEvaluationResult: + @staticmethod + async def evaluate_goal( + tool_list: list[MCPTool], + resoning_llm: ReasoningLLM = ReasoningLLM()) -> GoalEvaluationResult: """评估用户目标的可行性""" # 获取推理结果 - result = await self._get_reasoning_evaluation(tool_list) + result = await MCPPlanner._get_reasoning_evaluation(tool_list, resoning_llm) # 解析为结构化数据 - evaluation = await self._parse_evaluation_result(result) + evaluation = await MCPPlanner._parse_evaluation_result(result) # 返回评估结果 return evaluation - async def _get_reasoning_evaluation(self, tool_list: list[MCPTool]) -> str: + @staticmethod + async def _get_reasoning_evaluation( + goal, tool_list: list[MCPTool], + resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: """获取推理大模型的评估结果""" - template = self._env.from_string(EVALUATE_GOAL) + template = _env.from_string(EVALUATE_GOAL) prompt = template.render( - goal=self.user_goal, + goal=goal, tools=tool_list, ) - result = await self.get_resoning_result(prompt) + result = await MCPPlanner.get_resoning_result(prompt, resoning_llm) return result - async def _parse_evaluation_result(self, result: str) -> GoalEvaluationResult: + @staticmethod + async def _parse_evaluation_result(result: str) -> GoalEvaluationResult: """将推理结果解析为结构化数据""" schema = GoalEvaluationResult.model_json_schema() - evaluation = await self._parse_result(result, schema) + evaluation = await MCPPlanner._parse_result(result, schema) # 使用GoalEvaluationResult模型解析结果 return GoalEvaluationResult.model_validate(evaluation) - async def get_flow_name(self) -> str: + async def get_flow_name(user_goal: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: """获取当前流程的名称""" - result = await self._get_reasoning_flow_name() + result = await MCPPlanner._get_reasoning_flow_name(user_goal, resoning_llm) return result - async def _get_reasoning_flow_name(self) -> str: + @staticmethod + async def _get_reasoning_flow_name(user_goal: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: """获取推理大模型的流程名称""" - template = self._env.from_string(GENERATE_FLOW_NAME) - prompt = template.render(goal=self.user_goal) - result = await self.get_resoning_result(prompt) + template = _env.from_string(GENERATE_FLOW_NAME) + prompt = template.render(goal=user_goal) + result = await MCPPlanner.get_resoning_result(prompt, resoning_llm) return result - async def create_plan(self, tool_list: list[MCPTool], max_steps: int = 6) -> MCPPlan: + @staticmethod + async def get_replan_start_step_index( + user_goal: str, error_message: str, current_plan: MCPPlan | None = None, + history: str = "", + reasoning_llm: ReasoningLLM = ReasoningLLM()) -> MCPPlan: + """获取重新规划的步骤索引""" + # 获取推理结果 + template = _env.from_string(GET_REPLAN_START_STEP_INDEX) + prompt = template.render( + goal=user_goal, + error_message=error_message, + current_plan=current_plan.model_dump(exclude_none=True, by_alias=True), + history=history, + ) + result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) + # 解析为结构化数据 + schema = RestartStepIndex.model_json_schema() + schema["properties"]["start_index"]["maximum"] = len(current_plan.plans) - 1 + schema["properties"]["start_index"]["minimum"] = 0 + restart_index = await MCPPlanner._parse_result(result, schema) + # 使用RestartStepIndex模型解析结果 + return RestartStepIndex.model_validate(restart_index) + + @staticmethod + async def create_plan( + user_goal: str, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan | None = None, + tool_list: list[MCPTool] = [], + max_steps: int = 6, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> MCPPlan: """规划下一步的执行流程,并输出""" # 获取推理结果 - result = await self._get_reasoning_plan(tool_list, max_steps) + result = await MCPPlanner._get_reasoning_plan(user_goal, is_replan, error_message, current_plan, tool_list, max_steps, reasoning_llm) # 解析为结构化数据 - return await self._parse_plan_result(result, max_steps) + return await MCPPlanner._parse_plan_result(result, max_steps) + @staticmethod async def _get_reasoning_plan( - self, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan = MCPPlan(), + user_goal: str, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan | None = None, tool_list: list[MCPTool] = [], - max_steps: int = 10) -> str: + max_steps: int = 10, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str: """获取推理大模型的结果""" # 格式化Prompt if is_replan: - template = self._env.from_string(RECREATE_PLAN) + template = _env.from_string(RECREATE_PLAN) prompt = template.render( - current_plan=current_plan, + current_plan=current_plan.model_dump(exclude_none=True, by_alias=True), error_message=error_message, - goal=self.user_goal, + goal=user_goal, tools=tool_list, max_num=max_steps, ) else: - template = self._env.from_string(CREATE_PLAN) + template = _env.from_string(CREATE_PLAN) prompt = template.render( - goal=self.user_goal, + goal=user_goal, tools=tool_list, max_num=max_steps, ) - result = await self.get_resoning_result(prompt) + result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) return result - async def _parse_plan_result(self, result: str, max_steps: int) -> MCPPlan: + @staticmethod + async def _parse_plan_result(result: str, max_steps: int) -> MCPPlan: """将推理结果解析为结构化数据""" # 格式化Prompt schema = MCPPlan.model_json_schema() schema["properties"]["plans"]["maxItems"] = max_steps - plan = await self._parse_result(result, schema) + plan = await MCPPlanner._parse_result(result, schema) # 使用Function模型解析结果 return MCPPlan.model_validate(plan) - async def get_tool_risk(self, tool: MCPTool, input_parm: dict[str, Any], additional_info: str = "") -> ToolRisk: + @staticmethod + async def get_tool_risk( + tool: MCPTool, input_parm: dict[str, Any], + additional_info: str = "", resoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolRisk: """获取MCP工具的风险评估结果""" # 获取推理结果 - result = await self._get_reasoning_risk(tool, input_parm, additional_info) + result = await MCPPlanner._get_reasoning_risk(tool, input_parm, additional_info, resoning_llm) # 解析为结构化数据 - risk = await self._parse_risk_result(result) + risk = await MCPPlanner._parse_risk_result(result) # 返回风险评估结果 return risk - async def _get_reasoning_risk(self, tool: MCPTool, input_param: dict[str, Any], additional_info: str) -> str: + @staticmethod + async def _get_reasoning_risk( + tool: MCPTool, input_param: dict[str, Any], + additional_info: str, resoning_llm: ReasoningLLM) -> str: """获取推理大模型的风险评估结果""" - template = self._env.from_string(RISK_EVALUATE) + template = _env.from_string(RISK_EVALUATE) prompt = template.render( tool_name=tool.name, tool_description=tool.description, input_param=input_param, additional_info=additional_info, ) - result = await self.get_resoning_result(prompt) + result = await MCPPlanner.get_resoning_result(prompt, resoning_llm) return result - async def _parse_risk_result(self, result: str) -> ToolRisk: + @staticmethod + async def _parse_risk_result(result: str) -> ToolRisk: """将推理结果解析为结构化数据""" schema = ToolRisk.model_json_schema() - risk = await self._parse_result(result, schema) + risk = await MCPPlanner._parse_result(result, schema) # 使用ToolRisk模型解析结果 return ToolRisk.model_validate(risk) + @staticmethod + async def _get_reasoning_tool_execute_error_type( + user_goal: str, current_plan: MCPPlan, + tool: MCPTool, input_param: dict[str, Any], + error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + """获取推理大模型的工具执行错误类型""" + template = _env.from_string(TOOL_EXECUTE_ERROR_TYPE_ANALYSIS) + prompt = template.render( + goal=user_goal, + current_plan=current_plan.model_dump(exclude_none=True, by_alias=True), + tool_name=tool.name, + tool_description=tool.description, + input_param=input_param, + error_message=error_message, + ) + result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) + return result + + @staticmethod + async def _parse_tool_execute_error_type_result(result: str) -> ToolExcutionErrorType: + """将推理结果解析为工具执行错误类型""" + schema = ToolExcutionErrorType.model_json_schema() + error_type = await MCPPlanner._parse_result(result, schema) + # 使用ToolExcutionErrorType模型解析结果 + return ToolExcutionErrorType.model_validate(error_type) + + @staticmethod + async def get_tool_execute_error_type( + user_goal: str, current_plan: MCPPlan, + tool: MCPTool, input_param: dict[str, Any], + error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolExcutionErrorType: + """获取MCP工具执行错误类型""" + # 获取推理结果 + result = await MCPPlanner._get_reasoning_tool_execute_error_type( + user_goal, current_plan, tool, input_param, error_message, reasoning_llm) + error_type = await MCPPlanner._parse_tool_execute_error_type_result(result) + # 返回工具执行错误类型 + return error_type + + @staticmethod async def get_missing_param( - self, tool: MCPTool, schema: dict[str, Any], + tool: MCPTool, input_param: dict[str, Any], - error_message: str) -> list[str]: + error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> list[str]: """获取缺失的参数""" - slot = Slot(schema=schema) + slot = Slot(schema=tool.input_schema) + template = _env.from_string(GET_MISSING_PARAMS) schema_with_null = slot.add_null_to_basic_types() - template = self._env.from_string(GET_MISSING_PARAMS) prompt = template.render( tool_name=tool.name, tool_description=tool.description, @@ -201,26 +281,26 @@ class MCPPlanner: schema=schema_with_null, error_message=error_message, ) - result = await self.get_resoning_result(prompt) + result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) # 解析为结构化数据 - input_param_with_null = await self._parse_result(result, schema_with_null) + input_param_with_null = await MCPPlanner._parse_result(result, schema_with_null) return input_param_with_null - async def generate_answer(self, plan: MCPPlan, memory: str) -> AsyncGenerator[str, None]: + @staticmethod + async def generate_answer( + user_goal: str, plan: MCPPlan, memory: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> AsyncGenerator[ + str, None]: """生成最终回答""" - template = self._env.from_string(FINAL_ANSWER) + template = _env.from_string(FINAL_ANSWER) prompt = template.render( - plan=plan, + plan=plan.model_dump(exclude_none=True, by_alias=True), memory=memory, - goal=self.user_goal, + goal=user_goal, ) - async for chunk in self.resoning_llm.call( + async for chunk in resoning_llm.call( [{"role": "user", "content": prompt}], streaming=False, temperature=0.07, ): yield chunk - - self.input_tokens = self.resoning_llm.input_tokens - self.output_tokens = self.resoning_llm.output_tokens diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index 9cbc2f5bc..b5bc085c6 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -62,6 +62,62 @@ MCP_SELECT = dedent(r""" ### 请一步一步思考: """) +TOOL_SELECT = dedent(r""" + 你是一个乐于助人的智能助手。 + 你的任务是:根据当前目标,附加信息,选择最合适的MCP工具。 + ## 选择MCP工具时的注意事项: + 1. 确保充分理解当前目标,选择实现目标所需的MCP工具。 + 2. 请在给定的MCP工具列表中选择,不要自己生成MCP工具。 + 3. 可以选择一些辅助工具,但必须确保这些工具与当前目标相关。 + 必须按照以下格式生成选择结果,不要输出任何其他内容: + ```json + { + "tool_ids": ["工具ID1", "工具ID2", ...] + } + ``` + + # 示例 + ## 目标 + 调优mysql性能 + ## MCP工具列表 + + - mcp_tool_1 MySQL链接池工具;用于优化MySQL链接池 + - mcp_tool_2 MySQL性能调优工具;用于分析MySQL性能瓶颈 + - mcp_tool_3 MySQL查询优化工具;用于优化MySQL查询语句 + - mcp_tool_4 MySQL索引优化工具;用于优化MySQL索引 + - mcp_tool_5 文件存储工具;用于存储文件 + - mcp_tool_6 mongoDB工具;用于操作MongoDB数据库 + + ## 附加信息 + 1. 当前MySQL数据库的版本是8.0.26 + 2. 当前MySQL数据库的配置文件路径是/etc/my.cnf,并含有以下配置项 + ```json + { + "max_connections": 1000, + "innodb_buffer_pool_size": "1G", + "query_cache_size": "64M" + } + ##输出 + ```json + { + "tool_ids": ["mcp_tool_1", "mcp_tool_2", "mcp_tool_3", "mcp_tool_4"] + } + ``` + # 现在开始! + ## 目标 + {{goal}} + ## MCP工具列表 + + {% for tool in tools %} + - {{tool.id}} {{tool.name}};{{tool.description}} + {% endfor %} + + ## 附加信息 + {{additional_info}} + # 输出 + """ + ) + EVALUATE_GOAL = dedent(r""" 你是一个计划评估器。 请根据用户的目标和当前的工具集合以及一些附加信息,判断基于当前的工具集合,是否能够完成用户的目标。 @@ -76,18 +132,18 @@ EVALUATE_GOAL = dedent(r""" ``` # 样例 - ## 目标 - 我需要扫描当前mysql数据库,分析性能瓶颈,并调优 + # 目标 + 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 - ## 工具集合 - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 + # 工具集合 + 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - - mysql_analyzer分析MySQL数据库性能 - - performance_tuner调优数据库性能 - - Final结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 + - mysql_analyzer 分析MySQL数据库性能 + - performance_tuner 调优数据库性能 + - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 - ## 附加信息 + # 附加信息 1. 当前MySQL数据库的版本是8.0.26 2. 当前MySQL数据库的配置文件路径是/etc/my.cnf @@ -100,17 +156,17 @@ EVALUATE_GOAL = dedent(r""" ``` # 目标 - {{ goal }} + {{goal}} # 工具集合 - {% for tool in tools %} - - {{ tool.id }}{{tool.name}};{{ tool.description }} - {% endfor %} + { % for tool in tools % } + - {{tool.id}} {{tool.name}};{{tool.description}} + { % endfor % } # 附加信息 - {{ additional_info }} + {{additional_info}} """) GENERATE_FLOW_NAME = dedent(r""" @@ -123,15 +179,79 @@ GENERATE_FLOW_NAME = dedent(r""" 4. 流程名称应该尽量简短,小于20个字或者单词。 5. 只输出流程名称,不要输出其他内容。 # 样例 - ## 目标 - 我需要扫描当前mysql数据库,分析性能瓶颈,并调优 - ## 输出 + # 目标 + 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 + # 输出 扫描MySQL数据库并分析性能瓶颈,进行调优 # 现在开始生成流程名称: # 目标 - {{ goal }} + {{goal}} # 输出 """) +GET_REPLAN_START_STEP_INDEX = dedent(r""" + 你是一个智能助手,你的任务是根据用户的目标、报错信息和当前计划和历史,获取重新规划的步骤起始索引。 + + # 样例 + # 目标 + 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 + # 报错信息 + 执行端口扫描命令时,出现了错误:`- bash: curl: command not found`。 + # 当前计划 + ```json + { + "plans": [ + { + "step_id": "step_1", + "content": "生成端口扫描命令", + "tool": "command_generator", + "instruction": "生成端口扫描命令:扫描 + }, + { + "step_id": "step_2", + "content": "在执行Result[0]生成的命令", + "tool": "command_executor", + "instruction": "执行端口扫描命令" + } + ] + } + # 历史 + [ + { + id: "0", + task_id: "task_1", + flow_id: "flow_1", + flow_name: "MYSQL性能调优", + flow_status: "RUNNING", + step_id: "step_1", + step_name: "生成端口扫描命令", + step_description: "生成端口扫描命令:扫描当前MySQL数据库的端口", + step_status: "FAILED", + input_data: { + "command": "nmap -p 3306 + "target": "localhost" + }, + output_data: { + "error": "- bash: curl: command not found" + } + } + ] + # 输出 + { + "start_index": 0, + "reasoning": "当前计划的第一步就失败了,报错信息显示curl命令未找到,可能是因为没有安装curl工具,因此需要从第一步重新规划。" + } + # 现在开始获取重新规划的步骤起始索引: + # 目标 + {{goal}} + # 报错信息 + {{error_message}} + # 当前计划 + {{current_plan}} + # 历史 + {{history}} + # 输出 + """) + CREATE_PLAN = dedent(r""" 你是一个计划生成器。 请分析用户的目标,并生成一个计划。你后续将根据这个计划,一步一步地完成用户的目标。 @@ -163,40 +283,38 @@ CREATE_PLAN = dedent(r""" } ``` - - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。\ -思考过程应放置在 XML标签中。 + - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。 +思考过程应放置在 XML标签中。 - 计划内容中,可以使用"Result[]"来引用之前计划步骤的结果。例如:"Result[3]"表示引用第三条计划执行后的结果。 - - 计划不得多于{{ max_num }}条,且每条计划内容应少于150字。 + - 计划不得多于{{max_num}}条,且每条计划内容应少于150字。 # 工具 - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 + 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - {% for tool in tools %} - - {{ tool.id }}{{tool.name}};{{ tool.description }} - {% endfor %} + { % for tool in tools % } + - {{tool.id}} {{tool.name}};{{tool.description}} + { % endfor % } # 样例 - ## 目标 + # 目标 - 在后台运行一个新的alpine:latest容器,将主机/root文件夹挂载至/data,并执行top命令。 + 在后台运行一个新的alpine: latest容器,将主机/root文件夹挂载至/data,并执行top命令。 - ## 计划 + # 计划 - 1. 这个目标需要使用Docker来完成,首先需要选择合适的MCP Server + 1. 这个目标需要使用Docker来完成, 首先需要选择合适的MCP Server 2. 目标可以拆解为以下几个部分: - - 运行alpine:latest容器 + - 运行alpine: latest容器 - 挂载主机目录 - 在后台运行 - 执行top命令 - 3. 需要先选择MCP Server,然后生成Docker命令,最后执行命令 - - - ```json + 3. 需要先选择MCP Server, 然后生成Docker命令, 最后执行命令 + ```json { "plans": [ { @@ -225,7 +343,7 @@ CREATE_PLAN = dedent(r""" # 现在开始生成计划: - ## 目标 + # 目标 {{goal}} @@ -263,26 +381,24 @@ RECREATE_PLAN = dedent(r""" } ``` - - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。\ -思考过程应放置在 XML标签中。 + - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。 +思考过程应放置在 XML标签中。 - 计划内容中,可以使用"Result[]"来引用之前计划步骤的结果。例如:"Result[3]"表示引用第三条计划执行后的结果。 - - 计划不得多于{{ max_num }}条,且每条计划内容应少于150字。 + - 计划不得多于{{max_num}}条,且每条计划内容应少于150字。 # 样例 - ## 目标 + # 目标 请帮我扫描一下192.168.1.1的这台机器的端口,看看有哪些端口开放。 - ## 工具 - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 + # 工具 + 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - - command_generator生成命令行指令 - - tool_selector选择合适的工具 - - command_executor执行命令行指令 - - Final结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 - - - ## 当前计划 + - command_generator 生成命令行指令 + - tool_selector 选择合适的工具 + - command_executor 执行命令行指令 + - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 + # 当前计划 ```json { "plans": [ @@ -304,25 +420,23 @@ RECREATE_PLAN = dedent(r""" ] } ``` - ## 运行报错 - 执行端口扫描命令时,出现了错误:`-bash: curl: command not found`。 - ## 重新生成的计划 + # 运行报错 + 执行端口扫描命令时,出现了错误:`- bash: curl: command not found`。 + # 重新生成的计划 - 1. 这个目标需要使用网络扫描工具来完成,首先需要选择合适的网络扫描工具 + 1. 这个目标需要使用网络扫描工具来完成, 首先需要选择合适的网络扫描工具 2. 目标可以拆解为以下几个部分: - 生成端口扫描命令 - 执行端口扫描命令 - 3.但是在执行端口扫描命令时,出现了错误:`-bash: curl: command not found`。 + 3.但是在执行端口扫描命令时,出现了错误:`- bash: curl: command not found`。 4.我将计划调整为: - 需要先生成一个命令,查看当前机器支持哪些网络扫描工具 - 执行这个命令,查看当前机器支持哪些网络扫描工具 - 然后从中选择一个网络扫描工具 - 基于选择的网络扫描工具,生成端口扫描命令 - 执行端口扫描命令 - - - ```json + ```json { "plans": [ { @@ -367,19 +481,19 @@ RECREATE_PLAN = dedent(r""" # 工具 - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 + 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - {% for tool in tools %} - - {{ tool.id }}{{tool.name}};{{ tool.description }} - {% endfor %} + { % for tool in tools % } + - {{tool.id}} {{tool.name}};{{tool.description}} + { % endfor % } # 当前计划 - {{ current_plan }} + {{current_plan}} # 运行报错 - {{ error_message }} + {{error_message}} # 重新生成的计划 """) @@ -393,18 +507,18 @@ RISK_EVALUATE = dedent(r""" } ``` # 样例 - ## 工具名称 + # 工具名称 mysql_analyzer - ## 工具描述 + # 工具描述 分析MySQL数据库性能 - ## 工具入参 + # 工具入参 { "host": "192.0.0.1", "port": 3306, "username": "root", "password": "password" } - ## 附加信息 + # 附加信息 1. 当前MySQL数据库的版本是8.0.26 2. 当前MySQL数据库的配置文件路径是/etc/my.cnf,并含有以下配置项 ```ini @@ -412,7 +526,7 @@ RISK_EVALUATE = dedent(r""" innodb_buffer_pool_size=1G innodb_log_file_size=256M ``` - ## 输出 + # 输出 ```json { "risk": "中", @@ -421,35 +535,35 @@ RISK_EVALUATE = dedent(r""" ``` # 工具 - {{ tool_name }} - {{ tool_description }} + {{tool_name}} + {{tool_description}} # 工具入参 - {{ input_param }} + {{input_param}} # 附加信息 - {{ additional_info }} + {{additional_info}} # 输出 """ ) # 根据当前计划和报错信息决定下一步执行,具体计划有需要用户补充工具入参、重计划当前步骤、重计划接下来的所有计划 -JUDGE_NEXT_STEP = dedent(r""" +TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r""" 你是一个计划决策器。 - 你的任务是根据当前计划、当前使用的工具、工具入参和工具运行报错,决定下一步执行的操作。 + 你的任务是根据用户目标、当前计划、当前使用的工具、工具入参和工具运行报错,决定下一步执行的操作。 请根据以下规则进行判断: - 1. 仅通过补充工具入参来解决问题的,返回 fill_params; - 2. 需要重计划当前步骤的,返回 replan_current_step; - 3. 需要重计划接下来的所有计划的,返回 replan_all_steps; + 1. 仅通过补充工具入参来解决问题的,返回 missing_param; + 2. 需要重计划当前步骤的,返回 decorrect_plan + 3.推理过程必须清晰明了,能够让人理解你的判断依据,并且不超过100字。 你的输出要以json格式返回,格式如下: ```json { - "next_step": "fill_params/replan_current_step/replan_all_steps", - "reason": "你的判断依据" + "error_type": "missing_param/decorrect_plan, + "reason": "你的推理过程" } ``` - 注意: - reason字段必须清晰明了,能够让人理解你的判断依据,并且不超过50个中文字或者100个英文单词。 # 样例 - ## 当前计划 + # 用户目标 + 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 + # 当前计划 {"plans": [ { "content": "生成端口扫描命令", @@ -467,38 +581,40 @@ JUDGE_NEXT_STEP = dedent(r""" "instruction": "" } ]} - ## 当前使用的工具 + # 当前使用的工具 - command_executor - 执行命令行指令 + command_executor + 执行命令行指令 - ## 工具入参 + # 工具入参 { "command": "nmap -sS -p--open 192.168.1.1" } - ## 工具运行报错 - 执行端口扫描命令时,出现了错误:`-bash: nmap: command not found`。 - ## 输出 + # 工具运行报错 + 执行端口扫描命令时,出现了错误:`- bash: nmap: command not found`。 + # 输出 ```json { - "next_step": "replan_all_steps", - "reason": "当前工具执行报错,提示nmap命令未找到,需要增加command_generator和command_executor的步骤,生成nmap安装命令并执行,之后再生成端口扫描命令并执行。" + "error_type": "decorrect_plan", + "reason": "当前计划的第二步执行失败,报错信息显示nmap命令未找到,可能是因为没有安装nmap工具,因此需要重计划当前步骤。" } ``` + # 用户目标 + {{goal}} # 当前计划 - {{ current_plan }} + {{current_plan}} # 当前使用的工具 - {{ tool_name }} - {{ tool_description }} + {{tool_name}} + {{tool_description}} # 工具入参 - {{ input_param }} + {{input_param}} # 工具运行报错 - {{ error_message }} + {{error_message}} # 输出 """ - ) + ) # 获取缺失的参数的json结构体 GET_MISSING_PARAMS = dedent(r""" 你是一个工具参数获取器。 @@ -570,10 +686,10 @@ GET_MISSING_PARAMS = dedent(r""" } ``` # 工具 - < tool > - < name > {{tool_name}} < /name > - < description > {{tool_description}} < /description > - < / tool > + + {{tool_name}} + {{tool_description}} + # 工具入参 {{input_param}} # 工具入参schema(部分字段允许为null) @@ -588,12 +704,12 @@ REPAIR_PARAMS = dedent(r""" 你的任务是根据当前的工具信息、工具入参的schema、工具当前的入参、工具的报错、补充的参数和补充的参数描述,修复当前工具的入参。 # 样例 - ## 工具信息 + # 工具信息 - mysql_analyzer - 分析MySQL数据库性能 + mysql_analyzer + 分析MySQL数据库性能 - ## 工具入参的schema + # 工具入参的schema { "type": "object", "properties": { @@ -616,21 +732,21 @@ REPAIR_PARAMS = dedent(r""" }, "required": ["host", "port", "username", "password"] } - ## 工具当前的入参 + # 工具当前的入参 { "host": "192.0.0.1", "port": 3306, "username": "root", "password": "password" } - ## 工具的报错 + # 工具的报错 执行端口扫描命令时,出现了错误:`password is not correct`。 - ## 补充的参数 + # 补充的参数 { "username": "admin", "password": "admin123" } - ## 补充的参数描述 + # 补充的参数描述 用户希望使用admin用户和admin123密码来连接MySQL数据库。 # 输出 ```json @@ -643,8 +759,8 @@ REPAIR_PARAMS = dedent(r""" ``` # 工具 - {{tool_name}} - {{tool_description}} + {{tool_name}} + {{tool_description}} # 工具入参scheme {{input_schema}} @@ -664,17 +780,17 @@ FINAL_ANSWER = dedent(r""" # 用户目标 - {{ goal }} + {{goal}} # 计划执行情况 为了完成上述目标,你实施了以下计划: - {{ memory }} + {{memory}} # 其他背景信息: - {{ status }} + {{status}} # 现在,请根据以上信息,向用户报告目标的完成情况: diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py index 37d1e752a..933527c38 100644 --- a/apps/scheduler/mcp_agent/select.py +++ b/apps/scheduler/mcp_agent/select.py @@ -2,7 +2,7 @@ """选择MCP Server及其工具""" import logging -import uuid +import random from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from typing import AsyncGenerator @@ -13,176 +13,94 @@ from apps.common.mongo import MongoDB from apps.llm.embedding import Embedding from apps.llm.function import FunctionLLM from apps.llm.reasoning import ReasoningLLM -from apps.scheduler.mcp.prompt import ( - MCP_SELECT, -) +from apps.llm.token import TokenCalculator +from apps.scheduler.mcp_agent.prompt import TOOL_SELECT from apps.schemas.mcp import ( + BaseModel, MCPCollection, MCPSelectResult, MCPTool, + MCPToolIdsSelectResult ) - +from apps.common.config import Config logger = logging.getLogger(__name__) +_env = SandboxedEnvironment( + loader=BaseLoader, + autoescape=True, + trim_blocks=True, + lstrip_blocks=True, +) -class MCPSelector: - """MCP选择器""" - - def __init__(self, resoning_llm: ReasoningLLM = None) -> None: - """初始化助手类""" - self.resoning_llm = resoning_llm or ReasoningLLM() - self.input_tokens = 0 - self.output_tokens = 0 - - @staticmethod - def _assemble_sql(mcp_list: list[str]) -> str: - """组装SQL""" - sql = "(" - for mcp_id in mcp_list: - sql += f"'{mcp_id}', " - return sql.rstrip(", ") + ")" - - async def _get_top_mcp_by_embedding( - self, - query: str, - mcp_list: list[str], - ) -> list[dict[str, str]]: - """通过向量检索获取Top5 MCP Server""" - logger.info("[MCPHelper] 查询MCP Server向量: %s, %s", query, mcp_list) - mcp_table = await LanceDB().get_table("mcp") - query_embedding = await Embedding.get_embedding([query]) - mcp_vecs = await (await mcp_table.search( - query=query_embedding, - vector_column_name="embedding", - )).where(f"id IN {MCPSelector._assemble_sql(mcp_list)}").limit(5).to_list() - - # 拿到名称和description - logger.info("[MCPHelper] 查询MCP Server名称和描述: %s", mcp_vecs) - mcp_collection = MongoDB().get_collection("mcp") - llm_mcp_list: list[dict[str, str]] = [] - for mcp_vec in mcp_vecs: - mcp_id = mcp_vec["id"] - mcp_data = await mcp_collection.find_one({"_id": mcp_id}) - if not mcp_data: - logger.warning("[MCPHelper] 查询MCP Server名称和描述失败: %s", mcp_id) - continue - mcp_data = MCPCollection.model_validate(mcp_data) - llm_mcp_list.extend([{ - "id": mcp_id, - "name": mcp_data.name, - "description": mcp_data.description, - }]) - return llm_mcp_list - - async def _get_mcp_by_llm( - self, - query: str, - mcp_list: list[dict[str, str]], - mcp_ids: list[str], - ) -> MCPSelectResult: - """通过LLM选择最合适的MCP Server""" - # 初始化jinja2环境 - env = SandboxedEnvironment( - loader=BaseLoader, - autoescape=True, - trim_blocks=True, - lstrip_blocks=True, - ) - template = env.from_string(MCP_SELECT) - # 渲染模板 - mcp_prompt = template.render( - mcp_list=mcp_list, - goal=query, - ) - - # 调用大模型进行推理 - result = await self._call_reasoning(mcp_prompt) - - # 使用小模型提取JSON - return await self._call_function_mcp(result, mcp_ids) - - async def _call_reasoning(self, prompt: str) -> AsyncGenerator[str, None]: - """调用大模型进行推理""" - logger.info("[MCPHelper] 调用推理大模型") - message = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": prompt}, - ] - async for chunk in self.resoning_llm.call(message): - yield chunk - - async def _call_function_mcp(self, reasoning_result: str, mcp_ids: list[str]) -> MCPSelectResult: - """调用结构化输出小模型提取JSON""" - logger.info("[MCPHelper] 调用结构化输出小模型") - llm = FunctionLLM() - message = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": reasoning_result}, - ] - schema = MCPSelectResult.model_json_schema() - # schema中加入选项 - schema["properties"]["mcp_id"]["enum"] = mcp_ids - result = await llm.call(messages=message, schema=schema) - try: - result = MCPSelectResult.model_validate(result) - except Exception: - logger.exception("[MCPHelper] 解析MCP Select Result失败") - raise - return result - - async def select_top_mcp( - self, - query: str, - mcp_list: list[str], - ) -> MCPSelectResult: - """ - 选择最合适的MCP Server +FINAL_TOOL_ID = "FIANL" +SUMMARIZE_TOOL_ID = "SUMMARIZE" - 先通过Embedding选择Top5,然后通过LLM选择Top 1 - """ - # 通过向量检索获取Top5 - llm_mcp_list = await self._get_top_mcp_by_embedding(query, mcp_list) - # 通过LLM选择最合适的 - return await self._get_mcp_by_llm(query, llm_mcp_list, mcp_list) +class MCPSelector: + """MCP选择器""" @staticmethod - async def select_top_tool(query: str, mcp_list: list[str], top_n: int = 10) -> list[MCPTool]: + async def select_top_tool( + goal: str, tool_list: list[MCPTool], + additional_info: str | None = None, top_n: int | None = None) -> list[MCPTool]: """选择最合适的工具""" - tool_vector = await LanceDB().get_table("mcp_tool") - query_embedding = await Embedding.get_embedding([query]) - tool_vecs = await (await tool_vector.search( - query=query_embedding, - vector_column_name="embedding", - )).where(f"mcp_id IN {MCPSelector._assemble_sql(mcp_list)}").limit(top_n).to_list() - - # 拿到工具 - tool_collection = MongoDB().get_collection("mcp") - llm_tool_list = [] - - for tool_vec in tool_vecs: - # 到MongoDB里找对应的工具 - logger.info("[MCPHelper] 查询MCP Tool名称和描述: %s", tool_vec["mcp_id"]) - tool_data = await tool_collection.aggregate([ - {"$match": {"_id": tool_vec["mcp_id"]}}, - {"$unwind": "$tools"}, - {"$match": {"tools.id": tool_vec["id"]}}, - {"$project": {"_id": 0, "tools": 1}}, - {"$replaceRoot": {"newRoot": "$tools"}}, - ]) - async for tool in tool_data: - tool_obj = MCPTool.model_validate(tool) - llm_tool_list.append(tool_obj) - llm_tool_list.append( - MCPTool( - id="00000000-0000-0000-0000-000000000000", - name="Final", - description="It is the final step, indicating the end of the plan execution.") - ) - llm_tool_list.append( - MCPTool( - id="00000000-0000-0000-0000-000000000001", - name="Chat", - description="It is a chat tool to communicate with the user.") - ) - return llm_tool_list + random.shuffle(tool_list) + max_tokens = Config().get_config().function_call.max_tokens + template = _env.from_string(TOOL_SELECT) + if TokenCalculator.calculate_token_length( + messages=[{"role": "user", "content": template.render( + goal=goal, tools=[], additional_info=additional_info + )}], + pure_text=True) > max_tokens: + logger.warning("[MCPSelector] 工具选择模板长度超过最大令牌数,无法进行选择") + return [] + llm = FunctionLLM() + current_index = 0 + tool_ids = [] + while current_index < len(tool_list): + index = current_index + sub_tools = [] + while index < len(tool_list): + tool = tool_list[index] + tokens = TokenCalculator.calculate_token_length( + messages=[{"role": "user", "content": template.render( + goal=goal, tools=[tool], + additional_info=additional_info + )}], + pure_text=True + ) + if tokens > max_tokens: + continue + sub_tools.append(tool) + + tokens = TokenCalculator.calculate_token_length(messages=[{"role": "user", "content": template.render( + goal=goal, tools=sub_tools, additional_info=additional_info)}, ], pure_text=True) + if tokens > max_tokens: + del sub_tools[-1] + break + else: + index += 1 + current_index = index + if sub_tools: + message = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": template.render(tools=sub_tools)}, + ] + schema = MCPToolIdsSelectResult.model_json_schema() + schema["properties"]["tool_ids"]["enum"] = [tool.id for tool in sub_tools] + result = await llm.call(messages=message, schema=schema) + try: + result = MCPToolIdsSelectResult.model_validate(result) + tool_ids.extend(result.tool_ids) + except Exception: + logger.exception("[MCPSelector] 解析MCP工具ID选择结果失败") + continue + mcp_tools = [tool for tool in tool_list if tool.id in tool_ids] + + if top_n is not None: + mcp_tools = mcp_tools[:top_n] + mcp_tools.append(MCPTool(id=FINAL_TOOL_ID, name="Final", + description="终止", mcp_id=FINAL_TOOL_ID, input_schema={})) + # mcp_tools.append(MCPTool(id=SUMMARIZE_TOOL_ID, name="Summarize", + # description="总结工具", mcp_id=SUMMARIZE_TOOL_ID, input_schema={})) + return mcp_tools diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index f63253693..b81448475 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -4,7 +4,9 @@ import asyncio import logging from datetime import UTC, datetime - +from apps.llm.reasoning import ReasoningLLM +from apps.schemas.config import LLMConfig +from apps.llm.patterns.rewrite import QuestionRewrite from apps.common.config import Config from apps.common.mongo import MongoDB from apps.common.queue import MessageQueue @@ -67,8 +69,8 @@ class Scheduler: except Exception as e: logger.error(f"[Scheduler] 活动监控过程中发生错误: {e}") - async def run(self) -> None: # noqa: PLR0911 - """运行调度器""" + async def get_llm_use_in_chat_with_rag(self) -> LLM: + """获取RAG大模型""" try: # 获取当前会话使用的大模型 llm_id = await LLMManager.get_llm_id_by_conversation_id( @@ -97,14 +99,25 @@ class Scheduler: logger.exception("[Scheduler] 获取大模型失败") await self.queue.close() return + + async def get_kb_ids_use_in_chat_with_rag(self) -> list[str]: + """获取知识库ID列表""" try: - # 获取当前会话使用的知识库 kb_ids = await KnowledgeBaseManager.get_kb_ids_by_conversation_id( - self.task.ids.user_sub, self.task.ids.conversation_id) + self.task.ids.user_sub, self.task.ids.conversation_id, + ) + if not kb_ids: + logger.error("[Scheduler] 获取知识库ID失败") + await self.queue.close() + return [] + return kb_ids except Exception: logger.exception("[Scheduler] 获取知识库ID失败") await self.queue.close() - return + return [] + + async def run(self) -> None: # noqa: PLR0911 + """运行调度器""" try: # 获取当前问答可供关联的文档 docs, doc_ids = await get_docs(self.task.ids.user_sub, self.post_body) @@ -114,13 +127,18 @@ class Scheduler: return history, _ = await get_context(self.task.ids.user_sub, self.post_body, 3) # 已使用文档 - # 如果是智能问答,直接执行 logger.info("[Scheduler] 开始执行") # 创建用于通信的事件 kill_event = asyncio.Event() monitor = asyncio.create_task(self._monitor_activity(kill_event, self.task.ids.user_sub)) if not self.post_body.app or self.post_body.app.app_id == "": + llm = await self.get_llm_use_in_chat_with_rag() + kb_ids = await self.get_kb_ids_use_in_chat_with_rag() + if not llm: + logger.error("[Scheduler] 获取大模型失败") + await self.queue.close() + return self.task = await push_init_message(self.task, self.queue, 3, is_flow=False) rag_data = RAGQueryReq( kbIds=kb_ids, @@ -199,6 +217,27 @@ class Scheduler: if not app_metadata: logger.error("[Scheduler] 未找到Agent应用") return + llm = await LLMManager.get_llm_by_id( + self.task.ids.user_sub, app_metadata.llm_id, + ) + if not llm: + logger.error("[Scheduler] 获取大模型失败") + await self.queue.close() + return + reasion_llm = ReasoningLLM( + LLMConfig( + endpoint=llm.openai_base_url, + key=llm.openai_api_key, + model=llm.model_name, + max_tokens=llm.max_tokens, + ) + ) + if background.conversation: + try: + question_obj = QuestionRewrite() + post_body.question = await question_obj.generate(history=background.conversation, question=post_body.question, llm=reasion_llm) + except Exception: + logger.exception("[Scheduler] 问题重写失败") if app_metadata.app_type == AppType.FLOW.value: logger.info("[Scheduler] 获取工作流元数据") flow_info = await Pool().get_flow_metadata(app_info.app_id) @@ -229,8 +268,6 @@ class Scheduler: # 初始化Executor logger.info("[Scheduler] 初始化Executor") - logger.error(f"{flow_data}") - logger.error(f"{self.task}") flow_exec = FlowExecutor( flow_id=flow_id, flow=flow_data, @@ -258,6 +295,7 @@ class Scheduler: servers_id=servers_id, background=background, agent_id=app_info.app_id, + params=post_body.app.params ) # 开始运行 logger.info("[Scheduler] 运行Executor") diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py index 3fb650287..3bcabd579 100644 --- a/apps/schemas/enum_var.py +++ b/apps/schemas/enum_var.py @@ -15,6 +15,7 @@ class SlotType(str, Enum): class StepStatus(str, Enum): """步骤状态""" UNKNOWN = "unknown" + INIT = "init" WAITING = "waiting" RUNNING = "running" SUCCESS = "success" @@ -55,12 +56,15 @@ class EventType(str, Enum): STEP_WAITING_FOR_START = "step.waiting_for_start" STEP_WAITING_FOR_PARAM = "step.waiting_for_param" FLOW_START = "flow.start" + STEP_INIT = "step.init" STEP_INPUT = "step.input" STEP_OUTPUT = "step.output" + STEP_CANCEL = "step.cancel" + STEP_ERROR = "step.error" FLOW_STOP = "flow.stop" FLOW_FAILED = "flow.failed" FLOW_SUCCESS = "flow.success" - FLOW_CANCELLED = "flow.cancelled" + FLOW_CANCEL = "flow.cancel" DONE = "done" diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py index 368865ac5..21c403d4a 100644 --- a/apps/schemas/mcp.py +++ b/apps/schemas/mcp.py @@ -111,6 +111,13 @@ class GoalEvaluationResult(BaseModel): reason: str = Field(description="评估原因") +class RestartStepIndex(BaseModel): + """MCP重新规划的步骤索引""" + + start_index: int = Field(description="重新规划的起始步骤索引") + reasoning: str = Field(description="重新规划的原因") + + class Risk(str, Enum): """MCP工具风险类型""" @@ -126,6 +133,20 @@ class ToolRisk(BaseModel): reason: str = Field(description="风险原因", default="") +class ErrorType(str, Enum): + """MCP工具错误类型""" + + MISSING_PARAM = "missing_param" + DECORRECT_PLAN = "decorrect_plan" + + +class ToolExcutionErrorType(BaseModel): + """MCP工具执行错误""" + + type: ErrorType = Field(description="错误类型", default=ErrorType.MISSING_PARAM) + reason: str = Field(description="错误原因", default="") + + class MCPSelectResult(BaseModel): """MCP选择结果""" @@ -138,6 +159,12 @@ class MCPToolSelectResult(BaseModel): name: str = Field(description="工具名称") +class MCPToolIdsSelectResult(BaseModel): + """MCP工具ID选择结果""" + + tool_ids: list[str] = Field(description="工具ID列表") + + class MCPPlanItem(BaseModel): """MCP 计划""" step_id: str = Field(description="步骤的ID", default="") diff --git a/apps/schemas/message.py b/apps/schemas/message.py index e73413242..1f46ff578 100644 --- a/apps/schemas/message.py +++ b/apps/schemas/message.py @@ -84,7 +84,7 @@ class FlowStartContent(BaseModel): """flow.start消息的content""" question: str = Field(description="用户问题") - params: dict[str, Any] = Field(description="预先提供的参数") + params: dict[str, Any] | None = Field(description="预先提供的参数", default=None) class MessageBase(HeartbeatData): @@ -95,5 +95,5 @@ class MessageBase(HeartbeatData): conversation_id: str = Field(min_length=36, max_length=36, alias="conversationId") task_id: str = Field(min_length=36, max_length=36, alias="taskId") flow: MessageFlow | None = None - content: dict[str, Any] = {} + content: Any | None = Field(default=None, description="消息内容") metadata: MessageMetadata diff --git a/apps/schemas/pool.py b/apps/schemas/pool.py index 27e16b370..7df6dab8d 100644 --- a/apps/schemas/pool.py +++ b/apps/schemas/pool.py @@ -110,3 +110,6 @@ class AppPool(BaseData): flows: list[AppFlow] = Field(description="Flow列表", default=[]) hashes: dict[str, str] = Field(description="关联文件的hash值", default={}) mcp_service: list[str] = Field(default=[], alias="mcpService", description="MCP服务id列表") + llm_id: str = Field( + default="empty", alias="llmId", description="应用使用的大模型ID(如果有的话)" + ) diff --git a/apps/schemas/task.py b/apps/schemas/task.py index eccc95a56..336bfedc5 100644 --- a/apps/schemas/task.py +++ b/apps/schemas/task.py @@ -30,6 +30,7 @@ class FlowStepHistory(BaseModel): step_status: StepStatus = Field(description="当前步骤状态") input_data: dict[str, Any] = Field(description="当前Step执行的输入", default={}) output_data: dict[str, Any] = Field(description="当前Step执行后的结果", default={}) + ex_data: dict[str, Any] | None = Field(description="额外数据", default=None) created_at: float = Field(default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3)) @@ -43,14 +44,13 @@ class ExecutorState(BaseModel): flow_status: FlowStatus = Field(description="Flow状态", default=FlowStatus.INIT) # 任务级数据 step_id: str = Field(description="当前步骤ID", default="") + step_index: int = Field(description="当前步骤索引", default=0) step_name: str = Field(description="当前步骤名称", default="") step_status: StepStatus = Field(description="当前步骤状态", default=StepStatus.UNKNOWN) step_description: str = Field(description="当前步骤描述", default="") app_id: str = Field(description="应用ID", default="") current_input: dict[str, Any] = Field(description="当前输入数据", default={}) - params: dict[str, Any] = Field(description="补充的参数", default={}) - params_description: str = Field(description="补充的参数描述", default="") - error_info: str = Field(description="错误信息", default="") + error_message: str = Field(description="错误信息", default="") retry_times: int = Field(description="当前步骤重试次数", default=0) -- Gitee