diff --git a/apps/common/queue.py b/apps/common/queue.py
index bcda8aabf0a73b8de4834d1793c36b4feb2dca2c..58731bcc49992c01a29ab8675e7fe580f04ccec6 100644
--- a/apps/common/queue.py
+++ b/apps/common/queue.py
@@ -60,6 +60,7 @@ class MessageQueue:
flowStatus=task.state.flow_status,
stepId=task.state.step_id,
stepName=task.state.step_name,
+ stepDescription=task.state.step_description,
stepStatus=task.state.step_status,
)
else:
diff --git a/apps/routers/chat.py b/apps/routers/chat.py
index 033156a906fd72ded7352d9f5f3d4fa114af4325..2498da814950ae840b493d03a858c96333661377 100644
--- a/apps/routers/chat.py
+++ b/apps/routers/chat.py
@@ -3,9 +3,11 @@
import asyncio
import logging
+import uuid
from collections.abc import AsyncGenerator
+from typing import Annotated
-from fastapi import APIRouter, Depends, HTTPException, Request, status
+from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from fastapi.responses import JSONResponse, StreamingResponse
from apps.common.queue import MessageQueue
@@ -155,9 +157,10 @@ async def chat(request: Request, post_body: RequestData) -> StreamingResponse:
@router.post("/stop", response_model=ResponseData)
-async def stop_generation(request: Request) -> JSONResponse:
+async def stop_generation(taskId: Annotated[uuid.UUID, Query()]) -> JSONResponse: # noqa: N803
"""停止生成"""
- await Activity.remove_active(request.state.user_sub)
+ await Activity.remove_active(taskId)
+
return JSONResponse(
status_code=status.HTTP_200_OK,
content=ResponseData(
diff --git a/apps/routers/mcp_service.py b/apps/routers/mcp_service.py
index c4adec4179988c9abd0a3d99e4cabada63c9f2f3..1c0ebb5787c94c89f7050ec68b0323f1338f6a45 100644
--- a/apps/routers/mcp_service.py
+++ b/apps/routers/mcp_service.py
@@ -49,14 +49,14 @@ admin_router = APIRouter(
@router.get("", response_model=GetMCPServiceListRsp | ResponseData)
-async def get_mcpservice_list(
+async def get_mcpservice_list( # noqa: PLR0913
request: Request,
searchType: SearchType = SearchType.ALL, # noqa: N803
keyword: str | None = None,
page: Annotated[int, Query(ge=1)] = 1,
*,
- is_installed: bool | None = None,
- is_active: bool | None = None,
+ isInstall: bool | None = None, # noqa: N803
+ isActive: bool | None = None, # noqa: N803
) -> JSONResponse:
"""获取服务列表"""
user_sub = request.state.user_sub
@@ -66,8 +66,8 @@ async def get_mcpservice_list(
user_sub,
keyword,
page,
- is_installed=is_installed,
- is_active=is_active,
+ is_install=isInstall,
+ is_active=isActive,
)
except Exception as e:
err = f"[MCPServiceCenter] 获取MCP服务列表失败: {e}"
diff --git a/apps/routers/user.py b/apps/routers/user.py
index 1ddc3d3f10a7fd7ec3366b470d07ea34893b9af7..210311ee39b0aea59646103f93793bdae9577a2b 100644
--- a/apps/routers/user.py
+++ b/apps/routers/user.py
@@ -46,14 +46,9 @@ async def list_user(
async def update_user_info(request: Request, data: UserUpdateRequest) -> JSONResponse:
"""更新用户信息接口"""
# 更新用户信息
+ await UserManager.update_userinfo_by_user_sub(request.state.user_sub, data)
- result = await UserManager.update_userinfo_by_user_sub(request.state.user_sub, data)
- if not result:
- return JSONResponse(
- status_code=status.HTTP_200_OK,
- content={"code": status.HTTP_200_OK, "message": "用户信息更新成功"},
- )
return JSONResponse(
- status_code=status.HTTP_404_NOT_FOUND,
- content={"code": status.HTTP_404_NOT_FOUND, "message": "用户信息更新失败"},
+ status_code=status.HTTP_200_OK,
+ content={"code": status.HTTP_200_OK, "message": "用户信息更新成功"},
)
diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py
index 6f933b8ba9463740b0f217befb05abc6f8d46148..4d34b85b6855478d0686c8fa76a1b51433587067 100644
--- a/apps/scheduler/executor/agent.py
+++ b/apps/scheduler/executor/agent.py
@@ -1,39 +1,38 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""MCP Agent执行器"""
+import anyio
import logging
-import traceback
import uuid
-
-import anyio
-from mcp.types import TextContent
from pydantic import Field
-
+from typing import Any
+from mcp.types import TextContent
+from apps.llm.patterns.rewrite import QuestionRewrite
from apps.llm.reasoning import ReasoningLLM
from apps.scheduler.executor.base import BaseExecutor
+from apps.schemas.enum_var import EventType, SpecialCallType, FlowStatus, StepStatus
from apps.scheduler.mcp_agent.host import MCPHost
from apps.scheduler.mcp_agent.plan import MCPPlanner
from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID, MCPSelector
from apps.scheduler.pool.mcp.client import MCPClient
-from apps.scheduler.pool.mcp.pool import MCPPool
-from apps.schemas.enum_var import EventType, FlowStatus, SpecialCallType, StepStatus
from apps.schemas.mcp import (
- ErrorType,
GoalEvaluationResult,
- MCPCollection,
- MCPPlan,
- MCPTool,
RestartStepIndex,
- ToolExcutionErrorType,
ToolRisk,
+ ErrorType,
+ ToolExcutionErrorType,
+ MCPPlan,
+ MCPCollection,
+ MCPTool,
+ Step
)
-from apps.schemas.message import FlowParams
+from apps.scheduler.pool.mcp.pool import MCPPool
from apps.schemas.task import ExecutorState, FlowStepHistory, StepQueueItem
+from apps.schemas.message import param
+from apps.services.task import TaskManager
from apps.services.appcenter import AppCenterManager
from apps.services.mcp_service import MCPServiceManager
-from apps.services.task import TaskManager
from apps.services.user import UserManager
-
logger = logging.getLogger(__name__)
@@ -47,10 +46,13 @@ class MCPAgentExecutor(BaseExecutor):
mcp_list: list[MCPCollection] = Field(description="MCP服务器列表", default=[])
mcp_pool: MCPPool = Field(description="MCP池", default=MCPPool())
tools: dict[str, MCPTool] = Field(
- description="MCP工具列表,key为tool_id", default={},
+ description="MCP工具列表,key为tool_id", default={}
+ )
+ tool_list: list[MCPTool] = Field(
+ description="MCP工具列表,包含所有MCP工具", default=[]
)
- params: FlowParams | bool | None = Field(
- default=None, description="流执行过程中的参数补充", alias="params",
+ params: param | bool | None = Field(
+ default=None, description="流执行过程中的参数补充", alias="params"
)
resoning_llm: ReasoningLLM = Field(
default=ReasoningLLM(),
@@ -90,94 +92,36 @@ class MCPAgentExecutor(BaseExecutor):
await self.mcp_pool._init_mcp(mcp_id, self.task.ids.user_sub)
for tool in mcp_service.tools:
self.tools[tool.id] = tool
-
- async def plan(self, is_replan: bool = False, start_index: int | None = None) -> None:
- if is_replan:
- error_message = "之前的计划遇到以下报错\n\n"+self.task.state.error_message
- else:
- error_message = "初始化计划"
- tools = await MCPSelector.select_top_tool(
- self.task.runtime.question, list(self.tools.values()),
- additional_info=error_message, top_n=40, reasoning_llm=self.resoning_llm)
- if is_replan:
- logger.info("[MCPAgentExecutor] 重新规划流程")
- if not start_index:
- start_index = await MCPPlanner.get_replan_start_step_index(self.task.runtime.question,
- self.task.state.error_message,
- self.task.runtime.temporary_plans,
- self.resoning_llm)
- start_index = start_index.start_index
- current_plan = MCPPlan(plans=self.task.runtime.temporary_plans.plans[start_index:])
- error_message = self.task.state.error_message
- temporary_plans = await MCPPlanner.create_plan(
- self.task.runtime.question,
- is_replan=is_replan,
- error_message=error_message,
- current_plan=current_plan,
- tool_list=tools,
- max_steps=self.max_steps-start_index-1,
- reasoning_llm=self.resoning_llm,
- )
- await self.update_tokens()
- await self.push_message(
- EventType.STEP_CANCEL,
- data={},
- )
- if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
- self.task.context[-1].step_status = StepStatus.CANCELLED
- self.task.runtime.temporary_plans.plans = self.task.runtime.temporary_plans.plans[
- : start_index] + temporary_plans.plans
- self.task.state.step_index = start_index
- else:
- start_index = 0
- logger.error(
- f"各个字段的类型: {type(self.task.runtime.question)}, {type(tools)}, {type(self.max_steps)}, {type(self.resoning_llm)}")
- self.task.runtime.temporary_plans = await MCPPlanner.create_plan(self.task.runtime.question, tool_list=tools, max_steps=self.max_steps, reasoning_llm=self.resoning_llm)
- for i in range(start_index, len(self.task.runtime.temporary_plans.plans)):
- self.task.runtime.temporary_plans.plans[i].step_id = str(uuid.uuid4())
+ self.tool_list.extend(mcp_service.tools)
+ self.tools[FINAL_TOOL_ID] = MCPTool(
+ id=FINAL_TOOL_ID,
+ name="Final Tool",
+ description="结束流程的工具",
+ mcp_id="",
+ input_schema={}
+ )
+ self.tool_list.append(MCPTool(id=FINAL_TOOL_ID, name="Final Tool",
+ description="结束流程的工具", mcp_id="", input_schema={}))
async def get_tool_input_param(self, is_first: bool) -> None:
if is_first:
# 获取第一个输入参数
- tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
- step = self.task.runtime.temporary_plans.plans[self.task.state.step_index]
- mcp_tool = self.tools[tool_id]
- self.task.state.current_input = await MCPHost.get_first_input_params(
- mcp_tool, self.task.runtime.question, step.instruction, self.task
- )
+ mcp_tool = self.tools[self.task.state.tool_id]
+ self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task)
else:
# 获取后续输入参数
- if isinstance(self.params, FlowParams):
+ if isinstance(self.params, param):
params = self.params.content
params_description = self.params.description
else:
params = {}
params_description = ""
- tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
- mcp_tool = self.tools[tool_id]
- step = self.task.runtime.temporary_plans.plans[self.task.state.step_index]
- self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.runtime.question, step.instruction, self.task.state.current_input, self.task.state.error_message, params, params_description)
-
- async def reset_step_to_index(self, start_index: int) -> None:
- """重置步骤到开始"""
- logger.info("[MCPAgentExecutor] 重置步骤到索引 %d", start_index)
-
- if start_index < len(self.task.runtime.temporary_plans.plans):
- self.task.state.flow_status = FlowStatus.RUNNING
- self.task.state.step_id = self.task.runtime.temporary_plans.plans[start_index].step_id
- self.task.state.step_index = 0
- self.task.state.step_name = self.tools[self.task.runtime.temporary_plans.plans[start_index].tool].name
- self.task.state.step_description = self.task.runtime.temporary_plans.plans[start_index].content
- self.task.state.step_status = StepStatus.INIT
- self.task.state.retry_times = 0
- else:
- self.task.state.step_id = FINAL_TOOL_ID
+ mcp_tool = self.tools[self.task.state.tool_id]
+ self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task.state.current_input, self.task.state.error_message, params, params_description)
async def confirm_before_step(self) -> None:
- logger.info("[MCPAgentExecutor] 等待用户确认步骤 %d", self.task.state.step_index)
# 发送确认消息
- tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
- mcp_tool = self.tools[tool_id]
+ mcp_tool = self.tools[self.task.state.tool_id]
confirm_message = await MCPPlanner.get_tool_risk(mcp_tool, self.task.state.current_input, "", self.resoning_llm)
await self.update_tokens()
await self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(
@@ -198,17 +142,14 @@ class MCPAgentExecutor(BaseExecutor):
input_data={},
output_data={},
ex_data=confirm_message.model_dump(exclude_none=True, by_alias=True),
- ),
+ )
)
async def run_step(self):
"""执行步骤"""
self.task.state.flow_status = FlowStatus.RUNNING
self.task.state.step_status = StepStatus.RUNNING
- logger.info("[MCPAgentExecutor] 执行步骤 %d", self.task.state.step_index)
- # 获取MCP客户端
- tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
- mcp_tool = self.tools[tool_id]
+ mcp_tool = self.tools[self.task.state.tool_id]
mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub))
try:
output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input)
@@ -245,11 +186,11 @@ class MCPAgentExecutor(BaseExecutor):
await self.update_tokens()
await self.push_message(
EventType.STEP_INPUT,
- self.task.state.current_input,
+ self.task.state.current_input
)
await self.push_message(
EventType.STEP_OUTPUT,
- output_params,
+ output_params
)
self.task.context.append(
FlowStepHistory(
@@ -263,37 +204,36 @@ class MCPAgentExecutor(BaseExecutor):
flow_status=self.task.state.flow_status,
input_data=self.task.state.current_input,
output_data=output_params,
- ),
+ )
)
self.task.state.step_status = StepStatus.SUCCESS
async def generate_params_with_null(self) -> None:
"""生成参数补充"""
- tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
- mcp_tool = self.tools[tool_id]
+ mcp_tool = self.tools[self.task.state.tool_id]
params_with_null = await MCPPlanner.get_missing_param(
mcp_tool,
self.task.state.current_input,
self.task.state.error_message,
- self.resoning_llm,
+ self.resoning_llm
)
await self.update_tokens()
error_message = await MCPPlanner.change_err_message_to_description(
error_message=self.task.state.error_message,
tool=mcp_tool,
input_params=self.task.state.current_input,
- reasoning_llm=self.resoning_llm,
+ reasoning_llm=self.resoning_llm
)
await self.push_message(
EventType.STEP_WAITING_FOR_PARAM,
data={
"message": error_message,
- "params": params_with_null,
- },
+ "params": params_with_null
+ }
)
await self.push_message(
EventType.FLOW_STOP,
- data={},
+ data={}
)
self.task.state.flow_status = FlowStatus.WAITING
self.task.state.step_status = StepStatus.PARAM
@@ -311,27 +251,42 @@ class MCPAgentExecutor(BaseExecutor):
output_data={},
ex_data={
"message": error_message,
- "params": params_with_null,
- },
- ),
+ "params": params_with_null
+ }
+ )
)
async def get_next_step(self) -> None:
- self.task.state.step_index += 1
- if self.task.state.step_index < len(self.task.runtime.temporary_plans.plans):
- tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
+ if self.task.state.step_cnt < self.max_steps:
+ self.task.state.step_cnt += 1
+ history = await MCPHost.assemble_memory(self.task)
+ max_retry = 3
+ step = None
+ for i in range(max_retry):
+ step = await MCPPlanner.create_next_step(self.task.runtime.question, history, self.tool_list)
+ if step.tool_id in self.tools.keys():
+ break
+ if step is None or step.tool_id not in self.tools.keys():
+ step = Step(
+ tool_id=FINAL_TOOL_ID,
+ description=FINAL_TOOL_ID
+ )
+ tool_id = step.tool_id
if tool_id == FINAL_TOOL_ID:
- return
- self.task.state.step_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].step_id
- self.task.state.step_name = self.tools[self.task.runtime.temporary_plans.plans
- [self.task.state.step_index].tool].name
- self.task.state.step_description = self.task.runtime.temporary_plans.plans[self.task.state.step_index].content
+ step_name = FINAL_TOOL_ID
+ else:
+ step_name = self.tools[tool_id].name
+ step_description = step.description
+ self.task.state.step_id = str(uuid.uuid4())
+ self.task.state.tool_id = tool_id
+ self.task.state.step_name = step_name
+ self.task.state.step_description = step_description
self.task.state.step_status = StepStatus.INIT
self.task.state.current_input = {}
else:
# 没有下一步了,结束流程
- self.task.state.step_id = FINAL_TOOL_ID
- return
+ self.task.state.tool_id = FINAL_TOOL_ID
+ return
async def error_handle_after_step(self) -> None:
"""步骤执行失败后的错误处理"""
@@ -339,7 +294,7 @@ class MCPAgentExecutor(BaseExecutor):
self.task.state.flow_status = FlowStatus.ERROR
await self.push_message(
EventType.FLOW_FAILED,
- data={},
+ data={}
)
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
del self.task.context[-1]
@@ -355,7 +310,7 @@ class MCPAgentExecutor(BaseExecutor):
flow_status=self.task.state.flow_status,
input_data={},
output_data={},
- ),
+ )
)
async def work(self) -> None:
@@ -363,7 +318,7 @@ class MCPAgentExecutor(BaseExecutor):
if self.task.state.step_status == StepStatus.INIT:
await self.push_message(
EventType.STEP_INIT,
- data={},
+ data={}
)
await self.get_tool_input_param(is_first=True)
user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub)
@@ -385,11 +340,11 @@ class MCPAgentExecutor(BaseExecutor):
self.task.state.step_status = StepStatus.CANCELLED
await self.push_message(
EventType.STEP_CANCEL,
- data={},
+ data={}
)
await self.push_message(
EventType.FLOW_CANCEL,
- data={},
+ data={}
)
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
self.task.context[-1].step_status = StepStatus.CANCELLED
@@ -398,7 +353,7 @@ class MCPAgentExecutor(BaseExecutor):
max_retry = 5
for i in range(max_retry):
if i != 0:
- await self.get_tool_input_param(is_first=False)
+ await self.get_tool_input_param(is_first=True)
await self.run_step()
if self.task.state.step_status == StepStatus.SUCCESS:
break
@@ -408,28 +363,51 @@ class MCPAgentExecutor(BaseExecutor):
await self.error_handle_after_step()
else:
user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub)
- tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
- mcp_tool = self.tools[tool_id]
- error_type = await MCPPlanner.get_tool_execute_error_type(
- self.task.runtime.question,
- self.task.runtime.temporary_plans,
- mcp_tool,
- self.task.state.current_input,
- self.task.state.error_message,
- self.resoning_llm
- )
- if error_type.type == ErrorType.DECORRECT_PLAN or user_info.auto_execute:
- await self.plan(is_replan=True)
- await self.reset_step_to_index(self.task.state.step_index)
- elif error_type.type == ErrorType.MISSING_PARAM:
- await self.generate_params_with_null()
+ if user_info.auto_execute:
+ await self.push_message(
+ EventType.STEP_ERROR,
+ data={
+ "message": self.task.state.error_message,
+ }
+ )
+ if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
+ self.task.context[-1].step_status = StepStatus.ERROR
+ self.task.context[-1].output_data = {
+ "message": self.task.state.error_message,
+ }
+ await self.get_next_step()
+ else:
+ mcp_tool = self.tools[self.task.state.tool_id]
+ is_param_error = await MCPPlanner.is_param_error(
+ self.task.runtime.question,
+ await MCPHost.assemble_memory(self.task),
+ self.task.state.error_message,
+ mcp_tool,
+ self.task.state.step_description,
+ self.task.state.current_input,
+ )
+ if is_param_error.is_param_error:
+ # 如果是参数错误,生成参数补充
+ await self.generate_params_with_null()
+ else:
+ await self.push_message(
+ EventType.STEP_ERROR,
+ data={
+ "message": self.task.state.error_message,
+ }
+ )
+ if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
+ self.task.context[-1].step_status = StepStatus.ERROR
+ self.task.context[-1].output_data = {
+ "message": self.task.state.error_message,
+ }
+ await self.get_next_step()
elif self.task.state.step_status == StepStatus.SUCCESS:
await self.get_next_step()
async def summarize(self) -> None:
async for chunk in MCPPlanner.generate_answer(
self.task.runtime.question,
- self.task.runtime.temporary_plans,
(await MCPHost.assemble_memory(self.task)),
self.resoning_llm
):
@@ -449,9 +427,8 @@ class MCPAgentExecutor(BaseExecutor):
try:
self.task.state.flow_id = str(uuid.uuid4())
self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm)
- await self.plan(is_replan=False)
- await self.reset_step_to_index(0)
await TaskManager.save_task(self.task.id, self.task)
+ await self.get_next_step()
except Exception as e:
import traceback
logger.error("[MCPAgentExecutor] 初始化失败: %s", traceback.format_exc())
@@ -460,68 +437,53 @@ class MCPAgentExecutor(BaseExecutor):
self.task.state.error_message = str(e)
await self.push_message(
EventType.FLOW_FAILED,
- data={},
+ data={}
)
return
self.task.state.flow_status = FlowStatus.RUNNING
- plan = {
- "plans": [],
- }
- for p in self.task.runtime.temporary_plans.plans:
- if p.tool == FINAL_TOOL_ID:
- continue
- mcp_tool = self.tools.get(p.tool, None)
- plan["plans"].append(
- {
- "stepId": p.step_id,
- "stepName": mcp_tool.name,
- "stepDescription": p.content,
- },
- )
await self.push_message(
EventType.FLOW_START,
- data=plan,
+ data={}
)
- if self.task.state.step_id == FINAL_TOOL_ID:
+ if self.task.state.tool_id == FINAL_TOOL_ID:
# 如果已经是最后一步,直接结束
self.task.state.flow_status = FlowStatus.SUCCESS
await self.push_message(
EventType.FLOW_SUCCESS,
- data={},
+ data={}
)
await self.summarize()
return
try:
- while len(self.task.runtime.temporary_plans.plans) and \
- self.task.state.step_index < len(self.task.runtime.temporary_plans.plans) and \
- self.task.state.flow_status == FlowStatus.RUNNING:
- tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
- if tool_id == FINAL_TOOL_ID:
+ while self.task.state.flow_status == FlowStatus.RUNNING:
+ if self.task.state.tool_id == FINAL_TOOL_ID:
break
await self.work()
await TaskManager.save_task(self.task.id, self.task)
- tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
+ tool_id = self.task.state.tool_id
if tool_id == FINAL_TOOL_ID:
# 如果已经是最后一步,直接结束
self.task.state.flow_status = FlowStatus.SUCCESS
self.task.state.step_status = StepStatus.SUCCESS
await self.push_message(
EventType.FLOW_SUCCESS,
- data={},
+ data={}
)
await self.summarize()
except Exception as e:
- logger.exception("[MCPAgentExecutor] 执行过程中发生错误")
+ import traceback
+ logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", traceback.format_exc())
+ logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e))
self.task.state.flow_status = FlowStatus.ERROR
self.task.state.error_message = str(e)
self.task.state.step_status = StepStatus.ERROR
await self.push_message(
EventType.STEP_ERROR,
- data={},
+ data={}
)
await self.push_message(
EventType.FLOW_FAILED,
- data={},
+ data={}
)
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
del self.task.context[-1]
@@ -537,11 +499,12 @@ class MCPAgentExecutor(BaseExecutor):
flow_status=self.task.state.flow_status,
input_data={},
output_data={},
- ),
+ )
)
finally:
for mcp_service in self.mcp_list:
try:
await self.mcp_pool.stop(mcp_service.id, self.task.ids.user_sub)
- except Exception:
- logger.exception("[MCPAgentExecutor] 停止MCP客户端时发生错误")
+ except Exception as e:
+ import traceback
+ logger.error("[MCPAgentExecutor] 停止MCP客户端时发生错误: %s", traceback.format_exc())
diff --git a/apps/scheduler/mcp/host.py b/apps/scheduler/mcp/host.py
index a4fc8b8968cbdf0a68b1342c68d712d6a2bdec27..3ccc088019bfbba85630ac4de612b3a1f35ee66e 100644
--- a/apps/scheduler/mcp/host.py
+++ b/apps/scheduler/mcp/host.py
@@ -12,12 +12,12 @@ from mcp.types import TextContent
from apps.llm.function import JsonGenerator
from apps.models.mcp import MCPTools
+from apps.models.task import ExecutorHistory
from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE
from apps.scheduler.pool.mcp.client import MCPClient
from apps.scheduler.pool.mcp.pool import MCPPool
from apps.schemas.enum_var import FlowStatus, StepStatus
from apps.schemas.mcp import MCPPlanItem
-from apps.schemas.task import FlowStepHistory
from apps.services.mcp_service import MCPServiceManager
from apps.services.task import TaskManager
diff --git a/apps/scheduler/mcp/prompt.py b/apps/scheduler/mcp/prompt.py
index 0d7b417f5277d59321e420a3a5a92cfaa43e637e..9951fd9367105e35c67908a9bd14a149461acb09 100644
--- a/apps/scheduler/mcp/prompt.py
+++ b/apps/scheduler/mcp/prompt.py
@@ -13,6 +13,7 @@ CREATE_PLAN = dedent(r"""
2. 计划中的每一个步骤必须且只能使用一个工具。
3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。
4. 计划中的最后一步必须是Final工具,以确保计划执行结束。
+ 5.生成的计划必须要覆盖用户的目标,不能遗漏任何用户目标中的内容。
# 生成计划时的注意事项:
@@ -174,8 +175,8 @@ FINAL_ANSWER = dedent(r"""
MEMORY_TEMPLATE = dedent(r"""
{% for ctx in context_list %}
- 第{{ loop.index }}步:{{ ctx.step_description }}
- - 调用工具 `{{ ctx.step_name }}`,并提供参数 `{{ ctx.input_data | tojson }}`。
- - 执行状态:{{ ctx.status }}
+ - 调用工具 `{{ ctx.step_name }}`,并提供参数 `{{ ctx.input_data | tojson }}`。
+ - 执行状态:{{ ctx.step_status }}
- 得到数据:`{{ ctx.output_data | tojson }}`
{% endfor %}
""")
diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py
index f068e9d2730bcbcd6d61cd172599f32da0786eaf..fde8e39acb6fa41e474307137a4584bba029fac2 100644
--- a/apps/scheduler/mcp_agent/host.py
+++ b/apps/scheduler/mcp_agent/host.py
@@ -7,17 +7,13 @@ from typing import Any
from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
-from mcp.types import TextContent
from apps.llm.function import JsonGenerator
from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE
-from apps.scheduler.mcp_agent.prompt import REPAIR_PARAMS
-from apps.scheduler.pool.mcp.client import MCPClient
-from apps.scheduler.pool.mcp.pool import MCPPool
-from apps.schemas.enum_var import StepStatus
-from apps.schemas.mcp import MCPPlanItem, MCPTool
-from apps.schemas.task import FlowStepHistory, Task
-from apps.services.task import TaskManager
+from apps.scheduler.mcp_agent.base import McpBase
+from apps.scheduler.mcp_agent.prompt import GEN_PARAMS, REPAIR_PARAMS
+from apps.schemas.mcp import MCPTool
+from apps.schemas.task import Task
def tojson_filter(value: Any) -> str:
@@ -35,7 +31,7 @@ _env = SandboxedEnvironment(
)
-class MCPHost:
+class MCPHost(McpBase):
"""MCP宿主服务"""
@staticmethod
@@ -45,34 +41,43 @@ class MCPHost:
context_list=task.context,
)
- async def get_first_input_params(mcp_tool: MCPTool, query: str, task: Task) -> dict[str, Any]:
+ @staticmethod
+ async def _get_first_input_params(mcp_tool: MCPTool, goal: str, current_goal: str, task: Task,
+ resoning_llm: ReasoningLLM = ReasoningLLM()) -> dict[str, Any]:
"""填充工具参数"""
# 更清晰的输入·指令,这样可以调用generate
- llm_query = rf"""
- 请使用参数生成工具,生成满足以下目标的工具参数:
-
- {query}
- """
-
- # 进行生成
- json_generator = JsonGenerator(
- llm_query,
- [
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": await MCPHost.assemble_memory(task)},
- ],
+ prompt = _env.from_string(GEN_PARAMS).render(
+ tool_name=mcp_tool.name,
+ tool_description=mcp_tool.description,
+ goal=goal,
+ current_goal=current_goal,
+ input_schema=mcp_tool.input_schema,
+ background_info=await MCPHost.assemble_memory(task),
+ )
+ logger.info("[MCPHost] 填充工具参数: %s", prompt)
+ result = await MCPHost.get_resoning_result(
+ prompt,
+ resoning_llm
+ )
+ # 使用JsonGenerator解析结果
+ result = await MCPHost._parse_result(
+ result,
mcp_tool.input_schema,
)
- return await json_generator.generate()
+ return result
- async def _fill_params(mcp_tool: MCPTool,
- current_input: dict[str, Any],
- error_message: str = "", params: dict[str, Any] = {},
- params_description: str = "") -> dict[str, Any]:
+ @staticmethod
+ async def _fill_params( # noqa: PLR0913
+ goal: str, current_goal: str,
+ mcp_tool: MCPTool, current_input: dict[str, Any],
+ error_message: str = "", params: dict[str, Any] | None = None,
+ params_description: str = "") -> dict[str, Any]:
llm_query = "请生成修复之后的工具参数"
prompt = _env.from_string(REPAIR_PARAMS).render(
tool_name=mcp_tool.name,
tool_description=mcp_tool.description,
+ goal=goal,
+ current_goal=current_goal,
input_schema=mcp_tool.input_schema,
current_input=current_input,
error_message=error_message,
@@ -88,4 +93,4 @@ class MCPHost:
],
mcp_tool.input_schema,
)
- return await json_generator.generate()
\ No newline at end of file
+ return await json_generator.generate()
diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py
index b2f3a35e35a1d6d89d5827b0a11b594600dfb048..0d707d2d97ad55834942c937e8432fc6fa6834ea 100644
--- a/apps/scheduler/mcp_agent/plan.py
+++ b/apps/scheduler/mcp_agent/plan.py
@@ -1,5 +1,6 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""MCP 用户目标拆解与规划"""
+
from collections.abc import AsyncGenerator
from typing import Any
@@ -14,19 +15,34 @@ from apps.scheduler.mcp_agent.prompt import (
CREATE_PLAN,
EVALUATE_GOAL,
FINAL_ANSWER,
+ GEN_STEP,
GENERATE_FLOW_NAME,
GET_MISSING_PARAMS,
GET_REPLAN_START_STEP_INDEX,
+ IS_PARAM_ERROR,
RECREATE_PLAN,
RISK_EVALUATE,
TOOL_EXECUTE_ERROR_TYPE_ANALYSIS,
+ TOOL_SKIP,
)
from apps.scheduler.slot.slot import Slot
-from apps.schemas.mcp import GoalEvaluationResult, MCPPlan, MCPTool, RestartStepIndex, ToolExcutionErrorType, ToolRisk
+from apps.schemas.mcp import (
+ GoalEvaluationResult,
+ IsParamError,
+ MCPPlan,
+ MCPPlanItem,
+ MCPTool,
+ RestartStepIndex,
+ Step,
+ ToolExcutionErrorType,
+ ToolRisk,
+ ToolSkip,
+)
+from apps.schemas.task import Task
_env = SandboxedEnvironment(
loader=BaseLoader,
- autoescape=True,
+ autoescape=False,
trim_blocks=True,
lstrip_blocks=True,
)
@@ -126,6 +142,7 @@ class MCPPlanner(McpBase):
max_steps: int = 10, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
"""获取推理大模型的结果"""
# 格式化Prompt
+ tool_ids = [tool.id for tool in tool_list]
if is_replan:
template = _env.from_string(RECREATE_PLAN)
prompt = template.render(
@@ -155,9 +172,54 @@ class MCPPlanner(McpBase):
# 使用Function模型解析结果
return MCPPlan.model_validate(plan)
+ @staticmethod
+ async def create_next_step(
+ goal: str, history: str, tools: list[MCPTool],
+ reasoning_llm: ReasoningLLM = ReasoningLLM()) -> Step:
+ """创建下一步的执行步骤"""
+ # 获取推理结果
+ template = _env.from_string(GEN_STEP)
+ prompt = template.render(goal=goal, history=history, tools=tools)
+ result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
+
+ # 解析为结构化数据
+ schema = Step.model_json_schema()
+ if "enum" not in schema["properties"]["tool_id"]:
+ schema["properties"]["tool_id"]["enum"] = []
+ for tool in tools:
+ schema["properties"]["tool_id"]["enum"].append(tool.id)
+ step = await MCPPlanner._parse_result(result, schema)
+ # 使用Step模型解析结果
+ return Step.model_validate(step)
+
+ @staticmethod
+ async def tool_skip(
+ task: Task, step_id: str, step_name: str, step_instruction: str, step_content: str,
+ reasoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolSkip:
+ """判断当前步骤是否需要跳过"""
+ # 获取推理结果
+ template = _env.from_string(TOOL_SKIP)
+ from apps.scheduler.mcp_agent.host import MCPHost
+ history = await MCPHost.assemble_memory(task)
+ prompt = template.render(
+ step_id=step_id,
+ step_name=step_name,
+ step_instruction=step_instruction,
+ step_content=step_content,
+ history=history,
+ goal=task.runtime.question
+ )
+ result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
+
+ # 解析为结构化数据
+ schema = ToolSkip.model_json_schema()
+ skip_result = await MCPPlanner._parse_result(result, schema)
+ # 使用ToolSkip模型解析结果
+ return ToolSkip.model_validate(skip_result)
+
@staticmethod
async def get_tool_risk(
- tool: MCPTool, input_parm: dict[str, Any],
+ tool: MCPTool, input_parm: dict[str, Any],
additional_info: str = "", resoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolRisk:
"""获取MCP工具的风险评估结果"""
# 获取推理结果
@@ -231,6 +293,29 @@ class MCPPlanner(McpBase):
# 返回工具执行错误类型
return error_type
+ @staticmethod
+ async def is_param_error(
+ goal: str, history: str, error_message: str, tool: MCPTool, step_description: str, input_params: dict
+ [str, Any],
+ reasoning_llm: ReasoningLLM = ReasoningLLM()) -> IsParamError:
+ """判断错误信息是否是参数错误"""
+ tmplate = _env.from_string(IS_PARAM_ERROR)
+ prompt = tmplate.render(
+ goal=goal,
+ history=history,
+ step_id=tool.id,
+ step_name=tool.name,
+ step_description=step_description,
+ input_params=input_params,
+ error_message=error_message,
+ )
+ result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
+ # 解析为结构化数据
+ schema = IsParamError.model_json_schema()
+ is_param_error = await MCPPlanner._parse_result(result, schema)
+ # 使用IsParamError模型解析结果
+ return IsParamError.model_validate(is_param_error)
+
@staticmethod
async def change_err_message_to_description(
error_message: str, tool: MCPTool, input_params: dict[str, Any],
@@ -270,16 +355,14 @@ class MCPPlanner(McpBase):
@staticmethod
async def generate_answer(
- user_goal: str, plan: MCPPlan, memory: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> AsyncGenerator[
+ user_goal: str, memory: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> AsyncGenerator[
str, None]:
"""生成最终回答"""
template = _env.from_string(FINAL_ANSWER)
prompt = template.render(
- plan=plan.model_dump(exclude_none=True, by_alias=True),
memory=memory,
goal=user_goal,
)
-
async for chunk in resoning_llm.call(
[{"role": "user", "content": prompt}],
streaming=True,
diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py
index bfe8cf39ed6752d1ffca92c959a2ef3af5781c46..d51042a9dc4c7ede3cc0151d4fb7b4fcd9bfcfc0 100644
--- a/apps/scheduler/mcp_agent/prompt.py
+++ b/apps/scheduler/mcp_agent/prompt.py
@@ -68,10 +68,10 @@ TOOL_SELECT = dedent(r"""
你的任务是:根据当前目标,附加信息,选择最合适的MCP工具。
## 选择MCP工具时的注意事项:
1. 确保充分理解当前目标,选择实现目标所需的MCP工具。
- 2. 请在给定的MCP工具列表中选择,不要自己生成MCP工具。
+ 2. 不要选择不存在的工具。
3. 可以选择一些辅助工具,但必须确保这些工具与当前目标相关。
4. 注意,返回的工具ID必须是MCP工具的ID,而不是名称。
- 5. 不要选择不存在的工具。
+ 5. 可以多选择一些工具,用于应对不同的情况。
必须按照以下格式生成选择结果,不要输出任何其他内容:
```json
{
@@ -265,7 +265,9 @@ CREATE_PLAN = dedent(r"""
1. 能够成功完成用户的目标
2. 计划中的每一个步骤必须且只能使用一个工具。
3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。
- 4. 计划中的最后一步必须是Final工具,以确保计划执行结束。
+ 4. 不要选择不存在的工具。
+ 5. 计划中的最后一步必须是Final工具,以确保计划执行结束。
+ 6. 生成的计划必须要覆盖用户的目标,当然需要考虑一些意外情况,可以有一定的冗余步骤。
# 生成计划时的注意事项:
@@ -327,17 +329,17 @@ CREATE_PLAN = dedent(r"""
"instruction": "需要一个支持Docker容器运行的MCP Server"
},
{
- "content": "使用Result[0]中选择的MCP Server,生成Docker命令",
+ "content": "使用第一步选择的MCP Server,生成Docker命令",
"tool": "command_generator",
"instruction": "生成Docker命令:在后台运行alpine:latest容器,挂载/root到/data,执行top命令"
},
{
- "content": "在Result[0]的MCP Server上执行Result[1]生成的命令",
+ "content": "执行第二步生成的Docker命令",
"tool": "command_executor",
"instruction": "执行Docker命令"
},
{
- "content": "任务执行完成,容器已在后台运行,结果为Result[2]",
+ "content": "任务执行完成,容器已在后台运行",
"tool": "Final",
"instruction": ""
}
@@ -364,7 +366,9 @@ RECREATE_PLAN = dedent(r"""
2. 计划中的每一个步骤必须且只能使用一个工具。
3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。
4. 你的计划必须避免之前的错误,并且能够成功执行。
- 5. 计划中的最后一步必须是Final工具,以确保计划执行结束。
+ 5. 不要选择不存在的工具。
+ 6. 计划中的最后一步必须是Final工具,以确保计划执行结束。
+ 7. 生成的计划必须要覆盖用户的目标,当然需要考虑一些意外情况,可以有一定的冗余步骤。
# 生成计划时的注意事项:
@@ -413,12 +417,12 @@ RECREATE_PLAN = dedent(r"""
"instruction": "生成端口扫描命令:扫描192.168.1.1的开放端口"
},
{
- "content": "在执行Result[0]生成的命令",
+ "content": "在执行第一步生成的命令",
"tool": "command_executor",
"instruction": "执行端口扫描命令"
},
{
- "content": "任务执行完成,端口扫描结果为Result[2]",
+ "content": "任务执行完成",
"tool": "Final",
"instruction": ""
}
@@ -450,27 +454,27 @@ RECREATE_PLAN = dedent(r"""
"instruction": "选择一个前机器支持哪些网络扫描工具"
},
{
- "content": "执行Result[0]中生成的命令,查看当前机器支持哪些网络扫描工具",
+ "content": "执行第一步中生成的命令,查看当前机器支持哪些网络扫描工具",
"tool": "command_executor",
- "instruction": "执行Result[0]中生成的命令"
+ "instruction": "执行第一步中生成的命令"
},
{
- "content": "从Result[1]中选择一个网络扫描工具,生成端口扫描命令",
+ "content": "从第二步执行结果中选择一个网络扫描工具,生成端口扫描命令",
"tool": "tool_selector",
"instruction": "选择一个网络扫描工具,生成端口扫描命令"
},
{
- "content": "基于result[2]中选择的网络扫描工具,生成端口扫描命令",
+ "content": "基于第三步中选择的网络扫描工具,生成端口扫描命令",
"tool": "command_generator",
"instruction": "生成端口扫描命令:扫描192.168.1.1的开放端口"
},
{
- "content": "在Result[0]的MCP Server上执行Result[3]生成的命令",
+ "content": "执行第四步中生成的端口扫描命令",
"tool": "command_executor",
"instruction": "执行端口扫描命令"
},
{
- "content": "任务执行完成,端口扫描结果为Result[4]",
+ "content": "任务执行完成",
"tool": "Final",
"instruction": ""
}
@@ -503,6 +507,144 @@ RECREATE_PLAN = dedent(r"""
# 重新生成的计划
""")
+GEN_STEP = dedent(r"""
+ 你是一个计划生成器。
+ 请根据用户的目标、当前计划和历史,生成一个新的步骤。
+
+ # 一个好的计划步骤应该:
+ 1.使用最适合的工具来完成当前步骤。
+ 2.能够基于当前的计划和历史,完成阶段性的任务。
+ 3.不要选择不存在的工具。
+ 4.如果你认为当前已经达成了用户的目标,可以直接返回Final工具,表示计划执行结束。
+
+ # 样例 1
+ # 目标
+ 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优,我的ip是192.168.1.1,数据库端口是3306,用户名是root,密码是password
+ # 历史记录
+ 第1步:生成端口扫描命令
+ - 调用工具 `command_generator`,并提供参数 `帮我生成一个mysql端口扫描命令`
+ - 执行状态:成功
+ - 得到数据:`{"command": "nmap -sS -p--open 192.168.1.1"}`
+ 第2步:执行端口扫描命令
+ - 调用工具 `command_executor`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}`
+ - 执行状态:成功
+ - 得到数据:`{"result": "success"}`
+ # 工具
+
+ - mcp_tool_1 mysql_analyzer;用于分析数据库性能/description>
+ - mcp_tool_2 文件存储工具;用于存储文件
+ - mcp_tool_3 mongoDB工具;用于操作MongoDB数据库
+ - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
+
+ # 输出
+ ```json
+ {
+ "tool_id": "mcp_tool_1", // 选择的工具ID
+ "description": "扫描ip为192.168.1.1的MySQL数据库,端口为3306,用户名为root,密码为password的数据库性能",
+ }
+ ```
+ # 样例二
+ # 目标
+ 计划从杭州到北京的旅游计划
+ # 历史记录
+ 第1步:将杭州转换为经纬度坐标
+ - 调用工具 `maps_geo_planner`,并提供参数 `{"city_from": "杭州", "address": "西湖"}`
+ - 执行状态:成功
+ - 得到数据:`{"location": "123.456, 78.901"}`
+ 第2步:查询杭州的天气
+ - 调用工具 `weather_query`,并提供参数 `{"location": "123.456, 78.901"}`
+ - 执行状态:成功
+ - 得到数据:`{"weather": "晴", "temperature": "25°C"}`
+ 第3步:将北京转换为经纬度坐标
+ - 调用工具 `maps_geo_planner`,并提供参数 `{"city_from": "北京", "address": "天安门"}`
+ - 执行状态:成功
+ - 得到数据:`{"location": "123.456, 78.901"}`
+ 第4步:查询北京的天气
+ - 调用工具 `weather_query`,并提供参数 `{"location": "123.456, 78.901"}`
+ - 执行状态:成功
+ - 得到数据:`{"weather": "晴", "temperature": "25°C"}`
+ # 工具
+
+ - mcp_tool_4 maps_geo_planner;将详细的结构化地址转换为经纬度坐标。支持对地标性名胜景区、建筑物名称解析为经纬度坐标
+ - mcp_tool_5 weather_query;天气查询,用于查询天气信息
+ - mcp_tool_6 maps_direction_transit_integrated;根据用户起终点经纬度坐标规划综合各类公共(火车、公交、地铁)交通方式的通勤方案,并且返回通勤方案的数据,跨城场景下必须传起点城市与终点城市
+ - Final Final;结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
+
+ # 输出
+ ```json
+ {
+ "tool_id": "mcp_tool_6", // 选择的工具ID
+ "description": "规划从杭州到北京的综合公共交通方式的通勤方案"
+ }
+ ```
+ # 现在开始生成步骤:
+ # 目标
+ {{goal}}
+ # 历史记录
+ {{history}}
+ # 工具
+
+ {% for tool in tools %}
+ - {{tool.id}} {{tool.name}};{{tool.description}}
+ {% endfor %}
+
+""")
+
+TOOL_SKIP = dedent(r"""
+ 你是一个计划执行器。
+ 你的任务是根据当前的计划和用户目标,判断当前步骤是否需要跳过。
+ 如果需要跳过,请返回`true`,否则返回`false`。
+ 必须按照以下格式回答:
+ ```json
+ {
+ "skip": true/false,
+ }
+ ```
+ 注意:
+ 1.你的判断要谨慎,在历史消息中有足够的上下文信息时,才可以判断是否跳过当前步骤。
+ # 样例
+ # 用户目标
+ 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
+ # 历史
+ 第1步:生成端口扫描命令
+ - 调用工具 `command_generator`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}`
+ - 执行状态:成功
+ - 得到数据:`{"command": "nmap -sS -p--open 192.168.1.1"}`
+ 第2步:执行端口扫描命令
+ - 调用工具 `command_executor`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}`
+ - 执行状态:成功
+ - 得到数据:`{"result": "success"}`
+ 第3步:分析端口扫描结果
+ - 调用工具 `mysql_analyzer`,并提供参数 `{"host": "192.168.1.1", "port": 3306, "username": "root", "password": "password"}`
+ - 执行状态:成功
+ - 得到数据:`{"performance": "good", "bottleneck": "none"}`
+ # 当前步骤
+
+ step_4
+ command_generator
+ 生成MySQL性能调优命令
+ 生成MySQL性能调优命令:调优MySQL数据库性能
+
+ # 输出
+ ```json
+ {
+ "skip": true
+ }
+ ```
+ # 用户目标
+ {{goal}}
+ # 历史
+ {{history}}
+ # 当前步骤
+
+ {{step_id}}
+ {{step_name}}
+ {{step_instruction}}
+ {{step_content}}
+
+ # 输出
+ """)
+
RISK_EVALUATE = dedent(r"""
你是一个工具执行计划评估器。
你的任务是根据当前工具的名称、描述和入参以及附加信息,判断当前工具执行的风险并输出提示。
@@ -621,6 +763,66 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r"""
# 输出
""")
+IS_PARAM_ERROR = dedent(r"""
+ 你是一个计划执行专家,你的任务是判断当前的步骤执行失败是否是因为参数错误导致的,
+ 如果是,请返回`true`,否则返回`false`。
+ 必须按照以下格式回答:
+ ```json
+ {
+ "is_param_error": true/false,
+ }
+ ```
+ # 样例
+ # 用户目标
+ 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
+ # 历史
+ 第1步:生成端口扫描命令
+ - 调用工具 `command_generator`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}`
+ - 执行状态:成功
+ - 得到数据:`{"command": "nmap -sS -p--open 192.168.1.1"}`
+ 第2步:执行端口扫描命令
+ - 调用工具 `command_executor`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}`
+ - 执行状态:成功
+ - 得到数据:`{"result": "success"}`
+ # 当前步骤
+
+ step_3
+ mysql_analyzer
+ 分析MySQL数据库性能
+
+ # 工具入参
+ {
+ "host": "192.0.0.1",
+ "port": 3306,
+ "username": "root",
+ "password": "password"
+ }
+ # 工具运行报错
+ 执行MySQL性能分析命令时,出现了错误:`host is not correct`。
+
+ # 输出
+ ```json
+ {
+ "is_param_error": true
+ }
+ ```
+ # 用户目标
+ {{goal}}
+ # 历史
+ {{history}}
+ # 当前步骤
+
+ {{step_id}}
+ {{step_name}}
+ {{step_instruction}}
+
+ # 工具入参
+ {{input_param}}
+ # 工具运行报错
+ {{error_message}}
+ # 输出
+ """)
+
# 将当前程序运行的报错转换为自然语言
CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r"""
你是一个智能助手,你的任务是将当前程序运行的报错转换为自然语言描述。
@@ -771,9 +973,87 @@ GET_MISSING_PARAMS = dedent(r"""
# 输出
""")
+
+GEN_PARAMS = dedent(r"""
+ 你是一个工具参数生成器。
+ 你的任务是根据总的目标、阶段性的目标、工具信息、工具入参的schema和背景信息生成工具的入参。
+ 注意:
+ 1.生成的参数在格式上必须符合工具入参的schema。
+ 2.总的目标、阶段性的目标和背景信息必须被充分理解,利用其中的信息来生成工具入参。
+ 3.生成的参数必须符合阶段性目标。
+
+ # 样例
+ # 工具信息
+ < tool >
+ < name > mysql_analyzer < /name >
+ < description > 分析MySQL数据库性能 < /description >
+ < / tool >
+ # 总目标
+ 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优,ip地址是192.168.1.1,端口是3306,用户名是root,密码是password。
+ # 当前阶段目标
+ 我要连接MySQL数据库,分析性能瓶颈,并调优。
+ # 工具入参的schema
+ {
+ "type": "object",
+ "properties": {
+ "host": {
+ "type": "string",
+ "description": "MySQL数据库的主机地址"
+ },
+ "port": {
+ "type": "integer",
+ "description": "MySQL数据库的端口号"
+ },
+ "username": {
+ "type": "string",
+ "description": "MySQL数据库的用户名"
+ },
+ "password": {
+ "type": "string",
+ "description": "MySQL数据库的密码"
+ }
+ },
+ "required": ["host", "port", "username", "password"]
+ }
+ # 背景信息
+ 第1步:生成端口扫描命令
+ - 调用工具 `command_generator`,并提供参数 `帮我生成一个mysql端口扫描命令`
+ - 执行状态:成功
+ - 得到数据:`{"command": "nmap -sS -p--open 192.168.1.1"}`
+ 第2步:执行端口扫描命令
+ - 调用工具 `command_executor`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}`
+ - 执行状态:成功
+ - 得到数据:`{"result": "success"}`
+ # 输出
+ ```json
+ {
+ "host": "192.168.1.1",
+ "port": 3306,
+ "username": "root",
+ "password": "password"
+ }
+ ```
+ # 工具
+
+ {{tool_name}}
+ {{tool_description}}
+
+ # 总目标
+ {{goal}}
+ # 当前阶段目标
+ {{current_goal}}
+ # 工具入参scheme
+ {{input_schema}}
+ # 背景信息
+ {{background_info}}
+ # 输出
+ """)
+
REPAIR_PARAMS = dedent(r"""
- 你是一个工具参数修复器。
- 你的任务是根据当前的工具信息、工具入参的schema、工具当前的入参、工具的报错、补充的参数和补充的参数描述,修复当前工具的入参。
+ 你的任务是根据当前的工具信息、目标、工具入参的schema、工具当前的入参、工具的报错、补充的参数和补充的参数描述,修复当前工具的入参。
+
+ 注意:
+ 1.最终修复的参数要符合目标和工具入参的schema。
# 样例
# 工具信息
@@ -781,6 +1061,10 @@ REPAIR_PARAMS = dedent(r"""
mysql_analyzer
分析MySQL数据库性能
+ # 总目标
+ 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
+ # 当前阶段目标
+ 我要连接MySQL数据库,分析性能瓶颈,并调优。
# 工具入参的schema
{
"type": "object",
@@ -834,6 +1118,10 @@ REPAIR_PARAMS = dedent(r"""
{{tool_name}}
{{tool_description}}
+ # 总目标
+ {{goal}}
+ # 当前阶段目标
+ {{current_goal}}
# 工具入参scheme
{{input_schema}}
# 工具入参
@@ -860,10 +1148,6 @@ FINAL_ANSWER = dedent(r"""
{{memory}}
- # 其他背景信息:
-
- {{status}}
-
# 现在,请根据以上信息,向用户报告目标的完成情况:
""")
@@ -871,7 +1155,7 @@ MEMORY_TEMPLATE = dedent(r"""
{% for ctx in context_list %}
- 第{{loop.index}}步:{{ctx.step_description}}
- 调用工具 `{{ctx.step_id}}`,并提供参数 `{{ctx.input_data}}`
- - 执行状态:{{ctx.status}}
+ - 执行状态:{{ctx.step_status}}
- 得到数据:`{{ctx.output_data}}`
{% endfor %}
""")
diff --git a/apps/scheduler/pool/loader/mcp.py b/apps/scheduler/pool/loader/mcp.py
index e97ef4a02657c4d67ba098910842cc02873a59ce..7a2aa374502051248c8db7f501e63b5e9e1cc81c 100644
--- a/apps/scheduler/pool/loader/mcp.py
+++ b/apps/scheduler/pool/loader/mcp.py
@@ -410,17 +410,6 @@ class MCPLoader(metaclass=SingletonMeta):
if mcp_env is not None:
mcp_config.mcpServers[mcp_id].env.update(mcp_env)
- user_config_path = user_path / "config.json"
- # 更新用户配置
- f = await user_config_path.open("w", encoding="utf-8", errors="ignore")
- await f.write(
- json.dumps(
- mcp_config.model_dump(by_alias=True, exclude_none=True),
- indent=4,
- ensure_ascii=False,
- )
- )
- await f.aclose()
if mcp_config.mcpType == MCPType.STDIO:
index = None
for i in range(len(mcp_config.config.args)):
@@ -434,6 +423,17 @@ class MCPLoader(metaclass=SingletonMeta):
mcp_config.config.args.append(str(user_path)+'/project')
else:
mcp_config.config.args = ["--directory", str(user_path)+'/project'] + mcp_config.config.args
+ user_config_path = user_path / "config.json"
+ # 更新用户配置
+ f = await user_config_path.open("w", encoding="utf-8", errors="ignore")
+ await f.write(
+ json.dumps(
+ mcp_config.model_dump(by_alias=True, exclude_none=True),
+ indent=4,
+ ensure_ascii=False,
+ ),
+ )
+ await f.aclose()
# 更新数据库
async with postgres.session() as session:
await session.merge(MCPActivated(
@@ -554,6 +554,7 @@ class MCPLoader(metaclass=SingletonMeta):
async with postgres.session() as session:
mcp_data = (await session.scalars(select(MCPInfo).where(MCPInfo.id == mcp_id))).one_or_none()
if mcp_data:
+ logger.info("[MCPLoader] 更新MCP模板状态: %s -> %s", mcp_id, status)
mcp_data.status = status
await session.merge(mcp_data)
diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py
index afb8d2929e89d703e059ebfb94bb4ed172b1d2df..69b5882654898adb71beb45b33803373ced7f1a2 100644
--- a/apps/schemas/mcp.py
+++ b/apps/schemas/mcp.py
@@ -81,6 +81,12 @@ class Risk(str, Enum):
HIGH = "high"
+class ToolSkip(BaseModel):
+ """MCP工具跳过执行结果"""
+
+ skip: bool = Field(description="是否跳过当前步骤", default=False)
+
+
class ToolRisk(BaseModel):
"""MCP工具风险评估结果"""
@@ -102,6 +108,12 @@ class ToolExcutionErrorType(BaseModel):
reason: str = Field(description="错误原因", default="")
+class IsParamError(BaseModel):
+ """MCP工具参数错误"""
+
+ is_param_error: bool = Field(description="是否是参数错误", default=False)
+
+
class MCPSelectResult(BaseModel):
"""MCP选择结果"""
@@ -133,3 +145,10 @@ class MCPPlan(BaseModel):
"""MCP 计划"""
plans: list[MCPPlanItem] = Field(description="计划列表", default=[])
+
+
+class Step(BaseModel):
+ """MCP步骤"""
+
+ tool_id: str = Field(description="工具ID")
+ description: str = Field(description="步骤描述")
diff --git a/apps/schemas/message.py b/apps/schemas/message.py
index 77b173995232acb508a8262dab19a26ef8789e65..1ece99ba27784c482d9be7109ffbef63b416a780 100644
--- a/apps/schemas/message.py
+++ b/apps/schemas/message.py
@@ -39,6 +39,11 @@ class MessageFlow(BaseModel):
sub_step_id: str | None = Field(description="当前子步骤ID", alias="subStepId", default=None)
sub_step_name: str | None = Field(description="当前子步骤名称", alias="subStepName", default=None)
step_status: StepStatus = Field(description="当前步骤状态", alias="stepStatus")
+ step_description: str | None = Field(
+ description="当前步骤描述",
+ alias="stepDescription",
+ default=None,
+ )
class MessageMetadata(RecordMetadata):
diff --git a/apps/schemas/record.py b/apps/schemas/record.py
index 119a80823f5daab98c8617a5fcd305b4a38dc3b5..94b83c09c5d014eaffe2ba3ec603a2a69311d996 100644
--- a/apps/schemas/record.py
+++ b/apps/schemas/record.py
@@ -33,6 +33,7 @@ class RecordFlowStep(BaseModel):
step_status: StepStatus = Field(alias="stepStatus")
input: dict[str, Any]
output: dict[str, Any]
+ ex_data: dict[str, Any] | None = Field(default=None, alias="exData")
class RecordFlow(BaseModel):
diff --git a/apps/schemas/task.py b/apps/schemas/task.py
index a23f7cf9ec50f95c89e88be2a38fb2813e55f6af..0a337cfc8a5fa5130abf5604800d8ace6a8ed7ae 100644
--- a/apps/schemas/task.py
+++ b/apps/schemas/task.py
@@ -6,7 +6,6 @@ from typing import Any
from pydantic import BaseModel, Field
from .flow import Step
-from .mcp import MCPPlan
class CheckpointExtra(BaseModel):
@@ -20,8 +19,6 @@ class CheckpointExtra(BaseModel):
class TaskExtra(BaseModel):
"""任务额外数据"""
- temporary_plans: MCPPlan | None = Field(description="临时计划列表", default=None)
-
class StepQueueItem(BaseModel):
"""步骤栈中的元素"""
diff --git a/apps/services/activity.py b/apps/services/activity.py
index 55bfe2c7c10788550b6df56ca66a03b3725cbe07..be41a238d68c10daed5e32ea0e3655d4a846fb45 100644
--- a/apps/services/activity.py
+++ b/apps/services/activity.py
@@ -14,6 +14,7 @@ from apps.models.session import SessionActivity
class Activity:
"""用户活动控制,限制单用户同一时间只能提问一个问题"""
+ # TODO:改为同一时间整个系统最多有n个task在执行,与用户无关
@staticmethod
async def is_active(user_sub: str) -> bool:
"""
diff --git a/apps/services/mcp_service.py b/apps/services/mcp_service.py
index b1fbae0a0486882d7cad1acd7c0a86d764cd39ed..84b638302d06c13f2d68cd6f2e37500a948f0e35 100644
--- a/apps/services/mcp_service.py
+++ b/apps/services/mcp_service.py
@@ -35,7 +35,6 @@ from apps.schemas.mcp import (
)
from apps.schemas.request_data import UpdateMCPServiceRequest
from apps.schemas.response_data import MCPServiceCardItem
-from apps.services.user import UserManager
logger = logging.getLogger(__name__)
MCP_ICON_PATH = ICON_PATH / "mcp"
@@ -92,13 +91,13 @@ class MCPServiceManager:
@staticmethod
- async def fetch_mcp_services(
+ async def fetch_mcp_services( # noqa: PLR0913
search_type: SearchType,
user_sub: str,
keyword: str | None,
page: int,
*,
- is_installed: bool | None = None,
+ is_install: bool | None = None,
is_active: bool | None = None,
) -> list[MCPServiceCardItem]:
"""
@@ -110,7 +109,9 @@ class MCPServiceManager:
:param page: int: 页码
:return: MCP服务列表
"""
- mcpservice_pools = await MCPServiceManager._search_mcpservice(search_type, keyword, page, is_active=is_active)
+ mcpservice_pools = await MCPServiceManager._search_mcpservice(
+ search_type, keyword, page, is_active=is_active, is_installed=is_install,
+ )
return [
MCPServiceCardItem(
mcpserviceId=item.id,
@@ -163,12 +164,14 @@ class MCPServiceManager:
@staticmethod
- async def _search_mcpservice(
+ async def _search_mcpservice( # noqa: PLR0913
search_type: SearchType,
keyword: str | None,
page: int,
+ user_sub: str,
*,
is_active: bool | None = None,
+ is_installed: bool | None = None,
) -> list[MCPInfo]:
"""
基于输入条件搜索MCP服务
@@ -179,40 +182,36 @@ class MCPServiceManager:
"""
# 分页查询
skip = (page - 1) * SERVICE_PAGE_SIZE
-
async with postgres.session() as session:
- if not keyword:
- result = list(
- (await session.scalars(select(MCPInfo).offset(skip).limit(SERVICE_PAGE_SIZE))).all(),
- )
- elif search_type == SearchType.ALL:
- result = list(
- (await session.scalars(select(MCPInfo).where(
- or_(
- MCPInfo.name.like(f"%{keyword}%"),
- MCPInfo.description.like(f"%{keyword}%"),
- MCPInfo.author.like(f"%{keyword}%"),
- ),
- ).offset(skip).limit(SERVICE_PAGE_SIZE))).all(),
+ sql = select(MCPInfo)
+
+ if search_type == SearchType.ALL:
+ sql = sql.where(
+ or_(
+ MCPInfo.name.like(f"%{keyword}%"),
+ MCPInfo.description.like(f"%{keyword}%"),
+ MCPInfo.author.like(f"%{keyword}%"),
+ ),
)
elif search_type == SearchType.NAME:
- result = list(
- (await session.scalars(
- select(MCPInfo).where(MCPInfo.name.like(f"%{keyword}%")).offset(skip).limit(SERVICE_PAGE_SIZE),
- )).all(),
- )
+ sql = sql.where(MCPInfo.name.like(f"%{keyword}%"))
elif search_type == SearchType.DESCRIPTION:
- result = list(
- (await session.scalars(
- select(MCPInfo).where(MCPInfo.description.like(f"%{keyword}%")).offset(skip).limit(SERVICE_PAGE_SIZE),
- )).all(),
- )
+ sql = sql.where(MCPInfo.description.like(f"%{keyword}%"))
elif search_type == SearchType.AUTHOR:
- result = list(
- (await session.scalars(
- select(MCPInfo).where(MCPInfo.author.like(f"%{keyword}%")).offset(skip).limit(SERVICE_PAGE_SIZE),
- )).all(),
- )
+ sql = sql.where(MCPInfo.author.like(f"%{keyword}%"))
+
+ sql = sql.offset(skip).limit(SERVICE_PAGE_SIZE)
+
+ if is_installed is not None:
+ sql = sql.where(MCPInfo.id.in_(
+ select(MCPActivated.mcpId).where(MCPActivated.userSub == user_sub),
+ ))
+ if is_active is not None:
+ sql = sql.where(MCPInfo.id.in_(
+ select(MCPActivated.mcpId).where(MCPActivated.userSub == user_sub),
+ ))
+
+ result = list((await session.scalars(sql)).all())
# 如果未找到,返回空列表
if not result:
@@ -294,8 +293,13 @@ class MCPServiceManager:
msg = "[MCPServiceManager] MCP服务ID为空"
raise ValueError(msg)
- mcp_collection = MongoDB().get_collection("mcp")
- db_service = await mcp_collection.find_one({"_id": data.service_id, "author": user_sub})
+ async with postgres.session() as session:
+ db_service = (await session.scalars(select(MCPInfo).where(
+ and_(
+ MCPInfo.id == data.service_id,
+ MCPInfo.author == user_sub,
+ ),
+ ))).one_or_none()
if not db_service:
msg = "[MCPServiceManager] MCP服务未找到或无权限"
raise ValueError(msg)
diff --git a/apps/services/task.py b/apps/services/task.py
index 647d1796d6918d61460133194707f9caafb04286..832b5743805605e14b0a8a165b9b9d3d37d79a57 100644
--- a/apps/services/task.py
+++ b/apps/services/task.py
@@ -75,6 +75,9 @@ class TaskManager:
@staticmethod
async def save_flow_context(task_id: str, flow_context: list[ExecutorHistory]) -> None:
"""保存flow信息到flow_context"""
+ if not flow_context:
+ return
+
flow_context_collection = MongoDB().get_collection("flow_context")
try:
for history in flow_context: