diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 4d34b85b6855478d0686c8fa76a1b51433587067..6c80685c9389d65c9703b9892fdf6aaeaf092828 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -1,38 +1,32 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP Agent执行器""" -import anyio import logging import uuid -from pydantic import Field -from typing import Any + +import anyio from mcp.types import TextContent -from apps.llm.patterns.rewrite import QuestionRewrite +from pydantic import Field + from apps.llm.reasoning import ReasoningLLM from apps.scheduler.executor.base import BaseExecutor -from apps.schemas.enum_var import EventType, SpecialCallType, FlowStatus, StepStatus from apps.scheduler.mcp_agent.host import MCPHost from apps.scheduler.mcp_agent.plan import MCPPlanner -from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID, MCPSelector -from apps.scheduler.pool.mcp.client import MCPClient +from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID +from apps.scheduler.pool.mcp.pool import MCPPool +from apps.schemas.enum_var import EventType, FlowStatus, StepStatus from apps.schemas.mcp import ( - GoalEvaluationResult, - RestartStepIndex, - ToolRisk, - ErrorType, - ToolExcutionErrorType, - MCPPlan, MCPCollection, MCPTool, - Step + Step, ) -from apps.scheduler.pool.mcp.pool import MCPPool -from apps.schemas.task import ExecutorState, FlowStepHistory, StepQueueItem -from apps.schemas.message import param -from apps.services.task import TaskManager +from apps.schemas.message import FlowParams +from apps.schemas.task import FlowStepHistory from apps.services.appcenter import AppCenterManager from apps.services.mcp_service import MCPServiceManager +from apps.services.task import TaskManager from apps.services.user import UserManager + logger = logging.getLogger(__name__) @@ -46,13 +40,17 @@ class MCPAgentExecutor(BaseExecutor): mcp_list: list[MCPCollection] = Field(description="MCP服务器列表", default=[]) mcp_pool: MCPPool = Field(description="MCP池", default=MCPPool()) tools: dict[str, MCPTool] = Field( - description="MCP工具列表,key为tool_id", default={} + description="MCP工具列表,key为tool_id", + default={}, ) tool_list: list[MCPTool] = Field( - description="MCP工具列表,包含所有MCP工具", default=[] + description="MCP工具列表,包含所有MCP工具", + default=[], ) - params: param | bool | None = Field( - default=None, description="流执行过程中的参数补充", alias="params" + params: FlowParams | bool | None = Field( + default=None, + description="流执行过程中的参数补充", + alias="params", ) resoning_llm: ReasoningLLM = Field( default=ReasoningLLM(), @@ -89,43 +87,53 @@ class MCPAgentExecutor(BaseExecutor): continue self.mcp_list.append(mcp_service) - await self.mcp_pool._init_mcp(mcp_id, self.task.ids.user_sub) + await self.mcp_pool.init_mcp(mcp_id, self.task.ids.user_sub) for tool in mcp_service.tools: self.tools[tool.id] = tool self.tool_list.extend(mcp_service.tools) self.tools[FINAL_TOOL_ID] = MCPTool( - id=FINAL_TOOL_ID, - name="Final Tool", - description="结束流程的工具", - mcp_id="", - input_schema={} + id=FINAL_TOOL_ID, name="Final Tool", description="结束流程的工具", mcp_id="", input_schema={}, + ) + self.tool_list.append( + MCPTool(id=FINAL_TOOL_ID, name="Final Tool", description="结束流程的工具", mcp_id="", input_schema={}), ) - self.tool_list.append(MCPTool(id=FINAL_TOOL_ID, name="Final Tool", - description="结束流程的工具", mcp_id="", input_schema={})) - async def get_tool_input_param(self, is_first: bool) -> None: + async def get_tool_input_param(self, *, is_first: bool) -> None: + """获取工具输入参数""" if is_first: # 获取第一个输入参数 mcp_tool = self.tools[self.task.state.tool_id] - self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task) + self.task.state.current_input = await MCPHost.get_first_input_params( + mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task + ) else: # 获取后续输入参数 - if isinstance(self.params, param): + if isinstance(self.params, FlowParams): params = self.params.content params_description = self.params.description else: params = {} params_description = "" mcp_tool = self.tools[self.task.state.tool_id] - self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task.state.current_input, self.task.state.error_message, params, params_description) + self.task.state.current_input = await MCPHost._fill_params( + mcp_tool, + self.task.runtime.question, + self.task.state.step_description, + self.task.state.current_input, + self.task.state.error_message, + params, + params_description, + ) async def confirm_before_step(self) -> None: + """确认前步骤""" # 发送确认消息 mcp_tool = self.tools[self.task.state.tool_id] confirm_message = await MCPPlanner.get_tool_risk(mcp_tool, self.task.state.current_input, "", self.resoning_llm) await self.update_tokens() - await self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( - exclude_none=True, by_alias=True)) + await self.push_message( + EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(exclude_none=True, by_alias=True), + ) await self.push_message(EventType.FLOW_STOP, {}) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.WAITING @@ -142,27 +150,26 @@ class MCPAgentExecutor(BaseExecutor): input_data={}, output_data={}, ex_data=confirm_message.model_dump(exclude_none=True, by_alias=True), - ) + ), ) - async def run_step(self): + async def run_step(self) -> None: """执行步骤""" self.task.state.flow_status = FlowStatus.RUNNING self.task.state.step_status = StepStatus.RUNNING mcp_tool = self.tools[self.task.state.tool_id] - mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub)) + mcp_client = await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub) try: output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input) - except anyio.ClosedResourceError as e: - import traceback - logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, traceback.format_exc()) + except anyio.ClosedResourceError: + logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcp_id) await self.mcp_pool.stop(mcp_tool.mcp_id, self.task.ids.user_sub) - await self.mcp_pool._init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub) - logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, str(e)) + await self.mcp_pool.init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub) self.task.state.step_status = StepStatus.ERROR return except Exception as e: import traceback + logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误: %s", mcp_tool.name, traceback.format_exc()) self.task.state.step_status = StepStatus.ERROR self.task.state.error_message = str(e) @@ -184,14 +191,8 @@ class MCPAgentExecutor(BaseExecutor): } await self.update_tokens() - await self.push_message( - EventType.STEP_INPUT, - self.task.state.current_input - ) - await self.push_message( - EventType.STEP_OUTPUT, - output_params - ) + await self.push_message(EventType.STEP_INPUT, self.task.state.current_input) + await self.push_message(EventType.STEP_OUTPUT, output_params) self.task.context.append( FlowStepHistory( task_id=self.task.id, @@ -204,7 +205,7 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data=self.task.state.current_input, output_data=output_params, - ) + ), ) self.task.state.step_status = StepStatus.SUCCESS @@ -212,29 +213,19 @@ class MCPAgentExecutor(BaseExecutor): """生成参数补充""" mcp_tool = self.tools[self.task.state.tool_id] params_with_null = await MCPPlanner.get_missing_param( - mcp_tool, - self.task.state.current_input, - self.task.state.error_message, - self.resoning_llm + mcp_tool, self.task.state.current_input, self.task.state.error_message, self.resoning_llm, ) await self.update_tokens() error_message = await MCPPlanner.change_err_message_to_description( error_message=self.task.state.error_message, tool=mcp_tool, input_params=self.task.state.current_input, - reasoning_llm=self.resoning_llm + reasoning_llm=self.resoning_llm, ) await self.push_message( - EventType.STEP_WAITING_FOR_PARAM, - data={ - "message": error_message, - "params": params_with_null - } - ) - await self.push_message( - EventType.FLOW_STOP, - data={} + EventType.STEP_WAITING_FOR_PARAM, data={"message": error_message, "params": params_with_null}, ) + await self.push_message(EventType.FLOW_STOP, data={}) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.PARAM self.task.context.append( @@ -249,33 +240,25 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data={}, output_data={}, - ex_data={ - "message": error_message, - "params": params_with_null - } - ) + ex_data={"message": error_message, "params": params_with_null}, + ), ) async def get_next_step(self) -> None: + """获取下一步""" if self.task.state.step_cnt < self.max_steps: self.task.state.step_cnt += 1 history = await MCPHost.assemble_memory(self.task) max_retry = 3 step = None - for i in range(max_retry): + for _ in range(max_retry): step = await MCPPlanner.create_next_step(self.task.runtime.question, history, self.tool_list) - if step.tool_id in self.tools.keys(): + if step.tool_id in self.tools: break - if step is None or step.tool_id not in self.tools.keys(): - step = Step( - tool_id=FINAL_TOOL_ID, - description=FINAL_TOOL_ID - ) + if step is None or step.tool_id not in self.tools: + step = Step(tool_id=FINAL_TOOL_ID, description=FINAL_TOOL_ID) tool_id = step.tool_id - if tool_id == FINAL_TOOL_ID: - step_name = FINAL_TOOL_ID - else: - step_name = self.tools[tool_id].name + step_name = FINAL_TOOL_ID if tool_id == FINAL_TOOL_ID else self.tools[tool_id].name step_description = step.description self.task.state.step_id = str(uuid.uuid4()) self.task.state.tool_id = tool_id @@ -286,16 +269,12 @@ class MCPAgentExecutor(BaseExecutor): else: # 没有下一步了,结束流程 self.task.state.tool_id = FINAL_TOOL_ID - return async def error_handle_after_step(self) -> None: """步骤执行失败后的错误处理""" self.task.state.step_status = StepStatus.ERROR self.task.state.flow_status = FlowStatus.ERROR - await self.push_message( - EventType.FLOW_FAILED, - data={} - ) + await self.push_message(EventType.FLOW_FAILED, data={}) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: del self.task.context[-1] self.task.context.append( @@ -310,18 +289,18 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data={}, output_data={}, - ) + ), ) - async def work(self) -> None: + async def work(self) -> None: # noqa: C901, PLR0912, PLR0915 """执行当前步骤""" if self.task.state.step_status == StepStatus.INIT: - await self.push_message( - EventType.STEP_INIT, - data={} - ) + await self.push_message(EventType.STEP_INIT, data={}) await self.get_tool_input_param(is_first=True) user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) + if user_info is None: + logger.error("[MCPAgentExecutor] 用户信息不存在: %s", self.task.ids.user_sub) + return if not user_info.auto_execute: # 等待用户确认 await self.confirm_before_step() @@ -338,14 +317,8 @@ class MCPAgentExecutor(BaseExecutor): else: self.task.state.flow_status = FlowStatus.CANCELLED self.task.state.step_status = StepStatus.CANCELLED - await self.push_message( - EventType.STEP_CANCEL, - data={} - ) - await self.push_message( - EventType.FLOW_CANCEL, - data={} - ) + await self.push_message(EventType.STEP_CANCEL, data={}) + await self.push_message(EventType.FLOW_CANCEL, data={}) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: self.task.context[-1].step_status = StepStatus.CANCELLED if self.task.state.step_status == StepStatus.PARAM: @@ -363,12 +336,15 @@ class MCPAgentExecutor(BaseExecutor): await self.error_handle_after_step() else: user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) + if user_info is None: + logger.error("[MCPAgentExecutor] 用户信息不存在: %s", self.task.ids.user_sub) + return if user_info.auto_execute: await self.push_message( EventType.STEP_ERROR, data={ "message": self.task.state.error_message, - } + }, ) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: self.task.context[-1].step_status = StepStatus.ERROR @@ -394,7 +370,7 @@ class MCPAgentExecutor(BaseExecutor): EventType.STEP_ERROR, data={ "message": self.task.state.error_message, - } + }, ) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: self.task.context[-1].step_status = StepStatus.ERROR @@ -406,18 +382,14 @@ class MCPAgentExecutor(BaseExecutor): await self.get_next_step() async def summarize(self) -> None: + """总结""" async for chunk in MCPPlanner.generate_answer( - self.task.runtime.question, - (await MCPHost.assemble_memory(self.task)), - self.resoning_llm + self.task.runtime.question, (await MCPHost.assemble_memory(self.task)), self.resoning_llm, ): - await self.push_message( - EventType.TEXT_ADD, - data=chunk - ) + await self.push_message(EventType.TEXT_ADD, data=chunk) self.task.runtime.answer += chunk - async def run(self) -> None: + async def run(self) -> None: # noqa: C901 """执行MCP Agent的主逻辑""" # 初始化MCP服务 await self.load_state() @@ -426,32 +398,23 @@ class MCPAgentExecutor(BaseExecutor): # 初始化状态 try: self.task.state.flow_id = str(uuid.uuid4()) - self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm) + self.task.state.flow_name = await MCPPlanner.get_flow_name( + self.task.runtime.question, self.resoning_llm, + ) await TaskManager.save_task(self.task.id, self.task) await self.get_next_step() except Exception as e: - import traceback - logger.error("[MCPAgentExecutor] 初始化失败: %s", traceback.format_exc()) - logger.error("[MCPAgentExecutor] 初始化失败: %s", str(e)) + logger.exception("[MCPAgentExecutor] 初始化失败") self.task.state.flow_status = FlowStatus.ERROR self.task.state.error_message = str(e) - await self.push_message( - EventType.FLOW_FAILED, - data={} - ) + await self.push_message(EventType.FLOW_FAILED, data={}) return self.task.state.flow_status = FlowStatus.RUNNING - await self.push_message( - EventType.FLOW_START, - data={} - ) + await self.push_message(EventType.FLOW_START, data={}) if self.task.state.tool_id == FINAL_TOOL_ID: # 如果已经是最后一步,直接结束 self.task.state.flow_status = FlowStatus.SUCCESS - await self.push_message( - EventType.FLOW_SUCCESS, - data={} - ) + await self.push_message(EventType.FLOW_SUCCESS, data={}) await self.summarize() return try: @@ -465,26 +428,15 @@ class MCPAgentExecutor(BaseExecutor): # 如果已经是最后一步,直接结束 self.task.state.flow_status = FlowStatus.SUCCESS self.task.state.step_status = StepStatus.SUCCESS - await self.push_message( - EventType.FLOW_SUCCESS, - data={} - ) + await self.push_message(EventType.FLOW_SUCCESS, data={}) await self.summarize() except Exception as e: - import traceback - logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", traceback.format_exc()) - logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e)) + logger.exception("[MCPAgentExecutor] 执行过程中发生错误") self.task.state.flow_status = FlowStatus.ERROR self.task.state.error_message = str(e) self.task.state.step_status = StepStatus.ERROR - await self.push_message( - EventType.STEP_ERROR, - data={} - ) - await self.push_message( - EventType.FLOW_FAILED, - data={} - ) + await self.push_message(EventType.STEP_ERROR, data={}) + await self.push_message(EventType.FLOW_FAILED, data={}) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: del self.task.context[-1] self.task.context.append( @@ -499,12 +451,11 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data={}, output_data={}, - ) + ), ) finally: for mcp_service in self.mcp_list: try: await self.mcp_pool.stop(mcp_service.id, self.task.ids.user_sub) - except Exception as e: - import traceback - logger.error("[MCPAgentExecutor] 停止MCP客户端时发生错误: %s", traceback.format_exc()) + except Exception: + logger.exception("[MCPAgentExecutor] 停止MCP客户端时发生错误") diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py index ac3829b418be54bfc5df6ae3d61db82fe0fc129e..081a70ab85a3aa4d7da180e61d1208f1ce359811 100644 --- a/apps/scheduler/mcp_agent/base.py +++ b/apps/scheduler/mcp_agent/base.py @@ -1,14 +1,21 @@ -from typing import Any +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""MCP基类""" + import json -from jsonschema import validate import logging +from typing import Any + +from jsonschema import validate + from apps.llm.function import JsonGenerator from apps.llm.reasoning import ReasoningLLM logger = logging.getLogger(__name__) -class McpBase: +class MCPBase: + """MCP基类""" + @staticmethod async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: """获取推理结果""" @@ -29,7 +36,7 @@ class McpBase: return result @staticmethod - async def _parse_result(result: str, schema: dict[str, Any], left_str: str = '{', right_str: str = '}') -> str: + async def _parse_result(result: str, schema: dict[str, Any], left_str: str = "{", right_str: str = "}") -> str: """解析推理结果""" left_index = result.find(left_str) right_index = result.rfind(right_str) @@ -41,21 +48,21 @@ class McpBase: flag = False if flag: try: - tmp_js = json.loads(result[left_index:right_index + 1]) + tmp_js = json.loads(result[left_index : right_index + 1]) validate(instance=tmp_js, schema=schema) - except Exception as e: - logger.error("[McpBase] 解析结果失败: %s", e) + except Exception: + logger.exception("[McpBase] 解析结果失败") flag = False if not flag: json_generator = JsonGenerator( "请提取下面内容中的json\n\n", [ {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "请提取下面内容中的json\n\n"+result}, + {"role": "user", "content": "请提取下面内容中的json\n\n" + result}, ], schema, ) json_result = await json_generator.generate() else: - json_result = json.loads(result[left_index:right_index + 1]) + json_result = json.loads(result[left_index : right_index + 1]) return json_result diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index f43506769772d9f1e573c6a862f68e1011075115..445ee83716c801510aa68475fcc08e51583431b1 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -7,20 +7,14 @@ from typing import Any from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment -from mcp.types import TextContent -from apps.common.mongo import MongoDB -from apps.llm.reasoning import ReasoningLLM from apps.llm.function import JsonGenerator -from apps.scheduler.mcp_agent.base import McpBase +from apps.llm.reasoning import ReasoningLLM from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE -from apps.scheduler.pool.mcp.client import MCPClient -from apps.scheduler.pool.mcp.pool import MCPPool +from apps.scheduler.mcp_agent.base import MCPBase from apps.scheduler.mcp_agent.prompt import GEN_PARAMS, REPAIR_PARAMS -from apps.schemas.enum_var import StepStatus -from apps.schemas.mcp import MCPPlanItem, MCPTool -from apps.schemas.task import Task, FlowStepHistory -from apps.services.task import TaskManager +from apps.schemas.mcp import MCPTool +from apps.schemas.task import Task logger = logging.getLogger(__name__) @@ -33,25 +27,26 @@ _env = SandboxedEnvironment( def tojson_filter(value): - return json.dumps(value, ensure_ascii=False, separators=(',', ':')) + return json.dumps(value, ensure_ascii=False, separators=(",", ":")) -_env.filters['tojson'] = tojson_filter +_env.filters["tojson"] = tojson_filter -class MCPHost(McpBase): +class MCPHost(MCPBase): """MCP宿主服务""" @staticmethod async def assemble_memory(task: Task) -> str: """组装记忆""" - return _env.from_string(MEMORY_TEMPLATE).render( context_list=task.context, ) - async def _get_first_input_params(mcp_tool: MCPTool, goal: str, current_goal: str, task: Task, - resoning_llm: ReasoningLLM = ReasoningLLM()) -> dict[str, Any]: + async def get_first_input_params( + self, mcp_tool: MCPTool, goal: str, current_goal: str, + task: Task, resoning_llm: ReasoningLLM = ReasoningLLM(), + ) -> dict[str, Any]: """填充工具参数""" # 更清晰的输入·指令,这样可以调用generate prompt = _env.from_string(GEN_PARAMS).render( @@ -63,10 +58,7 @@ class MCPHost(McpBase): background_info=await MCPHost.assemble_memory(task), ) logger.info("[MCPHost] 填充工具参数: %s", prompt) - result = await MCPHost.get_resoning_result( - prompt, - resoning_llm - ) + result = await MCPHost.get_resoning_result(prompt, resoning_llm) # 使用JsonGenerator解析结果 result = await MCPHost._parse_result( result, @@ -74,12 +66,16 @@ class MCPHost(McpBase): ) return result - async def _fill_params(mcp_tool: MCPTool, - goal: str, - current_goal: str, - current_input: dict[str, Any], - error_message: str = "", params: dict[str, Any] = {}, - params_description: str = "") -> dict[str, Any]: + async def _fill_params( + self, + mcp_tool: MCPTool, + goal: str, + current_goal: str, + current_input: dict[str, Any], + error_message: str = "", + params: dict[str, Any] = {}, + params_description: str = "", + ) -> dict[str, Any]: llm_query = "请生成修复之后的工具参数" prompt = _env.from_string(REPAIR_PARAMS).render( tool_name=mcp_tool.name, diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index b539482d97aa511a928361bffd5e3b0e63f97ec7..3ef84d26f0ef393483fe2368f1f30c7e543255aa 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -1,41 +1,43 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP 用户目标拆解与规划""" -from typing import Any, AsyncGenerator + +import logging +from collections.abc import AsyncGenerator +from typing import Any + from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment -import logging + from apps.llm.reasoning import ReasoningLLM -from apps.llm.function import JsonGenerator -from apps.scheduler.mcp_agent.base import McpBase +from apps.scheduler.mcp_agent.base import MCPBase from apps.scheduler.mcp_agent.prompt import ( + CHANGE_ERROR_MESSAGE_TO_DESCRIPTION, + CREATE_PLAN, EVALUATE_GOAL, + FINAL_ANSWER, + GEN_STEP, GENERATE_FLOW_NAME, + GET_MISSING_PARAMS, GET_REPLAN_START_STEP_INDEX, - CREATE_PLAN, + IS_PARAM_ERROR, RECREATE_PLAN, - GEN_STEP, - TOOL_SKIP, RISK_EVALUATE, TOOL_EXECUTE_ERROR_TYPE_ANALYSIS, - IS_PARAM_ERROR, - CHANGE_ERROR_MESSAGE_TO_DESCRIPTION, - GET_MISSING_PARAMS, - FINAL_ANSWER + TOOL_SKIP, ) -from apps.schemas.task import Task +from apps.scheduler.slot.slot import Slot from apps.schemas.mcp import ( GoalEvaluationResult, - RestartStepIndex, - ToolSkip, - ToolRisk, IsParamError, - ToolExcutionErrorType, MCPPlan, + MCPTool, + RestartStepIndex, Step, - MCPPlanItem, - MCPTool + ToolExcutionErrorType, + ToolRisk, + ToolSkip, ) -from apps.scheduler.slot.slot import Slot +from apps.schemas.task import Task _env = SandboxedEnvironment( loader=BaseLoader, @@ -45,36 +47,32 @@ _env = SandboxedEnvironment( ) logger = logging.getLogger(__name__) -class MCPPlanner(McpBase): + +class MCPPlanner(MCPBase): """MCP 用户目标拆解与规划""" @staticmethod async def evaluate_goal( - goal: str, - tool_list: list[MCPTool], - resoning_llm: ReasoningLLM = ReasoningLLM()) -> GoalEvaluationResult: + goal: str, tool_list: list[MCPTool], resoning_llm: ReasoningLLM = ReasoningLLM() + ) -> GoalEvaluationResult: """评估用户目标的可行性""" # 获取推理结果 result = await MCPPlanner._get_reasoning_evaluation(goal, tool_list, resoning_llm) - # 解析为结构化数据 - evaluation = await MCPPlanner._parse_evaluation_result(result) - # 返回评估结果 - return evaluation + return await MCPPlanner._parse_evaluation_result(result) @staticmethod async def _get_reasoning_evaluation( - goal, tool_list: list[MCPTool], - resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + goal, tool_list: list[MCPTool], resoning_llm: ReasoningLLM = ReasoningLLM() + ) -> str: """获取推理大模型的评估结果""" template = _env.from_string(EVALUATE_GOAL) prompt = template.render( goal=goal, tools=tool_list, ) - result = await MCPPlanner.get_resoning_result(prompt, resoning_llm) - return result + return await MCPPlanner.get_resoning_result(prompt, resoning_llm) @staticmethod async def _parse_evaluation_result(result: str) -> GoalEvaluationResult: @@ -86,22 +84,23 @@ class MCPPlanner(McpBase): async def get_flow_name(user_goal: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: """获取当前流程的名称""" - result = await MCPPlanner._get_reasoning_flow_name(user_goal, resoning_llm) - return result + return await MCPPlanner._get_reasoning_flow_name(user_goal, resoning_llm) @staticmethod async def _get_reasoning_flow_name(user_goal: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: """获取推理大模型的流程名称""" template = _env.from_string(GENERATE_FLOW_NAME) prompt = template.render(goal=user_goal) - result = await MCPPlanner.get_resoning_result(prompt, resoning_llm) - return result + return await MCPPlanner.get_resoning_result(prompt, resoning_llm) @staticmethod async def get_replan_start_step_index( - user_goal: str, error_message: str, current_plan: MCPPlan | None = None, - history: str = "", - reasoning_llm: ReasoningLLM = ReasoningLLM()) -> RestartStepIndex: + user_goal: str, + error_message: str, + current_plan: MCPPlan | None = None, + history: str = "", + reasoning_llm: ReasoningLLM = ReasoningLLM(), + ) -> RestartStepIndex: """获取重新规划的步骤索引""" # 获取推理结果 template = _env.from_string(GET_REPLAN_START_STEP_INDEX) @@ -122,21 +121,33 @@ class MCPPlanner(McpBase): @staticmethod async def create_plan( - user_goal: str, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan | None = None, - tool_list: list[MCPTool] = [], - max_steps: int = 6, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> MCPPlan: + user_goal: str, + is_replan: bool = False, + error_message: str = "", + current_plan: MCPPlan | None = None, + tool_list: list[MCPTool] = [], + max_steps: int = 6, + reasoning_llm: ReasoningLLM = ReasoningLLM(), + ) -> MCPPlan: """规划下一步的执行流程,并输出""" # 获取推理结果 - result = await MCPPlanner._get_reasoning_plan(user_goal, is_replan, error_message, current_plan, tool_list, max_steps, reasoning_llm) + result = await MCPPlanner._get_reasoning_plan( + user_goal, is_replan, error_message, current_plan, tool_list, max_steps, reasoning_llm, + ) # 解析为结构化数据 return await MCPPlanner._parse_plan_result(result, max_steps) @staticmethod async def _get_reasoning_plan( - user_goal: str, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan | None = None, - tool_list: list[MCPTool] = [], - max_steps: int = 10, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + user_goal: str, + is_replan: bool = False, + error_message: str = "", + current_plan: MCPPlan | None = None, + tool_list: list[MCPTool] = [], + max_steps: int = 10, + reasoning_llm: ReasoningLLM = ReasoningLLM(), + ) -> str: """获取推理大模型的结果""" # 格式化Prompt tool_ids = [tool.id for tool in tool_list] @@ -156,8 +167,7 @@ class MCPPlanner(McpBase): tools=tool_list, max_num=max_steps, ) - result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) - return result + return await MCPPlanner.get_resoning_result(prompt, reasoning_llm) @staticmethod async def _parse_plan_result(result: str, max_steps: int) -> MCPPlan: @@ -171,8 +181,8 @@ class MCPPlanner(McpBase): @staticmethod async def create_next_step( - goal: str, history: str, tools: list[MCPTool], - reasoning_llm: ReasoningLLM = ReasoningLLM()) -> Step: + goal: str, history: str, tools: list[MCPTool], reasoning_llm: ReasoningLLM = ReasoningLLM() + ) -> Step: """创建下一步的执行步骤""" # 获取推理结果 template = _env.from_string(GEN_STEP) @@ -192,12 +202,18 @@ class MCPPlanner(McpBase): @staticmethod async def tool_skip( - task: Task, step_id: str, step_name: str, step_instruction: str, step_content: str, - reasoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolSkip: + task: Task, + step_id: str, + step_name: str, + step_instruction: str, + step_content: str, + reasoning_llm: ReasoningLLM = ReasoningLLM(), + ) -> ToolSkip: """判断当前步骤是否需要跳过""" # 获取推理结果 template = _env.from_string(TOOL_SKIP) from apps.scheduler.mcp_agent.host import MCPHost + history = await MCPHost.assemble_memory(task) prompt = template.render( step_id=step_id, @@ -205,7 +221,7 @@ class MCPPlanner(McpBase): step_instruction=step_instruction, step_content=step_content, history=history, - goal=task.runtime.question + goal=task.runtime.question, ) result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) @@ -217,22 +233,22 @@ class MCPPlanner(McpBase): @staticmethod async def get_tool_risk( - tool: MCPTool, input_parm: dict[str, Any], - additional_info: str = "", resoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolRisk: + tool: MCPTool, + input_parm: dict[str, Any], + additional_info: str = "", + resoning_llm: ReasoningLLM = ReasoningLLM(), + ) -> ToolRisk: """获取MCP工具的风险评估结果""" # 获取推理结果 result = await MCPPlanner._get_reasoning_risk(tool, input_parm, additional_info, resoning_llm) - # 解析为结构化数据 - risk = await MCPPlanner._parse_risk_result(result) - # 返回风险评估结果 - return risk + return await MCPPlanner._parse_risk_result(result) @staticmethod async def _get_reasoning_risk( - tool: MCPTool, input_param: dict[str, Any], - additional_info: str, resoning_llm: ReasoningLLM) -> str: + tool: MCPTool, input_param: dict[str, Any], additional_info: str, resoning_llm: ReasoningLLM, + ) -> str: """获取推理大模型的风险评估结果""" template = _env.from_string(RISK_EVALUATE) prompt = template.render( @@ -241,8 +257,7 @@ class MCPPlanner(McpBase): input_param=input_param, additional_info=additional_info, ) - result = await MCPPlanner.get_resoning_result(prompt, resoning_llm) - return result + return await MCPPlanner.get_resoning_result(prompt, resoning_llm) @staticmethod async def _parse_risk_result(result: str) -> ToolRisk: @@ -254,9 +269,13 @@ class MCPPlanner(McpBase): @staticmethod async def _get_reasoning_tool_execute_error_type( - user_goal: str, current_plan: MCPPlan, - tool: MCPTool, input_param: dict[str, Any], - error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + user_goal: str, + current_plan: MCPPlan, + tool: MCPTool, + input_param: dict[str, Any], + error_message: str, + reasoning_llm: ReasoningLLM = ReasoningLLM(), + ) -> str: """获取推理大模型的工具执行错误类型""" template = _env.from_string(TOOL_EXECUTE_ERROR_TYPE_ANALYSIS) prompt = template.render( @@ -267,8 +286,7 @@ class MCPPlanner(McpBase): input_param=input_param, error_message=error_message, ) - result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) - return result + return await MCPPlanner.get_resoning_result(prompt, reasoning_llm) @staticmethod async def _parse_tool_execute_error_type_result(result: str) -> ToolExcutionErrorType: @@ -280,22 +298,32 @@ class MCPPlanner(McpBase): @staticmethod async def get_tool_execute_error_type( - user_goal: str, current_plan: MCPPlan, - tool: MCPTool, input_param: dict[str, Any], - error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolExcutionErrorType: + user_goal: str, + current_plan: MCPPlan, + tool: MCPTool, + input_param: dict[str, Any], + error_message: str, + reasoning_llm: ReasoningLLM = ReasoningLLM(), + ) -> ToolExcutionErrorType: """获取MCP工具执行错误类型""" # 获取推理结果 result = await MCPPlanner._get_reasoning_tool_execute_error_type( - user_goal, current_plan, tool, input_param, error_message, reasoning_llm) - error_type = await MCPPlanner._parse_tool_execute_error_type_result(result) + user_goal, current_plan, tool, input_param, error_message, reasoning_llm, + ) # 返回工具执行错误类型 - return error_type + return await MCPPlanner._parse_tool_execute_error_type_result(result) + @staticmethod async def is_param_error( - goal: str, history: str, error_message: str, tool: MCPTool, step_description: str, input_params: dict - [str, Any], - reasoning_llm: ReasoningLLM = ReasoningLLM()) -> IsParamError: + goal: str, + history: str, + error_message: str, + tool: MCPTool, + step_description: str, + input_params: dict[str, Any], + reasoning_llm: ReasoningLLM = ReasoningLLM(), + ) -> IsParamError: """判断错误信息是否是参数错误""" tmplate = _env.from_string(IS_PARAM_ERROR) prompt = tmplate.render( @@ -316,8 +344,8 @@ class MCPPlanner(McpBase): @staticmethod async def change_err_message_to_description( - error_message: str, tool: MCPTool, input_params: dict[str, Any], - reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + error_message: str, tool: MCPTool, input_params: dict[str, Any], reasoning_llm: ReasoningLLM = ReasoningLLM() + ) -> str: """将错误信息转换为工具描述""" template = _env.from_string(CHANGE_ERROR_MESSAGE_TO_DESCRIPTION) prompt = template.render( @@ -332,9 +360,8 @@ class MCPPlanner(McpBase): @staticmethod async def get_missing_param( - tool: MCPTool, - input_param: dict[str, Any], - error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> list[str]: + tool: MCPTool, input_param: dict[str, Any], error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM() + ) -> list[str]: """获取缺失的参数""" slot = Slot(schema=tool.input_schema) template = _env.from_string(GET_MISSING_PARAMS) @@ -353,8 +380,8 @@ class MCPPlanner(McpBase): @staticmethod async def generate_answer( - user_goal: str, memory: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> AsyncGenerator[ - str, None]: + user_goal: str, memory: str, resoning_llm: ReasoningLLM = ReasoningLLM() + ) -> AsyncGenerator[str, None]: """生成最终回答""" template = _env.from_string(FINAL_ANSWER) prompt = template.render( diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index 824ece8a35dfb3b02afb0e5bd5602d6f5d0d1342..25dbaff737804b00cec63479ed5e9ae8a5b9decd 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -62,6 +62,7 @@ MCP_SELECT = dedent(r""" ### 请一步一步思考: """) + TOOL_SELECT = dedent(r""" 你是一个乐于助人的智能助手。 你的任务是:根据当前目标,附加信息,选择最合适的MCP工具。 @@ -117,8 +118,7 @@ TOOL_SELECT = dedent(r""" ## 附加信息 {{additional_info}} # 输出 - """ - ) + """) EVALUATE_GOAL = dedent(r""" 你是一个计划评估器。 @@ -142,7 +142,8 @@ EVALUATE_GOAL = dedent(r""" - mysql_analyzer 分析MySQL数据库性能 - performance_tuner 调优数据库性能 - - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 + - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。\ + # 附加信息 @@ -153,7 +154,8 @@ EVALUATE_GOAL = dedent(r""" ```json { "can_complete": true, - "resoning": "当前的工具集合中包含mysql_analyzer和performance_tuner,能够完成对MySQL数据库的性能分析和调优,因此可以完成用户的目标。" + "resoning": "当前的工具集合中包含mysql_analyzer和performance_tuner,能够完成对MySQL数据库的性能分析和调优,\ +因此可以完成用户的目标。" } ``` @@ -171,6 +173,7 @@ EVALUATE_GOAL = dedent(r""" {{additional_info}} """) + GENERATE_FLOW_NAME = dedent(r""" 你是一个智能助手,你的任务是根据用户的目标,生成一个合适的流程名称。 @@ -190,6 +193,7 @@ GENERATE_FLOW_NAME = dedent(r""" {{goal}} # 输出 """) + GET_REPLAN_START_STEP_INDEX = dedent(r""" 你是一个智能助手,你的任务是根据用户的目标、报错信息和当前计划和历史,获取重新规划的步骤起始索引。 @@ -240,7 +244,8 @@ GET_REPLAN_START_STEP_INDEX = dedent(r""" # 输出 { "start_index": 0, - "reasoning": "当前计划的第一步就失败了,报错信息显示curl命令未找到,可能是因为没有安装curl工具,因此需要从第一步重新规划。" + "reasoning": "当前计划的第一步就失败了,报错信息显示curl命令未找到,可能是因为没有安装curl工具,\ +因此需要从第一步重新规划。" } # 现在开始获取重新规划的步骤起始索引: # 目标 @@ -353,6 +358,7 @@ CREATE_PLAN = dedent(r""" # 计划 """) + RECREATE_PLAN = dedent(r""" 你是一个计划重建器。 请根据用户的目标、当前计划和运行报错,重新生成一个计划。 @@ -402,7 +408,8 @@ RECREATE_PLAN = dedent(r""" - command_generator 生成命令行指令 - tool_selector 选择合适的工具 - command_executor 执行命令行指令 - - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 + - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。\ + # 当前计划 ```json { @@ -502,6 +509,7 @@ RECREATE_PLAN = dedent(r""" # 重新生成的计划 """) + GEN_STEP = dedent(r""" 你是一个计划生成器。 请根据用户的目标、当前计划和历史,生成一个新的步骤。 @@ -529,7 +537,8 @@ GEN_STEP = dedent(r""" - mcp_tool_1 mysql_analyzer;用于分析数据库性能/description> - mcp_tool_2 文件存储工具;用于存储文件 - mcp_tool_3 mongoDB工具;用于操作MongoDB数据库 - - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 + - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。\ + # 输出 ```json @@ -560,10 +569,13 @@ GEN_STEP = dedent(r""" - 得到数据:`{"weather": "晴", "temperature": "25°C"}` # 工具 - - mcp_tool_4 maps_geo_planner;将详细的结构化地址转换为经纬度坐标。支持对地标性名胜景区、建筑物名称解析为经纬度坐标 + - mcp_tool_4 maps_geo_planner;将详细的结构化地址转换为经纬度坐标。支持对地标性名胜景区、\ +建筑物名称解析为经纬度坐标 - mcp_tool_5 weather_query;天气查询,用于查询天气信息 - - mcp_tool_6 maps_direction_transit_integrated;根据用户起终点经纬度坐标规划综合各类公共(火车、公交、地铁)交通方式的通勤方案,并且返回通勤方案的数据,跨城场景下必须传起点城市与终点城市 - - Final Final;结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 + - mcp_tool_6 maps_direction_transit_integrated;根据用户起终点经纬度坐标规划综合各类\ +公共交通方式(火车、公交、地铁)的通勤方案,并且返回通勤方案的数据,跨城场景下必须传起点城市与终点城市 + - Final Final;结束步骤,当执行到这一步时,表示计划执行结束,\ +所得到的结果将作为最终结果。 # 输出 ```json @@ -610,7 +622,8 @@ TOOL_SKIP = dedent(r""" - 执行状态:成功 - 得到数据:`{"result": "success"}` 第3步:分析端口扫描结果 - - 调用工具 `mysql_analyzer`,并提供参数 `{"host": "192.168.1.1", "port": 3306, "username": "root", "password": "password"}` + - 调用工具 `mysql_analyzer`,并提供参数 `{"host": "192.168.1.1", "port": 3306, "username": "root",\ + "password": "password"}` - 执行状态:成功 - 得到数据:`{"performance": "good", "bottleneck": "none"}` # 当前步骤 @@ -638,8 +651,8 @@ TOOL_SKIP = dedent(r""" {{step_content}} # 输出 - """ - ) + """) + RISK_EVALUATE = dedent(r""" 你是一个工具执行计划评估器。 你的任务是根据当前工具的名称、描述和入参以及附加信息,判断当前工具执行的风险并输出提示。 @@ -673,7 +686,8 @@ RISK_EVALUATE = dedent(r""" ```json { "risk": "中", - "reason": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。请确保在非生产环境中执行此操作。" + "reason": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。\ +请确保在非生产环境中执行此操作。" } ``` # 工具 @@ -686,8 +700,8 @@ RISK_EVALUATE = dedent(r""" # 附加信息 {{additional_info}} # 输出 - """ - ) + """) + # 根据当前计划和报错信息决定下一步执行,具体计划有需要用户补充工具入参、重计划当前步骤、重计划接下来的所有计划 TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r""" 你是一个计划决策器。 @@ -739,7 +753,8 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r""" ```json { "error_type": "decorrect_plan", - "reason": "当前计划的第二步执行失败,报错信息显示nmap命令未找到,可能是因为没有安装nmap工具,因此需要重计划当前步骤。" + "reason": "当前计划的第二步执行失败,报错信息显示nmap命令未找到,可能是因为没有安装nmap工具,\ +因此需要重计划当前步骤。" } ``` # 用户目标 @@ -756,8 +771,8 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r""" # 工具运行报错 {{error_message}} # 输出 - """ - ) + """) + IS_PARAM_ERROR = dedent(r""" 你是一个计划执行专家,你的任务是判断当前的步骤执行失败是否是因为参数错误导致的, 如果是,请返回`true`,否则返回`false`。 @@ -816,8 +831,8 @@ IS_PARAM_ERROR = dedent(r""" # 工具运行报错 {{error_message}} # 输出 - """ - ) + """) + # 将当前程序运行的报错转换为自然语言 CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r""" 你是一个智能助手,你的任务是将当前程序运行的报错转换为自然语言描述。 @@ -883,6 +898,7 @@ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r""" {{error_message}} # 输出 """) + # 获取缺失的参数的json结构体 GET_MISSING_PARAMS = dedent(r""" 你是一个工具参数获取器。 @@ -965,8 +981,8 @@ GET_MISSING_PARAMS = dedent(r""" # 运行报错 {{error_message}} # 输出 - """ - ) + """) + GEN_PARAMS = dedent(r""" 你是一个工具参数生成器。 你的任务是根据总的目标、阶段性的目标、工具信息、工具入参的schema和背景信息生成工具的入参。 @@ -1040,8 +1056,7 @@ GEN_PARAMS = dedent(r""" # 背景信息 {{background_info}} # 输出 - """ - ) + """) REPAIR_PARAMS = dedent(r""" 你是一个工具参数修复器。 @@ -1130,8 +1145,8 @@ REPAIR_PARAMS = dedent(r""" # 补充的参数描述 {{params_description}} # 输出 - """ - ) + """) + FINAL_ANSWER = dedent(r""" 综合理解计划执行结果和背景信息,向用户报告目标的完成情况。 diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py index 075e08f00cb2b5a2976d5372c623193b04c42412..d287b1139d334cb19faacbedaba5eaefa0ec7eb3 100644 --- a/apps/scheduler/mcp_agent/select.py +++ b/apps/scheduler/mcp_agent/select.py @@ -3,28 +3,16 @@ import logging import random + from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment -from typing import AsyncGenerator, Any -from apps.llm.function import JsonGenerator -from apps.llm.reasoning import ReasoningLLM -from apps.common.lance import LanceDB -from apps.common.mongo import MongoDB -from apps.llm.embedding import Embedding -from apps.llm.function import FunctionLLM from apps.llm.reasoning import ReasoningLLM from apps.llm.token import TokenCalculator -from apps.scheduler.mcp_agent.base import McpBase +from apps.scheduler.mcp_agent.base import MCPBase from apps.scheduler.mcp_agent.prompt import TOOL_SELECT -from apps.schemas.mcp import ( - BaseModel, - MCPCollection, - MCPSelectResult, - MCPTool, - MCPToolIdsSelectResult -) -from apps.common.config import Config +from apps.schemas.mcp import MCPTool, MCPToolIdsSelectResult + logger = logging.getLogger(__name__) _env = SandboxedEnvironment( @@ -38,7 +26,7 @@ FINAL_TOOL_ID = "FIANL" SUMMARIZE_TOOL_ID = "SUMMARIZE" -class MCPSelector(McpBase): +class MCPSelector(MCPBase): """MCP选择器""" @staticmethod @@ -68,9 +56,9 @@ class MCPSelector(McpBase): tokens = token_calculator.calculate_token_length( messages=[{"role": "user", "content": template.render( goal=goal, tools=[tool], - additional_info=additional_info + additional_info=additional_info, )}], - pure_text=True + pure_text=True, ) if tokens > max_tokens: continue @@ -90,7 +78,11 @@ class MCPSelector(McpBase): schema["properties"]["tool_ids"]["items"] = {} # 将enum添加到items中,限制数组元素的可选值 schema["properties"]["tool_ids"]["items"]["enum"] = [tool.id for tool in sub_tools] - result = await MCPSelector.get_resoning_result(template.render(goal=goal, tools=sub_tools, additional_info="请根据目标选择对应的工具"), reasoning_llm) + result = await MCPSelector.get_resoning_result( + template.render( + goal=goal, tools=sub_tools, additional_info="请根据目标选择对应的工具", + ), reasoning_llm, + ) result = await MCPSelector._parse_result(result, schema) try: result = MCPToolIdsSelectResult.model_validate(result) diff --git a/apps/scheduler/pool/mcp/client.py b/apps/scheduler/pool/mcp/client.py index 0ced05e8cc68bb773cfe61eb9268a4f8baab8fa5..562a1056e2f9355e1cab82f9f0a30bb859bb74bd 100644 --- a/apps/scheduler/pool/mcp/client.py +++ b/apps/scheduler/pool/mcp/client.py @@ -129,12 +129,13 @@ class MCPClient: done, pending = await asyncio.wait( [asyncio.create_task(self.ready_sign.wait()), asyncio.create_task(self.error_sign.wait())], - return_when=asyncio.FIRST_COMPLETED + return_when=asyncio.FIRST_COMPLETED, ) if self.error_sign.is_set(): self.status = MCPStatus.ERROR - logger.error("[MCPClient] MCP %s:初始化失败", mcp_id) - raise Exception(f"MCP {mcp_id} 初始化失败") + error = f"MCP {mcp_id} 初始化失败" + logger.error("[MCPClient] %s", error) + raise RuntimeError(error) # 获取工具列表 self.tools = (await self.client.list_tools()).tools @@ -148,5 +149,5 @@ class MCPClient: self.stop_sign.set() try: await self.task - except Exception as e: + except Exception as e: # noqa: BLE001 logger.warning("[MCPClient] MCP %s:停止时发生异常:%s", self.mcp_id, e) diff --git a/apps/scheduler/pool/mcp/install.py b/apps/scheduler/pool/mcp/install.py index 2b15cd690bfd8b1939326728647e27ab517f89ff..b694eff35fe96471556238ddd206b4844c611552 100644 --- a/apps/scheduler/pool/mcp/install.py +++ b/apps/scheduler/pool/mcp/install.py @@ -1,11 +1,11 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP 安装""" -from asyncio import subprocess -from typing import TYPE_CHECKING import logging -import os import shutil +from asyncio import subprocess +from typing import TYPE_CHECKING + from apps.constants import MCP_PATH if TYPE_CHECKING: @@ -27,31 +27,31 @@ async def install_uvx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer :rtype: MCPServerStdioConfig :raises ValueError: 未找到MCP Server对应的Python包 """ - uv_path = shutil.which('uv') + uv_path = shutil.which("uv") if uv_path is None: error = "[Installer] 未找到uv命令,请先安装uv包管理器: pip install uv" - logging.error(error) - raise Exception(error) + logger.error(error) + raise RuntimeError(error) # 找到包名 package = None for arg in config.args: if not arg.startswith("-"): package = arg break - logger.error(f"[Installer] MCP包名: {package}") + logger.error("[Installer] MCP包名: %s", package) if not package: print("[Installer] 未找到包名") # noqa: T201 return None # 创建文件夹 mcp_path = MCP_PATH / "template" / mcp_id / "project" - logger.error(f"[Installer] MCP安装路径: {mcp_path}") + logger.error("[Installer] MCP安装路径: %s", mcp_path) await mcp_path.mkdir(parents=True, exist_ok=True) # 如果有pyproject.toml文件,则使用sync flag = await (mcp_path / "pyproject.toml").exists() - logger.error(f"[Installer] MCP安装标志: {flag}") + logger.error("[Installer] MCP安装标志: %s", flag) if await (mcp_path / "pyproject.toml").exists(): shell_command = f"{uv_path} venv; {uv_path} sync --index-url https://pypi.tuna.tsinghua.edu.cn/simple --active --no-install-project --no-cache" - logger.error(f"[Installer] MCP安装命令: {shell_command}") + logger.error("[Installer] MCP安装命令: %s", shell_command) pipe = await subprocess.create_subprocess_shell( ( f"{uv_path} venv; " @@ -73,7 +73,7 @@ async def install_uvx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer if "run" not in config.args: config.args = ["run", *config.args] config.auto_install = False - logger.error(f"[Installer] MCP安装配置更新成功: {config}") + logger.error("[Installer] MCP安装配置更新成功: %s", config) return config # 否则,初始化uv项目 @@ -117,11 +117,11 @@ async def install_npx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer :rtype: MCPServerStdioConfig :raises ValueError: 未找到MCP Server对应的npm包 """ - npm_path = shutil.which('npm') + npm_path = shutil.which("npm") if npm_path is None: error = "[Installer] 未找到npm命令,请先安装Node.js和npm" - logging.error(error) - raise Exception(error) + logger.error(error) + raise RuntimeError(error) # 查找package name package = None for arg in config.args: diff --git a/apps/scheduler/pool/mcp/pool.py b/apps/scheduler/pool/mcp/pool.py index bf0320f429a9ef45864ba6548b1bb28e3d874b59..cb76864c68823d3f3a0ce69a36b446c7de3747c0 100644 --- a/apps/scheduler/pool/mcp/pool.py +++ b/apps/scheduler/pool/mcp/pool.py @@ -21,7 +21,7 @@ class MCPPool(metaclass=SingletonMeta): """初始化MCP池""" self.pool = {} - async def _init_mcp(self, mcp_id: str, user_sub: str) -> MCPClient | None: + async def init_mcp(self, mcp_id: str, user_sub: str) -> MCPClient | None: """初始化MCP池""" config_path = MCP_USER_PATH / user_sub / mcp_id / "config.json" flag = (await config_path.exists()) @@ -69,7 +69,7 @@ class MCPPool(metaclass=SingletonMeta): return None # 初始化进程 - item = await self._init_mcp(mcp_id, user_sub) + item = await self.init_mcp(mcp_id, user_sub) if item is None: return None diff --git a/apps/schemas/message.py b/apps/schemas/message.py index 5b465ee54321bbd4c649753911025bff41840186..17a569ca2cf710b206ddbc1afb655bf4675af7fd 100644 --- a/apps/schemas/message.py +++ b/apps/schemas/message.py @@ -1,16 +1,18 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """队列中的消息结构""" -from typing import Any from datetime import UTC, datetime +from typing import Any + from pydantic import BaseModel, Field from apps.schemas.enum_var import EventType, FlowStatus, StepStatus from apps.schemas.record import RecordMetadata -class param(BaseModel): +class FlowParams(BaseModel): """流执行过程中的参数补充""" + content: dict[str, Any] = Field(default={}, description="流执行过程中的参数补充内容") description: str = Field(default="", description="流执行过程中的参数补充描述") diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py index 3fd5a67fa29af6e1685d55285ee1ebd458cabce2..d04f6fd8a31b18e231578056d7de94a81bb11980 100644 --- a/apps/schemas/request_data.py +++ b/apps/schemas/request_data.py @@ -10,7 +10,7 @@ from apps.schemas.appcenter import AppData from apps.schemas.enum_var import CommentType from apps.schemas.flow_topology import FlowItem from apps.schemas.mcp import MCPType -from apps.schemas.message import param +from apps.schemas.message import FlowParams class RequestDataApp(BaseModel): @@ -47,7 +47,7 @@ class RequestData(BaseModel): app: RequestDataApp | None = Field(default=None, description="应用") debug: bool = Field(default=False, description="是否调试") task_id: str | None = Field(default=None, alias="taskId", description="任务ID") - params: param | bool | None = Field(default=None, description="流执行过程中的参数补充", alias="params") + params: FlowParams | bool | None = Field(default=None, description="流执行过程中的参数补充", alias="params") class QuestionBlacklistRequest(BaseModel):