diff --git a/apps/routers/conversation.py b/apps/routers/conversation.py index 2f49f803733c105362e2ba36cae2032f58d6ee77..e56623f4e64b65076b186b9fc4f816aa313a6d27 100644 --- a/apps/routers/conversation.py +++ b/apps/routers/conversation.py @@ -6,7 +6,7 @@ import uuid from datetime import UTC, datetime from typing import Annotated -from fastapi import APIRouter, Body, Depends, Query, Request, status +from fastapi import APIRouter, Depends, Query, Request, status from fastapi.responses import JSONResponse from apps.dependency import verify_personal_token, verify_session diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 4d34b85b6855478d0686c8fa76a1b51433587067..663f6827471308954affb28c450e33040ee105cb 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -1,64 +1,65 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP Agent执行器""" -import anyio import logging import uuid -from pydantic import Field -from typing import Any + +import anyio from mcp.types import TextContent -from apps.llm.patterns.rewrite import QuestionRewrite +from pydantic import Field + from apps.llm.reasoning import ReasoningLLM from apps.scheduler.executor.base import BaseExecutor -from apps.schemas.enum_var import EventType, SpecialCallType, FlowStatus, StepStatus from apps.scheduler.mcp_agent.host import MCPHost from apps.scheduler.mcp_agent.plan import MCPPlanner -from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID, MCPSelector -from apps.scheduler.pool.mcp.client import MCPClient +from apps.scheduler.pool.mcp.pool import MCPPool +from apps.schemas.enum_var import EventType, FlowStatus, StepStatus from apps.schemas.mcp import ( - GoalEvaluationResult, - RestartStepIndex, - ToolRisk, - ErrorType, - ToolExcutionErrorType, - MCPPlan, MCPCollection, MCPTool, - Step + Step, ) -from apps.scheduler.pool.mcp.pool import MCPPool -from apps.schemas.task import ExecutorState, FlowStepHistory, StepQueueItem -from apps.schemas.message import param -from apps.services.task import TaskManager +from apps.schemas.message import FlowParams +from apps.schemas.task import FlowStepHistory from apps.services.appcenter import AppCenterManager from apps.services.mcp_service import MCPServiceManager +from apps.services.task import TaskManager from apps.services.user import UserManager + logger = logging.getLogger(__name__) +FINAL_TOOL_ID = "FIANL" class MCPAgentExecutor(BaseExecutor): """MCP Agent执行器""" max_steps: int = Field(default=20, description="最大步数") - servers_id: list[str] = Field(description="MCP server id") agent_id: str = Field(default="", description="Agent ID") - agent_description: str = Field(default="", description="Agent描述") mcp_list: list[MCPCollection] = Field(description="MCP服务器列表", default=[]) mcp_pool: MCPPool = Field(description="MCP池", default=MCPPool()) tools: dict[str, MCPTool] = Field( - description="MCP工具列表,key为tool_id", default={} + description="MCP工具列表,key为tool_id", + default={}, ) tool_list: list[MCPTool] = Field( - description="MCP工具列表,包含所有MCP工具", default=[] + description="MCP工具列表,包含所有MCP工具", + default=[], ) - params: param | bool | None = Field( - default=None, description="流执行过程中的参数补充", alias="params" + params: FlowParams | bool | None = Field( + default=None, + description="流执行过程中的参数补充", + alias="params", ) resoning_llm: ReasoningLLM = Field( default=ReasoningLLM(), description="推理大模型", ) + async def init(self) -> None: + """初始化Executor""" + self.planner = MCPPlanner(self.task.runtime.question, self.resoning_llm) + self.host = MCPHost(self.task.runtime.question) + async def update_tokens(self) -> None: """更新令牌数""" self.task.tokens.input_tokens = self.resoning_llm.input_tokens @@ -89,43 +90,52 @@ class MCPAgentExecutor(BaseExecutor): continue self.mcp_list.append(mcp_service) - await self.mcp_pool._init_mcp(mcp_id, self.task.ids.user_sub) + await self.mcp_pool.init_mcp(mcp_id, self.task.ids.user_sub) for tool in mcp_service.tools: self.tools[tool.id] = tool self.tool_list.extend(mcp_service.tools) self.tools[FINAL_TOOL_ID] = MCPTool( - id=FINAL_TOOL_ID, - name="Final Tool", - description="结束流程的工具", - mcp_id="", - input_schema={} + id=FINAL_TOOL_ID, name="Final Tool", description="结束流程的工具", mcp_id="", input_schema={}, + ) + self.tool_list.append( + MCPTool(id=FINAL_TOOL_ID, name="Final Tool", description="结束流程的工具", mcp_id="", input_schema={}), ) - self.tool_list.append(MCPTool(id=FINAL_TOOL_ID, name="Final Tool", - description="结束流程的工具", mcp_id="", input_schema={})) - async def get_tool_input_param(self, is_first: bool) -> None: + async def get_tool_input_param(self, *, is_first: bool) -> None: + """获取工具输入参数""" if is_first: # 获取第一个输入参数 mcp_tool = self.tools[self.task.state.tool_id] - self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task) + self.task.state.current_input = await self.host.get_first_input_params( + mcp_tool, self.task.state.step_description, self.task, + ) else: # 获取后续输入参数 - if isinstance(self.params, param): + if isinstance(self.params, FlowParams): params = self.params.content params_description = self.params.description else: params = {} params_description = "" mcp_tool = self.tools[self.task.state.tool_id] - self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task.state.current_input, self.task.state.error_message, params, params_description) + self.task.state.current_input = await self.host.fill_params( + mcp_tool, + self.task.state.step_description, + self.task.state.current_input, + self.task.state.error_message, + params, + params_description, + ) async def confirm_before_step(self) -> None: + """确认前步骤""" # 发送确认消息 mcp_tool = self.tools[self.task.state.tool_id] - confirm_message = await MCPPlanner.get_tool_risk(mcp_tool, self.task.state.current_input, "", self.resoning_llm) + confirm_message = await self.planner.get_tool_risk(mcp_tool, self.task.state.current_input, "") await self.update_tokens() - await self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( - exclude_none=True, by_alias=True)) + await self.push_message( + EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(exclude_none=True, by_alias=True), + ) await self.push_message(EventType.FLOW_STOP, {}) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.WAITING @@ -142,27 +152,30 @@ class MCPAgentExecutor(BaseExecutor): input_data={}, output_data={}, ex_data=confirm_message.model_dump(exclude_none=True, by_alias=True), - ) + ), ) - async def run_step(self): + async def run_step(self) -> None: """执行步骤""" self.task.state.flow_status = FlowStatus.RUNNING self.task.state.step_status = StepStatus.RUNNING mcp_tool = self.tools[self.task.state.tool_id] - mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub)) + mcp_client = await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub) + if mcp_client is None: + logger.error("[MCPAgentExecutor] MCP客户端不存在: %s", mcp_tool.mcp_id) + self.task.state.step_status = StepStatus.ERROR + return try: output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input) - except anyio.ClosedResourceError as e: - import traceback - logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, traceback.format_exc()) + except anyio.ClosedResourceError: + logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcp_id) await self.mcp_pool.stop(mcp_tool.mcp_id, self.task.ids.user_sub) - await self.mcp_pool._init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub) - logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, str(e)) + await self.mcp_pool.init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub) self.task.state.step_status = StepStatus.ERROR return except Exception as e: import traceback + logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误: %s", mcp_tool.name, traceback.format_exc()) self.task.state.step_status = StepStatus.ERROR self.task.state.error_message = str(e) @@ -184,14 +197,8 @@ class MCPAgentExecutor(BaseExecutor): } await self.update_tokens() - await self.push_message( - EventType.STEP_INPUT, - self.task.state.current_input - ) - await self.push_message( - EventType.STEP_OUTPUT, - output_params - ) + await self.push_message(EventType.STEP_INPUT, self.task.state.current_input) + await self.push_message(EventType.STEP_OUTPUT, output_params) self.task.context.append( FlowStepHistory( task_id=self.task.id, @@ -204,37 +211,26 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data=self.task.state.current_input, output_data=output_params, - ) + ), ) self.task.state.step_status = StepStatus.SUCCESS async def generate_params_with_null(self) -> None: """生成参数补充""" mcp_tool = self.tools[self.task.state.tool_id] - params_with_null = await MCPPlanner.get_missing_param( - mcp_tool, - self.task.state.current_input, - self.task.state.error_message, - self.resoning_llm + params_with_null = await self.planner.get_missing_param( + mcp_tool, self.task.state.current_input, self.task.state.error_message, ) await self.update_tokens() - error_message = await MCPPlanner.change_err_message_to_description( + error_message = await self.planner.change_err_message_to_description( error_message=self.task.state.error_message, tool=mcp_tool, input_params=self.task.state.current_input, - reasoning_llm=self.resoning_llm - ) - await self.push_message( - EventType.STEP_WAITING_FOR_PARAM, - data={ - "message": error_message, - "params": params_with_null - } ) await self.push_message( - EventType.FLOW_STOP, - data={} + EventType.STEP_WAITING_FOR_PARAM, data={"message": error_message, "params": params_with_null}, ) + await self.push_message(EventType.FLOW_STOP, data={}) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.PARAM self.task.context.append( @@ -249,33 +245,31 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data={}, output_data={}, - ex_data={ - "message": error_message, - "params": params_with_null - } - ) + ex_data={"message": error_message, "params": params_with_null}, + ), ) async def get_next_step(self) -> None: + """获取下一步""" if self.task.state.step_cnt < self.max_steps: self.task.state.step_cnt += 1 history = await MCPHost.assemble_memory(self.task) max_retry = 3 step = None - for i in range(max_retry): - step = await MCPPlanner.create_next_step(self.task.runtime.question, history, self.tool_list) - if step.tool_id in self.tools.keys(): - break - if step is None or step.tool_id not in self.tools.keys(): + for _ in range(max_retry): + try: + step = await self.planner.create_next_step(history, self.tool_list) + if step.tool_id in self.tools: + break + except Exception as e: # noqa: BLE001 + logger.warning("[MCPAgentExecutor] 获取下一步失败,重试中: %s", str(e)) + if step is None or step.tool_id not in self.tools: step = Step( tool_id=FINAL_TOOL_ID, - description=FINAL_TOOL_ID + description=FINAL_TOOL_ID, ) tool_id = step.tool_id - if tool_id == FINAL_TOOL_ID: - step_name = FINAL_TOOL_ID - else: - step_name = self.tools[tool_id].name + step_name = FINAL_TOOL_ID if tool_id == FINAL_TOOL_ID else self.tools[tool_id].name step_description = step.description self.task.state.step_id = str(uuid.uuid4()) self.task.state.tool_id = tool_id @@ -286,16 +280,12 @@ class MCPAgentExecutor(BaseExecutor): else: # 没有下一步了,结束流程 self.task.state.tool_id = FINAL_TOOL_ID - return async def error_handle_after_step(self) -> None: """步骤执行失败后的错误处理""" self.task.state.step_status = StepStatus.ERROR self.task.state.flow_status = FlowStatus.ERROR - await self.push_message( - EventType.FLOW_FAILED, - data={} - ) + await self.push_message(EventType.FLOW_FAILED, data={}) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: del self.task.context[-1] self.task.context.append( @@ -310,18 +300,18 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data={}, output_data={}, - ) + ), ) - async def work(self) -> None: + async def work(self) -> None: # noqa: C901, PLR0912, PLR0915 """执行当前步骤""" if self.task.state.step_status == StepStatus.INIT: - await self.push_message( - EventType.STEP_INIT, - data={} - ) + await self.push_message(EventType.STEP_INIT, data={}) await self.get_tool_input_param(is_first=True) user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) + if user_info is None: + logger.error("[MCPAgentExecutor] 用户信息不存在: %s", self.task.ids.user_sub) + return if not user_info.auto_execute: # 等待用户确认 await self.confirm_before_step() @@ -338,14 +328,8 @@ class MCPAgentExecutor(BaseExecutor): else: self.task.state.flow_status = FlowStatus.CANCELLED self.task.state.step_status = StepStatus.CANCELLED - await self.push_message( - EventType.STEP_CANCEL, - data={} - ) - await self.push_message( - EventType.FLOW_CANCEL, - data={} - ) + await self.push_message(EventType.STEP_CANCEL, data={}) + await self.push_message(EventType.FLOW_CANCEL, data={}) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: self.task.context[-1].step_status = StepStatus.CANCELLED if self.task.state.step_status == StepStatus.PARAM: @@ -363,12 +347,15 @@ class MCPAgentExecutor(BaseExecutor): await self.error_handle_after_step() else: user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) + if user_info is None: + logger.error("[MCPAgentExecutor] 用户信息不存在: %s", self.task.ids.user_sub) + return if user_info.auto_execute: await self.push_message( EventType.STEP_ERROR, data={ "message": self.task.state.error_message, - } + }, ) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: self.task.context[-1].step_status = StepStatus.ERROR @@ -378,9 +365,8 @@ class MCPAgentExecutor(BaseExecutor): await self.get_next_step() else: mcp_tool = self.tools[self.task.state.tool_id] - is_param_error = await MCPPlanner.is_param_error( - self.task.runtime.question, - await MCPHost.assemble_memory(self.task), + is_param_error = await self.planner.is_param_error( + await self.host.assemble_memory(self.task), self.task.state.error_message, mcp_tool, self.task.state.step_description, @@ -394,7 +380,7 @@ class MCPAgentExecutor(BaseExecutor): EventType.STEP_ERROR, data={ "message": self.task.state.error_message, - } + }, ) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: self.task.context[-1].step_status = StepStatus.ERROR @@ -406,18 +392,12 @@ class MCPAgentExecutor(BaseExecutor): await self.get_next_step() async def summarize(self) -> None: - async for chunk in MCPPlanner.generate_answer( - self.task.runtime.question, - (await MCPHost.assemble_memory(self.task)), - self.resoning_llm - ): - await self.push_message( - EventType.TEXT_ADD, - data=chunk - ) + """总结""" + async for chunk in self.planner.generate_answer(await self.host.assemble_memory(self.task)): + await self.push_message(EventType.TEXT_ADD, data=chunk) self.task.runtime.answer += chunk - async def run(self) -> None: + async def run(self) -> None: # noqa: C901 """执行MCP Agent的主逻辑""" # 初始化MCP服务 await self.load_state() @@ -426,32 +406,21 @@ class MCPAgentExecutor(BaseExecutor): # 初始化状态 try: self.task.state.flow_id = str(uuid.uuid4()) - self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm) + self.task.state.flow_name = await self.planner.get_flow_name() await TaskManager.save_task(self.task.id, self.task) await self.get_next_step() except Exception as e: - import traceback - logger.error("[MCPAgentExecutor] 初始化失败: %s", traceback.format_exc()) - logger.error("[MCPAgentExecutor] 初始化失败: %s", str(e)) + logger.exception("[MCPAgentExecutor] 初始化失败") self.task.state.flow_status = FlowStatus.ERROR self.task.state.error_message = str(e) - await self.push_message( - EventType.FLOW_FAILED, - data={} - ) + await self.push_message(EventType.FLOW_FAILED, data={}) return self.task.state.flow_status = FlowStatus.RUNNING - await self.push_message( - EventType.FLOW_START, - data={} - ) + await self.push_message(EventType.FLOW_START, data={}) if self.task.state.tool_id == FINAL_TOOL_ID: # 如果已经是最后一步,直接结束 self.task.state.flow_status = FlowStatus.SUCCESS - await self.push_message( - EventType.FLOW_SUCCESS, - data={} - ) + await self.push_message(EventType.FLOW_SUCCESS, data={}) await self.summarize() return try: @@ -465,26 +434,15 @@ class MCPAgentExecutor(BaseExecutor): # 如果已经是最后一步,直接结束 self.task.state.flow_status = FlowStatus.SUCCESS self.task.state.step_status = StepStatus.SUCCESS - await self.push_message( - EventType.FLOW_SUCCESS, - data={} - ) + await self.push_message(EventType.FLOW_SUCCESS, data={}) await self.summarize() except Exception as e: - import traceback - logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", traceback.format_exc()) - logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e)) + logger.exception("[MCPAgentExecutor] 执行过程中发生错误") self.task.state.flow_status = FlowStatus.ERROR self.task.state.error_message = str(e) self.task.state.step_status = StepStatus.ERROR - await self.push_message( - EventType.STEP_ERROR, - data={} - ) - await self.push_message( - EventType.FLOW_FAILED, - data={} - ) + await self.push_message(EventType.STEP_ERROR, data={}) + await self.push_message(EventType.FLOW_FAILED, data={}) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: del self.task.context[-1] self.task.context.append( @@ -499,12 +457,11 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data={}, output_data={}, - ) + ), ) finally: for mcp_service in self.mcp_list: try: await self.mcp_pool.stop(mcp_service.id, self.task.ids.user_sub) - except Exception as e: - import traceback - logger.error("[MCPAgentExecutor] 停止MCP客户端时发生错误: %s", traceback.format_exc()) + except Exception: + logger.exception("[MCPAgentExecutor] 停止MCP客户端时发生错误") diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py index f062a551c5b3d797fa0f77ad0efd1a8e135eeb52..b3f71c4a9a9c85b254f6669e02db60451b833d31 100644 --- a/apps/scheduler/mcp_agent/base.py +++ b/apps/scheduler/mcp_agent/base.py @@ -1,3 +1,6 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""MCP基类""" + import json import logging from typing import Any @@ -10,9 +13,12 @@ from apps.llm.reasoning import ReasoningLLM logger = logging.getLogger(__name__) -class McpBase: - @staticmethod - async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: +class MCPBase: + """MCP基类""" + + llm: ReasoningLLM + + async def get_resoning_result(self, prompt: str) -> str: """获取推理结果""" # 调用推理大模型 message = [ @@ -20,7 +26,7 @@ class McpBase: {"role": "user", "content": prompt}, ] result = "" - async for chunk in resoning_llm.call( + async for chunk in self.llm.call( message, streaming=False, temperature=0.07, @@ -31,7 +37,10 @@ class McpBase: return result @staticmethod - async def _parse_result(result: str, schema: dict[str, Any], left_str: str = '{', right_str: str = '}') -> str: + async def _parse_result( + result: str, + schema: dict[str, Any], left_str: str = "{", right_str: str = "}", + ) -> dict[str, Any]: """解析推理结果""" left_index = result.find(left_str) right_index = result.rfind(right_str) @@ -43,21 +52,21 @@ class McpBase: flag = False if flag: try: - tmp_js = json.loads(result[left_index:right_index + 1]) + tmp_js = json.loads(result[left_index : right_index + 1]) validate(instance=tmp_js, schema=schema) - except Exception as e: - logger.error("[McpBase] 解析结果失败: %s", e) + except Exception: + logger.exception("[McpBase] 解析结果失败") flag = False if not flag: json_generator = JsonGenerator( "请提取下面内容中的json\n\n", [ {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "请提取下面内容中的json\n\n"+result}, + {"role": "user", "content": "请提取下面内容中的json\n\n" + result}, ], schema, ) json_result = await json_generator.generate() else: - json_result = json.loads(result[left_index:right_index + 1]) + json_result = json.loads(result[left_index : right_index + 1]) return json_result diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index fde8e39acb6fa41e474307137a4584bba029fac2..3fc7bfc746a631143fbee5cc5b1036306d41759e 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -9,31 +9,39 @@ from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from apps.llm.function import JsonGenerator +from apps.llm.reasoning import ReasoningLLM from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE -from apps.scheduler.mcp_agent.base import McpBase +from apps.scheduler.mcp_agent.base import MCPBase from apps.scheduler.mcp_agent.prompt import GEN_PARAMS, REPAIR_PARAMS from apps.schemas.mcp import MCPTool from apps.schemas.task import Task - -def tojson_filter(value: Any) -> str: - """将值转换为JSON字符串""" - return json.dumps(value, ensure_ascii=False, separators=(",", ":")) - - logger = logging.getLogger(__name__) _env = SandboxedEnvironment( loader=BaseLoader, autoescape=False, trim_blocks=True, lstrip_blocks=True, - filters={"tojson": tojson_filter}, ) -class MCPHost(McpBase): +def tojson_filter(value: dict[str, Any]) -> str: + """将字典转换为紧凑JSON字符串""" + return json.dumps(value, ensure_ascii=False, separators=(",", ":")) + + +_env.filters["tojson"] = tojson_filter + + +class MCPHost(MCPBase): """MCP宿主服务""" + def __init__(self, goal: str, llm: ReasoningLLM) -> None: + """初始化MCP宿主服务""" + super().__init__() + self.goal = goal + self.llm = llm + @staticmethod async def assemble_memory(task: Task) -> str: """组装记忆""" @@ -41,43 +49,42 @@ class MCPHost(McpBase): context_list=task.context, ) - @staticmethod - async def _get_first_input_params(mcp_tool: MCPTool, goal: str, current_goal: str, task: Task, - resoning_llm: ReasoningLLM = ReasoningLLM()) -> dict[str, Any]: + async def get_first_input_params( + self, mcp_tool: MCPTool, current_goal: str, task: Task, + ) -> dict[str, Any]: """填充工具参数""" # 更清晰的输入·指令,这样可以调用generate prompt = _env.from_string(GEN_PARAMS).render( tool_name=mcp_tool.name, tool_description=mcp_tool.description, - goal=goal, + goal=self.goal, current_goal=current_goal, input_schema=mcp_tool.input_schema, background_info=await MCPHost.assemble_memory(task), ) logger.info("[MCPHost] 填充工具参数: %s", prompt) - result = await MCPHost.get_resoning_result( - prompt, - resoning_llm - ) + result = await self.get_resoning_result(prompt) # 使用JsonGenerator解析结果 - result = await MCPHost._parse_result( + return await MCPHost._parse_result( result, mcp_tool.input_schema, ) - return result - @staticmethod - async def _fill_params( # noqa: PLR0913 - goal: str, current_goal: str, - mcp_tool: MCPTool, current_input: dict[str, Any], - error_message: str = "", params: dict[str, Any] | None = None, - params_description: str = "") -> dict[str, Any]: + async def fill_params( # noqa: D102, PLR0913 + self, + mcp_tool: MCPTool, + current_goal: str, + current_input: dict[str, Any], + error_message: str = "", + params: dict[str, Any] | None = None, + params_description: str = "", + ) -> dict[str, Any]: llm_query = "请生成修复之后的工具参数" prompt = _env.from_string(REPAIR_PARAMS).render( tool_name=mcp_tool.name, - tool_description=mcp_tool.description, - goal=goal, + goal=self.goal, current_goal=current_goal, + tool_description=mcp_tool.description, input_schema=mcp_tool.input_schema, current_input=current_input, error_message=error_message, diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index 0d707d2d97ad55834942c937e8432fc6fa6834ea..ae4bcf65b02f7f5853aa5f2536b4f784adfec9d0 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -1,44 +1,31 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP 用户目标拆解与规划""" +import logging from collections.abc import AsyncGenerator from typing import Any from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment -from apps.llm.function import JsonGenerator from apps.llm.reasoning import ReasoningLLM -from apps.scheduler.mcp_agent.base import McpBase +from apps.scheduler.mcp_agent.base import MCPBase from apps.scheduler.mcp_agent.prompt import ( CHANGE_ERROR_MESSAGE_TO_DESCRIPTION, - CREATE_PLAN, - EVALUATE_GOAL, FINAL_ANSWER, GEN_STEP, GENERATE_FLOW_NAME, GET_MISSING_PARAMS, - GET_REPLAN_START_STEP_INDEX, IS_PARAM_ERROR, - RECREATE_PLAN, RISK_EVALUATE, - TOOL_EXECUTE_ERROR_TYPE_ANALYSIS, - TOOL_SKIP, ) from apps.scheduler.slot.slot import Slot from apps.schemas.mcp import ( - GoalEvaluationResult, IsParamError, - MCPPlan, - MCPPlanItem, MCPTool, - RestartStepIndex, Step, - ToolExcutionErrorType, ToolRisk, - ToolSkip, ) -from apps.schemas.task import Task _env = SandboxedEnvironment( loader=BaseLoader, @@ -46,141 +33,33 @@ _env = SandboxedEnvironment( trim_blocks=True, lstrip_blocks=True, ) +logger = logging.getLogger(__name__) -class MCPPlanner(McpBase): +class MCPPlanner(MCPBase): """MCP 用户目标拆解与规划""" - @staticmethod - async def evaluate_goal( - goal: str, - tool_list: list[MCPTool], - resoning_llm: ReasoningLLM = ReasoningLLM()) -> GoalEvaluationResult: - """评估用户目标的可行性""" - # 获取推理结果 - result = await MCPPlanner._get_reasoning_evaluation(goal, tool_list, resoning_llm) - - # 解析为结构化数据 - evaluation = await MCPPlanner._parse_evaluation_result(result) - - # 返回评估结果 - return evaluation - - @staticmethod - async def _get_reasoning_evaluation( - goal, tool_list: list[MCPTool], - resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: - """获取推理大模型的评估结果""" - template = _env.from_string(EVALUATE_GOAL) - prompt = template.render( - goal=goal, - tools=tool_list, - ) - result = await MCPPlanner.get_resoning_result(prompt, resoning_llm) - return result + goal: str + llm: ReasoningLLM - @staticmethod - async def _parse_evaluation_result(result: str) -> GoalEvaluationResult: - """将推理结果解析为结构化数据""" - schema = GoalEvaluationResult.model_json_schema() - evaluation = await MCPPlanner._parse_result(result, schema) - # 使用GoalEvaluationResult模型解析结果 - return GoalEvaluationResult.model_validate(evaluation) + def __init__(self, goal: str, llm: ReasoningLLM) -> None: + """初始化MCPPlanner""" + super().__init__() + self.goal = goal + self.llm = llm - async def get_flow_name(user_goal: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + async def get_flow_name(self) -> str: """获取当前流程的名称""" - result = await MCPPlanner._get_reasoning_flow_name(user_goal, resoning_llm) - return result - - @staticmethod - async def _get_reasoning_flow_name(user_goal: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: - """获取推理大模型的流程名称""" template = _env.from_string(GENERATE_FLOW_NAME) - prompt = template.render(goal=user_goal) - result = await MCPPlanner.get_resoning_result(prompt, resoning_llm) - return result + prompt = template.render(goal=self.goal) + return await self.get_resoning_result(prompt) - @staticmethod - async def get_replan_start_step_index( - user_goal: str, error_message: str, current_plan: MCPPlan | None = None, - history: str = "", - reasoning_llm: ReasoningLLM = ReasoningLLM()) -> RestartStepIndex: - """获取重新规划的步骤索引""" - # 获取推理结果 - template = _env.from_string(GET_REPLAN_START_STEP_INDEX) - prompt = template.render( - goal=user_goal, - error_message=error_message, - current_plan=current_plan.model_dump(exclude_none=True, by_alias=True), - history=history, - ) - result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) - # 解析为结构化数据 - schema = RestartStepIndex.model_json_schema() - schema["properties"]["start_index"]["maximum"] = len(current_plan.plans) - 1 - schema["properties"]["start_index"]["minimum"] = 0 - restart_index = await MCPPlanner._parse_result(result, schema) - # 使用RestartStepIndex模型解析结果 - return RestartStepIndex.model_validate(restart_index) - - @staticmethod - async def create_plan( - user_goal: str, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan | None = None, - tool_list: list[MCPTool] = [], - max_steps: int = 6, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> MCPPlan: - """规划下一步的执行流程,并输出""" - # 获取推理结果 - result = await MCPPlanner._get_reasoning_plan(user_goal, is_replan, error_message, current_plan, tool_list, max_steps, reasoning_llm) - - # 解析为结构化数据 - return await MCPPlanner._parse_plan_result(result, max_steps) - - @staticmethod - async def _get_reasoning_plan( - user_goal: str, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan | None = None, - tool_list: list[MCPTool] = [], - max_steps: int = 10, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str: - """获取推理大模型的结果""" - # 格式化Prompt - tool_ids = [tool.id for tool in tool_list] - if is_replan: - template = _env.from_string(RECREATE_PLAN) - prompt = template.render( - current_plan=current_plan.model_dump(exclude_none=True, by_alias=True), - error_message=error_message, - goal=user_goal, - tools=tool_list, - max_num=max_steps, - ) - else: - template = _env.from_string(CREATE_PLAN) - prompt = template.render( - goal=user_goal, - tools=tool_list, - max_num=max_steps, - ) - result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) - return result - - @staticmethod - async def _parse_plan_result(result: str, max_steps: int) -> MCPPlan: - """将推理结果解析为结构化数据""" - # 格式化Prompt - schema = MCPPlan.model_json_schema() - schema["properties"]["plans"]["maxItems"] = max_steps - plan = await MCPPlanner._parse_result(result, schema) - # 使用Function模型解析结果 - return MCPPlan.model_validate(plan) - - @staticmethod - async def create_next_step( - goal: str, history: str, tools: list[MCPTool], - reasoning_llm: ReasoningLLM = ReasoningLLM()) -> Step: + async def create_next_step(self, history: str, tools: list[MCPTool]) -> Step: """创建下一步的执行步骤""" # 获取推理结果 template = _env.from_string(GEN_STEP) - prompt = template.render(goal=goal, history=history, tools=tools) - result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) + prompt = template.render(goal=self.goal, history=history, tools=tools) + result = await self.get_resoning_result(prompt) # 解析为结构化数据 schema = Step.model_json_schema() @@ -188,54 +67,19 @@ class MCPPlanner(McpBase): schema["properties"]["tool_id"]["enum"] = [] for tool in tools: schema["properties"]["tool_id"]["enum"].append(tool.id) - step = await MCPPlanner._parse_result(result, schema) + step = await self._parse_result(result, schema) + logger.info("[MCPPlanner] 创建下一步的执行步骤: %s", step) # 使用Step模型解析结果 return Step.model_validate(step) - @staticmethod - async def tool_skip( - task: Task, step_id: str, step_name: str, step_instruction: str, step_content: str, - reasoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolSkip: - """判断当前步骤是否需要跳过""" - # 获取推理结果 - template = _env.from_string(TOOL_SKIP) - from apps.scheduler.mcp_agent.host import MCPHost - history = await MCPHost.assemble_memory(task) - prompt = template.render( - step_id=step_id, - step_name=step_name, - step_instruction=step_instruction, - step_content=step_content, - history=history, - goal=task.runtime.question - ) - result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) - - # 解析为结构化数据 - schema = ToolSkip.model_json_schema() - skip_result = await MCPPlanner._parse_result(result, schema) - # 使用ToolSkip模型解析结果 - return ToolSkip.model_validate(skip_result) - - @staticmethod async def get_tool_risk( - tool: MCPTool, input_parm: dict[str, Any], - additional_info: str = "", resoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolRisk: + self, + tool: MCPTool, + input_param: dict[str, Any], + additional_info: str = "", + ) -> ToolRisk: """获取MCP工具的风险评估结果""" # 获取推理结果 - result = await MCPPlanner._get_reasoning_risk(tool, input_parm, additional_info, resoning_llm) - - # 解析为结构化数据 - risk = await MCPPlanner._parse_risk_result(result) - - # 返回风险评估结果 - return risk - - @staticmethod - async def _get_reasoning_risk( - tool: MCPTool, input_param: dict[str, Any], - additional_info: str, resoning_llm: ReasoningLLM) -> str: - """获取推理大模型的风险评估结果""" template = _env.from_string(RISK_EVALUATE) prompt = template.render( tool_name=tool.name, @@ -243,65 +87,26 @@ class MCPPlanner(McpBase): input_param=input_param, additional_info=additional_info, ) - result = await MCPPlanner.get_resoning_result(prompt, resoning_llm) - return result + result = await self.get_resoning_result(prompt) - @staticmethod - async def _parse_risk_result(result: str) -> ToolRisk: - """将推理结果解析为结构化数据""" schema = ToolRisk.model_json_schema() - risk = await MCPPlanner._parse_result(result, schema) - # 使用ToolRisk模型解析结果 - return ToolRisk.model_validate(risk) + risk = await self._parse_result(result, schema) - @staticmethod - async def _get_reasoning_tool_execute_error_type( - user_goal: str, current_plan: MCPPlan, - tool: MCPTool, input_param: dict[str, Any], - error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str: - """获取推理大模型的工具执行错误类型""" - template = _env.from_string(TOOL_EXECUTE_ERROR_TYPE_ANALYSIS) - prompt = template.render( - goal=user_goal, - current_plan=current_plan.model_dump(exclude_none=True, by_alias=True), - tool_name=tool.name, - tool_description=tool.description, - input_param=input_param, - error_message=error_message, - ) - result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) - return result - - @staticmethod - async def _parse_tool_execute_error_type_result(result: str) -> ToolExcutionErrorType: - """将推理结果解析为工具执行错误类型""" - schema = ToolExcutionErrorType.model_json_schema() - error_type = await MCPPlanner._parse_result(result, schema) - # 使用ToolExcutionErrorType模型解析结果 - return ToolExcutionErrorType.model_validate(error_type) - - @staticmethod - async def get_tool_execute_error_type( - user_goal: str, current_plan: MCPPlan, - tool: MCPTool, input_param: dict[str, Any], - error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolExcutionErrorType: - """获取MCP工具执行错误类型""" - # 获取推理结果 - result = await MCPPlanner._get_reasoning_tool_execute_error_type( - user_goal, current_plan, tool, input_param, error_message, reasoning_llm) - error_type = await MCPPlanner._parse_tool_execute_error_type_result(result) - # 返回工具执行错误类型 - return error_type + # 返回风险评估结果 + return ToolRisk.model_validate(risk) - @staticmethod async def is_param_error( - goal: str, history: str, error_message: str, tool: MCPTool, step_description: str, input_params: dict - [str, Any], - reasoning_llm: ReasoningLLM = ReasoningLLM()) -> IsParamError: + self, + history: str, + error_message: str, + tool: MCPTool, + step_description: str, + input_params: dict[str, Any], + ) -> IsParamError: """判断错误信息是否是参数错误""" tmplate = _env.from_string(IS_PARAM_ERROR) prompt = tmplate.render( - goal=goal, + goal=self.goal, history=history, step_id=tool.id, step_name=tool.name, @@ -309,17 +114,16 @@ class MCPPlanner(McpBase): input_params=input_params, error_message=error_message, ) - result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) + result = await self.get_resoning_result(prompt) # 解析为结构化数据 schema = IsParamError.model_json_schema() - is_param_error = await MCPPlanner._parse_result(result, schema) + is_param_error = await self._parse_result(result, schema) # 使用IsParamError模型解析结果 return IsParamError.model_validate(is_param_error) - @staticmethod async def change_err_message_to_description( - error_message: str, tool: MCPTool, input_params: dict[str, Any], - reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + self, error_message: str, tool: MCPTool, input_params: dict[str, Any], + ) -> str: """将错误信息转换为工具描述""" template = _env.from_string(CHANGE_ERROR_MESSAGE_TO_DESCRIPTION) prompt = template.render( @@ -329,14 +133,9 @@ class MCPPlanner(McpBase): input_schema=tool.input_schema, input_params=input_params, ) - result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) - return result + return await self.get_resoning_result(prompt) - @staticmethod - async def get_missing_param( - tool: MCPTool, - input_param: dict[str, Any], - error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> list[str]: + async def get_missing_param(self, tool: MCPTool, input_param: dict[str, Any], error_message: str) -> dict[str, Any]: """获取缺失的参数""" slot = Slot(schema=tool.input_schema) template = _env.from_string(GET_MISSING_PARAMS) @@ -348,22 +147,20 @@ class MCPPlanner(McpBase): schema=schema_with_null, error_message=error_message, ) - result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) + result = await self.get_resoning_result(prompt) # 解析为结构化数据 - input_param_with_null = await MCPPlanner._parse_result(result, schema_with_null) - return input_param_with_null + return await self._parse_result(result, schema_with_null) - @staticmethod async def generate_answer( - user_goal: str, memory: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> AsyncGenerator[ - str, None]: + self, memory: str, + ) -> AsyncGenerator[str, None]: """生成最终回答""" template = _env.from_string(FINAL_ANSWER) prompt = template.render( memory=memory, - goal=user_goal, + goal=self.goal, ) - async for chunk in resoning_llm.call( + async for chunk in self.llm.call( [{"role": "user", "content": prompt}], streaming=True, temperature=0.07, diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index d51042a9dc4c7ede3cc0151d4fb7b4fcd9bfcfc0..80c95362ad1870333e33a284deceed4801862b91 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -3,175 +3,6 @@ from textwrap import dedent -MCP_SELECT = dedent(r""" - 你是一个乐于助人的智能助手。 - 你的任务是:根据当前目标,选择最合适的MCP Server。 - - ## 选择MCP Server时的注意事项: - - 1. 确保充分理解当前目标,选择最合适的MCP Server。 - 2. 请在给定的MCP Server列表中选择,不要自己生成MCP Server。 - 3. 请先给出你选择的理由,再给出你的选择。 - 4. 当前目标将在下面给出,MCP Server列表也会在下面给出。 - 请将你的思考过程放在"思考过程"部分,将你的选择放在"选择结果"部分。 - 5. 选择必须是JSON格式,严格按照下面的模板,不要输出任何其他内容: - - ```json - { - "mcp": "你选择的MCP Server的名称" - } - ``` - - 6. 下面的示例仅供参考,不要将示例中的内容作为选择MCP Server的依据。 - - ## 示例 - - ### 目标 - - 我需要一个MCP Server来完成一个任务。 - - ### MCP Server列表 - - - **mcp_1**: "MCP Server 1";MCP Server 1的描述 - - **mcp_2**: "MCP Server 2";MCP Server 2的描述 - - ### 请一步一步思考: - - 因为当前目标需要一个MCP Server来完成一个任务,所以选择mcp_1。 - - ### 选择结果 - - ```json - { - "mcp": "mcp_1" - } - ``` - - ## 现在开始! - - ### 目标 - - {{goal}} - - ### MCP Server列表 - - {% for mcp in mcp_list %} - - **{{mcp.id}}**: "{{mcp.name}}";{{mcp.description}} - {% endfor %} - - ### 请一步一步思考: - -""") - -TOOL_SELECT = dedent(r""" - 你是一个乐于助人的智能助手。 - 你的任务是:根据当前目标,附加信息,选择最合适的MCP工具。 - ## 选择MCP工具时的注意事项: - 1. 确保充分理解当前目标,选择实现目标所需的MCP工具。 - 2. 不要选择不存在的工具。 - 3. 可以选择一些辅助工具,但必须确保这些工具与当前目标相关。 - 4. 注意,返回的工具ID必须是MCP工具的ID,而不是名称。 - 5. 可以多选择一些工具,用于应对不同的情况。 - 必须按照以下格式生成选择结果,不要输出任何其他内容: - ```json - { - "tool_ids": ["工具ID1", "工具ID2", ...] - } - ``` - - # 示例 - ## 目标 - 调优mysql性能 - ## MCP工具列表 - - - mcp_tool_1 MySQL链接池工具;用于优化MySQL链接池 - - mcp_tool_2 MySQL性能调优工具;用于分析MySQL性能瓶颈 - - mcp_tool_3 MySQL查询优化工具;用于优化MySQL查询语句 - - mcp_tool_4 MySQL索引优化工具;用于优化MySQL索引 - - mcp_tool_5 文件存储工具;用于存储文件 - - mcp_tool_6 mongoDB工具;用于操作MongoDB数据库 - - ## 附加信息 - 1. 当前MySQL数据库的版本是8.0.26 - 2. 当前MySQL数据库的配置文件路径是/etc/my.cnf,并含有以下配置项 - ```json - { - "max_connections": 1000, - "innodb_buffer_pool_size": "1G", - "query_cache_size": "64M" - } - ##输出 - ```json - { - "tool_ids": ["mcp_tool_1", "mcp_tool_2", "mcp_tool_3", "mcp_tool_4"] - } - ``` - # 现在开始! - ## 目标 - {{goal}} - ## MCP工具列表 - - {% for tool in tools %} - - {{tool.id}} {{tool.name}};{{tool.description}} - {% endfor %} - - ## 附加信息 - {{additional_info}} - # 输出 - """) - -EVALUATE_GOAL = dedent(r""" - 你是一个计划评估器。 - 请根据用户的目标和当前的工具集合以及一些附加信息,判断基于当前的工具集合,是否能够完成用户的目标。 - 如果能够完成,请返回`true`,否则返回`false`。 - 推理过程必须清晰明了,能够让人理解你的判断依据。 - 必须按照以下格式回答: - ```json - { - "can_complete": true/false, - "resoning": "你的推理过程" - } - ``` - - # 样例 - # 目标 - 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 - - # 工具集合 - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - - - mysql_analyzer 分析MySQL数据库性能 - - performance_tuner 调优数据库性能 - - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 - - - # 附加信息 - 1. 当前MySQL数据库的版本是8.0.26 - 2. 当前MySQL数据库的配置文件路径是/etc/my.cnf - - ## - ```json - { - "can_complete": true, - "resoning": "当前的工具集合中包含mysql_analyzer和performance_tuner,能够完成对MySQL数据库的性能分析和调优,因此可以完成用户的目标。" - } - ``` - - # 目标 - {{goal}} - - # 工具集合 - - {% for tool in tools %} - - {{tool.id}} {{tool.name}};{{tool.description}} - {% endfor %} - - - # 附加信息 - {{additional_info}} - -""") - GENERATE_FLOW_NAME = dedent(r""" 你是一个智能助手,你的任务是根据用户的目标,生成一个合适的流程名称。 @@ -192,321 +23,6 @@ GENERATE_FLOW_NAME = dedent(r""" # 输出 """) -GET_REPLAN_START_STEP_INDEX = dedent(r""" - 你是一个智能助手,你的任务是根据用户的目标、报错信息和当前计划和历史,获取重新规划的步骤起始索引。 - - # 样例 - # 目标 - 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 - # 报错信息 - 执行端口扫描命令时,出现了错误:`- bash: curl: command not found`。 - # 当前计划 - ```json - { - "plans": [ - { - "step_id": "step_1", - "content": "生成端口扫描命令", - "tool": "command_generator", - "instruction": "生成端口扫描命令:扫描 - }, - { - "step_id": "step_2", - "content": "在执行Result[0]生成的命令", - "tool": "command_executor", - "instruction": "执行端口扫描命令" - } - ] - } - # 历史 - [ - { - id: "0", - task_id: "task_1", - flow_id: "flow_1", - flow_name: "MYSQL性能调优", - flow_status: "RUNNING", - step_id: "step_1", - step_name: "生成端口扫描命令", - step_description: "生成端口扫描命令:扫描当前MySQL数据库的端口", - step_status: "FAILED", - input_data: { - "command": "nmap -p 3306 - "target": "localhost" - }, - output_data: { - "error": "- bash: curl: command not found" - } - } - ] - # 输出 - { - "start_index": 0, - "reasoning": "当前计划的第一步就失败了,报错信息显示curl命令未找到,可能是因为没有安装curl工具,因此需要从第一步重新规划。" - } - # 现在开始获取重新规划的步骤起始索引: - # 目标 - {{goal}} - # 报错信息 - {{error_message}} - # 当前计划 - {{current_plan}} - # 历史 - {{history}} - # 输出 - """) - -CREATE_PLAN = dedent(r""" - 你是一个计划生成器。 - 请分析用户的目标,并生成一个计划。你后续将根据这个计划,一步一步地完成用户的目标。 - - # 一个好的计划应该: - - 1. 能够成功完成用户的目标 - 2. 计划中的每一个步骤必须且只能使用一个工具。 - 3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。 - 4. 不要选择不存在的工具。 - 5. 计划中的最后一步必须是Final工具,以确保计划执行结束。 - 6. 生成的计划必须要覆盖用户的目标,当然需要考虑一些意外情况,可以有一定的冗余步骤。 - - # 生成计划时的注意事项: - - - 每一条计划包含3个部分: - - 计划内容:描述单个计划步骤的大致内容 - - 工具ID:必须从下文的工具列表中选择 - - 工具指令:改写用户的目标,使其更符合工具的输入要求 - - 必须按照如下格式生成计划,不要输出任何额外数据: - - ```json - { - "plans": [ - { - "content": "计划内容", - "tool": "工具ID", - "instruction": "工具指令" - } - ] - } - ``` - - - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。 -思考过程应放置在 XML标签中。 - - 计划内容中,可以使用"Result[]"来引用之前计划步骤的结果。例如:"Result[3]"表示引用第三条计划执行后的结果。 - - 计划不得多于{{max_num}}条,且每条计划内容应少于150字。 - - # 工具 - - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - - - {% for tool in tools %} - - {{tool.id}} {{tool.name}};{{tool.description}} - {% endfor %} - - - # 样例 - - # 目标 - - 在后台运行一个新的alpine: latest容器,将主机/root文件夹挂载至/data,并执行top命令。 - - # 计划 - - - 1. 这个目标需要使用Docker来完成, 首先需要选择合适的MCP Server - 2. 目标可以拆解为以下几个部分: - - 运行alpine: latest容器 - - 挂载主机目录 - - 在后台运行 - - 执行top命令 - 3. 需要先选择MCP Server, 然后生成Docker命令, 最后执行命令 - ```json - { - "plans": [ - { - "content": "选择一个支持Docker的MCP Server", - "tool": "mcp_selector", - "instruction": "需要一个支持Docker容器运行的MCP Server" - }, - { - "content": "使用第一步选择的MCP Server,生成Docker命令", - "tool": "command_generator", - "instruction": "生成Docker命令:在后台运行alpine:latest容器,挂载/root到/data,执行top命令" - }, - { - "content": "执行第二步生成的Docker命令", - "tool": "command_executor", - "instruction": "执行Docker命令" - }, - { - "content": "任务执行完成,容器已在后台运行", - "tool": "Final", - "instruction": "" - } - ] - } - ``` - - # 现在开始生成计划: - - # 目标 - - {{goal}} - - # 计划 -""") - -RECREATE_PLAN = dedent(r""" - 你是一个计划重建器。 - 请根据用户的目标、当前计划和运行报错,重新生成一个计划。 - - # 一个好的计划应该: - - 1. 能够成功完成用户的目标 - 2. 计划中的每一个步骤必须且只能使用一个工具。 - 3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。 - 4. 你的计划必须避免之前的错误,并且能够成功执行。 - 5. 不要选择不存在的工具。 - 6. 计划中的最后一步必须是Final工具,以确保计划执行结束。 - 7. 生成的计划必须要覆盖用户的目标,当然需要考虑一些意外情况,可以有一定的冗余步骤。 - - # 生成计划时的注意事项: - - - 每一条计划包含3个部分: - - 计划内容:描述单个计划步骤的大致内容 - - 工具ID:必须从下文的工具列表中选择 - - 工具指令:改写用户的目标,使其更符合工具的输入要求 - - 必须按照如下格式生成计划,不要输出任何额外数据: - - ```json - { - "plans": [ - { - "content": "计划内容", - "tool": "工具ID", - "instruction": "工具指令" - } - ] - } - ``` - - - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。 -思考过程应放置在 XML标签中。 - - 计划内容中,可以使用"Result[]"来引用之前计划步骤的结果。例如:"Result[3]"表示引用第三条计划执行后的结果。 - - 计划不得多于{{max_num}}条,且每条计划内容应少于150字。 - - # 样例 - - # 目标 - - 请帮我扫描一下192.168.1.1的这台机器的端口,看看有哪些端口开放。 - # 工具 - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - - - command_generator 生成命令行指令 - - tool_selector 选择合适的工具 - - command_executor 执行命令行指令 - - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 - # 当前计划 - ```json - { - "plans": [ - { - "content": "生成端口扫描命令", - "tool": "command_generator", - "instruction": "生成端口扫描命令:扫描192.168.1.1的开放端口" - }, - { - "content": "在执行第一步生成的命令", - "tool": "command_executor", - "instruction": "执行端口扫描命令" - }, - { - "content": "任务执行完成", - "tool": "Final", - "instruction": "" - } - ] - } - ``` - # 运行报错 - 执行端口扫描命令时,出现了错误:`- bash: curl: command not found`。 - # 重新生成的计划 - - - 1. 这个目标需要使用网络扫描工具来完成, 首先需要选择合适的网络扫描工具 - 2. 目标可以拆解为以下几个部分: - - 生成端口扫描命令 - - 执行端口扫描命令 - 3.但是在执行端口扫描命令时,出现了错误:`- bash: curl: command not found`。 - 4.我将计划调整为: - - 需要先生成一个命令,查看当前机器支持哪些网络扫描工具 - - 执行这个命令,查看当前机器支持哪些网络扫描工具 - - 然后从中选择一个网络扫描工具 - - 基于选择的网络扫描工具,生成端口扫描命令 - - 执行端口扫描命令 - ```json - { - "plans": [ - { - "content": "需要生成一条命令查看当前机器支持哪些网络扫描工具", - "tool": "command_generator", - "instruction": "选择一个前机器支持哪些网络扫描工具" - }, - { - "content": "执行第一步中生成的命令,查看当前机器支持哪些网络扫描工具", - "tool": "command_executor", - "instruction": "执行第一步中生成的命令" - }, - { - "content": "从第二步执行结果中选择一个网络扫描工具,生成端口扫描命令", - "tool": "tool_selector", - "instruction": "选择一个网络扫描工具,生成端口扫描命令" - }, - { - "content": "基于第三步中选择的网络扫描工具,生成端口扫描命令", - "tool": "command_generator", - "instruction": "生成端口扫描命令:扫描192.168.1.1的开放端口" - }, - { - "content": "执行第四步中生成的端口扫描命令", - "tool": "command_executor", - "instruction": "执行端口扫描命令" - }, - { - "content": "任务执行完成", - "tool": "Final", - "instruction": "" - } - ] - } - ``` - - # 现在开始重新生成计划: - - # 目标 - - {{goal}} - - # 工具 - - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - - - {% for tool in tools %} - - {{tool.id}} {{tool.name}};{{tool.description}} - {% endfor %} - - - # 当前计划 - {{current_plan}} - - # 运行报错 - {{error_message}} - - # 重新生成的计划 -""") - GEN_STEP = dedent(r""" 你是一个计划生成器。 请根据用户的目标、当前计划和历史,生成一个新的步骤。 @@ -534,7 +50,8 @@ GEN_STEP = dedent(r""" - mcp_tool_1 mysql_analyzer;用于分析数据库性能/description> - mcp_tool_2 文件存储工具;用于存储文件 - mcp_tool_3 mongoDB工具;用于操作MongoDB数据库 - - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 + - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。\ + # 输出 ```json @@ -565,10 +82,13 @@ GEN_STEP = dedent(r""" - 得到数据:`{"weather": "晴", "temperature": "25°C"}` # 工具 - - mcp_tool_4 maps_geo_planner;将详细的结构化地址转换为经纬度坐标。支持对地标性名胜景区、建筑物名称解析为经纬度坐标 + - mcp_tool_4 maps_geo_planner;将详细的结构化地址转换为经纬度坐标。支持对地标性名胜景区、\ +建筑物名称解析为经纬度坐标 - mcp_tool_5 weather_query;天气查询,用于查询天气信息 - - mcp_tool_6 maps_direction_transit_integrated;根据用户起终点经纬度坐标规划综合各类公共(火车、公交、地铁)交通方式的通勤方案,并且返回通勤方案的数据,跨城场景下必须传起点城市与终点城市 - - Final Final;结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 + - mcp_tool_6 maps_direction_transit_integrated;根据用户起终点经纬度坐标规划综合各类\ +公共交通方式(火车、公交、地铁)的通勤方案,并且返回通勤方案的数据,跨城场景下必须传起点城市与终点城市 + - Final Final;结束步骤,当执行到这一步时,表示计划执行结束,\ +所得到的结果将作为最终结果。 # 输出 ```json @@ -590,61 +110,6 @@ GEN_STEP = dedent(r""" """) -TOOL_SKIP = dedent(r""" - 你是一个计划执行器。 - 你的任务是根据当前的计划和用户目标,判断当前步骤是否需要跳过。 - 如果需要跳过,请返回`true`,否则返回`false`。 - 必须按照以下格式回答: - ```json - { - "skip": true/false, - } - ``` - 注意: - 1.你的判断要谨慎,在历史消息中有足够的上下文信息时,才可以判断是否跳过当前步骤。 - # 样例 - # 用户目标 - 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 - # 历史 - 第1步:生成端口扫描命令 - - 调用工具 `command_generator`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}` - - 执行状态:成功 - - 得到数据:`{"command": "nmap -sS -p--open 192.168.1.1"}` - 第2步:执行端口扫描命令 - - 调用工具 `command_executor`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}` - - 执行状态:成功 - - 得到数据:`{"result": "success"}` - 第3步:分析端口扫描结果 - - 调用工具 `mysql_analyzer`,并提供参数 `{"host": "192.168.1.1", "port": 3306, "username": "root", "password": "password"}` - - 执行状态:成功 - - 得到数据:`{"performance": "good", "bottleneck": "none"}` - # 当前步骤 - - step_4 - command_generator - 生成MySQL性能调优命令 - 生成MySQL性能调优命令:调优MySQL数据库性能 - - # 输出 - ```json - { - "skip": true - } - ``` - # 用户目标 - {{goal}} - # 历史 - {{history}} - # 当前步骤 - - {{step_id}} - {{step_name}} - {{step_instruction}} - {{step_content}} - - # 输出 - """) - RISK_EVALUATE = dedent(r""" 你是一个工具执行计划评估器。 你的任务是根据当前工具的名称、描述和入参以及附加信息,判断当前工具执行的风险并输出提示。 @@ -678,14 +143,15 @@ RISK_EVALUATE = dedent(r""" ```json { "risk": "中", - "reason": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。请确保在非生产环境中执行此操作。" + "reason": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。\ +请确保在非生产环境中执行此操作。" } ``` # 工具 - - {{tool_name}} - {{tool_description}} - + < tool > + < name > {{tool_name}} < /name > + < description > {{tool_description}} < /description > + < / tool > # 工具入参 {{input_param}} # 附加信息 @@ -693,76 +159,6 @@ RISK_EVALUATE = dedent(r""" # 输出 """) -# 根据当前计划和报错信息决定下一步执行,具体计划有需要用户补充工具入参、重计划当前步骤、重计划接下来的所有计划 -TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r""" - 你是一个计划决策器。 - 你的任务是根据用户目标、当前计划、当前使用的工具、工具入参和工具运行报错,决定下一步执行的操作。 - 请根据以下规则进行判断: - 1. 仅通过补充工具入参来解决问题的,返回 missing_param; - 2. 需要重计划当前步骤的,返回 decorrect_plan - 3.推理过程必须清晰明了,能够让人理解你的判断依据,并且不超过100字。 - 你的输出要以json格式返回,格式如下: - ```json - { - "error_type": "missing_param/decorrect_plan, - "reason": "你的推理过程" - } - ``` - # 样例 - # 用户目标 - 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 - # 当前计划 - {"plans": [ - { - "content": "生成端口扫描命令", - "tool": "command_generator", - "instruction": "生成端口扫描命令:扫描192.168.1.1的开放端口" - }, - { - "content": "在执行Result[0]生成的命令", - "tool": "command_executor", - "instruction": "执行端口扫描命令" - }, - { - "content": "任务执行完成,端口扫描结果为Result[2]", - "tool": "Final", - "instruction": "" - } - ]} - # 当前使用的工具 - - command_executor - 执行命令行指令 - - # 工具入参 - { - "command": "nmap -sS -p--open 192.168.1.1" - } - # 工具运行报错 - 执行端口扫描命令时,出现了错误:`- bash: nmap: command not found`。 - # 输出 - ```json - { - "error_type": "decorrect_plan", - "reason": "当前计划的第二步执行失败,报错信息显示nmap命令未找到,可能是因为没有安装nmap工具,因此需要重计划当前步骤。" - } - ``` - # 用户目标 - {{goal}} - # 当前计划 - {{current_plan}} - # 当前使用的工具 - - {{tool_name}} - {{tool_description}} - - # 工具入参 - {{input_param}} - # 工具运行报错 - {{error_message}} - # 输出 - """) - IS_PARAM_ERROR = dedent(r""" 你是一个计划执行专家,你的任务是判断当前的步骤执行失败是否是因为参数错误导致的, 如果是,请返回`true`,否则返回`false`。 @@ -834,10 +230,10 @@ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r""" 5. 只输出自然语言描述,不要输出其他内容。 # 样例 # 工具信息 - - port_scanner - 扫描主机端口 - + < tool > + < name > port_scanner < /name > + < description > 扫描主机端口 < /description > + < input_schema > { "type": "object", "properties": { @@ -860,8 +256,8 @@ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r""" }, "required": ["host", "port", "username", "password"] } - - + < /input_schema > + < / tool > # 工具入参 { "host": "192.0.0.1", @@ -875,13 +271,13 @@ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r""" 扫描端口时发生错误:密码不正确。请检查输入的密码是否正确,并重试。 # 现在开始转换报错信息: # 工具信息 - - {{tool_name}} - {{tool_description}} - + < tool > + < name > {{tool_name}} < /name > + < description > {{tool_description}} < /description > + < input_schema > {{input_schema}} - - + < /input_schema > + < / tool > # 工具入参 {{input_params}} # 报错信息 @@ -960,10 +356,10 @@ GET_MISSING_PARAMS = dedent(r""" } ``` # 工具 - - {{tool_name}} - {{tool_description}} - + < tool > + < name > {{tool_name}} < /name > + < description > {{tool_description}} < /description > + < / tool > # 工具入参 {{input_param}} # 工具入参schema(部分字段允许为null) @@ -973,7 +369,6 @@ GET_MISSING_PARAMS = dedent(r""" # 输出 """) - GEN_PARAMS = dedent(r""" 你是一个工具参数生成器。 你的任务是根据总的目标、阶段性的目标、工具信息、工具入参的schema和背景信息生成工具的入参。 @@ -1034,10 +429,10 @@ GEN_PARAMS = dedent(r""" } ``` # 工具 - - {{tool_name}} - {{tool_description}} - + < tool > + < name > {{tool_name}} < /name > + < description > {{tool_description}} < /description > + < / tool > # 总目标 {{goal}} # 当前阶段目标 @@ -1050,6 +445,7 @@ GEN_PARAMS = dedent(r""" """) REPAIR_PARAMS = dedent(r""" + 你是一个工具参数修复器。 你的任务是根据当前的工具信息、目标、工具入参的schema、工具当前的入参、工具的报错、补充的参数和补充的参数描述,修复当前工具的入参。 注意: @@ -1057,10 +453,10 @@ REPAIR_PARAMS = dedent(r""" # 样例 # 工具信息 - - mysql_analyzer - 分析MySQL数据库性能 - + < tool > + < name > mysql_analyzer < /name > + < description > 分析MySQL数据库性能 < /description > + < / tool > # 总目标 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 # 当前阶段目标 @@ -1114,10 +510,10 @@ REPAIR_PARAMS = dedent(r""" } ``` # 工具 - - {{tool_name}} - {{tool_description}} - + < tool > + < name > {{tool_name}} < /name > + < description > {{tool_description}} < /description > + < / tool > # 总目标 {{goal}} # 当前阶段目标 @@ -1126,6 +522,8 @@ REPAIR_PARAMS = dedent(r""" {{input_schema}} # 工具入参 {{input_param}} + # 工具描述 + {{tool_description}} # 运行报错 {{error_message}} # 补充的参数 @@ -1148,14 +546,7 @@ FINAL_ANSWER = dedent(r""" {{memory}} + # 现在,请根据以上信息,向用户报告目标的完成情况: """) -MEMORY_TEMPLATE = dedent(r""" - {% for ctx in context_list %} - - 第{{loop.index}}步:{{ctx.step_description}} - - 调用工具 `{{ctx.step_id}}`,并提供参数 `{{ctx.input_data}}` - - 执行状态:{{ctx.step_status}} - - 得到数据:`{{ctx.output_data}}` - {% endfor %} -""") diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py deleted file mode 100644 index ce289c1f6ca502214ae5517e692da1889d5ff25e..0000000000000000000000000000000000000000 --- a/apps/scheduler/mcp_agent/select.py +++ /dev/null @@ -1,97 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""选择MCP Server及其工具""" - -import logging -import random - -from jinja2 import BaseLoader -from jinja2.sandbox import SandboxedEnvironment - -from apps.llm.reasoning import ReasoningLLM -from apps.llm.token import TokenCalculator -from apps.scheduler.mcp_agent.base import McpBase -from apps.scheduler.mcp_agent.prompt import TOOL_SELECT -from apps.schemas.mcp import MCPCollection, MCPTool, MCPToolIdsSelectResult - -logger = logging.getLogger(__name__) - -_env = SandboxedEnvironment( - loader=BaseLoader, - autoescape=True, - trim_blocks=True, - lstrip_blocks=True, -) - -FINAL_TOOL_ID = "FIANL" -SUMMARIZE_TOOL_ID = "SUMMARIZE" - - -class MCPSelector(McpBase): - """MCP选择器""" - - @staticmethod - async def select_top_tool( - goal: str, tool_list: list[MCPTool], - additional_info: str | None = None, top_n: int | None = None, - reasoning_llm: ReasoningLLM | None = None) -> list[MCPTool]: - """选择最合适的工具""" - random.shuffle(tool_list) - max_tokens = reasoning_llm._config.max_tokens - template = _env.from_string(TOOL_SELECT) - token_calculator = TokenCalculator() - if token_calculator.calculate_token_length( - messages=[{"role": "user", "content": template.render( - goal=goal, tools=[], additional_info=additional_info - )}], - pure_text=True) > max_tokens: - logger.warning("[MCPSelector] 工具选择模板长度超过最大令牌数,无法进行选择") - return [] - current_index = 0 - tool_ids = [] - while current_index < len(tool_list): - index = current_index - sub_tools = [] - while index < len(tool_list): - tool = tool_list[index] - tokens = token_calculator.calculate_token_length( - messages=[{"role": "user", "content": template.render( - goal=goal, tools=[tool], - additional_info=additional_info - )}], - pure_text=True - ) - if tokens > max_tokens: - continue - sub_tools.append(tool) - - tokens = token_calculator.calculate_token_length(messages=[{"role": "user", "content": template.render( - goal=goal, tools=sub_tools, additional_info=additional_info)}, ], pure_text=True) - if tokens > max_tokens: - del sub_tools[-1] - break - else: - index += 1 - current_index = index - if sub_tools: - schema = MCPToolIdsSelectResult.model_json_schema() - if "items" not in schema["properties"]["tool_ids"]: - schema["properties"]["tool_ids"]["items"] = {} - # 将enum添加到items中,限制数组元素的可选值 - schema["properties"]["tool_ids"]["items"]["enum"] = [tool.id for tool in sub_tools] - result = await MCPSelector.get_resoning_result(template.render(goal=goal, tools=sub_tools, additional_info="请根据目标选择对应的工具"), reasoning_llm) - result = await MCPSelector._parse_result(result, schema) - try: - result = MCPToolIdsSelectResult.model_validate(result) - tool_ids.extend(result.tool_ids) - except Exception: - logger.exception("[MCPSelector] 解析MCP工具ID选择结果失败") - continue - mcp_tools = [tool for tool in tool_list if tool.id in tool_ids] - - if top_n is not None: - mcp_tools = mcp_tools[:top_n] - mcp_tools.append(MCPTool(id=FINAL_TOOL_ID, name="Final", - description="终止", mcp_id=FINAL_TOOL_ID, input_schema={})) - # mcp_tools.append(MCPTool(id=SUMMARIZE_TOOL_ID, name="Summarize", - # description="总结工具", mcp_id=SUMMARIZE_TOOL_ID, input_schema={})) - return mcp_tools diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py index 69b5882654898adb71beb45b33803373ced7f1a2..9f87ae647d8f717113b74fd3b90e9bab77fff80c 100644 --- a/apps/schemas/mcp.py +++ b/apps/schemas/mcp.py @@ -59,20 +59,6 @@ class MCPServerConfig(MCPServerItem): author: str = Field(description="MCP 服务器上传者", default="") -class GoalEvaluationResult(BaseModel): - """MCP 目标评估结果""" - - can_complete: bool = Field(description="是否可以完成目标") - reason: str = Field(description="评估原因") - - -class RestartStepIndex(BaseModel): - """MCP重新规划的步骤索引""" - - start_index: int = Field(description="重新规划的起始步骤索引") - reasoning: str = Field(description="重新规划的原因") - - class Risk(str, Enum): """MCP工具风险类型""" @@ -81,12 +67,6 @@ class Risk(str, Enum): HIGH = "high" -class ToolSkip(BaseModel): - """MCP工具跳过执行结果""" - - skip: bool = Field(description="是否跳过当前步骤", default=False) - - class ToolRisk(BaseModel): """MCP工具风险评估结果""" @@ -94,44 +74,17 @@ class ToolRisk(BaseModel): reason: str = Field(description="风险原因", default="") -class ErrorType(str, Enum): - """MCP工具错误类型""" - - MISSING_PARAM = "missing_param" - DECORRECT_PLAN = "decorrect_plan" - - -class ToolExcutionErrorType(BaseModel): - """MCP工具执行错误""" - - type: ErrorType = Field(description="错误类型", default=ErrorType.MISSING_PARAM) - reason: str = Field(description="错误原因", default="") - - class IsParamError(BaseModel): """MCP工具参数错误""" is_param_error: bool = Field(description="是否是参数错误", default=False) - class MCPSelectResult(BaseModel): """MCP选择结果""" mcp_id: str = Field(description="MCP Server的ID") -class MCPToolSelectResult(BaseModel): - """MCP工具选择结果""" - - name: str = Field(description="工具名称") - - -class MCPToolIdsSelectResult(BaseModel): - """MCP工具ID选择结果""" - - tool_ids: list[str] = Field(description="工具ID列表") - - class MCPPlanItem(BaseModel): """MCP 计划"""