diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 8904cf4f826ca2aafdb32a12c2080aef2a23dc3d..4228943382fb5a34709f9432262a0d3164c45b87 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -460,6 +460,7 @@ class MCPAgentExecutor(BaseExecutor): # 初始化MCP服务 await self.load_state() await self.load_mcp() + data = {} if self.task.state.flow_status == FlowStatus.INIT: # 初始化状态 try: @@ -467,6 +468,12 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.flow_name = (await MCPPlanner.get_flow_name( self.task.runtime.question, self.resoning_llm, self.task.language )).flow_name + flow_risk = await MCPPlanner.get_flow_excute_risk( + self.task.runtime.question, self.tool_list, self.resoning_llm, self.task.language + ) + user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) + if user_info.auto_execute: + data = flow_risk.model_dump(exclude_none=True, by_alias=True) await TaskManager.save_task(self.task.id, self.task) await self.get_next_step() except Exception as e: @@ -481,7 +488,7 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.flow_status = FlowStatus.RUNNING await self.push_message( EventType.FLOW_START, - data={} + data=data ) if self.task.state.tool_id == FINAL_TOOL_ID: # 如果已经是最后一步,直接结束 diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index 3d653bc8b5bce314308dde4ed6369dd072850fa7..ded8610c32729ca2d43e62777b0ae2c7e2146e08 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -17,6 +17,7 @@ from apps.scheduler.mcp_agent.prompt import ( FINAL_ANSWER, GEN_STEP, GENERATE_FLOW_NAME, + GENERATE_FLOW_EXCUTE_RISK, GET_MISSING_PARAMS, GET_REPLAN_START_STEP_INDEX, IS_PARAM_ERROR, @@ -30,6 +31,7 @@ from apps.scheduler.slot.slot import Slot from apps.schemas.mcp import ( GoalEvaluationResult, FlowName, + FlowRisk, IsParamError, MCPPlan, MCPTool, @@ -95,7 +97,6 @@ class MCPPlanner(MCPBase): result = await MCPPlanner._get_reasoning_flow_name(user_goal, resoning_llm, language) result = await MCPPlanner._parse_result(result, FlowName.model_json_schema()) - print(result) # 使用FlowName模型解析结果 return FlowName.model_validate(result) @@ -110,6 +111,45 @@ class MCPPlanner(MCPBase): prompt = template.render(goal=user_goal) return await MCPPlanner.get_resoning_result(prompt, resoning_llm) + async def get_flow_excute_risk( + user_goal: str, + tools: list[MCPTool], + resoning_llm: ReasoningLLM = ReasoningLLM(), + language: LanguageType = LanguageType.CHINESE, + ) -> FlowRisk: + """获取当前流程的风险评估结果""" + result = await MCPPlanner._get_reasoning_flow_risk(user_goal, tools, resoning_llm, language) + result = await MCPPlanner._parse_result(result, FlowRisk.model_json_schema()) + # 使用FlowRisk模型解析结果 + return FlowRisk.model_validate(result) + + async def _get_reasoning_flow_risk( + user_goal: str, + tools: list[MCPTool], + resoning_llm: ReasoningLLM = ReasoningLLM(), + language: LanguageType = LanguageType.CHINESE, + ) -> FlowRisk: + """获取当前流程的风险评估结果""" + result = await MCPPlanner._get_reasoning_flow_risk(user_goal, tools, resoning_llm, language) + result = await MCPPlanner._parse_result(result, FlowRisk.model_json_schema()) + # 使用FlowRisk模型解析结果 + return FlowRisk.model_validate(result) + + @staticmethod + async def _get_reasoning_flow_risk( + user_goal: str, + tools: list[MCPTool], + resoning_llm: ReasoningLLM = ReasoningLLM(), + language: LanguageType = LanguageType.CHINESE, + ) -> str: + """获取推理大模型的流程风险""" + template = _env.from_string(GENERATE_FLOW_EXCUTE_RISK[language]) + prompt = template.render( + goal=user_goal, + tools=tools, + ) + return await MCPPlanner.get_resoning_result(prompt, resoning_llm) + @staticmethod async def get_replan_start_step_index( user_goal: str, diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index f41a78baaa2b8266bf1545d97dd2fa41969d39b4..de9d2b1fa883e01d210223abedb761985f7c42e5 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -398,6 +398,67 @@ GENERATE_FLOW_NAME: dict[LanguageType, str] = { """ ), } +GENERATE_FLOW_EXCUTE_RISK: dict[LanguageType, str] = { + LanguageType.CHINESE: dedent( + r""" + 你是一个智能助手,你的任务是根据用户的目标和当前的工具集合,评估当前流程的风险。 + + # 样例 + # 目标 + 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 + # 工具集合 + 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 + + - mysql_analyzer 分析MySQL数据库性能 + - performance_tuner 调优数据库性能 + + # 输出 + { + "risk": "high", + "reason": "当前目标实现带来的风险较高,因为需要通过performance_tuner工具对MySQL数据库进行调优,而该工具可能会对数据库的性能和稳定性产生较大的影响,因此风险评估为高。" + } + # 现在开始评估当前流程的风险: + # 目标 + {{goal}} + # 工具集合 + + {% for tool in tools %} + - {{tool.id}} {{tool.name}};{{tool.description}} + {% endfor %} + + # 输出 + """ + ), + LanguageType.ENGLISH: dedent( + r""" + You are an intelligent assistant, your task is to evaluate the risk of the current process based on the user's goal and the current tool set. + # Example + # Goal + I need to scan the current MySQL database, analyze performance bottlenecks, and optimize it. + # Tool Set + You can access and use some tools, which will be given in the XML tag. + + - mysql_analyzer Analyze MySQL database performance + - performance_tuner Tune database performance + + # Output + { + "risk": "high", + "reason": "The risk brought by the realization of the current goal is relatively high, because it is necessary to tune the MySQL database through the performance_tuner tool, which may have a greater impact on the performance and stability of the database. Therefore, the risk assessment is high." + } + # Now start evaluating the risk of the current process: + # Goal + {{goal}} + # Tool Set + + {% for tool in tools %} + - {{tool.id}} {{tool.name}};{{tool.description}} + {% endfor %} + + # Output + """ + ) +} GET_REPLAN_START_STEP_INDEX: dict[LanguageType, str] = { LanguageType.CHINESE: dedent( r""" diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py index be25b4eb90d7dc85edac4f3e0501da4b330792fd..cb0e4cb056cc4beaef5b699e510ae67fb6b38758 100644 --- a/apps/schemas/mcp.py +++ b/apps/schemas/mcp.py @@ -116,12 +116,27 @@ class GoalEvaluationResult(BaseModel): reason: str = Field(description="评估原因") +class Risk(str, Enum): + """MCP工具风险类型""" + + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + + class FlowName(BaseModel): """MCP 流程名称""" flow_name: str = Field(description="MCP 流程名称", default="") +class FlowRisk(BaseModel): + """MCP 流程风险评估结果""" + + risk: Risk = Field(description="风险类型", default=Risk.LOW) + reason: str = Field(description="风险原因", default="") + + class RestartStepIndex(BaseModel): """MCP重新规划的步骤索引""" @@ -129,14 +144,6 @@ class RestartStepIndex(BaseModel): reasoning: str = Field(description="重新规划的原因") -class Risk(str, Enum): - """MCP工具风险类型""" - - LOW = "low" - MEDIUM = "medium" - HIGH = "high" - - class ToolSkip(BaseModel): """MCP工具跳过执行结果"""