From fb21d2e1f3f6dc4a883303c8d3519ca57cbadc56 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Wed, 13 Aug 2025 11:57:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=88=E5=B9=B6Agent=E5=88=86=E6=94=AF?= =?UTF-8?q?=E8=87=B3!647?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/queue.py | 1 + apps/routers/chat.py | 9 +- apps/routers/mcp_service.py | 10 +- apps/routers/user.py | 11 +- apps/scheduler/executor/agent.py | 313 ++++++++++++--------------- apps/scheduler/mcp/host.py | 2 +- apps/scheduler/mcp/prompt.py | 5 +- apps/scheduler/mcp_agent/host.py | 63 +++--- apps/scheduler/mcp_agent/plan.py | 95 ++++++++- apps/scheduler/mcp_agent/prompt.py | 328 +++++++++++++++++++++++++++-- apps/scheduler/pool/loader/mcp.py | 23 +- apps/schemas/mcp.py | 19 ++ apps/schemas/message.py | 5 + apps/schemas/record.py | 1 + apps/schemas/task.py | 3 - apps/services/activity.py | 1 + apps/services/mcp_service.py | 76 +++---- apps/services/task.py | 3 + 18 files changed, 667 insertions(+), 301 deletions(-) diff --git a/apps/common/queue.py b/apps/common/queue.py index bcda8aab..58731bcc 100644 --- a/apps/common/queue.py +++ b/apps/common/queue.py @@ -60,6 +60,7 @@ class MessageQueue: flowStatus=task.state.flow_status, stepId=task.state.step_id, stepName=task.state.step_name, + stepDescription=task.state.step_description, stepStatus=task.state.step_status, ) else: diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 033156a9..2498da81 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -3,9 +3,11 @@ import asyncio import logging +import uuid from collections.abc import AsyncGenerator +from typing import Annotated -from fastapi import APIRouter, Depends, HTTPException, Request, status +from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from fastapi.responses import JSONResponse, StreamingResponse from apps.common.queue import MessageQueue @@ -155,9 +157,10 @@ async def chat(request: Request, post_body: RequestData) -> StreamingResponse: @router.post("/stop", response_model=ResponseData) -async def stop_generation(request: Request) -> JSONResponse: +async def stop_generation(taskId: Annotated[uuid.UUID, Query()]) -> JSONResponse: # noqa: N803 """停止生成""" - await Activity.remove_active(request.state.user_sub) + await Activity.remove_active(taskId) + return JSONResponse( status_code=status.HTTP_200_OK, content=ResponseData( diff --git a/apps/routers/mcp_service.py b/apps/routers/mcp_service.py index c4adec41..1c0ebb57 100644 --- a/apps/routers/mcp_service.py +++ b/apps/routers/mcp_service.py @@ -49,14 +49,14 @@ admin_router = APIRouter( @router.get("", response_model=GetMCPServiceListRsp | ResponseData) -async def get_mcpservice_list( +async def get_mcpservice_list( # noqa: PLR0913 request: Request, searchType: SearchType = SearchType.ALL, # noqa: N803 keyword: str | None = None, page: Annotated[int, Query(ge=1)] = 1, *, - is_installed: bool | None = None, - is_active: bool | None = None, + isInstall: bool | None = None, # noqa: N803 + isActive: bool | None = None, # noqa: N803 ) -> JSONResponse: """获取服务列表""" user_sub = request.state.user_sub @@ -66,8 +66,8 @@ async def get_mcpservice_list( user_sub, keyword, page, - is_installed=is_installed, - is_active=is_active, + is_install=isInstall, + is_active=isActive, ) except Exception as e: err = f"[MCPServiceCenter] 获取MCP服务列表失败: {e}" diff --git a/apps/routers/user.py b/apps/routers/user.py index 1ddc3d3f..210311ee 100644 --- a/apps/routers/user.py +++ b/apps/routers/user.py @@ -46,14 +46,9 @@ async def list_user( async def update_user_info(request: Request, data: UserUpdateRequest) -> JSONResponse: """更新用户信息接口""" # 更新用户信息 + await UserManager.update_userinfo_by_user_sub(request.state.user_sub, data) - result = await UserManager.update_userinfo_by_user_sub(request.state.user_sub, data) - if not result: - return JSONResponse( - status_code=status.HTTP_200_OK, - content={"code": status.HTTP_200_OK, "message": "用户信息更新成功"}, - ) return JSONResponse( - status_code=status.HTTP_404_NOT_FOUND, - content={"code": status.HTTP_404_NOT_FOUND, "message": "用户信息更新失败"}, + status_code=status.HTTP_200_OK, + content={"code": status.HTTP_200_OK, "message": "用户信息更新成功"}, ) diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 6f933b8b..4d34b85b 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -1,39 +1,38 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP Agent执行器""" +import anyio import logging -import traceback import uuid - -import anyio -from mcp.types import TextContent from pydantic import Field - +from typing import Any +from mcp.types import TextContent +from apps.llm.patterns.rewrite import QuestionRewrite from apps.llm.reasoning import ReasoningLLM from apps.scheduler.executor.base import BaseExecutor +from apps.schemas.enum_var import EventType, SpecialCallType, FlowStatus, StepStatus from apps.scheduler.mcp_agent.host import MCPHost from apps.scheduler.mcp_agent.plan import MCPPlanner from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID, MCPSelector from apps.scheduler.pool.mcp.client import MCPClient -from apps.scheduler.pool.mcp.pool import MCPPool -from apps.schemas.enum_var import EventType, FlowStatus, SpecialCallType, StepStatus from apps.schemas.mcp import ( - ErrorType, GoalEvaluationResult, - MCPCollection, - MCPPlan, - MCPTool, RestartStepIndex, - ToolExcutionErrorType, ToolRisk, + ErrorType, + ToolExcutionErrorType, + MCPPlan, + MCPCollection, + MCPTool, + Step ) -from apps.schemas.message import FlowParams +from apps.scheduler.pool.mcp.pool import MCPPool from apps.schemas.task import ExecutorState, FlowStepHistory, StepQueueItem +from apps.schemas.message import param +from apps.services.task import TaskManager from apps.services.appcenter import AppCenterManager from apps.services.mcp_service import MCPServiceManager -from apps.services.task import TaskManager from apps.services.user import UserManager - logger = logging.getLogger(__name__) @@ -47,10 +46,13 @@ class MCPAgentExecutor(BaseExecutor): mcp_list: list[MCPCollection] = Field(description="MCP服务器列表", default=[]) mcp_pool: MCPPool = Field(description="MCP池", default=MCPPool()) tools: dict[str, MCPTool] = Field( - description="MCP工具列表,key为tool_id", default={}, + description="MCP工具列表,key为tool_id", default={} + ) + tool_list: list[MCPTool] = Field( + description="MCP工具列表,包含所有MCP工具", default=[] ) - params: FlowParams | bool | None = Field( - default=None, description="流执行过程中的参数补充", alias="params", + params: param | bool | None = Field( + default=None, description="流执行过程中的参数补充", alias="params" ) resoning_llm: ReasoningLLM = Field( default=ReasoningLLM(), @@ -90,94 +92,36 @@ class MCPAgentExecutor(BaseExecutor): await self.mcp_pool._init_mcp(mcp_id, self.task.ids.user_sub) for tool in mcp_service.tools: self.tools[tool.id] = tool - - async def plan(self, is_replan: bool = False, start_index: int | None = None) -> None: - if is_replan: - error_message = "之前的计划遇到以下报错\n\n"+self.task.state.error_message - else: - error_message = "初始化计划" - tools = await MCPSelector.select_top_tool( - self.task.runtime.question, list(self.tools.values()), - additional_info=error_message, top_n=40, reasoning_llm=self.resoning_llm) - if is_replan: - logger.info("[MCPAgentExecutor] 重新规划流程") - if not start_index: - start_index = await MCPPlanner.get_replan_start_step_index(self.task.runtime.question, - self.task.state.error_message, - self.task.runtime.temporary_plans, - self.resoning_llm) - start_index = start_index.start_index - current_plan = MCPPlan(plans=self.task.runtime.temporary_plans.plans[start_index:]) - error_message = self.task.state.error_message - temporary_plans = await MCPPlanner.create_plan( - self.task.runtime.question, - is_replan=is_replan, - error_message=error_message, - current_plan=current_plan, - tool_list=tools, - max_steps=self.max_steps-start_index-1, - reasoning_llm=self.resoning_llm, - ) - await self.update_tokens() - await self.push_message( - EventType.STEP_CANCEL, - data={}, - ) - if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: - self.task.context[-1].step_status = StepStatus.CANCELLED - self.task.runtime.temporary_plans.plans = self.task.runtime.temporary_plans.plans[ - : start_index] + temporary_plans.plans - self.task.state.step_index = start_index - else: - start_index = 0 - logger.error( - f"各个字段的类型: {type(self.task.runtime.question)}, {type(tools)}, {type(self.max_steps)}, {type(self.resoning_llm)}") - self.task.runtime.temporary_plans = await MCPPlanner.create_plan(self.task.runtime.question, tool_list=tools, max_steps=self.max_steps, reasoning_llm=self.resoning_llm) - for i in range(start_index, len(self.task.runtime.temporary_plans.plans)): - self.task.runtime.temporary_plans.plans[i].step_id = str(uuid.uuid4()) + self.tool_list.extend(mcp_service.tools) + self.tools[FINAL_TOOL_ID] = MCPTool( + id=FINAL_TOOL_ID, + name="Final Tool", + description="结束流程的工具", + mcp_id="", + input_schema={} + ) + self.tool_list.append(MCPTool(id=FINAL_TOOL_ID, name="Final Tool", + description="结束流程的工具", mcp_id="", input_schema={})) async def get_tool_input_param(self, is_first: bool) -> None: if is_first: # 获取第一个输入参数 - tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool - step = self.task.runtime.temporary_plans.plans[self.task.state.step_index] - mcp_tool = self.tools[tool_id] - self.task.state.current_input = await MCPHost.get_first_input_params( - mcp_tool, self.task.runtime.question, step.instruction, self.task - ) + mcp_tool = self.tools[self.task.state.tool_id] + self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task) else: # 获取后续输入参数 - if isinstance(self.params, FlowParams): + if isinstance(self.params, param): params = self.params.content params_description = self.params.description else: params = {} params_description = "" - tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool - mcp_tool = self.tools[tool_id] - step = self.task.runtime.temporary_plans.plans[self.task.state.step_index] - self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.runtime.question, step.instruction, self.task.state.current_input, self.task.state.error_message, params, params_description) - - async def reset_step_to_index(self, start_index: int) -> None: - """重置步骤到开始""" - logger.info("[MCPAgentExecutor] 重置步骤到索引 %d", start_index) - - if start_index < len(self.task.runtime.temporary_plans.plans): - self.task.state.flow_status = FlowStatus.RUNNING - self.task.state.step_id = self.task.runtime.temporary_plans.plans[start_index].step_id - self.task.state.step_index = 0 - self.task.state.step_name = self.tools[self.task.runtime.temporary_plans.plans[start_index].tool].name - self.task.state.step_description = self.task.runtime.temporary_plans.plans[start_index].content - self.task.state.step_status = StepStatus.INIT - self.task.state.retry_times = 0 - else: - self.task.state.step_id = FINAL_TOOL_ID + mcp_tool = self.tools[self.task.state.tool_id] + self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task.state.current_input, self.task.state.error_message, params, params_description) async def confirm_before_step(self) -> None: - logger.info("[MCPAgentExecutor] 等待用户确认步骤 %d", self.task.state.step_index) # 发送确认消息 - tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool - mcp_tool = self.tools[tool_id] + mcp_tool = self.tools[self.task.state.tool_id] confirm_message = await MCPPlanner.get_tool_risk(mcp_tool, self.task.state.current_input, "", self.resoning_llm) await self.update_tokens() await self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( @@ -198,17 +142,14 @@ class MCPAgentExecutor(BaseExecutor): input_data={}, output_data={}, ex_data=confirm_message.model_dump(exclude_none=True, by_alias=True), - ), + ) ) async def run_step(self): """执行步骤""" self.task.state.flow_status = FlowStatus.RUNNING self.task.state.step_status = StepStatus.RUNNING - logger.info("[MCPAgentExecutor] 执行步骤 %d", self.task.state.step_index) - # 获取MCP客户端 - tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool - mcp_tool = self.tools[tool_id] + mcp_tool = self.tools[self.task.state.tool_id] mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub)) try: output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input) @@ -245,11 +186,11 @@ class MCPAgentExecutor(BaseExecutor): await self.update_tokens() await self.push_message( EventType.STEP_INPUT, - self.task.state.current_input, + self.task.state.current_input ) await self.push_message( EventType.STEP_OUTPUT, - output_params, + output_params ) self.task.context.append( FlowStepHistory( @@ -263,37 +204,36 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data=self.task.state.current_input, output_data=output_params, - ), + ) ) self.task.state.step_status = StepStatus.SUCCESS async def generate_params_with_null(self) -> None: """生成参数补充""" - tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool - mcp_tool = self.tools[tool_id] + mcp_tool = self.tools[self.task.state.tool_id] params_with_null = await MCPPlanner.get_missing_param( mcp_tool, self.task.state.current_input, self.task.state.error_message, - self.resoning_llm, + self.resoning_llm ) await self.update_tokens() error_message = await MCPPlanner.change_err_message_to_description( error_message=self.task.state.error_message, tool=mcp_tool, input_params=self.task.state.current_input, - reasoning_llm=self.resoning_llm, + reasoning_llm=self.resoning_llm ) await self.push_message( EventType.STEP_WAITING_FOR_PARAM, data={ "message": error_message, - "params": params_with_null, - }, + "params": params_with_null + } ) await self.push_message( EventType.FLOW_STOP, - data={}, + data={} ) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.PARAM @@ -311,27 +251,42 @@ class MCPAgentExecutor(BaseExecutor): output_data={}, ex_data={ "message": error_message, - "params": params_with_null, - }, - ), + "params": params_with_null + } + ) ) async def get_next_step(self) -> None: - self.task.state.step_index += 1 - if self.task.state.step_index < len(self.task.runtime.temporary_plans.plans): - tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + if self.task.state.step_cnt < self.max_steps: + self.task.state.step_cnt += 1 + history = await MCPHost.assemble_memory(self.task) + max_retry = 3 + step = None + for i in range(max_retry): + step = await MCPPlanner.create_next_step(self.task.runtime.question, history, self.tool_list) + if step.tool_id in self.tools.keys(): + break + if step is None or step.tool_id not in self.tools.keys(): + step = Step( + tool_id=FINAL_TOOL_ID, + description=FINAL_TOOL_ID + ) + tool_id = step.tool_id if tool_id == FINAL_TOOL_ID: - return - self.task.state.step_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].step_id - self.task.state.step_name = self.tools[self.task.runtime.temporary_plans.plans - [self.task.state.step_index].tool].name - self.task.state.step_description = self.task.runtime.temporary_plans.plans[self.task.state.step_index].content + step_name = FINAL_TOOL_ID + else: + step_name = self.tools[tool_id].name + step_description = step.description + self.task.state.step_id = str(uuid.uuid4()) + self.task.state.tool_id = tool_id + self.task.state.step_name = step_name + self.task.state.step_description = step_description self.task.state.step_status = StepStatus.INIT self.task.state.current_input = {} else: # 没有下一步了,结束流程 - self.task.state.step_id = FINAL_TOOL_ID - return + self.task.state.tool_id = FINAL_TOOL_ID + return async def error_handle_after_step(self) -> None: """步骤执行失败后的错误处理""" @@ -339,7 +294,7 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.flow_status = FlowStatus.ERROR await self.push_message( EventType.FLOW_FAILED, - data={}, + data={} ) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: del self.task.context[-1] @@ -355,7 +310,7 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data={}, output_data={}, - ), + ) ) async def work(self) -> None: @@ -363,7 +318,7 @@ class MCPAgentExecutor(BaseExecutor): if self.task.state.step_status == StepStatus.INIT: await self.push_message( EventType.STEP_INIT, - data={}, + data={} ) await self.get_tool_input_param(is_first=True) user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) @@ -385,11 +340,11 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.step_status = StepStatus.CANCELLED await self.push_message( EventType.STEP_CANCEL, - data={}, + data={} ) await self.push_message( EventType.FLOW_CANCEL, - data={}, + data={} ) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: self.task.context[-1].step_status = StepStatus.CANCELLED @@ -398,7 +353,7 @@ class MCPAgentExecutor(BaseExecutor): max_retry = 5 for i in range(max_retry): if i != 0: - await self.get_tool_input_param(is_first=False) + await self.get_tool_input_param(is_first=True) await self.run_step() if self.task.state.step_status == StepStatus.SUCCESS: break @@ -408,28 +363,51 @@ class MCPAgentExecutor(BaseExecutor): await self.error_handle_after_step() else: user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) - tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool - mcp_tool = self.tools[tool_id] - error_type = await MCPPlanner.get_tool_execute_error_type( - self.task.runtime.question, - self.task.runtime.temporary_plans, - mcp_tool, - self.task.state.current_input, - self.task.state.error_message, - self.resoning_llm - ) - if error_type.type == ErrorType.DECORRECT_PLAN or user_info.auto_execute: - await self.plan(is_replan=True) - await self.reset_step_to_index(self.task.state.step_index) - elif error_type.type == ErrorType.MISSING_PARAM: - await self.generate_params_with_null() + if user_info.auto_execute: + await self.push_message( + EventType.STEP_ERROR, + data={ + "message": self.task.state.error_message, + } + ) + if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: + self.task.context[-1].step_status = StepStatus.ERROR + self.task.context[-1].output_data = { + "message": self.task.state.error_message, + } + await self.get_next_step() + else: + mcp_tool = self.tools[self.task.state.tool_id] + is_param_error = await MCPPlanner.is_param_error( + self.task.runtime.question, + await MCPHost.assemble_memory(self.task), + self.task.state.error_message, + mcp_tool, + self.task.state.step_description, + self.task.state.current_input, + ) + if is_param_error.is_param_error: + # 如果是参数错误,生成参数补充 + await self.generate_params_with_null() + else: + await self.push_message( + EventType.STEP_ERROR, + data={ + "message": self.task.state.error_message, + } + ) + if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: + self.task.context[-1].step_status = StepStatus.ERROR + self.task.context[-1].output_data = { + "message": self.task.state.error_message, + } + await self.get_next_step() elif self.task.state.step_status == StepStatus.SUCCESS: await self.get_next_step() async def summarize(self) -> None: async for chunk in MCPPlanner.generate_answer( self.task.runtime.question, - self.task.runtime.temporary_plans, (await MCPHost.assemble_memory(self.task)), self.resoning_llm ): @@ -449,9 +427,8 @@ class MCPAgentExecutor(BaseExecutor): try: self.task.state.flow_id = str(uuid.uuid4()) self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm) - await self.plan(is_replan=False) - await self.reset_step_to_index(0) await TaskManager.save_task(self.task.id, self.task) + await self.get_next_step() except Exception as e: import traceback logger.error("[MCPAgentExecutor] 初始化失败: %s", traceback.format_exc()) @@ -460,68 +437,53 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.error_message = str(e) await self.push_message( EventType.FLOW_FAILED, - data={}, + data={} ) return self.task.state.flow_status = FlowStatus.RUNNING - plan = { - "plans": [], - } - for p in self.task.runtime.temporary_plans.plans: - if p.tool == FINAL_TOOL_ID: - continue - mcp_tool = self.tools.get(p.tool, None) - plan["plans"].append( - { - "stepId": p.step_id, - "stepName": mcp_tool.name, - "stepDescription": p.content, - }, - ) await self.push_message( EventType.FLOW_START, - data=plan, + data={} ) - if self.task.state.step_id == FINAL_TOOL_ID: + if self.task.state.tool_id == FINAL_TOOL_ID: # 如果已经是最后一步,直接结束 self.task.state.flow_status = FlowStatus.SUCCESS await self.push_message( EventType.FLOW_SUCCESS, - data={}, + data={} ) await self.summarize() return try: - while len(self.task.runtime.temporary_plans.plans) and \ - self.task.state.step_index < len(self.task.runtime.temporary_plans.plans) and \ - self.task.state.flow_status == FlowStatus.RUNNING: - tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool - if tool_id == FINAL_TOOL_ID: + while self.task.state.flow_status == FlowStatus.RUNNING: + if self.task.state.tool_id == FINAL_TOOL_ID: break await self.work() await TaskManager.save_task(self.task.id, self.task) - tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + tool_id = self.task.state.tool_id if tool_id == FINAL_TOOL_ID: # 如果已经是最后一步,直接结束 self.task.state.flow_status = FlowStatus.SUCCESS self.task.state.step_status = StepStatus.SUCCESS await self.push_message( EventType.FLOW_SUCCESS, - data={}, + data={} ) await self.summarize() except Exception as e: - logger.exception("[MCPAgentExecutor] 执行过程中发生错误") + import traceback + logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", traceback.format_exc()) + logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e)) self.task.state.flow_status = FlowStatus.ERROR self.task.state.error_message = str(e) self.task.state.step_status = StepStatus.ERROR await self.push_message( EventType.STEP_ERROR, - data={}, + data={} ) await self.push_message( EventType.FLOW_FAILED, - data={}, + data={} ) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: del self.task.context[-1] @@ -537,11 +499,12 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data={}, output_data={}, - ), + ) ) finally: for mcp_service in self.mcp_list: try: await self.mcp_pool.stop(mcp_service.id, self.task.ids.user_sub) - except Exception: - logger.exception("[MCPAgentExecutor] 停止MCP客户端时发生错误") + except Exception as e: + import traceback + logger.error("[MCPAgentExecutor] 停止MCP客户端时发生错误: %s", traceback.format_exc()) diff --git a/apps/scheduler/mcp/host.py b/apps/scheduler/mcp/host.py index a4fc8b89..3ccc0880 100644 --- a/apps/scheduler/mcp/host.py +++ b/apps/scheduler/mcp/host.py @@ -12,12 +12,12 @@ from mcp.types import TextContent from apps.llm.function import JsonGenerator from apps.models.mcp import MCPTools +from apps.models.task import ExecutorHistory from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE from apps.scheduler.pool.mcp.client import MCPClient from apps.scheduler.pool.mcp.pool import MCPPool from apps.schemas.enum_var import FlowStatus, StepStatus from apps.schemas.mcp import MCPPlanItem -from apps.schemas.task import FlowStepHistory from apps.services.mcp_service import MCPServiceManager from apps.services.task import TaskManager diff --git a/apps/scheduler/mcp/prompt.py b/apps/scheduler/mcp/prompt.py index 0d7b417f..9951fd93 100644 --- a/apps/scheduler/mcp/prompt.py +++ b/apps/scheduler/mcp/prompt.py @@ -13,6 +13,7 @@ CREATE_PLAN = dedent(r""" 2. 计划中的每一个步骤必须且只能使用一个工具。 3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。 4. 计划中的最后一步必须是Final工具,以确保计划执行结束。 + 5.生成的计划必须要覆盖用户的目标,不能遗漏任何用户目标中的内容。 # 生成计划时的注意事项: @@ -174,8 +175,8 @@ FINAL_ANSWER = dedent(r""" MEMORY_TEMPLATE = dedent(r""" {% for ctx in context_list %} - 第{{ loop.index }}步:{{ ctx.step_description }} - - 调用工具 `{{ ctx.step_name }}`,并提供参数 `{{ ctx.input_data | tojson }}`。 - - 执行状态:{{ ctx.status }} + - 调用工具 `{{ ctx.step_name }}`,并提供参数 `{{ ctx.input_data | tojson }}`。 + - 执行状态:{{ ctx.step_status }} - 得到数据:`{{ ctx.output_data | tojson }}` {% endfor %} """) diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index f068e9d2..fde8e39a 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -7,17 +7,13 @@ from typing import Any from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment -from mcp.types import TextContent from apps.llm.function import JsonGenerator from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE -from apps.scheduler.mcp_agent.prompt import REPAIR_PARAMS -from apps.scheduler.pool.mcp.client import MCPClient -from apps.scheduler.pool.mcp.pool import MCPPool -from apps.schemas.enum_var import StepStatus -from apps.schemas.mcp import MCPPlanItem, MCPTool -from apps.schemas.task import FlowStepHistory, Task -from apps.services.task import TaskManager +from apps.scheduler.mcp_agent.base import McpBase +from apps.scheduler.mcp_agent.prompt import GEN_PARAMS, REPAIR_PARAMS +from apps.schemas.mcp import MCPTool +from apps.schemas.task import Task def tojson_filter(value: Any) -> str: @@ -35,7 +31,7 @@ _env = SandboxedEnvironment( ) -class MCPHost: +class MCPHost(McpBase): """MCP宿主服务""" @staticmethod @@ -45,34 +41,43 @@ class MCPHost: context_list=task.context, ) - async def get_first_input_params(mcp_tool: MCPTool, query: str, task: Task) -> dict[str, Any]: + @staticmethod + async def _get_first_input_params(mcp_tool: MCPTool, goal: str, current_goal: str, task: Task, + resoning_llm: ReasoningLLM = ReasoningLLM()) -> dict[str, Any]: """填充工具参数""" # 更清晰的输入·指令,这样可以调用generate - llm_query = rf""" - 请使用参数生成工具,生成满足以下目标的工具参数: - - {query} - """ - - # 进行生成 - json_generator = JsonGenerator( - llm_query, - [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": await MCPHost.assemble_memory(task)}, - ], + prompt = _env.from_string(GEN_PARAMS).render( + tool_name=mcp_tool.name, + tool_description=mcp_tool.description, + goal=goal, + current_goal=current_goal, + input_schema=mcp_tool.input_schema, + background_info=await MCPHost.assemble_memory(task), + ) + logger.info("[MCPHost] 填充工具参数: %s", prompt) + result = await MCPHost.get_resoning_result( + prompt, + resoning_llm + ) + # 使用JsonGenerator解析结果 + result = await MCPHost._parse_result( + result, mcp_tool.input_schema, ) - return await json_generator.generate() + return result - async def _fill_params(mcp_tool: MCPTool, - current_input: dict[str, Any], - error_message: str = "", params: dict[str, Any] = {}, - params_description: str = "") -> dict[str, Any]: + @staticmethod + async def _fill_params( # noqa: PLR0913 + goal: str, current_goal: str, + mcp_tool: MCPTool, current_input: dict[str, Any], + error_message: str = "", params: dict[str, Any] | None = None, + params_description: str = "") -> dict[str, Any]: llm_query = "请生成修复之后的工具参数" prompt = _env.from_string(REPAIR_PARAMS).render( tool_name=mcp_tool.name, tool_description=mcp_tool.description, + goal=goal, + current_goal=current_goal, input_schema=mcp_tool.input_schema, current_input=current_input, error_message=error_message, @@ -88,4 +93,4 @@ class MCPHost: ], mcp_tool.input_schema, ) - return await json_generator.generate() \ No newline at end of file + return await json_generator.generate() diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index b2f3a35e..0d707d2d 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -1,5 +1,6 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP 用户目标拆解与规划""" + from collections.abc import AsyncGenerator from typing import Any @@ -14,19 +15,34 @@ from apps.scheduler.mcp_agent.prompt import ( CREATE_PLAN, EVALUATE_GOAL, FINAL_ANSWER, + GEN_STEP, GENERATE_FLOW_NAME, GET_MISSING_PARAMS, GET_REPLAN_START_STEP_INDEX, + IS_PARAM_ERROR, RECREATE_PLAN, RISK_EVALUATE, TOOL_EXECUTE_ERROR_TYPE_ANALYSIS, + TOOL_SKIP, ) from apps.scheduler.slot.slot import Slot -from apps.schemas.mcp import GoalEvaluationResult, MCPPlan, MCPTool, RestartStepIndex, ToolExcutionErrorType, ToolRisk +from apps.schemas.mcp import ( + GoalEvaluationResult, + IsParamError, + MCPPlan, + MCPPlanItem, + MCPTool, + RestartStepIndex, + Step, + ToolExcutionErrorType, + ToolRisk, + ToolSkip, +) +from apps.schemas.task import Task _env = SandboxedEnvironment( loader=BaseLoader, - autoescape=True, + autoescape=False, trim_blocks=True, lstrip_blocks=True, ) @@ -126,6 +142,7 @@ class MCPPlanner(McpBase): max_steps: int = 10, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str: """获取推理大模型的结果""" # 格式化Prompt + tool_ids = [tool.id for tool in tool_list] if is_replan: template = _env.from_string(RECREATE_PLAN) prompt = template.render( @@ -155,9 +172,54 @@ class MCPPlanner(McpBase): # 使用Function模型解析结果 return MCPPlan.model_validate(plan) + @staticmethod + async def create_next_step( + goal: str, history: str, tools: list[MCPTool], + reasoning_llm: ReasoningLLM = ReasoningLLM()) -> Step: + """创建下一步的执行步骤""" + # 获取推理结果 + template = _env.from_string(GEN_STEP) + prompt = template.render(goal=goal, history=history, tools=tools) + result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) + + # 解析为结构化数据 + schema = Step.model_json_schema() + if "enum" not in schema["properties"]["tool_id"]: + schema["properties"]["tool_id"]["enum"] = [] + for tool in tools: + schema["properties"]["tool_id"]["enum"].append(tool.id) + step = await MCPPlanner._parse_result(result, schema) + # 使用Step模型解析结果 + return Step.model_validate(step) + + @staticmethod + async def tool_skip( + task: Task, step_id: str, step_name: str, step_instruction: str, step_content: str, + reasoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolSkip: + """判断当前步骤是否需要跳过""" + # 获取推理结果 + template = _env.from_string(TOOL_SKIP) + from apps.scheduler.mcp_agent.host import MCPHost + history = await MCPHost.assemble_memory(task) + prompt = template.render( + step_id=step_id, + step_name=step_name, + step_instruction=step_instruction, + step_content=step_content, + history=history, + goal=task.runtime.question + ) + result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) + + # 解析为结构化数据 + schema = ToolSkip.model_json_schema() + skip_result = await MCPPlanner._parse_result(result, schema) + # 使用ToolSkip模型解析结果 + return ToolSkip.model_validate(skip_result) + @staticmethod async def get_tool_risk( - tool: MCPTool, input_parm: dict[str, Any], + tool: MCPTool, input_parm: dict[str, Any], additional_info: str = "", resoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolRisk: """获取MCP工具的风险评估结果""" # 获取推理结果 @@ -231,6 +293,29 @@ class MCPPlanner(McpBase): # 返回工具执行错误类型 return error_type + @staticmethod + async def is_param_error( + goal: str, history: str, error_message: str, tool: MCPTool, step_description: str, input_params: dict + [str, Any], + reasoning_llm: ReasoningLLM = ReasoningLLM()) -> IsParamError: + """判断错误信息是否是参数错误""" + tmplate = _env.from_string(IS_PARAM_ERROR) + prompt = tmplate.render( + goal=goal, + history=history, + step_id=tool.id, + step_name=tool.name, + step_description=step_description, + input_params=input_params, + error_message=error_message, + ) + result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) + # 解析为结构化数据 + schema = IsParamError.model_json_schema() + is_param_error = await MCPPlanner._parse_result(result, schema) + # 使用IsParamError模型解析结果 + return IsParamError.model_validate(is_param_error) + @staticmethod async def change_err_message_to_description( error_message: str, tool: MCPTool, input_params: dict[str, Any], @@ -270,16 +355,14 @@ class MCPPlanner(McpBase): @staticmethod async def generate_answer( - user_goal: str, plan: MCPPlan, memory: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> AsyncGenerator[ + user_goal: str, memory: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> AsyncGenerator[ str, None]: """生成最终回答""" template = _env.from_string(FINAL_ANSWER) prompt = template.render( - plan=plan.model_dump(exclude_none=True, by_alias=True), memory=memory, goal=user_goal, ) - async for chunk in resoning_llm.call( [{"role": "user", "content": prompt}], streaming=True, diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index bfe8cf39..d51042a9 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -68,10 +68,10 @@ TOOL_SELECT = dedent(r""" 你的任务是:根据当前目标,附加信息,选择最合适的MCP工具。 ## 选择MCP工具时的注意事项: 1. 确保充分理解当前目标,选择实现目标所需的MCP工具。 - 2. 请在给定的MCP工具列表中选择,不要自己生成MCP工具。 + 2. 不要选择不存在的工具。 3. 可以选择一些辅助工具,但必须确保这些工具与当前目标相关。 4. 注意,返回的工具ID必须是MCP工具的ID,而不是名称。 - 5. 不要选择不存在的工具。 + 5. 可以多选择一些工具,用于应对不同的情况。 必须按照以下格式生成选择结果,不要输出任何其他内容: ```json { @@ -265,7 +265,9 @@ CREATE_PLAN = dedent(r""" 1. 能够成功完成用户的目标 2. 计划中的每一个步骤必须且只能使用一个工具。 3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。 - 4. 计划中的最后一步必须是Final工具,以确保计划执行结束。 + 4. 不要选择不存在的工具。 + 5. 计划中的最后一步必须是Final工具,以确保计划执行结束。 + 6. 生成的计划必须要覆盖用户的目标,当然需要考虑一些意外情况,可以有一定的冗余步骤。 # 生成计划时的注意事项: @@ -327,17 +329,17 @@ CREATE_PLAN = dedent(r""" "instruction": "需要一个支持Docker容器运行的MCP Server" }, { - "content": "使用Result[0]中选择的MCP Server,生成Docker命令", + "content": "使用第一步选择的MCP Server,生成Docker命令", "tool": "command_generator", "instruction": "生成Docker命令:在后台运行alpine:latest容器,挂载/root到/data,执行top命令" }, { - "content": "在Result[0]的MCP Server上执行Result[1]生成的命令", + "content": "执行第二步生成的Docker命令", "tool": "command_executor", "instruction": "执行Docker命令" }, { - "content": "任务执行完成,容器已在后台运行,结果为Result[2]", + "content": "任务执行完成,容器已在后台运行", "tool": "Final", "instruction": "" } @@ -364,7 +366,9 @@ RECREATE_PLAN = dedent(r""" 2. 计划中的每一个步骤必须且只能使用一个工具。 3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。 4. 你的计划必须避免之前的错误,并且能够成功执行。 - 5. 计划中的最后一步必须是Final工具,以确保计划执行结束。 + 5. 不要选择不存在的工具。 + 6. 计划中的最后一步必须是Final工具,以确保计划执行结束。 + 7. 生成的计划必须要覆盖用户的目标,当然需要考虑一些意外情况,可以有一定的冗余步骤。 # 生成计划时的注意事项: @@ -413,12 +417,12 @@ RECREATE_PLAN = dedent(r""" "instruction": "生成端口扫描命令:扫描192.168.1.1的开放端口" }, { - "content": "在执行Result[0]生成的命令", + "content": "在执行第一步生成的命令", "tool": "command_executor", "instruction": "执行端口扫描命令" }, { - "content": "任务执行完成,端口扫描结果为Result[2]", + "content": "任务执行完成", "tool": "Final", "instruction": "" } @@ -450,27 +454,27 @@ RECREATE_PLAN = dedent(r""" "instruction": "选择一个前机器支持哪些网络扫描工具" }, { - "content": "执行Result[0]中生成的命令,查看当前机器支持哪些网络扫描工具", + "content": "执行第一步中生成的命令,查看当前机器支持哪些网络扫描工具", "tool": "command_executor", - "instruction": "执行Result[0]中生成的命令" + "instruction": "执行第一步中生成的命令" }, { - "content": "从Result[1]中选择一个网络扫描工具,生成端口扫描命令", + "content": "从第二步执行结果中选择一个网络扫描工具,生成端口扫描命令", "tool": "tool_selector", "instruction": "选择一个网络扫描工具,生成端口扫描命令" }, { - "content": "基于result[2]中选择的网络扫描工具,生成端口扫描命令", + "content": "基于第三步中选择的网络扫描工具,生成端口扫描命令", "tool": "command_generator", "instruction": "生成端口扫描命令:扫描192.168.1.1的开放端口" }, { - "content": "在Result[0]的MCP Server上执行Result[3]生成的命令", + "content": "执行第四步中生成的端口扫描命令", "tool": "command_executor", "instruction": "执行端口扫描命令" }, { - "content": "任务执行完成,端口扫描结果为Result[4]", + "content": "任务执行完成", "tool": "Final", "instruction": "" } @@ -503,6 +507,144 @@ RECREATE_PLAN = dedent(r""" # 重新生成的计划 """) +GEN_STEP = dedent(r""" + 你是一个计划生成器。 + 请根据用户的目标、当前计划和历史,生成一个新的步骤。 + + # 一个好的计划步骤应该: + 1.使用最适合的工具来完成当前步骤。 + 2.能够基于当前的计划和历史,完成阶段性的任务。 + 3.不要选择不存在的工具。 + 4.如果你认为当前已经达成了用户的目标,可以直接返回Final工具,表示计划执行结束。 + + # 样例 1 + # 目标 + 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优,我的ip是192.168.1.1,数据库端口是3306,用户名是root,密码是password + # 历史记录 + 第1步:生成端口扫描命令 + - 调用工具 `command_generator`,并提供参数 `帮我生成一个mysql端口扫描命令` + - 执行状态:成功 + - 得到数据:`{"command": "nmap -sS -p--open 192.168.1.1"}` + 第2步:执行端口扫描命令 + - 调用工具 `command_executor`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}` + - 执行状态:成功 + - 得到数据:`{"result": "success"}` + # 工具 + + - mcp_tool_1 mysql_analyzer;用于分析数据库性能/description> + - mcp_tool_2 文件存储工具;用于存储文件 + - mcp_tool_3 mongoDB工具;用于操作MongoDB数据库 + - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 + + # 输出 + ```json + { + "tool_id": "mcp_tool_1", // 选择的工具ID + "description": "扫描ip为192.168.1.1的MySQL数据库,端口为3306,用户名为root,密码为password的数据库性能", + } + ``` + # 样例二 + # 目标 + 计划从杭州到北京的旅游计划 + # 历史记录 + 第1步:将杭州转换为经纬度坐标 + - 调用工具 `maps_geo_planner`,并提供参数 `{"city_from": "杭州", "address": "西湖"}` + - 执行状态:成功 + - 得到数据:`{"location": "123.456, 78.901"}` + 第2步:查询杭州的天气 + - 调用工具 `weather_query`,并提供参数 `{"location": "123.456, 78.901"}` + - 执行状态:成功 + - 得到数据:`{"weather": "晴", "temperature": "25°C"}` + 第3步:将北京转换为经纬度坐标 + - 调用工具 `maps_geo_planner`,并提供参数 `{"city_from": "北京", "address": "天安门"}` + - 执行状态:成功 + - 得到数据:`{"location": "123.456, 78.901"}` + 第4步:查询北京的天气 + - 调用工具 `weather_query`,并提供参数 `{"location": "123.456, 78.901"}` + - 执行状态:成功 + - 得到数据:`{"weather": "晴", "temperature": "25°C"}` + # 工具 + + - mcp_tool_4 maps_geo_planner;将详细的结构化地址转换为经纬度坐标。支持对地标性名胜景区、建筑物名称解析为经纬度坐标 + - mcp_tool_5 weather_query;天气查询,用于查询天气信息 + - mcp_tool_6 maps_direction_transit_integrated;根据用户起终点经纬度坐标规划综合各类公共(火车、公交、地铁)交通方式的通勤方案,并且返回通勤方案的数据,跨城场景下必须传起点城市与终点城市 + - Final Final;结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 + + # 输出 + ```json + { + "tool_id": "mcp_tool_6", // 选择的工具ID + "description": "规划从杭州到北京的综合公共交通方式的通勤方案" + } + ``` + # 现在开始生成步骤: + # 目标 + {{goal}} + # 历史记录 + {{history}} + # 工具 + + {% for tool in tools %} + - {{tool.id}} {{tool.name}};{{tool.description}} + {% endfor %} + +""") + +TOOL_SKIP = dedent(r""" + 你是一个计划执行器。 + 你的任务是根据当前的计划和用户目标,判断当前步骤是否需要跳过。 + 如果需要跳过,请返回`true`,否则返回`false`。 + 必须按照以下格式回答: + ```json + { + "skip": true/false, + } + ``` + 注意: + 1.你的判断要谨慎,在历史消息中有足够的上下文信息时,才可以判断是否跳过当前步骤。 + # 样例 + # 用户目标 + 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 + # 历史 + 第1步:生成端口扫描命令 + - 调用工具 `command_generator`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}` + - 执行状态:成功 + - 得到数据:`{"command": "nmap -sS -p--open 192.168.1.1"}` + 第2步:执行端口扫描命令 + - 调用工具 `command_executor`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}` + - 执行状态:成功 + - 得到数据:`{"result": "success"}` + 第3步:分析端口扫描结果 + - 调用工具 `mysql_analyzer`,并提供参数 `{"host": "192.168.1.1", "port": 3306, "username": "root", "password": "password"}` + - 执行状态:成功 + - 得到数据:`{"performance": "good", "bottleneck": "none"}` + # 当前步骤 + + step_4 + command_generator + 生成MySQL性能调优命令 + 生成MySQL性能调优命令:调优MySQL数据库性能 + + # 输出 + ```json + { + "skip": true + } + ``` + # 用户目标 + {{goal}} + # 历史 + {{history}} + # 当前步骤 + + {{step_id}} + {{step_name}} + {{step_instruction}} + {{step_content}} + + # 输出 + """) + RISK_EVALUATE = dedent(r""" 你是一个工具执行计划评估器。 你的任务是根据当前工具的名称、描述和入参以及附加信息,判断当前工具执行的风险并输出提示。 @@ -621,6 +763,66 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r""" # 输出 """) +IS_PARAM_ERROR = dedent(r""" + 你是一个计划执行专家,你的任务是判断当前的步骤执行失败是否是因为参数错误导致的, + 如果是,请返回`true`,否则返回`false`。 + 必须按照以下格式回答: + ```json + { + "is_param_error": true/false, + } + ``` + # 样例 + # 用户目标 + 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 + # 历史 + 第1步:生成端口扫描命令 + - 调用工具 `command_generator`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}` + - 执行状态:成功 + - 得到数据:`{"command": "nmap -sS -p--open 192.168.1.1"}` + 第2步:执行端口扫描命令 + - 调用工具 `command_executor`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}` + - 执行状态:成功 + - 得到数据:`{"result": "success"}` + # 当前步骤 + + step_3 + mysql_analyzer + 分析MySQL数据库性能 + + # 工具入参 + { + "host": "192.0.0.1", + "port": 3306, + "username": "root", + "password": "password" + } + # 工具运行报错 + 执行MySQL性能分析命令时,出现了错误:`host is not correct`。 + + # 输出 + ```json + { + "is_param_error": true + } + ``` + # 用户目标 + {{goal}} + # 历史 + {{history}} + # 当前步骤 + + {{step_id}} + {{step_name}} + {{step_instruction}} + + # 工具入参 + {{input_param}} + # 工具运行报错 + {{error_message}} + # 输出 + """) + # 将当前程序运行的报错转换为自然语言 CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r""" 你是一个智能助手,你的任务是将当前程序运行的报错转换为自然语言描述。 @@ -771,9 +973,87 @@ GET_MISSING_PARAMS = dedent(r""" # 输出 """) + +GEN_PARAMS = dedent(r""" + 你是一个工具参数生成器。 + 你的任务是根据总的目标、阶段性的目标、工具信息、工具入参的schema和背景信息生成工具的入参。 + 注意: + 1.生成的参数在格式上必须符合工具入参的schema。 + 2.总的目标、阶段性的目标和背景信息必须被充分理解,利用其中的信息来生成工具入参。 + 3.生成的参数必须符合阶段性目标。 + + # 样例 + # 工具信息 + < tool > + < name > mysql_analyzer < /name > + < description > 分析MySQL数据库性能 < /description > + < / tool > + # 总目标 + 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优,ip地址是192.168.1.1,端口是3306,用户名是root,密码是password。 + # 当前阶段目标 + 我要连接MySQL数据库,分析性能瓶颈,并调优。 + # 工具入参的schema + { + "type": "object", + "properties": { + "host": { + "type": "string", + "description": "MySQL数据库的主机地址" + }, + "port": { + "type": "integer", + "description": "MySQL数据库的端口号" + }, + "username": { + "type": "string", + "description": "MySQL数据库的用户名" + }, + "password": { + "type": "string", + "description": "MySQL数据库的密码" + } + }, + "required": ["host", "port", "username", "password"] + } + # 背景信息 + 第1步:生成端口扫描命令 + - 调用工具 `command_generator`,并提供参数 `帮我生成一个mysql端口扫描命令` + - 执行状态:成功 + - 得到数据:`{"command": "nmap -sS -p--open 192.168.1.1"}` + 第2步:执行端口扫描命令 + - 调用工具 `command_executor`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}` + - 执行状态:成功 + - 得到数据:`{"result": "success"}` + # 输出 + ```json + { + "host": "192.168.1.1", + "port": 3306, + "username": "root", + "password": "password" + } + ``` + # 工具 + + {{tool_name}} + {{tool_description}} + + # 总目标 + {{goal}} + # 当前阶段目标 + {{current_goal}} + # 工具入参scheme + {{input_schema}} + # 背景信息 + {{background_info}} + # 输出 + """) + REPAIR_PARAMS = dedent(r""" - 你是一个工具参数修复器。 - 你的任务是根据当前的工具信息、工具入参的schema、工具当前的入参、工具的报错、补充的参数和补充的参数描述,修复当前工具的入参。 + 你的任务是根据当前的工具信息、目标、工具入参的schema、工具当前的入参、工具的报错、补充的参数和补充的参数描述,修复当前工具的入参。 + + 注意: + 1.最终修复的参数要符合目标和工具入参的schema。 # 样例 # 工具信息 @@ -781,6 +1061,10 @@ REPAIR_PARAMS = dedent(r""" mysql_analyzer 分析MySQL数据库性能 + # 总目标 + 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 + # 当前阶段目标 + 我要连接MySQL数据库,分析性能瓶颈,并调优。 # 工具入参的schema { "type": "object", @@ -834,6 +1118,10 @@ REPAIR_PARAMS = dedent(r""" {{tool_name}} {{tool_description}} + # 总目标 + {{goal}} + # 当前阶段目标 + {{current_goal}} # 工具入参scheme {{input_schema}} # 工具入参 @@ -860,10 +1148,6 @@ FINAL_ANSWER = dedent(r""" {{memory}} - # 其他背景信息: - - {{status}} - # 现在,请根据以上信息,向用户报告目标的完成情况: """) @@ -871,7 +1155,7 @@ MEMORY_TEMPLATE = dedent(r""" {% for ctx in context_list %} - 第{{loop.index}}步:{{ctx.step_description}} - 调用工具 `{{ctx.step_id}}`,并提供参数 `{{ctx.input_data}}` - - 执行状态:{{ctx.status}} + - 执行状态:{{ctx.step_status}} - 得到数据:`{{ctx.output_data}}` {% endfor %} """) diff --git a/apps/scheduler/pool/loader/mcp.py b/apps/scheduler/pool/loader/mcp.py index e97ef4a0..7a2aa374 100644 --- a/apps/scheduler/pool/loader/mcp.py +++ b/apps/scheduler/pool/loader/mcp.py @@ -410,17 +410,6 @@ class MCPLoader(metaclass=SingletonMeta): if mcp_env is not None: mcp_config.mcpServers[mcp_id].env.update(mcp_env) - user_config_path = user_path / "config.json" - # 更新用户配置 - f = await user_config_path.open("w", encoding="utf-8", errors="ignore") - await f.write( - json.dumps( - mcp_config.model_dump(by_alias=True, exclude_none=True), - indent=4, - ensure_ascii=False, - ) - ) - await f.aclose() if mcp_config.mcpType == MCPType.STDIO: index = None for i in range(len(mcp_config.config.args)): @@ -434,6 +423,17 @@ class MCPLoader(metaclass=SingletonMeta): mcp_config.config.args.append(str(user_path)+'/project') else: mcp_config.config.args = ["--directory", str(user_path)+'/project'] + mcp_config.config.args + user_config_path = user_path / "config.json" + # 更新用户配置 + f = await user_config_path.open("w", encoding="utf-8", errors="ignore") + await f.write( + json.dumps( + mcp_config.model_dump(by_alias=True, exclude_none=True), + indent=4, + ensure_ascii=False, + ), + ) + await f.aclose() # 更新数据库 async with postgres.session() as session: await session.merge(MCPActivated( @@ -554,6 +554,7 @@ class MCPLoader(metaclass=SingletonMeta): async with postgres.session() as session: mcp_data = (await session.scalars(select(MCPInfo).where(MCPInfo.id == mcp_id))).one_or_none() if mcp_data: + logger.info("[MCPLoader] 更新MCP模板状态: %s -> %s", mcp_id, status) mcp_data.status = status await session.merge(mcp_data) diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py index afb8d292..69b58826 100644 --- a/apps/schemas/mcp.py +++ b/apps/schemas/mcp.py @@ -81,6 +81,12 @@ class Risk(str, Enum): HIGH = "high" +class ToolSkip(BaseModel): + """MCP工具跳过执行结果""" + + skip: bool = Field(description="是否跳过当前步骤", default=False) + + class ToolRisk(BaseModel): """MCP工具风险评估结果""" @@ -102,6 +108,12 @@ class ToolExcutionErrorType(BaseModel): reason: str = Field(description="错误原因", default="") +class IsParamError(BaseModel): + """MCP工具参数错误""" + + is_param_error: bool = Field(description="是否是参数错误", default=False) + + class MCPSelectResult(BaseModel): """MCP选择结果""" @@ -133,3 +145,10 @@ class MCPPlan(BaseModel): """MCP 计划""" plans: list[MCPPlanItem] = Field(description="计划列表", default=[]) + + +class Step(BaseModel): + """MCP步骤""" + + tool_id: str = Field(description="工具ID") + description: str = Field(description="步骤描述") diff --git a/apps/schemas/message.py b/apps/schemas/message.py index 77b17399..1ece99ba 100644 --- a/apps/schemas/message.py +++ b/apps/schemas/message.py @@ -39,6 +39,11 @@ class MessageFlow(BaseModel): sub_step_id: str | None = Field(description="当前子步骤ID", alias="subStepId", default=None) sub_step_name: str | None = Field(description="当前子步骤名称", alias="subStepName", default=None) step_status: StepStatus = Field(description="当前步骤状态", alias="stepStatus") + step_description: str | None = Field( + description="当前步骤描述", + alias="stepDescription", + default=None, + ) class MessageMetadata(RecordMetadata): diff --git a/apps/schemas/record.py b/apps/schemas/record.py index 119a8082..94b83c09 100644 --- a/apps/schemas/record.py +++ b/apps/schemas/record.py @@ -33,6 +33,7 @@ class RecordFlowStep(BaseModel): step_status: StepStatus = Field(alias="stepStatus") input: dict[str, Any] output: dict[str, Any] + ex_data: dict[str, Any] | None = Field(default=None, alias="exData") class RecordFlow(BaseModel): diff --git a/apps/schemas/task.py b/apps/schemas/task.py index a23f7cf9..0a337cfc 100644 --- a/apps/schemas/task.py +++ b/apps/schemas/task.py @@ -6,7 +6,6 @@ from typing import Any from pydantic import BaseModel, Field from .flow import Step -from .mcp import MCPPlan class CheckpointExtra(BaseModel): @@ -20,8 +19,6 @@ class CheckpointExtra(BaseModel): class TaskExtra(BaseModel): """任务额外数据""" - temporary_plans: MCPPlan | None = Field(description="临时计划列表", default=None) - class StepQueueItem(BaseModel): """步骤栈中的元素""" diff --git a/apps/services/activity.py b/apps/services/activity.py index 55bfe2c7..be41a238 100644 --- a/apps/services/activity.py +++ b/apps/services/activity.py @@ -14,6 +14,7 @@ from apps.models.session import SessionActivity class Activity: """用户活动控制,限制单用户同一时间只能提问一个问题""" + # TODO:改为同一时间整个系统最多有n个task在执行,与用户无关 @staticmethod async def is_active(user_sub: str) -> bool: """ diff --git a/apps/services/mcp_service.py b/apps/services/mcp_service.py index b1fbae0a..84b63830 100644 --- a/apps/services/mcp_service.py +++ b/apps/services/mcp_service.py @@ -35,7 +35,6 @@ from apps.schemas.mcp import ( ) from apps.schemas.request_data import UpdateMCPServiceRequest from apps.schemas.response_data import MCPServiceCardItem -from apps.services.user import UserManager logger = logging.getLogger(__name__) MCP_ICON_PATH = ICON_PATH / "mcp" @@ -92,13 +91,13 @@ class MCPServiceManager: @staticmethod - async def fetch_mcp_services( + async def fetch_mcp_services( # noqa: PLR0913 search_type: SearchType, user_sub: str, keyword: str | None, page: int, *, - is_installed: bool | None = None, + is_install: bool | None = None, is_active: bool | None = None, ) -> list[MCPServiceCardItem]: """ @@ -110,7 +109,9 @@ class MCPServiceManager: :param page: int: 页码 :return: MCP服务列表 """ - mcpservice_pools = await MCPServiceManager._search_mcpservice(search_type, keyword, page, is_active=is_active) + mcpservice_pools = await MCPServiceManager._search_mcpservice( + search_type, keyword, page, is_active=is_active, is_installed=is_install, + ) return [ MCPServiceCardItem( mcpserviceId=item.id, @@ -163,12 +164,14 @@ class MCPServiceManager: @staticmethod - async def _search_mcpservice( + async def _search_mcpservice( # noqa: PLR0913 search_type: SearchType, keyword: str | None, page: int, + user_sub: str, *, is_active: bool | None = None, + is_installed: bool | None = None, ) -> list[MCPInfo]: """ 基于输入条件搜索MCP服务 @@ -179,40 +182,36 @@ class MCPServiceManager: """ # 分页查询 skip = (page - 1) * SERVICE_PAGE_SIZE - async with postgres.session() as session: - if not keyword: - result = list( - (await session.scalars(select(MCPInfo).offset(skip).limit(SERVICE_PAGE_SIZE))).all(), - ) - elif search_type == SearchType.ALL: - result = list( - (await session.scalars(select(MCPInfo).where( - or_( - MCPInfo.name.like(f"%{keyword}%"), - MCPInfo.description.like(f"%{keyword}%"), - MCPInfo.author.like(f"%{keyword}%"), - ), - ).offset(skip).limit(SERVICE_PAGE_SIZE))).all(), + sql = select(MCPInfo) + + if search_type == SearchType.ALL: + sql = sql.where( + or_( + MCPInfo.name.like(f"%{keyword}%"), + MCPInfo.description.like(f"%{keyword}%"), + MCPInfo.author.like(f"%{keyword}%"), + ), ) elif search_type == SearchType.NAME: - result = list( - (await session.scalars( - select(MCPInfo).where(MCPInfo.name.like(f"%{keyword}%")).offset(skip).limit(SERVICE_PAGE_SIZE), - )).all(), - ) + sql = sql.where(MCPInfo.name.like(f"%{keyword}%")) elif search_type == SearchType.DESCRIPTION: - result = list( - (await session.scalars( - select(MCPInfo).where(MCPInfo.description.like(f"%{keyword}%")).offset(skip).limit(SERVICE_PAGE_SIZE), - )).all(), - ) + sql = sql.where(MCPInfo.description.like(f"%{keyword}%")) elif search_type == SearchType.AUTHOR: - result = list( - (await session.scalars( - select(MCPInfo).where(MCPInfo.author.like(f"%{keyword}%")).offset(skip).limit(SERVICE_PAGE_SIZE), - )).all(), - ) + sql = sql.where(MCPInfo.author.like(f"%{keyword}%")) + + sql = sql.offset(skip).limit(SERVICE_PAGE_SIZE) + + if is_installed is not None: + sql = sql.where(MCPInfo.id.in_( + select(MCPActivated.mcpId).where(MCPActivated.userSub == user_sub), + )) + if is_active is not None: + sql = sql.where(MCPInfo.id.in_( + select(MCPActivated.mcpId).where(MCPActivated.userSub == user_sub), + )) + + result = list((await session.scalars(sql)).all()) # 如果未找到,返回空列表 if not result: @@ -294,8 +293,13 @@ class MCPServiceManager: msg = "[MCPServiceManager] MCP服务ID为空" raise ValueError(msg) - mcp_collection = MongoDB().get_collection("mcp") - db_service = await mcp_collection.find_one({"_id": data.service_id, "author": user_sub}) + async with postgres.session() as session: + db_service = (await session.scalars(select(MCPInfo).where( + and_( + MCPInfo.id == data.service_id, + MCPInfo.author == user_sub, + ), + ))).one_or_none() if not db_service: msg = "[MCPServiceManager] MCP服务未找到或无权限" raise ValueError(msg) diff --git a/apps/services/task.py b/apps/services/task.py index 647d1796..832b5743 100644 --- a/apps/services/task.py +++ b/apps/services/task.py @@ -75,6 +75,9 @@ class TaskManager: @staticmethod async def save_flow_context(task_id: str, flow_context: list[ExecutorHistory]) -> None: """保存flow信息到flow_context""" + if not flow_context: + return + flow_context_collection = MongoDB().get_collection("flow_context") try: for history in flow_context: -- Gitee