diff --git a/apps/routers/conversation.py b/apps/routers/conversation.py
index 2f49f803733c105362e2ba36cae2032f58d6ee77..e56623f4e64b65076b186b9fc4f816aa313a6d27 100644
--- a/apps/routers/conversation.py
+++ b/apps/routers/conversation.py
@@ -6,7 +6,7 @@ import uuid
from datetime import UTC, datetime
from typing import Annotated
-from fastapi import APIRouter, Body, Depends, Query, Request, status
+from fastapi import APIRouter, Depends, Query, Request, status
from fastapi.responses import JSONResponse
from apps.dependency import verify_personal_token, verify_session
diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py
index 4d34b85b6855478d0686c8fa76a1b51433587067..663f6827471308954affb28c450e33040ee105cb 100644
--- a/apps/scheduler/executor/agent.py
+++ b/apps/scheduler/executor/agent.py
@@ -1,64 +1,65 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""MCP Agent执行器"""
-import anyio
import logging
import uuid
-from pydantic import Field
-from typing import Any
+
+import anyio
from mcp.types import TextContent
-from apps.llm.patterns.rewrite import QuestionRewrite
+from pydantic import Field
+
from apps.llm.reasoning import ReasoningLLM
from apps.scheduler.executor.base import BaseExecutor
-from apps.schemas.enum_var import EventType, SpecialCallType, FlowStatus, StepStatus
from apps.scheduler.mcp_agent.host import MCPHost
from apps.scheduler.mcp_agent.plan import MCPPlanner
-from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID, MCPSelector
-from apps.scheduler.pool.mcp.client import MCPClient
+from apps.scheduler.pool.mcp.pool import MCPPool
+from apps.schemas.enum_var import EventType, FlowStatus, StepStatus
from apps.schemas.mcp import (
- GoalEvaluationResult,
- RestartStepIndex,
- ToolRisk,
- ErrorType,
- ToolExcutionErrorType,
- MCPPlan,
MCPCollection,
MCPTool,
- Step
+ Step,
)
-from apps.scheduler.pool.mcp.pool import MCPPool
-from apps.schemas.task import ExecutorState, FlowStepHistory, StepQueueItem
-from apps.schemas.message import param
-from apps.services.task import TaskManager
+from apps.schemas.message import FlowParams
+from apps.schemas.task import FlowStepHistory
from apps.services.appcenter import AppCenterManager
from apps.services.mcp_service import MCPServiceManager
+from apps.services.task import TaskManager
from apps.services.user import UserManager
+
logger = logging.getLogger(__name__)
+FINAL_TOOL_ID = "FIANL"
class MCPAgentExecutor(BaseExecutor):
"""MCP Agent执行器"""
max_steps: int = Field(default=20, description="最大步数")
- servers_id: list[str] = Field(description="MCP server id")
agent_id: str = Field(default="", description="Agent ID")
- agent_description: str = Field(default="", description="Agent描述")
mcp_list: list[MCPCollection] = Field(description="MCP服务器列表", default=[])
mcp_pool: MCPPool = Field(description="MCP池", default=MCPPool())
tools: dict[str, MCPTool] = Field(
- description="MCP工具列表,key为tool_id", default={}
+ description="MCP工具列表,key为tool_id",
+ default={},
)
tool_list: list[MCPTool] = Field(
- description="MCP工具列表,包含所有MCP工具", default=[]
+ description="MCP工具列表,包含所有MCP工具",
+ default=[],
)
- params: param | bool | None = Field(
- default=None, description="流执行过程中的参数补充", alias="params"
+ params: FlowParams | bool | None = Field(
+ default=None,
+ description="流执行过程中的参数补充",
+ alias="params",
)
resoning_llm: ReasoningLLM = Field(
default=ReasoningLLM(),
description="推理大模型",
)
+ async def init(self) -> None:
+ """初始化Executor"""
+ self.planner = MCPPlanner(self.task.runtime.question, self.resoning_llm)
+ self.host = MCPHost(self.task.runtime.question)
+
async def update_tokens(self) -> None:
"""更新令牌数"""
self.task.tokens.input_tokens = self.resoning_llm.input_tokens
@@ -89,43 +90,52 @@ class MCPAgentExecutor(BaseExecutor):
continue
self.mcp_list.append(mcp_service)
- await self.mcp_pool._init_mcp(mcp_id, self.task.ids.user_sub)
+ await self.mcp_pool.init_mcp(mcp_id, self.task.ids.user_sub)
for tool in mcp_service.tools:
self.tools[tool.id] = tool
self.tool_list.extend(mcp_service.tools)
self.tools[FINAL_TOOL_ID] = MCPTool(
- id=FINAL_TOOL_ID,
- name="Final Tool",
- description="结束流程的工具",
- mcp_id="",
- input_schema={}
+ id=FINAL_TOOL_ID, name="Final Tool", description="结束流程的工具", mcp_id="", input_schema={},
+ )
+ self.tool_list.append(
+ MCPTool(id=FINAL_TOOL_ID, name="Final Tool", description="结束流程的工具", mcp_id="", input_schema={}),
)
- self.tool_list.append(MCPTool(id=FINAL_TOOL_ID, name="Final Tool",
- description="结束流程的工具", mcp_id="", input_schema={}))
- async def get_tool_input_param(self, is_first: bool) -> None:
+ async def get_tool_input_param(self, *, is_first: bool) -> None:
+ """获取工具输入参数"""
if is_first:
# 获取第一个输入参数
mcp_tool = self.tools[self.task.state.tool_id]
- self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task)
+ self.task.state.current_input = await self.host.get_first_input_params(
+ mcp_tool, self.task.state.step_description, self.task,
+ )
else:
# 获取后续输入参数
- if isinstance(self.params, param):
+ if isinstance(self.params, FlowParams):
params = self.params.content
params_description = self.params.description
else:
params = {}
params_description = ""
mcp_tool = self.tools[self.task.state.tool_id]
- self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task.state.current_input, self.task.state.error_message, params, params_description)
+ self.task.state.current_input = await self.host.fill_params(
+ mcp_tool,
+ self.task.state.step_description,
+ self.task.state.current_input,
+ self.task.state.error_message,
+ params,
+ params_description,
+ )
async def confirm_before_step(self) -> None:
+ """确认前步骤"""
# 发送确认消息
mcp_tool = self.tools[self.task.state.tool_id]
- confirm_message = await MCPPlanner.get_tool_risk(mcp_tool, self.task.state.current_input, "", self.resoning_llm)
+ confirm_message = await self.planner.get_tool_risk(mcp_tool, self.task.state.current_input, "")
await self.update_tokens()
- await self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(
- exclude_none=True, by_alias=True))
+ await self.push_message(
+ EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(exclude_none=True, by_alias=True),
+ )
await self.push_message(EventType.FLOW_STOP, {})
self.task.state.flow_status = FlowStatus.WAITING
self.task.state.step_status = StepStatus.WAITING
@@ -142,27 +152,30 @@ class MCPAgentExecutor(BaseExecutor):
input_data={},
output_data={},
ex_data=confirm_message.model_dump(exclude_none=True, by_alias=True),
- )
+ ),
)
- async def run_step(self):
+ async def run_step(self) -> None:
"""执行步骤"""
self.task.state.flow_status = FlowStatus.RUNNING
self.task.state.step_status = StepStatus.RUNNING
mcp_tool = self.tools[self.task.state.tool_id]
- mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub))
+ mcp_client = await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub)
+ if mcp_client is None:
+ logger.error("[MCPAgentExecutor] MCP客户端不存在: %s", mcp_tool.mcp_id)
+ self.task.state.step_status = StepStatus.ERROR
+ return
try:
output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input)
- except anyio.ClosedResourceError as e:
- import traceback
- logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, traceback.format_exc())
+ except anyio.ClosedResourceError:
+ logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcp_id)
await self.mcp_pool.stop(mcp_tool.mcp_id, self.task.ids.user_sub)
- await self.mcp_pool._init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub)
- logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, str(e))
+ await self.mcp_pool.init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub)
self.task.state.step_status = StepStatus.ERROR
return
except Exception as e:
import traceback
+
logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误: %s", mcp_tool.name, traceback.format_exc())
self.task.state.step_status = StepStatus.ERROR
self.task.state.error_message = str(e)
@@ -184,14 +197,8 @@ class MCPAgentExecutor(BaseExecutor):
}
await self.update_tokens()
- await self.push_message(
- EventType.STEP_INPUT,
- self.task.state.current_input
- )
- await self.push_message(
- EventType.STEP_OUTPUT,
- output_params
- )
+ await self.push_message(EventType.STEP_INPUT, self.task.state.current_input)
+ await self.push_message(EventType.STEP_OUTPUT, output_params)
self.task.context.append(
FlowStepHistory(
task_id=self.task.id,
@@ -204,37 +211,26 @@ class MCPAgentExecutor(BaseExecutor):
flow_status=self.task.state.flow_status,
input_data=self.task.state.current_input,
output_data=output_params,
- )
+ ),
)
self.task.state.step_status = StepStatus.SUCCESS
async def generate_params_with_null(self) -> None:
"""生成参数补充"""
mcp_tool = self.tools[self.task.state.tool_id]
- params_with_null = await MCPPlanner.get_missing_param(
- mcp_tool,
- self.task.state.current_input,
- self.task.state.error_message,
- self.resoning_llm
+ params_with_null = await self.planner.get_missing_param(
+ mcp_tool, self.task.state.current_input, self.task.state.error_message,
)
await self.update_tokens()
- error_message = await MCPPlanner.change_err_message_to_description(
+ error_message = await self.planner.change_err_message_to_description(
error_message=self.task.state.error_message,
tool=mcp_tool,
input_params=self.task.state.current_input,
- reasoning_llm=self.resoning_llm
- )
- await self.push_message(
- EventType.STEP_WAITING_FOR_PARAM,
- data={
- "message": error_message,
- "params": params_with_null
- }
)
await self.push_message(
- EventType.FLOW_STOP,
- data={}
+ EventType.STEP_WAITING_FOR_PARAM, data={"message": error_message, "params": params_with_null},
)
+ await self.push_message(EventType.FLOW_STOP, data={})
self.task.state.flow_status = FlowStatus.WAITING
self.task.state.step_status = StepStatus.PARAM
self.task.context.append(
@@ -249,33 +245,31 @@ class MCPAgentExecutor(BaseExecutor):
flow_status=self.task.state.flow_status,
input_data={},
output_data={},
- ex_data={
- "message": error_message,
- "params": params_with_null
- }
- )
+ ex_data={"message": error_message, "params": params_with_null},
+ ),
)
async def get_next_step(self) -> None:
+ """获取下一步"""
if self.task.state.step_cnt < self.max_steps:
self.task.state.step_cnt += 1
history = await MCPHost.assemble_memory(self.task)
max_retry = 3
step = None
- for i in range(max_retry):
- step = await MCPPlanner.create_next_step(self.task.runtime.question, history, self.tool_list)
- if step.tool_id in self.tools.keys():
- break
- if step is None or step.tool_id not in self.tools.keys():
+ for _ in range(max_retry):
+ try:
+ step = await self.planner.create_next_step(history, self.tool_list)
+ if step.tool_id in self.tools:
+ break
+ except Exception as e: # noqa: BLE001
+ logger.warning("[MCPAgentExecutor] 获取下一步失败,重试中: %s", str(e))
+ if step is None or step.tool_id not in self.tools:
step = Step(
tool_id=FINAL_TOOL_ID,
- description=FINAL_TOOL_ID
+ description=FINAL_TOOL_ID,
)
tool_id = step.tool_id
- if tool_id == FINAL_TOOL_ID:
- step_name = FINAL_TOOL_ID
- else:
- step_name = self.tools[tool_id].name
+ step_name = FINAL_TOOL_ID if tool_id == FINAL_TOOL_ID else self.tools[tool_id].name
step_description = step.description
self.task.state.step_id = str(uuid.uuid4())
self.task.state.tool_id = tool_id
@@ -286,16 +280,12 @@ class MCPAgentExecutor(BaseExecutor):
else:
# 没有下一步了,结束流程
self.task.state.tool_id = FINAL_TOOL_ID
- return
async def error_handle_after_step(self) -> None:
"""步骤执行失败后的错误处理"""
self.task.state.step_status = StepStatus.ERROR
self.task.state.flow_status = FlowStatus.ERROR
- await self.push_message(
- EventType.FLOW_FAILED,
- data={}
- )
+ await self.push_message(EventType.FLOW_FAILED, data={})
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
del self.task.context[-1]
self.task.context.append(
@@ -310,18 +300,18 @@ class MCPAgentExecutor(BaseExecutor):
flow_status=self.task.state.flow_status,
input_data={},
output_data={},
- )
+ ),
)
- async def work(self) -> None:
+ async def work(self) -> None: # noqa: C901, PLR0912, PLR0915
"""执行当前步骤"""
if self.task.state.step_status == StepStatus.INIT:
- await self.push_message(
- EventType.STEP_INIT,
- data={}
- )
+ await self.push_message(EventType.STEP_INIT, data={})
await self.get_tool_input_param(is_first=True)
user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub)
+ if user_info is None:
+ logger.error("[MCPAgentExecutor] 用户信息不存在: %s", self.task.ids.user_sub)
+ return
if not user_info.auto_execute:
# 等待用户确认
await self.confirm_before_step()
@@ -338,14 +328,8 @@ class MCPAgentExecutor(BaseExecutor):
else:
self.task.state.flow_status = FlowStatus.CANCELLED
self.task.state.step_status = StepStatus.CANCELLED
- await self.push_message(
- EventType.STEP_CANCEL,
- data={}
- )
- await self.push_message(
- EventType.FLOW_CANCEL,
- data={}
- )
+ await self.push_message(EventType.STEP_CANCEL, data={})
+ await self.push_message(EventType.FLOW_CANCEL, data={})
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
self.task.context[-1].step_status = StepStatus.CANCELLED
if self.task.state.step_status == StepStatus.PARAM:
@@ -363,12 +347,15 @@ class MCPAgentExecutor(BaseExecutor):
await self.error_handle_after_step()
else:
user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub)
+ if user_info is None:
+ logger.error("[MCPAgentExecutor] 用户信息不存在: %s", self.task.ids.user_sub)
+ return
if user_info.auto_execute:
await self.push_message(
EventType.STEP_ERROR,
data={
"message": self.task.state.error_message,
- }
+ },
)
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
self.task.context[-1].step_status = StepStatus.ERROR
@@ -378,9 +365,8 @@ class MCPAgentExecutor(BaseExecutor):
await self.get_next_step()
else:
mcp_tool = self.tools[self.task.state.tool_id]
- is_param_error = await MCPPlanner.is_param_error(
- self.task.runtime.question,
- await MCPHost.assemble_memory(self.task),
+ is_param_error = await self.planner.is_param_error(
+ await self.host.assemble_memory(self.task),
self.task.state.error_message,
mcp_tool,
self.task.state.step_description,
@@ -394,7 +380,7 @@ class MCPAgentExecutor(BaseExecutor):
EventType.STEP_ERROR,
data={
"message": self.task.state.error_message,
- }
+ },
)
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
self.task.context[-1].step_status = StepStatus.ERROR
@@ -406,18 +392,12 @@ class MCPAgentExecutor(BaseExecutor):
await self.get_next_step()
async def summarize(self) -> None:
- async for chunk in MCPPlanner.generate_answer(
- self.task.runtime.question,
- (await MCPHost.assemble_memory(self.task)),
- self.resoning_llm
- ):
- await self.push_message(
- EventType.TEXT_ADD,
- data=chunk
- )
+ """总结"""
+ async for chunk in self.planner.generate_answer(await self.host.assemble_memory(self.task)):
+ await self.push_message(EventType.TEXT_ADD, data=chunk)
self.task.runtime.answer += chunk
- async def run(self) -> None:
+ async def run(self) -> None: # noqa: C901
"""执行MCP Agent的主逻辑"""
# 初始化MCP服务
await self.load_state()
@@ -426,32 +406,21 @@ class MCPAgentExecutor(BaseExecutor):
# 初始化状态
try:
self.task.state.flow_id = str(uuid.uuid4())
- self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm)
+ self.task.state.flow_name = await self.planner.get_flow_name()
await TaskManager.save_task(self.task.id, self.task)
await self.get_next_step()
except Exception as e:
- import traceback
- logger.error("[MCPAgentExecutor] 初始化失败: %s", traceback.format_exc())
- logger.error("[MCPAgentExecutor] 初始化失败: %s", str(e))
+ logger.exception("[MCPAgentExecutor] 初始化失败")
self.task.state.flow_status = FlowStatus.ERROR
self.task.state.error_message = str(e)
- await self.push_message(
- EventType.FLOW_FAILED,
- data={}
- )
+ await self.push_message(EventType.FLOW_FAILED, data={})
return
self.task.state.flow_status = FlowStatus.RUNNING
- await self.push_message(
- EventType.FLOW_START,
- data={}
- )
+ await self.push_message(EventType.FLOW_START, data={})
if self.task.state.tool_id == FINAL_TOOL_ID:
# 如果已经是最后一步,直接结束
self.task.state.flow_status = FlowStatus.SUCCESS
- await self.push_message(
- EventType.FLOW_SUCCESS,
- data={}
- )
+ await self.push_message(EventType.FLOW_SUCCESS, data={})
await self.summarize()
return
try:
@@ -465,26 +434,15 @@ class MCPAgentExecutor(BaseExecutor):
# 如果已经是最后一步,直接结束
self.task.state.flow_status = FlowStatus.SUCCESS
self.task.state.step_status = StepStatus.SUCCESS
- await self.push_message(
- EventType.FLOW_SUCCESS,
- data={}
- )
+ await self.push_message(EventType.FLOW_SUCCESS, data={})
await self.summarize()
except Exception as e:
- import traceback
- logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", traceback.format_exc())
- logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e))
+ logger.exception("[MCPAgentExecutor] 执行过程中发生错误")
self.task.state.flow_status = FlowStatus.ERROR
self.task.state.error_message = str(e)
self.task.state.step_status = StepStatus.ERROR
- await self.push_message(
- EventType.STEP_ERROR,
- data={}
- )
- await self.push_message(
- EventType.FLOW_FAILED,
- data={}
- )
+ await self.push_message(EventType.STEP_ERROR, data={})
+ await self.push_message(EventType.FLOW_FAILED, data={})
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
del self.task.context[-1]
self.task.context.append(
@@ -499,12 +457,11 @@ class MCPAgentExecutor(BaseExecutor):
flow_status=self.task.state.flow_status,
input_data={},
output_data={},
- )
+ ),
)
finally:
for mcp_service in self.mcp_list:
try:
await self.mcp_pool.stop(mcp_service.id, self.task.ids.user_sub)
- except Exception as e:
- import traceback
- logger.error("[MCPAgentExecutor] 停止MCP客户端时发生错误: %s", traceback.format_exc())
+ except Exception:
+ logger.exception("[MCPAgentExecutor] 停止MCP客户端时发生错误")
diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py
index f062a551c5b3d797fa0f77ad0efd1a8e135eeb52..b3f71c4a9a9c85b254f6669e02db60451b833d31 100644
--- a/apps/scheduler/mcp_agent/base.py
+++ b/apps/scheduler/mcp_agent/base.py
@@ -1,3 +1,6 @@
+# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
+"""MCP基类"""
+
import json
import logging
from typing import Any
@@ -10,9 +13,12 @@ from apps.llm.reasoning import ReasoningLLM
logger = logging.getLogger(__name__)
-class McpBase:
- @staticmethod
- async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
+class MCPBase:
+ """MCP基类"""
+
+ llm: ReasoningLLM
+
+ async def get_resoning_result(self, prompt: str) -> str:
"""获取推理结果"""
# 调用推理大模型
message = [
@@ -20,7 +26,7 @@ class McpBase:
{"role": "user", "content": prompt},
]
result = ""
- async for chunk in resoning_llm.call(
+ async for chunk in self.llm.call(
message,
streaming=False,
temperature=0.07,
@@ -31,7 +37,10 @@ class McpBase:
return result
@staticmethod
- async def _parse_result(result: str, schema: dict[str, Any], left_str: str = '{', right_str: str = '}') -> str:
+ async def _parse_result(
+ result: str,
+ schema: dict[str, Any], left_str: str = "{", right_str: str = "}",
+ ) -> dict[str, Any]:
"""解析推理结果"""
left_index = result.find(left_str)
right_index = result.rfind(right_str)
@@ -43,21 +52,21 @@ class McpBase:
flag = False
if flag:
try:
- tmp_js = json.loads(result[left_index:right_index + 1])
+ tmp_js = json.loads(result[left_index : right_index + 1])
validate(instance=tmp_js, schema=schema)
- except Exception as e:
- logger.error("[McpBase] 解析结果失败: %s", e)
+ except Exception:
+ logger.exception("[McpBase] 解析结果失败")
flag = False
if not flag:
json_generator = JsonGenerator(
"请提取下面内容中的json\n\n",
[
{"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": "请提取下面内容中的json\n\n"+result},
+ {"role": "user", "content": "请提取下面内容中的json\n\n" + result},
],
schema,
)
json_result = await json_generator.generate()
else:
- json_result = json.loads(result[left_index:right_index + 1])
+ json_result = json.loads(result[left_index : right_index + 1])
return json_result
diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py
index fde8e39acb6fa41e474307137a4584bba029fac2..3fc7bfc746a631143fbee5cc5b1036306d41759e 100644
--- a/apps/scheduler/mcp_agent/host.py
+++ b/apps/scheduler/mcp_agent/host.py
@@ -9,31 +9,39 @@ from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
from apps.llm.function import JsonGenerator
+from apps.llm.reasoning import ReasoningLLM
from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE
-from apps.scheduler.mcp_agent.base import McpBase
+from apps.scheduler.mcp_agent.base import MCPBase
from apps.scheduler.mcp_agent.prompt import GEN_PARAMS, REPAIR_PARAMS
from apps.schemas.mcp import MCPTool
from apps.schemas.task import Task
-
-def tojson_filter(value: Any) -> str:
- """将值转换为JSON字符串"""
- return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
-
-
logger = logging.getLogger(__name__)
_env = SandboxedEnvironment(
loader=BaseLoader,
autoescape=False,
trim_blocks=True,
lstrip_blocks=True,
- filters={"tojson": tojson_filter},
)
-class MCPHost(McpBase):
+def tojson_filter(value: dict[str, Any]) -> str:
+ """将字典转换为紧凑JSON字符串"""
+ return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
+
+
+_env.filters["tojson"] = tojson_filter
+
+
+class MCPHost(MCPBase):
"""MCP宿主服务"""
+ def __init__(self, goal: str, llm: ReasoningLLM) -> None:
+ """初始化MCP宿主服务"""
+ super().__init__()
+ self.goal = goal
+ self.llm = llm
+
@staticmethod
async def assemble_memory(task: Task) -> str:
"""组装记忆"""
@@ -41,43 +49,42 @@ class MCPHost(McpBase):
context_list=task.context,
)
- @staticmethod
- async def _get_first_input_params(mcp_tool: MCPTool, goal: str, current_goal: str, task: Task,
- resoning_llm: ReasoningLLM = ReasoningLLM()) -> dict[str, Any]:
+ async def get_first_input_params(
+ self, mcp_tool: MCPTool, current_goal: str, task: Task,
+ ) -> dict[str, Any]:
"""填充工具参数"""
# 更清晰的输入·指令,这样可以调用generate
prompt = _env.from_string(GEN_PARAMS).render(
tool_name=mcp_tool.name,
tool_description=mcp_tool.description,
- goal=goal,
+ goal=self.goal,
current_goal=current_goal,
input_schema=mcp_tool.input_schema,
background_info=await MCPHost.assemble_memory(task),
)
logger.info("[MCPHost] 填充工具参数: %s", prompt)
- result = await MCPHost.get_resoning_result(
- prompt,
- resoning_llm
- )
+ result = await self.get_resoning_result(prompt)
# 使用JsonGenerator解析结果
- result = await MCPHost._parse_result(
+ return await MCPHost._parse_result(
result,
mcp_tool.input_schema,
)
- return result
- @staticmethod
- async def _fill_params( # noqa: PLR0913
- goal: str, current_goal: str,
- mcp_tool: MCPTool, current_input: dict[str, Any],
- error_message: str = "", params: dict[str, Any] | None = None,
- params_description: str = "") -> dict[str, Any]:
+ async def fill_params( # noqa: D102, PLR0913
+ self,
+ mcp_tool: MCPTool,
+ current_goal: str,
+ current_input: dict[str, Any],
+ error_message: str = "",
+ params: dict[str, Any] | None = None,
+ params_description: str = "",
+ ) -> dict[str, Any]:
llm_query = "请生成修复之后的工具参数"
prompt = _env.from_string(REPAIR_PARAMS).render(
tool_name=mcp_tool.name,
- tool_description=mcp_tool.description,
- goal=goal,
+ goal=self.goal,
current_goal=current_goal,
+ tool_description=mcp_tool.description,
input_schema=mcp_tool.input_schema,
current_input=current_input,
error_message=error_message,
diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py
index 0d707d2d97ad55834942c937e8432fc6fa6834ea..ae4bcf65b02f7f5853aa5f2536b4f784adfec9d0 100644
--- a/apps/scheduler/mcp_agent/plan.py
+++ b/apps/scheduler/mcp_agent/plan.py
@@ -1,44 +1,31 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""MCP 用户目标拆解与规划"""
+import logging
from collections.abc import AsyncGenerator
from typing import Any
from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
-from apps.llm.function import JsonGenerator
from apps.llm.reasoning import ReasoningLLM
-from apps.scheduler.mcp_agent.base import McpBase
+from apps.scheduler.mcp_agent.base import MCPBase
from apps.scheduler.mcp_agent.prompt import (
CHANGE_ERROR_MESSAGE_TO_DESCRIPTION,
- CREATE_PLAN,
- EVALUATE_GOAL,
FINAL_ANSWER,
GEN_STEP,
GENERATE_FLOW_NAME,
GET_MISSING_PARAMS,
- GET_REPLAN_START_STEP_INDEX,
IS_PARAM_ERROR,
- RECREATE_PLAN,
RISK_EVALUATE,
- TOOL_EXECUTE_ERROR_TYPE_ANALYSIS,
- TOOL_SKIP,
)
from apps.scheduler.slot.slot import Slot
from apps.schemas.mcp import (
- GoalEvaluationResult,
IsParamError,
- MCPPlan,
- MCPPlanItem,
MCPTool,
- RestartStepIndex,
Step,
- ToolExcutionErrorType,
ToolRisk,
- ToolSkip,
)
-from apps.schemas.task import Task
_env = SandboxedEnvironment(
loader=BaseLoader,
@@ -46,141 +33,33 @@ _env = SandboxedEnvironment(
trim_blocks=True,
lstrip_blocks=True,
)
+logger = logging.getLogger(__name__)
-class MCPPlanner(McpBase):
+class MCPPlanner(MCPBase):
"""MCP 用户目标拆解与规划"""
- @staticmethod
- async def evaluate_goal(
- goal: str,
- tool_list: list[MCPTool],
- resoning_llm: ReasoningLLM = ReasoningLLM()) -> GoalEvaluationResult:
- """评估用户目标的可行性"""
- # 获取推理结果
- result = await MCPPlanner._get_reasoning_evaluation(goal, tool_list, resoning_llm)
-
- # 解析为结构化数据
- evaluation = await MCPPlanner._parse_evaluation_result(result)
-
- # 返回评估结果
- return evaluation
-
- @staticmethod
- async def _get_reasoning_evaluation(
- goal, tool_list: list[MCPTool],
- resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
- """获取推理大模型的评估结果"""
- template = _env.from_string(EVALUATE_GOAL)
- prompt = template.render(
- goal=goal,
- tools=tool_list,
- )
- result = await MCPPlanner.get_resoning_result(prompt, resoning_llm)
- return result
+ goal: str
+ llm: ReasoningLLM
- @staticmethod
- async def _parse_evaluation_result(result: str) -> GoalEvaluationResult:
- """将推理结果解析为结构化数据"""
- schema = GoalEvaluationResult.model_json_schema()
- evaluation = await MCPPlanner._parse_result(result, schema)
- # 使用GoalEvaluationResult模型解析结果
- return GoalEvaluationResult.model_validate(evaluation)
+ def __init__(self, goal: str, llm: ReasoningLLM) -> None:
+ """初始化MCPPlanner"""
+ super().__init__()
+ self.goal = goal
+ self.llm = llm
- async def get_flow_name(user_goal: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
+ async def get_flow_name(self) -> str:
"""获取当前流程的名称"""
- result = await MCPPlanner._get_reasoning_flow_name(user_goal, resoning_llm)
- return result
-
- @staticmethod
- async def _get_reasoning_flow_name(user_goal: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
- """获取推理大模型的流程名称"""
template = _env.from_string(GENERATE_FLOW_NAME)
- prompt = template.render(goal=user_goal)
- result = await MCPPlanner.get_resoning_result(prompt, resoning_llm)
- return result
+ prompt = template.render(goal=self.goal)
+ return await self.get_resoning_result(prompt)
- @staticmethod
- async def get_replan_start_step_index(
- user_goal: str, error_message: str, current_plan: MCPPlan | None = None,
- history: str = "",
- reasoning_llm: ReasoningLLM = ReasoningLLM()) -> RestartStepIndex:
- """获取重新规划的步骤索引"""
- # 获取推理结果
- template = _env.from_string(GET_REPLAN_START_STEP_INDEX)
- prompt = template.render(
- goal=user_goal,
- error_message=error_message,
- current_plan=current_plan.model_dump(exclude_none=True, by_alias=True),
- history=history,
- )
- result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
- # 解析为结构化数据
- schema = RestartStepIndex.model_json_schema()
- schema["properties"]["start_index"]["maximum"] = len(current_plan.plans) - 1
- schema["properties"]["start_index"]["minimum"] = 0
- restart_index = await MCPPlanner._parse_result(result, schema)
- # 使用RestartStepIndex模型解析结果
- return RestartStepIndex.model_validate(restart_index)
-
- @staticmethod
- async def create_plan(
- user_goal: str, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan | None = None,
- tool_list: list[MCPTool] = [],
- max_steps: int = 6, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> MCPPlan:
- """规划下一步的执行流程,并输出"""
- # 获取推理结果
- result = await MCPPlanner._get_reasoning_plan(user_goal, is_replan, error_message, current_plan, tool_list, max_steps, reasoning_llm)
-
- # 解析为结构化数据
- return await MCPPlanner._parse_plan_result(result, max_steps)
-
- @staticmethod
- async def _get_reasoning_plan(
- user_goal: str, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan | None = None,
- tool_list: list[MCPTool] = [],
- max_steps: int = 10, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
- """获取推理大模型的结果"""
- # 格式化Prompt
- tool_ids = [tool.id for tool in tool_list]
- if is_replan:
- template = _env.from_string(RECREATE_PLAN)
- prompt = template.render(
- current_plan=current_plan.model_dump(exclude_none=True, by_alias=True),
- error_message=error_message,
- goal=user_goal,
- tools=tool_list,
- max_num=max_steps,
- )
- else:
- template = _env.from_string(CREATE_PLAN)
- prompt = template.render(
- goal=user_goal,
- tools=tool_list,
- max_num=max_steps,
- )
- result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
- return result
-
- @staticmethod
- async def _parse_plan_result(result: str, max_steps: int) -> MCPPlan:
- """将推理结果解析为结构化数据"""
- # 格式化Prompt
- schema = MCPPlan.model_json_schema()
- schema["properties"]["plans"]["maxItems"] = max_steps
- plan = await MCPPlanner._parse_result(result, schema)
- # 使用Function模型解析结果
- return MCPPlan.model_validate(plan)
-
- @staticmethod
- async def create_next_step(
- goal: str, history: str, tools: list[MCPTool],
- reasoning_llm: ReasoningLLM = ReasoningLLM()) -> Step:
+ async def create_next_step(self, history: str, tools: list[MCPTool]) -> Step:
"""创建下一步的执行步骤"""
# 获取推理结果
template = _env.from_string(GEN_STEP)
- prompt = template.render(goal=goal, history=history, tools=tools)
- result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
+ prompt = template.render(goal=self.goal, history=history, tools=tools)
+ result = await self.get_resoning_result(prompt)
# 解析为结构化数据
schema = Step.model_json_schema()
@@ -188,54 +67,19 @@ class MCPPlanner(McpBase):
schema["properties"]["tool_id"]["enum"] = []
for tool in tools:
schema["properties"]["tool_id"]["enum"].append(tool.id)
- step = await MCPPlanner._parse_result(result, schema)
+ step = await self._parse_result(result, schema)
+ logger.info("[MCPPlanner] 创建下一步的执行步骤: %s", step)
# 使用Step模型解析结果
return Step.model_validate(step)
- @staticmethod
- async def tool_skip(
- task: Task, step_id: str, step_name: str, step_instruction: str, step_content: str,
- reasoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolSkip:
- """判断当前步骤是否需要跳过"""
- # 获取推理结果
- template = _env.from_string(TOOL_SKIP)
- from apps.scheduler.mcp_agent.host import MCPHost
- history = await MCPHost.assemble_memory(task)
- prompt = template.render(
- step_id=step_id,
- step_name=step_name,
- step_instruction=step_instruction,
- step_content=step_content,
- history=history,
- goal=task.runtime.question
- )
- result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
-
- # 解析为结构化数据
- schema = ToolSkip.model_json_schema()
- skip_result = await MCPPlanner._parse_result(result, schema)
- # 使用ToolSkip模型解析结果
- return ToolSkip.model_validate(skip_result)
-
- @staticmethod
async def get_tool_risk(
- tool: MCPTool, input_parm: dict[str, Any],
- additional_info: str = "", resoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolRisk:
+ self,
+ tool: MCPTool,
+ input_param: dict[str, Any],
+ additional_info: str = "",
+ ) -> ToolRisk:
"""获取MCP工具的风险评估结果"""
# 获取推理结果
- result = await MCPPlanner._get_reasoning_risk(tool, input_parm, additional_info, resoning_llm)
-
- # 解析为结构化数据
- risk = await MCPPlanner._parse_risk_result(result)
-
- # 返回风险评估结果
- return risk
-
- @staticmethod
- async def _get_reasoning_risk(
- tool: MCPTool, input_param: dict[str, Any],
- additional_info: str, resoning_llm: ReasoningLLM) -> str:
- """获取推理大模型的风险评估结果"""
template = _env.from_string(RISK_EVALUATE)
prompt = template.render(
tool_name=tool.name,
@@ -243,65 +87,26 @@ class MCPPlanner(McpBase):
input_param=input_param,
additional_info=additional_info,
)
- result = await MCPPlanner.get_resoning_result(prompt, resoning_llm)
- return result
+ result = await self.get_resoning_result(prompt)
- @staticmethod
- async def _parse_risk_result(result: str) -> ToolRisk:
- """将推理结果解析为结构化数据"""
schema = ToolRisk.model_json_schema()
- risk = await MCPPlanner._parse_result(result, schema)
- # 使用ToolRisk模型解析结果
- return ToolRisk.model_validate(risk)
+ risk = await self._parse_result(result, schema)
- @staticmethod
- async def _get_reasoning_tool_execute_error_type(
- user_goal: str, current_plan: MCPPlan,
- tool: MCPTool, input_param: dict[str, Any],
- error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
- """获取推理大模型的工具执行错误类型"""
- template = _env.from_string(TOOL_EXECUTE_ERROR_TYPE_ANALYSIS)
- prompt = template.render(
- goal=user_goal,
- current_plan=current_plan.model_dump(exclude_none=True, by_alias=True),
- tool_name=tool.name,
- tool_description=tool.description,
- input_param=input_param,
- error_message=error_message,
- )
- result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
- return result
-
- @staticmethod
- async def _parse_tool_execute_error_type_result(result: str) -> ToolExcutionErrorType:
- """将推理结果解析为工具执行错误类型"""
- schema = ToolExcutionErrorType.model_json_schema()
- error_type = await MCPPlanner._parse_result(result, schema)
- # 使用ToolExcutionErrorType模型解析结果
- return ToolExcutionErrorType.model_validate(error_type)
-
- @staticmethod
- async def get_tool_execute_error_type(
- user_goal: str, current_plan: MCPPlan,
- tool: MCPTool, input_param: dict[str, Any],
- error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolExcutionErrorType:
- """获取MCP工具执行错误类型"""
- # 获取推理结果
- result = await MCPPlanner._get_reasoning_tool_execute_error_type(
- user_goal, current_plan, tool, input_param, error_message, reasoning_llm)
- error_type = await MCPPlanner._parse_tool_execute_error_type_result(result)
- # 返回工具执行错误类型
- return error_type
+ # 返回风险评估结果
+ return ToolRisk.model_validate(risk)
- @staticmethod
async def is_param_error(
- goal: str, history: str, error_message: str, tool: MCPTool, step_description: str, input_params: dict
- [str, Any],
- reasoning_llm: ReasoningLLM = ReasoningLLM()) -> IsParamError:
+ self,
+ history: str,
+ error_message: str,
+ tool: MCPTool,
+ step_description: str,
+ input_params: dict[str, Any],
+ ) -> IsParamError:
"""判断错误信息是否是参数错误"""
tmplate = _env.from_string(IS_PARAM_ERROR)
prompt = tmplate.render(
- goal=goal,
+ goal=self.goal,
history=history,
step_id=tool.id,
step_name=tool.name,
@@ -309,17 +114,16 @@ class MCPPlanner(McpBase):
input_params=input_params,
error_message=error_message,
)
- result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
+ result = await self.get_resoning_result(prompt)
# 解析为结构化数据
schema = IsParamError.model_json_schema()
- is_param_error = await MCPPlanner._parse_result(result, schema)
+ is_param_error = await self._parse_result(result, schema)
# 使用IsParamError模型解析结果
return IsParamError.model_validate(is_param_error)
- @staticmethod
async def change_err_message_to_description(
- error_message: str, tool: MCPTool, input_params: dict[str, Any],
- reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
+ self, error_message: str, tool: MCPTool, input_params: dict[str, Any],
+ ) -> str:
"""将错误信息转换为工具描述"""
template = _env.from_string(CHANGE_ERROR_MESSAGE_TO_DESCRIPTION)
prompt = template.render(
@@ -329,14 +133,9 @@ class MCPPlanner(McpBase):
input_schema=tool.input_schema,
input_params=input_params,
)
- result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
- return result
+ return await self.get_resoning_result(prompt)
- @staticmethod
- async def get_missing_param(
- tool: MCPTool,
- input_param: dict[str, Any],
- error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> list[str]:
+ async def get_missing_param(self, tool: MCPTool, input_param: dict[str, Any], error_message: str) -> dict[str, Any]:
"""获取缺失的参数"""
slot = Slot(schema=tool.input_schema)
template = _env.from_string(GET_MISSING_PARAMS)
@@ -348,22 +147,20 @@ class MCPPlanner(McpBase):
schema=schema_with_null,
error_message=error_message,
)
- result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
+ result = await self.get_resoning_result(prompt)
# 解析为结构化数据
- input_param_with_null = await MCPPlanner._parse_result(result, schema_with_null)
- return input_param_with_null
+ return await self._parse_result(result, schema_with_null)
- @staticmethod
async def generate_answer(
- user_goal: str, memory: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> AsyncGenerator[
- str, None]:
+ self, memory: str,
+ ) -> AsyncGenerator[str, None]:
"""生成最终回答"""
template = _env.from_string(FINAL_ANSWER)
prompt = template.render(
memory=memory,
- goal=user_goal,
+ goal=self.goal,
)
- async for chunk in resoning_llm.call(
+ async for chunk in self.llm.call(
[{"role": "user", "content": prompt}],
streaming=True,
temperature=0.07,
diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py
index d51042a9dc4c7ede3cc0151d4fb7b4fcd9bfcfc0..80c95362ad1870333e33a284deceed4801862b91 100644
--- a/apps/scheduler/mcp_agent/prompt.py
+++ b/apps/scheduler/mcp_agent/prompt.py
@@ -3,175 +3,6 @@
from textwrap import dedent
-MCP_SELECT = dedent(r"""
- 你是一个乐于助人的智能助手。
- 你的任务是:根据当前目标,选择最合适的MCP Server。
-
- ## 选择MCP Server时的注意事项:
-
- 1. 确保充分理解当前目标,选择最合适的MCP Server。
- 2. 请在给定的MCP Server列表中选择,不要自己生成MCP Server。
- 3. 请先给出你选择的理由,再给出你的选择。
- 4. 当前目标将在下面给出,MCP Server列表也会在下面给出。
- 请将你的思考过程放在"思考过程"部分,将你的选择放在"选择结果"部分。
- 5. 选择必须是JSON格式,严格按照下面的模板,不要输出任何其他内容:
-
- ```json
- {
- "mcp": "你选择的MCP Server的名称"
- }
- ```
-
- 6. 下面的示例仅供参考,不要将示例中的内容作为选择MCP Server的依据。
-
- ## 示例
-
- ### 目标
-
- 我需要一个MCP Server来完成一个任务。
-
- ### MCP Server列表
-
- - **mcp_1**: "MCP Server 1";MCP Server 1的描述
- - **mcp_2**: "MCP Server 2";MCP Server 2的描述
-
- ### 请一步一步思考:
-
- 因为当前目标需要一个MCP Server来完成一个任务,所以选择mcp_1。
-
- ### 选择结果
-
- ```json
- {
- "mcp": "mcp_1"
- }
- ```
-
- ## 现在开始!
-
- ### 目标
-
- {{goal}}
-
- ### MCP Server列表
-
- {% for mcp in mcp_list %}
- - **{{mcp.id}}**: "{{mcp.name}}";{{mcp.description}}
- {% endfor %}
-
- ### 请一步一步思考:
-
-""")
-
-TOOL_SELECT = dedent(r"""
- 你是一个乐于助人的智能助手。
- 你的任务是:根据当前目标,附加信息,选择最合适的MCP工具。
- ## 选择MCP工具时的注意事项:
- 1. 确保充分理解当前目标,选择实现目标所需的MCP工具。
- 2. 不要选择不存在的工具。
- 3. 可以选择一些辅助工具,但必须确保这些工具与当前目标相关。
- 4. 注意,返回的工具ID必须是MCP工具的ID,而不是名称。
- 5. 可以多选择一些工具,用于应对不同的情况。
- 必须按照以下格式生成选择结果,不要输出任何其他内容:
- ```json
- {
- "tool_ids": ["工具ID1", "工具ID2", ...]
- }
- ```
-
- # 示例
- ## 目标
- 调优mysql性能
- ## MCP工具列表
-
- - mcp_tool_1 MySQL链接池工具;用于优化MySQL链接池
- - mcp_tool_2 MySQL性能调优工具;用于分析MySQL性能瓶颈
- - mcp_tool_3 MySQL查询优化工具;用于优化MySQL查询语句
- - mcp_tool_4 MySQL索引优化工具;用于优化MySQL索引
- - mcp_tool_5 文件存储工具;用于存储文件
- - mcp_tool_6 mongoDB工具;用于操作MongoDB数据库
-
- ## 附加信息
- 1. 当前MySQL数据库的版本是8.0.26
- 2. 当前MySQL数据库的配置文件路径是/etc/my.cnf,并含有以下配置项
- ```json
- {
- "max_connections": 1000,
- "innodb_buffer_pool_size": "1G",
- "query_cache_size": "64M"
- }
- ##输出
- ```json
- {
- "tool_ids": ["mcp_tool_1", "mcp_tool_2", "mcp_tool_3", "mcp_tool_4"]
- }
- ```
- # 现在开始!
- ## 目标
- {{goal}}
- ## MCP工具列表
-
- {% for tool in tools %}
- - {{tool.id}} {{tool.name}};{{tool.description}}
- {% endfor %}
-
- ## 附加信息
- {{additional_info}}
- # 输出
- """)
-
-EVALUATE_GOAL = dedent(r"""
- 你是一个计划评估器。
- 请根据用户的目标和当前的工具集合以及一些附加信息,判断基于当前的工具集合,是否能够完成用户的目标。
- 如果能够完成,请返回`true`,否则返回`false`。
- 推理过程必须清晰明了,能够让人理解你的判断依据。
- 必须按照以下格式回答:
- ```json
- {
- "can_complete": true/false,
- "resoning": "你的推理过程"
- }
- ```
-
- # 样例
- # 目标
- 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
-
- # 工具集合
- 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
-
- - mysql_analyzer 分析MySQL数据库性能
- - performance_tuner 调优数据库性能
- - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
-
-
- # 附加信息
- 1. 当前MySQL数据库的版本是8.0.26
- 2. 当前MySQL数据库的配置文件路径是/etc/my.cnf
-
- ##
- ```json
- {
- "can_complete": true,
- "resoning": "当前的工具集合中包含mysql_analyzer和performance_tuner,能够完成对MySQL数据库的性能分析和调优,因此可以完成用户的目标。"
- }
- ```
-
- # 目标
- {{goal}}
-
- # 工具集合
-
- {% for tool in tools %}
- - {{tool.id}} {{tool.name}};{{tool.description}}
- {% endfor %}
-
-
- # 附加信息
- {{additional_info}}
-
-""")
-
GENERATE_FLOW_NAME = dedent(r"""
你是一个智能助手,你的任务是根据用户的目标,生成一个合适的流程名称。
@@ -192,321 +23,6 @@ GENERATE_FLOW_NAME = dedent(r"""
# 输出
""")
-GET_REPLAN_START_STEP_INDEX = dedent(r"""
- 你是一个智能助手,你的任务是根据用户的目标、报错信息和当前计划和历史,获取重新规划的步骤起始索引。
-
- # 样例
- # 目标
- 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
- # 报错信息
- 执行端口扫描命令时,出现了错误:`- bash: curl: command not found`。
- # 当前计划
- ```json
- {
- "plans": [
- {
- "step_id": "step_1",
- "content": "生成端口扫描命令",
- "tool": "command_generator",
- "instruction": "生成端口扫描命令:扫描
- },
- {
- "step_id": "step_2",
- "content": "在执行Result[0]生成的命令",
- "tool": "command_executor",
- "instruction": "执行端口扫描命令"
- }
- ]
- }
- # 历史
- [
- {
- id: "0",
- task_id: "task_1",
- flow_id: "flow_1",
- flow_name: "MYSQL性能调优",
- flow_status: "RUNNING",
- step_id: "step_1",
- step_name: "生成端口扫描命令",
- step_description: "生成端口扫描命令:扫描当前MySQL数据库的端口",
- step_status: "FAILED",
- input_data: {
- "command": "nmap -p 3306
- "target": "localhost"
- },
- output_data: {
- "error": "- bash: curl: command not found"
- }
- }
- ]
- # 输出
- {
- "start_index": 0,
- "reasoning": "当前计划的第一步就失败了,报错信息显示curl命令未找到,可能是因为没有安装curl工具,因此需要从第一步重新规划。"
- }
- # 现在开始获取重新规划的步骤起始索引:
- # 目标
- {{goal}}
- # 报错信息
- {{error_message}}
- # 当前计划
- {{current_plan}}
- # 历史
- {{history}}
- # 输出
- """)
-
-CREATE_PLAN = dedent(r"""
- 你是一个计划生成器。
- 请分析用户的目标,并生成一个计划。你后续将根据这个计划,一步一步地完成用户的目标。
-
- # 一个好的计划应该:
-
- 1. 能够成功完成用户的目标
- 2. 计划中的每一个步骤必须且只能使用一个工具。
- 3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。
- 4. 不要选择不存在的工具。
- 5. 计划中的最后一步必须是Final工具,以确保计划执行结束。
- 6. 生成的计划必须要覆盖用户的目标,当然需要考虑一些意外情况,可以有一定的冗余步骤。
-
- # 生成计划时的注意事项:
-
- - 每一条计划包含3个部分:
- - 计划内容:描述单个计划步骤的大致内容
- - 工具ID:必须从下文的工具列表中选择
- - 工具指令:改写用户的目标,使其更符合工具的输入要求
- - 必须按照如下格式生成计划,不要输出任何额外数据:
-
- ```json
- {
- "plans": [
- {
- "content": "计划内容",
- "tool": "工具ID",
- "instruction": "工具指令"
- }
- ]
- }
- ```
-
- - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。
-思考过程应放置在 XML标签中。
- - 计划内容中,可以使用"Result[]"来引用之前计划步骤的结果。例如:"Result[3]"表示引用第三条计划执行后的结果。
- - 计划不得多于{{max_num}}条,且每条计划内容应少于150字。
-
- # 工具
-
- 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
-
-
- {% for tool in tools %}
- - {{tool.id}} {{tool.name}};{{tool.description}}
- {% endfor %}
-
-
- # 样例
-
- # 目标
-
- 在后台运行一个新的alpine: latest容器,将主机/root文件夹挂载至/data,并执行top命令。
-
- # 计划
-
-
- 1. 这个目标需要使用Docker来完成, 首先需要选择合适的MCP Server
- 2. 目标可以拆解为以下几个部分:
- - 运行alpine: latest容器
- - 挂载主机目录
- - 在后台运行
- - 执行top命令
- 3. 需要先选择MCP Server, 然后生成Docker命令, 最后执行命令
- ```json
- {
- "plans": [
- {
- "content": "选择一个支持Docker的MCP Server",
- "tool": "mcp_selector",
- "instruction": "需要一个支持Docker容器运行的MCP Server"
- },
- {
- "content": "使用第一步选择的MCP Server,生成Docker命令",
- "tool": "command_generator",
- "instruction": "生成Docker命令:在后台运行alpine:latest容器,挂载/root到/data,执行top命令"
- },
- {
- "content": "执行第二步生成的Docker命令",
- "tool": "command_executor",
- "instruction": "执行Docker命令"
- },
- {
- "content": "任务执行完成,容器已在后台运行",
- "tool": "Final",
- "instruction": ""
- }
- ]
- }
- ```
-
- # 现在开始生成计划:
-
- # 目标
-
- {{goal}}
-
- # 计划
-""")
-
-RECREATE_PLAN = dedent(r"""
- 你是一个计划重建器。
- 请根据用户的目标、当前计划和运行报错,重新生成一个计划。
-
- # 一个好的计划应该:
-
- 1. 能够成功完成用户的目标
- 2. 计划中的每一个步骤必须且只能使用一个工具。
- 3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。
- 4. 你的计划必须避免之前的错误,并且能够成功执行。
- 5. 不要选择不存在的工具。
- 6. 计划中的最后一步必须是Final工具,以确保计划执行结束。
- 7. 生成的计划必须要覆盖用户的目标,当然需要考虑一些意外情况,可以有一定的冗余步骤。
-
- # 生成计划时的注意事项:
-
- - 每一条计划包含3个部分:
- - 计划内容:描述单个计划步骤的大致内容
- - 工具ID:必须从下文的工具列表中选择
- - 工具指令:改写用户的目标,使其更符合工具的输入要求
- - 必须按照如下格式生成计划,不要输出任何额外数据:
-
- ```json
- {
- "plans": [
- {
- "content": "计划内容",
- "tool": "工具ID",
- "instruction": "工具指令"
- }
- ]
- }
- ```
-
- - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。
-思考过程应放置在 XML标签中。
- - 计划内容中,可以使用"Result[]"来引用之前计划步骤的结果。例如:"Result[3]"表示引用第三条计划执行后的结果。
- - 计划不得多于{{max_num}}条,且每条计划内容应少于150字。
-
- # 样例
-
- # 目标
-
- 请帮我扫描一下192.168.1.1的这台机器的端口,看看有哪些端口开放。
- # 工具
- 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
-
- - command_generator 生成命令行指令
- - tool_selector 选择合适的工具
- - command_executor 执行命令行指令
- - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
- # 当前计划
- ```json
- {
- "plans": [
- {
- "content": "生成端口扫描命令",
- "tool": "command_generator",
- "instruction": "生成端口扫描命令:扫描192.168.1.1的开放端口"
- },
- {
- "content": "在执行第一步生成的命令",
- "tool": "command_executor",
- "instruction": "执行端口扫描命令"
- },
- {
- "content": "任务执行完成",
- "tool": "Final",
- "instruction": ""
- }
- ]
- }
- ```
- # 运行报错
- 执行端口扫描命令时,出现了错误:`- bash: curl: command not found`。
- # 重新生成的计划
-
-
- 1. 这个目标需要使用网络扫描工具来完成, 首先需要选择合适的网络扫描工具
- 2. 目标可以拆解为以下几个部分:
- - 生成端口扫描命令
- - 执行端口扫描命令
- 3.但是在执行端口扫描命令时,出现了错误:`- bash: curl: command not found`。
- 4.我将计划调整为:
- - 需要先生成一个命令,查看当前机器支持哪些网络扫描工具
- - 执行这个命令,查看当前机器支持哪些网络扫描工具
- - 然后从中选择一个网络扫描工具
- - 基于选择的网络扫描工具,生成端口扫描命令
- - 执行端口扫描命令
- ```json
- {
- "plans": [
- {
- "content": "需要生成一条命令查看当前机器支持哪些网络扫描工具",
- "tool": "command_generator",
- "instruction": "选择一个前机器支持哪些网络扫描工具"
- },
- {
- "content": "执行第一步中生成的命令,查看当前机器支持哪些网络扫描工具",
- "tool": "command_executor",
- "instruction": "执行第一步中生成的命令"
- },
- {
- "content": "从第二步执行结果中选择一个网络扫描工具,生成端口扫描命令",
- "tool": "tool_selector",
- "instruction": "选择一个网络扫描工具,生成端口扫描命令"
- },
- {
- "content": "基于第三步中选择的网络扫描工具,生成端口扫描命令",
- "tool": "command_generator",
- "instruction": "生成端口扫描命令:扫描192.168.1.1的开放端口"
- },
- {
- "content": "执行第四步中生成的端口扫描命令",
- "tool": "command_executor",
- "instruction": "执行端口扫描命令"
- },
- {
- "content": "任务执行完成",
- "tool": "Final",
- "instruction": ""
- }
- ]
- }
- ```
-
- # 现在开始重新生成计划:
-
- # 目标
-
- {{goal}}
-
- # 工具
-
- 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
-
-
- {% for tool in tools %}
- - {{tool.id}} {{tool.name}};{{tool.description}}
- {% endfor %}
-
-
- # 当前计划
- {{current_plan}}
-
- # 运行报错
- {{error_message}}
-
- # 重新生成的计划
-""")
-
GEN_STEP = dedent(r"""
你是一个计划生成器。
请根据用户的目标、当前计划和历史,生成一个新的步骤。
@@ -534,7 +50,8 @@ GEN_STEP = dedent(r"""
- mcp_tool_1 mysql_analyzer;用于分析数据库性能/description>
- mcp_tool_2 文件存储工具;用于存储文件
- mcp_tool_3 mongoDB工具;用于操作MongoDB数据库
- - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
+ - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。\
+
# 输出
```json
@@ -565,10 +82,13 @@ GEN_STEP = dedent(r"""
- 得到数据:`{"weather": "晴", "temperature": "25°C"}`
# 工具
- - mcp_tool_4 maps_geo_planner;将详细的结构化地址转换为经纬度坐标。支持对地标性名胜景区、建筑物名称解析为经纬度坐标
+ - mcp_tool_4 maps_geo_planner;将详细的结构化地址转换为经纬度坐标。支持对地标性名胜景区、\
+建筑物名称解析为经纬度坐标
- mcp_tool_5 weather_query;天气查询,用于查询天气信息
- - mcp_tool_6 maps_direction_transit_integrated;根据用户起终点经纬度坐标规划综合各类公共(火车、公交、地铁)交通方式的通勤方案,并且返回通勤方案的数据,跨城场景下必须传起点城市与终点城市
- - Final Final;结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
+ - mcp_tool_6 maps_direction_transit_integrated;根据用户起终点经纬度坐标规划综合各类\
+公共交通方式(火车、公交、地铁)的通勤方案,并且返回通勤方案的数据,跨城场景下必须传起点城市与终点城市
+ - Final Final;结束步骤,当执行到这一步时,表示计划执行结束,\
+所得到的结果将作为最终结果。
# 输出
```json
@@ -590,61 +110,6 @@ GEN_STEP = dedent(r"""
""")
-TOOL_SKIP = dedent(r"""
- 你是一个计划执行器。
- 你的任务是根据当前的计划和用户目标,判断当前步骤是否需要跳过。
- 如果需要跳过,请返回`true`,否则返回`false`。
- 必须按照以下格式回答:
- ```json
- {
- "skip": true/false,
- }
- ```
- 注意:
- 1.你的判断要谨慎,在历史消息中有足够的上下文信息时,才可以判断是否跳过当前步骤。
- # 样例
- # 用户目标
- 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
- # 历史
- 第1步:生成端口扫描命令
- - 调用工具 `command_generator`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}`
- - 执行状态:成功
- - 得到数据:`{"command": "nmap -sS -p--open 192.168.1.1"}`
- 第2步:执行端口扫描命令
- - 调用工具 `command_executor`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}`
- - 执行状态:成功
- - 得到数据:`{"result": "success"}`
- 第3步:分析端口扫描结果
- - 调用工具 `mysql_analyzer`,并提供参数 `{"host": "192.168.1.1", "port": 3306, "username": "root", "password": "password"}`
- - 执行状态:成功
- - 得到数据:`{"performance": "good", "bottleneck": "none"}`
- # 当前步骤
-
- step_4
- command_generator
- 生成MySQL性能调优命令
- 生成MySQL性能调优命令:调优MySQL数据库性能
-
- # 输出
- ```json
- {
- "skip": true
- }
- ```
- # 用户目标
- {{goal}}
- # 历史
- {{history}}
- # 当前步骤
-
- {{step_id}}
- {{step_name}}
- {{step_instruction}}
- {{step_content}}
-
- # 输出
- """)
-
RISK_EVALUATE = dedent(r"""
你是一个工具执行计划评估器。
你的任务是根据当前工具的名称、描述和入参以及附加信息,判断当前工具执行的风险并输出提示。
@@ -678,14 +143,15 @@ RISK_EVALUATE = dedent(r"""
```json
{
"risk": "中",
- "reason": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。请确保在非生产环境中执行此操作。"
+ "reason": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。\
+请确保在非生产环境中执行此操作。"
}
```
# 工具
-
- {{tool_name}}
- {{tool_description}}
-
+ < tool >
+ < name > {{tool_name}} < /name >
+ < description > {{tool_description}} < /description >
+ < / tool >
# 工具入参
{{input_param}}
# 附加信息
@@ -693,76 +159,6 @@ RISK_EVALUATE = dedent(r"""
# 输出
""")
-# 根据当前计划和报错信息决定下一步执行,具体计划有需要用户补充工具入参、重计划当前步骤、重计划接下来的所有计划
-TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r"""
- 你是一个计划决策器。
- 你的任务是根据用户目标、当前计划、当前使用的工具、工具入参和工具运行报错,决定下一步执行的操作。
- 请根据以下规则进行判断:
- 1. 仅通过补充工具入参来解决问题的,返回 missing_param;
- 2. 需要重计划当前步骤的,返回 decorrect_plan
- 3.推理过程必须清晰明了,能够让人理解你的判断依据,并且不超过100字。
- 你的输出要以json格式返回,格式如下:
- ```json
- {
- "error_type": "missing_param/decorrect_plan,
- "reason": "你的推理过程"
- }
- ```
- # 样例
- # 用户目标
- 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
- # 当前计划
- {"plans": [
- {
- "content": "生成端口扫描命令",
- "tool": "command_generator",
- "instruction": "生成端口扫描命令:扫描192.168.1.1的开放端口"
- },
- {
- "content": "在执行Result[0]生成的命令",
- "tool": "command_executor",
- "instruction": "执行端口扫描命令"
- },
- {
- "content": "任务执行完成,端口扫描结果为Result[2]",
- "tool": "Final",
- "instruction": ""
- }
- ]}
- # 当前使用的工具
-
- command_executor
- 执行命令行指令
-
- # 工具入参
- {
- "command": "nmap -sS -p--open 192.168.1.1"
- }
- # 工具运行报错
- 执行端口扫描命令时,出现了错误:`- bash: nmap: command not found`。
- # 输出
- ```json
- {
- "error_type": "decorrect_plan",
- "reason": "当前计划的第二步执行失败,报错信息显示nmap命令未找到,可能是因为没有安装nmap工具,因此需要重计划当前步骤。"
- }
- ```
- # 用户目标
- {{goal}}
- # 当前计划
- {{current_plan}}
- # 当前使用的工具
-
- {{tool_name}}
- {{tool_description}}
-
- # 工具入参
- {{input_param}}
- # 工具运行报错
- {{error_message}}
- # 输出
- """)
-
IS_PARAM_ERROR = dedent(r"""
你是一个计划执行专家,你的任务是判断当前的步骤执行失败是否是因为参数错误导致的,
如果是,请返回`true`,否则返回`false`。
@@ -834,10 +230,10 @@ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r"""
5. 只输出自然语言描述,不要输出其他内容。
# 样例
# 工具信息
-
- port_scanner
- 扫描主机端口
-
+ < tool >
+ < name > port_scanner < /name >
+ < description > 扫描主机端口 < /description >
+ < input_schema >
{
"type": "object",
"properties": {
@@ -860,8 +256,8 @@ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r"""
},
"required": ["host", "port", "username", "password"]
}
-
-
+ < /input_schema >
+ < / tool >
# 工具入参
{
"host": "192.0.0.1",
@@ -875,13 +271,13 @@ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r"""
扫描端口时发生错误:密码不正确。请检查输入的密码是否正确,并重试。
# 现在开始转换报错信息:
# 工具信息
-
- {{tool_name}}
- {{tool_description}}
-
+ < tool >
+ < name > {{tool_name}} < /name >
+ < description > {{tool_description}} < /description >
+ < input_schema >
{{input_schema}}
-
-
+ < /input_schema >
+ < / tool >
# 工具入参
{{input_params}}
# 报错信息
@@ -960,10 +356,10 @@ GET_MISSING_PARAMS = dedent(r"""
}
```
# 工具
-
- {{tool_name}}
- {{tool_description}}
-
+ < tool >
+ < name > {{tool_name}} < /name >
+ < description > {{tool_description}} < /description >
+ < / tool >
# 工具入参
{{input_param}}
# 工具入参schema(部分字段允许为null)
@@ -973,7 +369,6 @@ GET_MISSING_PARAMS = dedent(r"""
# 输出
""")
-
GEN_PARAMS = dedent(r"""
你是一个工具参数生成器。
你的任务是根据总的目标、阶段性的目标、工具信息、工具入参的schema和背景信息生成工具的入参。
@@ -1034,10 +429,10 @@ GEN_PARAMS = dedent(r"""
}
```
# 工具
-
- {{tool_name}}
- {{tool_description}}
-
+ < tool >
+ < name > {{tool_name}} < /name >
+ < description > {{tool_description}} < /description >
+ < / tool >
# 总目标
{{goal}}
# 当前阶段目标
@@ -1050,6 +445,7 @@ GEN_PARAMS = dedent(r"""
""")
REPAIR_PARAMS = dedent(r"""
+ 你是一个工具参数修复器。
你的任务是根据当前的工具信息、目标、工具入参的schema、工具当前的入参、工具的报错、补充的参数和补充的参数描述,修复当前工具的入参。
注意:
@@ -1057,10 +453,10 @@ REPAIR_PARAMS = dedent(r"""
# 样例
# 工具信息
-
- mysql_analyzer
- 分析MySQL数据库性能
-
+ < tool >
+ < name > mysql_analyzer < /name >
+ < description > 分析MySQL数据库性能 < /description >
+ < / tool >
# 总目标
我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
# 当前阶段目标
@@ -1114,10 +510,10 @@ REPAIR_PARAMS = dedent(r"""
}
```
# 工具
-
- {{tool_name}}
- {{tool_description}}
-
+ < tool >
+ < name > {{tool_name}} < /name >
+ < description > {{tool_description}} < /description >
+ < / tool >
# 总目标
{{goal}}
# 当前阶段目标
@@ -1126,6 +522,8 @@ REPAIR_PARAMS = dedent(r"""
{{input_schema}}
# 工具入参
{{input_param}}
+ # 工具描述
+ {{tool_description}}
# 运行报错
{{error_message}}
# 补充的参数
@@ -1148,14 +546,7 @@ FINAL_ANSWER = dedent(r"""
{{memory}}
+
# 现在,请根据以上信息,向用户报告目标的完成情况:
""")
-MEMORY_TEMPLATE = dedent(r"""
- {% for ctx in context_list %}
- - 第{{loop.index}}步:{{ctx.step_description}}
- - 调用工具 `{{ctx.step_id}}`,并提供参数 `{{ctx.input_data}}`
- - 执行状态:{{ctx.step_status}}
- - 得到数据:`{{ctx.output_data}}`
- {% endfor %}
-""")
diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py
deleted file mode 100644
index ce289c1f6ca502214ae5517e692da1889d5ff25e..0000000000000000000000000000000000000000
--- a/apps/scheduler/mcp_agent/select.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
-"""选择MCP Server及其工具"""
-
-import logging
-import random
-
-from jinja2 import BaseLoader
-from jinja2.sandbox import SandboxedEnvironment
-
-from apps.llm.reasoning import ReasoningLLM
-from apps.llm.token import TokenCalculator
-from apps.scheduler.mcp_agent.base import McpBase
-from apps.scheduler.mcp_agent.prompt import TOOL_SELECT
-from apps.schemas.mcp import MCPCollection, MCPTool, MCPToolIdsSelectResult
-
-logger = logging.getLogger(__name__)
-
-_env = SandboxedEnvironment(
- loader=BaseLoader,
- autoescape=True,
- trim_blocks=True,
- lstrip_blocks=True,
-)
-
-FINAL_TOOL_ID = "FIANL"
-SUMMARIZE_TOOL_ID = "SUMMARIZE"
-
-
-class MCPSelector(McpBase):
- """MCP选择器"""
-
- @staticmethod
- async def select_top_tool(
- goal: str, tool_list: list[MCPTool],
- additional_info: str | None = None, top_n: int | None = None,
- reasoning_llm: ReasoningLLM | None = None) -> list[MCPTool]:
- """选择最合适的工具"""
- random.shuffle(tool_list)
- max_tokens = reasoning_llm._config.max_tokens
- template = _env.from_string(TOOL_SELECT)
- token_calculator = TokenCalculator()
- if token_calculator.calculate_token_length(
- messages=[{"role": "user", "content": template.render(
- goal=goal, tools=[], additional_info=additional_info
- )}],
- pure_text=True) > max_tokens:
- logger.warning("[MCPSelector] 工具选择模板长度超过最大令牌数,无法进行选择")
- return []
- current_index = 0
- tool_ids = []
- while current_index < len(tool_list):
- index = current_index
- sub_tools = []
- while index < len(tool_list):
- tool = tool_list[index]
- tokens = token_calculator.calculate_token_length(
- messages=[{"role": "user", "content": template.render(
- goal=goal, tools=[tool],
- additional_info=additional_info
- )}],
- pure_text=True
- )
- if tokens > max_tokens:
- continue
- sub_tools.append(tool)
-
- tokens = token_calculator.calculate_token_length(messages=[{"role": "user", "content": template.render(
- goal=goal, tools=sub_tools, additional_info=additional_info)}, ], pure_text=True)
- if tokens > max_tokens:
- del sub_tools[-1]
- break
- else:
- index += 1
- current_index = index
- if sub_tools:
- schema = MCPToolIdsSelectResult.model_json_schema()
- if "items" not in schema["properties"]["tool_ids"]:
- schema["properties"]["tool_ids"]["items"] = {}
- # 将enum添加到items中,限制数组元素的可选值
- schema["properties"]["tool_ids"]["items"]["enum"] = [tool.id for tool in sub_tools]
- result = await MCPSelector.get_resoning_result(template.render(goal=goal, tools=sub_tools, additional_info="请根据目标选择对应的工具"), reasoning_llm)
- result = await MCPSelector._parse_result(result, schema)
- try:
- result = MCPToolIdsSelectResult.model_validate(result)
- tool_ids.extend(result.tool_ids)
- except Exception:
- logger.exception("[MCPSelector] 解析MCP工具ID选择结果失败")
- continue
- mcp_tools = [tool for tool in tool_list if tool.id in tool_ids]
-
- if top_n is not None:
- mcp_tools = mcp_tools[:top_n]
- mcp_tools.append(MCPTool(id=FINAL_TOOL_ID, name="Final",
- description="终止", mcp_id=FINAL_TOOL_ID, input_schema={}))
- # mcp_tools.append(MCPTool(id=SUMMARIZE_TOOL_ID, name="Summarize",
- # description="总结工具", mcp_id=SUMMARIZE_TOOL_ID, input_schema={}))
- return mcp_tools
diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py
index 69b5882654898adb71beb45b33803373ced7f1a2..9f87ae647d8f717113b74fd3b90e9bab77fff80c 100644
--- a/apps/schemas/mcp.py
+++ b/apps/schemas/mcp.py
@@ -59,20 +59,6 @@ class MCPServerConfig(MCPServerItem):
author: str = Field(description="MCP 服务器上传者", default="")
-class GoalEvaluationResult(BaseModel):
- """MCP 目标评估结果"""
-
- can_complete: bool = Field(description="是否可以完成目标")
- reason: str = Field(description="评估原因")
-
-
-class RestartStepIndex(BaseModel):
- """MCP重新规划的步骤索引"""
-
- start_index: int = Field(description="重新规划的起始步骤索引")
- reasoning: str = Field(description="重新规划的原因")
-
-
class Risk(str, Enum):
"""MCP工具风险类型"""
@@ -81,12 +67,6 @@ class Risk(str, Enum):
HIGH = "high"
-class ToolSkip(BaseModel):
- """MCP工具跳过执行结果"""
-
- skip: bool = Field(description="是否跳过当前步骤", default=False)
-
-
class ToolRisk(BaseModel):
"""MCP工具风险评估结果"""
@@ -94,44 +74,17 @@ class ToolRisk(BaseModel):
reason: str = Field(description="风险原因", default="")
-class ErrorType(str, Enum):
- """MCP工具错误类型"""
-
- MISSING_PARAM = "missing_param"
- DECORRECT_PLAN = "decorrect_plan"
-
-
-class ToolExcutionErrorType(BaseModel):
- """MCP工具执行错误"""
-
- type: ErrorType = Field(description="错误类型", default=ErrorType.MISSING_PARAM)
- reason: str = Field(description="错误原因", default="")
-
-
class IsParamError(BaseModel):
"""MCP工具参数错误"""
is_param_error: bool = Field(description="是否是参数错误", default=False)
-
class MCPSelectResult(BaseModel):
"""MCP选择结果"""
mcp_id: str = Field(description="MCP Server的ID")
-class MCPToolSelectResult(BaseModel):
- """MCP工具选择结果"""
-
- name: str = Field(description="工具名称")
-
-
-class MCPToolIdsSelectResult(BaseModel):
- """MCP工具ID选择结果"""
-
- tool_ids: list[str] = Field(description="工具ID列表")
-
-
class MCPPlanItem(BaseModel):
"""MCP 计划"""