diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py
index 4d34b85b6855478d0686c8fa76a1b51433587067..6c80685c9389d65c9703b9892fdf6aaeaf092828 100644
--- a/apps/scheduler/executor/agent.py
+++ b/apps/scheduler/executor/agent.py
@@ -1,38 +1,32 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""MCP Agent执行器"""
-import anyio
import logging
import uuid
-from pydantic import Field
-from typing import Any
+
+import anyio
from mcp.types import TextContent
-from apps.llm.patterns.rewrite import QuestionRewrite
+from pydantic import Field
+
from apps.llm.reasoning import ReasoningLLM
from apps.scheduler.executor.base import BaseExecutor
-from apps.schemas.enum_var import EventType, SpecialCallType, FlowStatus, StepStatus
from apps.scheduler.mcp_agent.host import MCPHost
from apps.scheduler.mcp_agent.plan import MCPPlanner
-from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID, MCPSelector
-from apps.scheduler.pool.mcp.client import MCPClient
+from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID
+from apps.scheduler.pool.mcp.pool import MCPPool
+from apps.schemas.enum_var import EventType, FlowStatus, StepStatus
from apps.schemas.mcp import (
- GoalEvaluationResult,
- RestartStepIndex,
- ToolRisk,
- ErrorType,
- ToolExcutionErrorType,
- MCPPlan,
MCPCollection,
MCPTool,
- Step
+ Step,
)
-from apps.scheduler.pool.mcp.pool import MCPPool
-from apps.schemas.task import ExecutorState, FlowStepHistory, StepQueueItem
-from apps.schemas.message import param
-from apps.services.task import TaskManager
+from apps.schemas.message import FlowParams
+from apps.schemas.task import FlowStepHistory
from apps.services.appcenter import AppCenterManager
from apps.services.mcp_service import MCPServiceManager
+from apps.services.task import TaskManager
from apps.services.user import UserManager
+
logger = logging.getLogger(__name__)
@@ -46,13 +40,17 @@ class MCPAgentExecutor(BaseExecutor):
mcp_list: list[MCPCollection] = Field(description="MCP服务器列表", default=[])
mcp_pool: MCPPool = Field(description="MCP池", default=MCPPool())
tools: dict[str, MCPTool] = Field(
- description="MCP工具列表,key为tool_id", default={}
+ description="MCP工具列表,key为tool_id",
+ default={},
)
tool_list: list[MCPTool] = Field(
- description="MCP工具列表,包含所有MCP工具", default=[]
+ description="MCP工具列表,包含所有MCP工具",
+ default=[],
)
- params: param | bool | None = Field(
- default=None, description="流执行过程中的参数补充", alias="params"
+ params: FlowParams | bool | None = Field(
+ default=None,
+ description="流执行过程中的参数补充",
+ alias="params",
)
resoning_llm: ReasoningLLM = Field(
default=ReasoningLLM(),
@@ -89,43 +87,53 @@ class MCPAgentExecutor(BaseExecutor):
continue
self.mcp_list.append(mcp_service)
- await self.mcp_pool._init_mcp(mcp_id, self.task.ids.user_sub)
+ await self.mcp_pool.init_mcp(mcp_id, self.task.ids.user_sub)
for tool in mcp_service.tools:
self.tools[tool.id] = tool
self.tool_list.extend(mcp_service.tools)
self.tools[FINAL_TOOL_ID] = MCPTool(
- id=FINAL_TOOL_ID,
- name="Final Tool",
- description="结束流程的工具",
- mcp_id="",
- input_schema={}
+ id=FINAL_TOOL_ID, name="Final Tool", description="结束流程的工具", mcp_id="", input_schema={},
+ )
+ self.tool_list.append(
+ MCPTool(id=FINAL_TOOL_ID, name="Final Tool", description="结束流程的工具", mcp_id="", input_schema={}),
)
- self.tool_list.append(MCPTool(id=FINAL_TOOL_ID, name="Final Tool",
- description="结束流程的工具", mcp_id="", input_schema={}))
- async def get_tool_input_param(self, is_first: bool) -> None:
+ async def get_tool_input_param(self, *, is_first: bool) -> None:
+ """获取工具输入参数"""
if is_first:
# 获取第一个输入参数
mcp_tool = self.tools[self.task.state.tool_id]
- self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task)
+ self.task.state.current_input = await MCPHost.get_first_input_params(
+ mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task
+ )
else:
# 获取后续输入参数
- if isinstance(self.params, param):
+ if isinstance(self.params, FlowParams):
params = self.params.content
params_description = self.params.description
else:
params = {}
params_description = ""
mcp_tool = self.tools[self.task.state.tool_id]
- self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task.state.current_input, self.task.state.error_message, params, params_description)
+ self.task.state.current_input = await MCPHost._fill_params(
+ mcp_tool,
+ self.task.runtime.question,
+ self.task.state.step_description,
+ self.task.state.current_input,
+ self.task.state.error_message,
+ params,
+ params_description,
+ )
async def confirm_before_step(self) -> None:
+ """确认前步骤"""
# 发送确认消息
mcp_tool = self.tools[self.task.state.tool_id]
confirm_message = await MCPPlanner.get_tool_risk(mcp_tool, self.task.state.current_input, "", self.resoning_llm)
await self.update_tokens()
- await self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(
- exclude_none=True, by_alias=True))
+ await self.push_message(
+ EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(exclude_none=True, by_alias=True),
+ )
await self.push_message(EventType.FLOW_STOP, {})
self.task.state.flow_status = FlowStatus.WAITING
self.task.state.step_status = StepStatus.WAITING
@@ -142,27 +150,26 @@ class MCPAgentExecutor(BaseExecutor):
input_data={},
output_data={},
ex_data=confirm_message.model_dump(exclude_none=True, by_alias=True),
- )
+ ),
)
- async def run_step(self):
+ async def run_step(self) -> None:
"""执行步骤"""
self.task.state.flow_status = FlowStatus.RUNNING
self.task.state.step_status = StepStatus.RUNNING
mcp_tool = self.tools[self.task.state.tool_id]
- mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub))
+ mcp_client = await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub)
try:
output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input)
- except anyio.ClosedResourceError as e:
- import traceback
- logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, traceback.format_exc())
+ except anyio.ClosedResourceError:
+ logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcp_id)
await self.mcp_pool.stop(mcp_tool.mcp_id, self.task.ids.user_sub)
- await self.mcp_pool._init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub)
- logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, str(e))
+ await self.mcp_pool.init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub)
self.task.state.step_status = StepStatus.ERROR
return
except Exception as e:
import traceback
+
logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误: %s", mcp_tool.name, traceback.format_exc())
self.task.state.step_status = StepStatus.ERROR
self.task.state.error_message = str(e)
@@ -184,14 +191,8 @@ class MCPAgentExecutor(BaseExecutor):
}
await self.update_tokens()
- await self.push_message(
- EventType.STEP_INPUT,
- self.task.state.current_input
- )
- await self.push_message(
- EventType.STEP_OUTPUT,
- output_params
- )
+ await self.push_message(EventType.STEP_INPUT, self.task.state.current_input)
+ await self.push_message(EventType.STEP_OUTPUT, output_params)
self.task.context.append(
FlowStepHistory(
task_id=self.task.id,
@@ -204,7 +205,7 @@ class MCPAgentExecutor(BaseExecutor):
flow_status=self.task.state.flow_status,
input_data=self.task.state.current_input,
output_data=output_params,
- )
+ ),
)
self.task.state.step_status = StepStatus.SUCCESS
@@ -212,29 +213,19 @@ class MCPAgentExecutor(BaseExecutor):
"""生成参数补充"""
mcp_tool = self.tools[self.task.state.tool_id]
params_with_null = await MCPPlanner.get_missing_param(
- mcp_tool,
- self.task.state.current_input,
- self.task.state.error_message,
- self.resoning_llm
+ mcp_tool, self.task.state.current_input, self.task.state.error_message, self.resoning_llm,
)
await self.update_tokens()
error_message = await MCPPlanner.change_err_message_to_description(
error_message=self.task.state.error_message,
tool=mcp_tool,
input_params=self.task.state.current_input,
- reasoning_llm=self.resoning_llm
+ reasoning_llm=self.resoning_llm,
)
await self.push_message(
- EventType.STEP_WAITING_FOR_PARAM,
- data={
- "message": error_message,
- "params": params_with_null
- }
- )
- await self.push_message(
- EventType.FLOW_STOP,
- data={}
+ EventType.STEP_WAITING_FOR_PARAM, data={"message": error_message, "params": params_with_null},
)
+ await self.push_message(EventType.FLOW_STOP, data={})
self.task.state.flow_status = FlowStatus.WAITING
self.task.state.step_status = StepStatus.PARAM
self.task.context.append(
@@ -249,33 +240,25 @@ class MCPAgentExecutor(BaseExecutor):
flow_status=self.task.state.flow_status,
input_data={},
output_data={},
- ex_data={
- "message": error_message,
- "params": params_with_null
- }
- )
+ ex_data={"message": error_message, "params": params_with_null},
+ ),
)
async def get_next_step(self) -> None:
+ """获取下一步"""
if self.task.state.step_cnt < self.max_steps:
self.task.state.step_cnt += 1
history = await MCPHost.assemble_memory(self.task)
max_retry = 3
step = None
- for i in range(max_retry):
+ for _ in range(max_retry):
step = await MCPPlanner.create_next_step(self.task.runtime.question, history, self.tool_list)
- if step.tool_id in self.tools.keys():
+ if step.tool_id in self.tools:
break
- if step is None or step.tool_id not in self.tools.keys():
- step = Step(
- tool_id=FINAL_TOOL_ID,
- description=FINAL_TOOL_ID
- )
+ if step is None or step.tool_id not in self.tools:
+ step = Step(tool_id=FINAL_TOOL_ID, description=FINAL_TOOL_ID)
tool_id = step.tool_id
- if tool_id == FINAL_TOOL_ID:
- step_name = FINAL_TOOL_ID
- else:
- step_name = self.tools[tool_id].name
+ step_name = FINAL_TOOL_ID if tool_id == FINAL_TOOL_ID else self.tools[tool_id].name
step_description = step.description
self.task.state.step_id = str(uuid.uuid4())
self.task.state.tool_id = tool_id
@@ -286,16 +269,12 @@ class MCPAgentExecutor(BaseExecutor):
else:
# 没有下一步了,结束流程
self.task.state.tool_id = FINAL_TOOL_ID
- return
async def error_handle_after_step(self) -> None:
"""步骤执行失败后的错误处理"""
self.task.state.step_status = StepStatus.ERROR
self.task.state.flow_status = FlowStatus.ERROR
- await self.push_message(
- EventType.FLOW_FAILED,
- data={}
- )
+ await self.push_message(EventType.FLOW_FAILED, data={})
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
del self.task.context[-1]
self.task.context.append(
@@ -310,18 +289,18 @@ class MCPAgentExecutor(BaseExecutor):
flow_status=self.task.state.flow_status,
input_data={},
output_data={},
- )
+ ),
)
- async def work(self) -> None:
+ async def work(self) -> None: # noqa: C901, PLR0912, PLR0915
"""执行当前步骤"""
if self.task.state.step_status == StepStatus.INIT:
- await self.push_message(
- EventType.STEP_INIT,
- data={}
- )
+ await self.push_message(EventType.STEP_INIT, data={})
await self.get_tool_input_param(is_first=True)
user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub)
+ if user_info is None:
+ logger.error("[MCPAgentExecutor] 用户信息不存在: %s", self.task.ids.user_sub)
+ return
if not user_info.auto_execute:
# 等待用户确认
await self.confirm_before_step()
@@ -338,14 +317,8 @@ class MCPAgentExecutor(BaseExecutor):
else:
self.task.state.flow_status = FlowStatus.CANCELLED
self.task.state.step_status = StepStatus.CANCELLED
- await self.push_message(
- EventType.STEP_CANCEL,
- data={}
- )
- await self.push_message(
- EventType.FLOW_CANCEL,
- data={}
- )
+ await self.push_message(EventType.STEP_CANCEL, data={})
+ await self.push_message(EventType.FLOW_CANCEL, data={})
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
self.task.context[-1].step_status = StepStatus.CANCELLED
if self.task.state.step_status == StepStatus.PARAM:
@@ -363,12 +336,15 @@ class MCPAgentExecutor(BaseExecutor):
await self.error_handle_after_step()
else:
user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub)
+ if user_info is None:
+ logger.error("[MCPAgentExecutor] 用户信息不存在: %s", self.task.ids.user_sub)
+ return
if user_info.auto_execute:
await self.push_message(
EventType.STEP_ERROR,
data={
"message": self.task.state.error_message,
- }
+ },
)
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
self.task.context[-1].step_status = StepStatus.ERROR
@@ -394,7 +370,7 @@ class MCPAgentExecutor(BaseExecutor):
EventType.STEP_ERROR,
data={
"message": self.task.state.error_message,
- }
+ },
)
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
self.task.context[-1].step_status = StepStatus.ERROR
@@ -406,18 +382,14 @@ class MCPAgentExecutor(BaseExecutor):
await self.get_next_step()
async def summarize(self) -> None:
+ """总结"""
async for chunk in MCPPlanner.generate_answer(
- self.task.runtime.question,
- (await MCPHost.assemble_memory(self.task)),
- self.resoning_llm
+ self.task.runtime.question, (await MCPHost.assemble_memory(self.task)), self.resoning_llm,
):
- await self.push_message(
- EventType.TEXT_ADD,
- data=chunk
- )
+ await self.push_message(EventType.TEXT_ADD, data=chunk)
self.task.runtime.answer += chunk
- async def run(self) -> None:
+ async def run(self) -> None: # noqa: C901
"""执行MCP Agent的主逻辑"""
# 初始化MCP服务
await self.load_state()
@@ -426,32 +398,23 @@ class MCPAgentExecutor(BaseExecutor):
# 初始化状态
try:
self.task.state.flow_id = str(uuid.uuid4())
- self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm)
+ self.task.state.flow_name = await MCPPlanner.get_flow_name(
+ self.task.runtime.question, self.resoning_llm,
+ )
await TaskManager.save_task(self.task.id, self.task)
await self.get_next_step()
except Exception as e:
- import traceback
- logger.error("[MCPAgentExecutor] 初始化失败: %s", traceback.format_exc())
- logger.error("[MCPAgentExecutor] 初始化失败: %s", str(e))
+ logger.exception("[MCPAgentExecutor] 初始化失败")
self.task.state.flow_status = FlowStatus.ERROR
self.task.state.error_message = str(e)
- await self.push_message(
- EventType.FLOW_FAILED,
- data={}
- )
+ await self.push_message(EventType.FLOW_FAILED, data={})
return
self.task.state.flow_status = FlowStatus.RUNNING
- await self.push_message(
- EventType.FLOW_START,
- data={}
- )
+ await self.push_message(EventType.FLOW_START, data={})
if self.task.state.tool_id == FINAL_TOOL_ID:
# 如果已经是最后一步,直接结束
self.task.state.flow_status = FlowStatus.SUCCESS
- await self.push_message(
- EventType.FLOW_SUCCESS,
- data={}
- )
+ await self.push_message(EventType.FLOW_SUCCESS, data={})
await self.summarize()
return
try:
@@ -465,26 +428,15 @@ class MCPAgentExecutor(BaseExecutor):
# 如果已经是最后一步,直接结束
self.task.state.flow_status = FlowStatus.SUCCESS
self.task.state.step_status = StepStatus.SUCCESS
- await self.push_message(
- EventType.FLOW_SUCCESS,
- data={}
- )
+ await self.push_message(EventType.FLOW_SUCCESS, data={})
await self.summarize()
except Exception as e:
- import traceback
- logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", traceback.format_exc())
- logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e))
+ logger.exception("[MCPAgentExecutor] 执行过程中发生错误")
self.task.state.flow_status = FlowStatus.ERROR
self.task.state.error_message = str(e)
self.task.state.step_status = StepStatus.ERROR
- await self.push_message(
- EventType.STEP_ERROR,
- data={}
- )
- await self.push_message(
- EventType.FLOW_FAILED,
- data={}
- )
+ await self.push_message(EventType.STEP_ERROR, data={})
+ await self.push_message(EventType.FLOW_FAILED, data={})
if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id:
del self.task.context[-1]
self.task.context.append(
@@ -499,12 +451,11 @@ class MCPAgentExecutor(BaseExecutor):
flow_status=self.task.state.flow_status,
input_data={},
output_data={},
- )
+ ),
)
finally:
for mcp_service in self.mcp_list:
try:
await self.mcp_pool.stop(mcp_service.id, self.task.ids.user_sub)
- except Exception as e:
- import traceback
- logger.error("[MCPAgentExecutor] 停止MCP客户端时发生错误: %s", traceback.format_exc())
+ except Exception:
+ logger.exception("[MCPAgentExecutor] 停止MCP客户端时发生错误")
diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py
index ac3829b418be54bfc5df6ae3d61db82fe0fc129e..081a70ab85a3aa4d7da180e61d1208f1ce359811 100644
--- a/apps/scheduler/mcp_agent/base.py
+++ b/apps/scheduler/mcp_agent/base.py
@@ -1,14 +1,21 @@
-from typing import Any
+# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
+"""MCP基类"""
+
import json
-from jsonschema import validate
import logging
+from typing import Any
+
+from jsonschema import validate
+
from apps.llm.function import JsonGenerator
from apps.llm.reasoning import ReasoningLLM
logger = logging.getLogger(__name__)
-class McpBase:
+class MCPBase:
+ """MCP基类"""
+
@staticmethod
async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
"""获取推理结果"""
@@ -29,7 +36,7 @@ class McpBase:
return result
@staticmethod
- async def _parse_result(result: str, schema: dict[str, Any], left_str: str = '{', right_str: str = '}') -> str:
+ async def _parse_result(result: str, schema: dict[str, Any], left_str: str = "{", right_str: str = "}") -> str:
"""解析推理结果"""
left_index = result.find(left_str)
right_index = result.rfind(right_str)
@@ -41,21 +48,21 @@ class McpBase:
flag = False
if flag:
try:
- tmp_js = json.loads(result[left_index:right_index + 1])
+ tmp_js = json.loads(result[left_index : right_index + 1])
validate(instance=tmp_js, schema=schema)
- except Exception as e:
- logger.error("[McpBase] 解析结果失败: %s", e)
+ except Exception:
+ logger.exception("[McpBase] 解析结果失败")
flag = False
if not flag:
json_generator = JsonGenerator(
"请提取下面内容中的json\n\n",
[
{"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": "请提取下面内容中的json\n\n"+result},
+ {"role": "user", "content": "请提取下面内容中的json\n\n" + result},
],
schema,
)
json_result = await json_generator.generate()
else:
- json_result = json.loads(result[left_index:right_index + 1])
+ json_result = json.loads(result[left_index : right_index + 1])
return json_result
diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py
index f43506769772d9f1e573c6a862f68e1011075115..445ee83716c801510aa68475fcc08e51583431b1 100644
--- a/apps/scheduler/mcp_agent/host.py
+++ b/apps/scheduler/mcp_agent/host.py
@@ -7,20 +7,14 @@ from typing import Any
from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
-from mcp.types import TextContent
-from apps.common.mongo import MongoDB
-from apps.llm.reasoning import ReasoningLLM
from apps.llm.function import JsonGenerator
-from apps.scheduler.mcp_agent.base import McpBase
+from apps.llm.reasoning import ReasoningLLM
from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE
-from apps.scheduler.pool.mcp.client import MCPClient
-from apps.scheduler.pool.mcp.pool import MCPPool
+from apps.scheduler.mcp_agent.base import MCPBase
from apps.scheduler.mcp_agent.prompt import GEN_PARAMS, REPAIR_PARAMS
-from apps.schemas.enum_var import StepStatus
-from apps.schemas.mcp import MCPPlanItem, MCPTool
-from apps.schemas.task import Task, FlowStepHistory
-from apps.services.task import TaskManager
+from apps.schemas.mcp import MCPTool
+from apps.schemas.task import Task
logger = logging.getLogger(__name__)
@@ -33,25 +27,26 @@ _env = SandboxedEnvironment(
def tojson_filter(value):
- return json.dumps(value, ensure_ascii=False, separators=(',', ':'))
+ return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
-_env.filters['tojson'] = tojson_filter
+_env.filters["tojson"] = tojson_filter
-class MCPHost(McpBase):
+class MCPHost(MCPBase):
"""MCP宿主服务"""
@staticmethod
async def assemble_memory(task: Task) -> str:
"""组装记忆"""
-
return _env.from_string(MEMORY_TEMPLATE).render(
context_list=task.context,
)
- async def _get_first_input_params(mcp_tool: MCPTool, goal: str, current_goal: str, task: Task,
- resoning_llm: ReasoningLLM = ReasoningLLM()) -> dict[str, Any]:
+ async def get_first_input_params(
+ self, mcp_tool: MCPTool, goal: str, current_goal: str,
+ task: Task, resoning_llm: ReasoningLLM = ReasoningLLM(),
+ ) -> dict[str, Any]:
"""填充工具参数"""
# 更清晰的输入·指令,这样可以调用generate
prompt = _env.from_string(GEN_PARAMS).render(
@@ -63,10 +58,7 @@ class MCPHost(McpBase):
background_info=await MCPHost.assemble_memory(task),
)
logger.info("[MCPHost] 填充工具参数: %s", prompt)
- result = await MCPHost.get_resoning_result(
- prompt,
- resoning_llm
- )
+ result = await MCPHost.get_resoning_result(prompt, resoning_llm)
# 使用JsonGenerator解析结果
result = await MCPHost._parse_result(
result,
@@ -74,12 +66,16 @@ class MCPHost(McpBase):
)
return result
- async def _fill_params(mcp_tool: MCPTool,
- goal: str,
- current_goal: str,
- current_input: dict[str, Any],
- error_message: str = "", params: dict[str, Any] = {},
- params_description: str = "") -> dict[str, Any]:
+ async def _fill_params(
+ self,
+ mcp_tool: MCPTool,
+ goal: str,
+ current_goal: str,
+ current_input: dict[str, Any],
+ error_message: str = "",
+ params: dict[str, Any] = {},
+ params_description: str = "",
+ ) -> dict[str, Any]:
llm_query = "请生成修复之后的工具参数"
prompt = _env.from_string(REPAIR_PARAMS).render(
tool_name=mcp_tool.name,
diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py
index b539482d97aa511a928361bffd5e3b0e63f97ec7..3ef84d26f0ef393483fe2368f1f30c7e543255aa 100644
--- a/apps/scheduler/mcp_agent/plan.py
+++ b/apps/scheduler/mcp_agent/plan.py
@@ -1,41 +1,43 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""MCP 用户目标拆解与规划"""
-from typing import Any, AsyncGenerator
+
+import logging
+from collections.abc import AsyncGenerator
+from typing import Any
+
from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
-import logging
+
from apps.llm.reasoning import ReasoningLLM
-from apps.llm.function import JsonGenerator
-from apps.scheduler.mcp_agent.base import McpBase
+from apps.scheduler.mcp_agent.base import MCPBase
from apps.scheduler.mcp_agent.prompt import (
+ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION,
+ CREATE_PLAN,
EVALUATE_GOAL,
+ FINAL_ANSWER,
+ GEN_STEP,
GENERATE_FLOW_NAME,
+ GET_MISSING_PARAMS,
GET_REPLAN_START_STEP_INDEX,
- CREATE_PLAN,
+ IS_PARAM_ERROR,
RECREATE_PLAN,
- GEN_STEP,
- TOOL_SKIP,
RISK_EVALUATE,
TOOL_EXECUTE_ERROR_TYPE_ANALYSIS,
- IS_PARAM_ERROR,
- CHANGE_ERROR_MESSAGE_TO_DESCRIPTION,
- GET_MISSING_PARAMS,
- FINAL_ANSWER
+ TOOL_SKIP,
)
-from apps.schemas.task import Task
+from apps.scheduler.slot.slot import Slot
from apps.schemas.mcp import (
GoalEvaluationResult,
- RestartStepIndex,
- ToolSkip,
- ToolRisk,
IsParamError,
- ToolExcutionErrorType,
MCPPlan,
+ MCPTool,
+ RestartStepIndex,
Step,
- MCPPlanItem,
- MCPTool
+ ToolExcutionErrorType,
+ ToolRisk,
+ ToolSkip,
)
-from apps.scheduler.slot.slot import Slot
+from apps.schemas.task import Task
_env = SandboxedEnvironment(
loader=BaseLoader,
@@ -45,36 +47,32 @@ _env = SandboxedEnvironment(
)
logger = logging.getLogger(__name__)
-class MCPPlanner(McpBase):
+
+class MCPPlanner(MCPBase):
"""MCP 用户目标拆解与规划"""
@staticmethod
async def evaluate_goal(
- goal: str,
- tool_list: list[MCPTool],
- resoning_llm: ReasoningLLM = ReasoningLLM()) -> GoalEvaluationResult:
+ goal: str, tool_list: list[MCPTool], resoning_llm: ReasoningLLM = ReasoningLLM()
+ ) -> GoalEvaluationResult:
"""评估用户目标的可行性"""
# 获取推理结果
result = await MCPPlanner._get_reasoning_evaluation(goal, tool_list, resoning_llm)
- # 解析为结构化数据
- evaluation = await MCPPlanner._parse_evaluation_result(result)
-
# 返回评估结果
- return evaluation
+ return await MCPPlanner._parse_evaluation_result(result)
@staticmethod
async def _get_reasoning_evaluation(
- goal, tool_list: list[MCPTool],
- resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
+ goal, tool_list: list[MCPTool], resoning_llm: ReasoningLLM = ReasoningLLM()
+ ) -> str:
"""获取推理大模型的评估结果"""
template = _env.from_string(EVALUATE_GOAL)
prompt = template.render(
goal=goal,
tools=tool_list,
)
- result = await MCPPlanner.get_resoning_result(prompt, resoning_llm)
- return result
+ return await MCPPlanner.get_resoning_result(prompt, resoning_llm)
@staticmethod
async def _parse_evaluation_result(result: str) -> GoalEvaluationResult:
@@ -86,22 +84,23 @@ class MCPPlanner(McpBase):
async def get_flow_name(user_goal: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
"""获取当前流程的名称"""
- result = await MCPPlanner._get_reasoning_flow_name(user_goal, resoning_llm)
- return result
+ return await MCPPlanner._get_reasoning_flow_name(user_goal, resoning_llm)
@staticmethod
async def _get_reasoning_flow_name(user_goal: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
"""获取推理大模型的流程名称"""
template = _env.from_string(GENERATE_FLOW_NAME)
prompt = template.render(goal=user_goal)
- result = await MCPPlanner.get_resoning_result(prompt, resoning_llm)
- return result
+ return await MCPPlanner.get_resoning_result(prompt, resoning_llm)
@staticmethod
async def get_replan_start_step_index(
- user_goal: str, error_message: str, current_plan: MCPPlan | None = None,
- history: str = "",
- reasoning_llm: ReasoningLLM = ReasoningLLM()) -> RestartStepIndex:
+ user_goal: str,
+ error_message: str,
+ current_plan: MCPPlan | None = None,
+ history: str = "",
+ reasoning_llm: ReasoningLLM = ReasoningLLM(),
+ ) -> RestartStepIndex:
"""获取重新规划的步骤索引"""
# 获取推理结果
template = _env.from_string(GET_REPLAN_START_STEP_INDEX)
@@ -122,21 +121,33 @@ class MCPPlanner(McpBase):
@staticmethod
async def create_plan(
- user_goal: str, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan | None = None,
- tool_list: list[MCPTool] = [],
- max_steps: int = 6, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> MCPPlan:
+ user_goal: str,
+ is_replan: bool = False,
+ error_message: str = "",
+ current_plan: MCPPlan | None = None,
+ tool_list: list[MCPTool] = [],
+ max_steps: int = 6,
+ reasoning_llm: ReasoningLLM = ReasoningLLM(),
+ ) -> MCPPlan:
"""规划下一步的执行流程,并输出"""
# 获取推理结果
- result = await MCPPlanner._get_reasoning_plan(user_goal, is_replan, error_message, current_plan, tool_list, max_steps, reasoning_llm)
+ result = await MCPPlanner._get_reasoning_plan(
+ user_goal, is_replan, error_message, current_plan, tool_list, max_steps, reasoning_llm,
+ )
# 解析为结构化数据
return await MCPPlanner._parse_plan_result(result, max_steps)
@staticmethod
async def _get_reasoning_plan(
- user_goal: str, is_replan: bool = False, error_message: str = "", current_plan: MCPPlan | None = None,
- tool_list: list[MCPTool] = [],
- max_steps: int = 10, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
+ user_goal: str,
+ is_replan: bool = False,
+ error_message: str = "",
+ current_plan: MCPPlan | None = None,
+ tool_list: list[MCPTool] = [],
+ max_steps: int = 10,
+ reasoning_llm: ReasoningLLM = ReasoningLLM(),
+ ) -> str:
"""获取推理大模型的结果"""
# 格式化Prompt
tool_ids = [tool.id for tool in tool_list]
@@ -156,8 +167,7 @@ class MCPPlanner(McpBase):
tools=tool_list,
max_num=max_steps,
)
- result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
- return result
+ return await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
@staticmethod
async def _parse_plan_result(result: str, max_steps: int) -> MCPPlan:
@@ -171,8 +181,8 @@ class MCPPlanner(McpBase):
@staticmethod
async def create_next_step(
- goal: str, history: str, tools: list[MCPTool],
- reasoning_llm: ReasoningLLM = ReasoningLLM()) -> Step:
+ goal: str, history: str, tools: list[MCPTool], reasoning_llm: ReasoningLLM = ReasoningLLM()
+ ) -> Step:
"""创建下一步的执行步骤"""
# 获取推理结果
template = _env.from_string(GEN_STEP)
@@ -192,12 +202,18 @@ class MCPPlanner(McpBase):
@staticmethod
async def tool_skip(
- task: Task, step_id: str, step_name: str, step_instruction: str, step_content: str,
- reasoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolSkip:
+ task: Task,
+ step_id: str,
+ step_name: str,
+ step_instruction: str,
+ step_content: str,
+ reasoning_llm: ReasoningLLM = ReasoningLLM(),
+ ) -> ToolSkip:
"""判断当前步骤是否需要跳过"""
# 获取推理结果
template = _env.from_string(TOOL_SKIP)
from apps.scheduler.mcp_agent.host import MCPHost
+
history = await MCPHost.assemble_memory(task)
prompt = template.render(
step_id=step_id,
@@ -205,7 +221,7 @@ class MCPPlanner(McpBase):
step_instruction=step_instruction,
step_content=step_content,
history=history,
- goal=task.runtime.question
+ goal=task.runtime.question,
)
result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
@@ -217,22 +233,22 @@ class MCPPlanner(McpBase):
@staticmethod
async def get_tool_risk(
- tool: MCPTool, input_parm: dict[str, Any],
- additional_info: str = "", resoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolRisk:
+ tool: MCPTool,
+ input_parm: dict[str, Any],
+ additional_info: str = "",
+ resoning_llm: ReasoningLLM = ReasoningLLM(),
+ ) -> ToolRisk:
"""获取MCP工具的风险评估结果"""
# 获取推理结果
result = await MCPPlanner._get_reasoning_risk(tool, input_parm, additional_info, resoning_llm)
- # 解析为结构化数据
- risk = await MCPPlanner._parse_risk_result(result)
-
# 返回风险评估结果
- return risk
+ return await MCPPlanner._parse_risk_result(result)
@staticmethod
async def _get_reasoning_risk(
- tool: MCPTool, input_param: dict[str, Any],
- additional_info: str, resoning_llm: ReasoningLLM) -> str:
+ tool: MCPTool, input_param: dict[str, Any], additional_info: str, resoning_llm: ReasoningLLM,
+ ) -> str:
"""获取推理大模型的风险评估结果"""
template = _env.from_string(RISK_EVALUATE)
prompt = template.render(
@@ -241,8 +257,7 @@ class MCPPlanner(McpBase):
input_param=input_param,
additional_info=additional_info,
)
- result = await MCPPlanner.get_resoning_result(prompt, resoning_llm)
- return result
+ return await MCPPlanner.get_resoning_result(prompt, resoning_llm)
@staticmethod
async def _parse_risk_result(result: str) -> ToolRisk:
@@ -254,9 +269,13 @@ class MCPPlanner(McpBase):
@staticmethod
async def _get_reasoning_tool_execute_error_type(
- user_goal: str, current_plan: MCPPlan,
- tool: MCPTool, input_param: dict[str, Any],
- error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
+ user_goal: str,
+ current_plan: MCPPlan,
+ tool: MCPTool,
+ input_param: dict[str, Any],
+ error_message: str,
+ reasoning_llm: ReasoningLLM = ReasoningLLM(),
+ ) -> str:
"""获取推理大模型的工具执行错误类型"""
template = _env.from_string(TOOL_EXECUTE_ERROR_TYPE_ANALYSIS)
prompt = template.render(
@@ -267,8 +286,7 @@ class MCPPlanner(McpBase):
input_param=input_param,
error_message=error_message,
)
- result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
- return result
+ return await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
@staticmethod
async def _parse_tool_execute_error_type_result(result: str) -> ToolExcutionErrorType:
@@ -280,22 +298,32 @@ class MCPPlanner(McpBase):
@staticmethod
async def get_tool_execute_error_type(
- user_goal: str, current_plan: MCPPlan,
- tool: MCPTool, input_param: dict[str, Any],
- error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> ToolExcutionErrorType:
+ user_goal: str,
+ current_plan: MCPPlan,
+ tool: MCPTool,
+ input_param: dict[str, Any],
+ error_message: str,
+ reasoning_llm: ReasoningLLM = ReasoningLLM(),
+ ) -> ToolExcutionErrorType:
"""获取MCP工具执行错误类型"""
# 获取推理结果
result = await MCPPlanner._get_reasoning_tool_execute_error_type(
- user_goal, current_plan, tool, input_param, error_message, reasoning_llm)
- error_type = await MCPPlanner._parse_tool_execute_error_type_result(result)
+ user_goal, current_plan, tool, input_param, error_message, reasoning_llm,
+ )
# 返回工具执行错误类型
- return error_type
+ return await MCPPlanner._parse_tool_execute_error_type_result(result)
+
@staticmethod
async def is_param_error(
- goal: str, history: str, error_message: str, tool: MCPTool, step_description: str, input_params: dict
- [str, Any],
- reasoning_llm: ReasoningLLM = ReasoningLLM()) -> IsParamError:
+ goal: str,
+ history: str,
+ error_message: str,
+ tool: MCPTool,
+ step_description: str,
+ input_params: dict[str, Any],
+ reasoning_llm: ReasoningLLM = ReasoningLLM(),
+ ) -> IsParamError:
"""判断错误信息是否是参数错误"""
tmplate = _env.from_string(IS_PARAM_ERROR)
prompt = tmplate.render(
@@ -316,8 +344,8 @@ class MCPPlanner(McpBase):
@staticmethod
async def change_err_message_to_description(
- error_message: str, tool: MCPTool, input_params: dict[str, Any],
- reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
+ error_message: str, tool: MCPTool, input_params: dict[str, Any], reasoning_llm: ReasoningLLM = ReasoningLLM()
+ ) -> str:
"""将错误信息转换为工具描述"""
template = _env.from_string(CHANGE_ERROR_MESSAGE_TO_DESCRIPTION)
prompt = template.render(
@@ -332,9 +360,8 @@ class MCPPlanner(McpBase):
@staticmethod
async def get_missing_param(
- tool: MCPTool,
- input_param: dict[str, Any],
- error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()) -> list[str]:
+ tool: MCPTool, input_param: dict[str, Any], error_message: str, reasoning_llm: ReasoningLLM = ReasoningLLM()
+ ) -> list[str]:
"""获取缺失的参数"""
slot = Slot(schema=tool.input_schema)
template = _env.from_string(GET_MISSING_PARAMS)
@@ -353,8 +380,8 @@ class MCPPlanner(McpBase):
@staticmethod
async def generate_answer(
- user_goal: str, memory: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> AsyncGenerator[
- str, None]:
+ user_goal: str, memory: str, resoning_llm: ReasoningLLM = ReasoningLLM()
+ ) -> AsyncGenerator[str, None]:
"""生成最终回答"""
template = _env.from_string(FINAL_ANSWER)
prompt = template.render(
diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py
index 824ece8a35dfb3b02afb0e5bd5602d6f5d0d1342..25dbaff737804b00cec63479ed5e9ae8a5b9decd 100644
--- a/apps/scheduler/mcp_agent/prompt.py
+++ b/apps/scheduler/mcp_agent/prompt.py
@@ -62,6 +62,7 @@ MCP_SELECT = dedent(r"""
### 请一步一步思考:
""")
+
TOOL_SELECT = dedent(r"""
你是一个乐于助人的智能助手。
你的任务是:根据当前目标,附加信息,选择最合适的MCP工具。
@@ -117,8 +118,7 @@ TOOL_SELECT = dedent(r"""
## 附加信息
{{additional_info}}
# 输出
- """
- )
+ """)
EVALUATE_GOAL = dedent(r"""
你是一个计划评估器。
@@ -142,7 +142,8 @@ EVALUATE_GOAL = dedent(r"""
- mysql_analyzer 分析MySQL数据库性能
- performance_tuner 调优数据库性能
- - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
+ - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。\
+
# 附加信息
@@ -153,7 +154,8 @@ EVALUATE_GOAL = dedent(r"""
```json
{
"can_complete": true,
- "resoning": "当前的工具集合中包含mysql_analyzer和performance_tuner,能够完成对MySQL数据库的性能分析和调优,因此可以完成用户的目标。"
+ "resoning": "当前的工具集合中包含mysql_analyzer和performance_tuner,能够完成对MySQL数据库的性能分析和调优,\
+因此可以完成用户的目标。"
}
```
@@ -171,6 +173,7 @@ EVALUATE_GOAL = dedent(r"""
{{additional_info}}
""")
+
GENERATE_FLOW_NAME = dedent(r"""
你是一个智能助手,你的任务是根据用户的目标,生成一个合适的流程名称。
@@ -190,6 +193,7 @@ GENERATE_FLOW_NAME = dedent(r"""
{{goal}}
# 输出
""")
+
GET_REPLAN_START_STEP_INDEX = dedent(r"""
你是一个智能助手,你的任务是根据用户的目标、报错信息和当前计划和历史,获取重新规划的步骤起始索引。
@@ -240,7 +244,8 @@ GET_REPLAN_START_STEP_INDEX = dedent(r"""
# 输出
{
"start_index": 0,
- "reasoning": "当前计划的第一步就失败了,报错信息显示curl命令未找到,可能是因为没有安装curl工具,因此需要从第一步重新规划。"
+ "reasoning": "当前计划的第一步就失败了,报错信息显示curl命令未找到,可能是因为没有安装curl工具,\
+因此需要从第一步重新规划。"
}
# 现在开始获取重新规划的步骤起始索引:
# 目标
@@ -353,6 +358,7 @@ CREATE_PLAN = dedent(r"""
# 计划
""")
+
RECREATE_PLAN = dedent(r"""
你是一个计划重建器。
请根据用户的目标、当前计划和运行报错,重新生成一个计划。
@@ -402,7 +408,8 @@ RECREATE_PLAN = dedent(r"""
- command_generator 生成命令行指令
- tool_selector 选择合适的工具
- command_executor 执行命令行指令
- - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
+ - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。\
+
# 当前计划
```json
{
@@ -502,6 +509,7 @@ RECREATE_PLAN = dedent(r"""
# 重新生成的计划
""")
+
GEN_STEP = dedent(r"""
你是一个计划生成器。
请根据用户的目标、当前计划和历史,生成一个新的步骤。
@@ -529,7 +537,8 @@ GEN_STEP = dedent(r"""
- mcp_tool_1 mysql_analyzer;用于分析数据库性能/description>
- mcp_tool_2 文件存储工具;用于存储文件
- mcp_tool_3 mongoDB工具;用于操作MongoDB数据库
- - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
+ - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。\
+
# 输出
```json
@@ -560,10 +569,13 @@ GEN_STEP = dedent(r"""
- 得到数据:`{"weather": "晴", "temperature": "25°C"}`
# 工具
- - mcp_tool_4 maps_geo_planner;将详细的结构化地址转换为经纬度坐标。支持对地标性名胜景区、建筑物名称解析为经纬度坐标
+ - mcp_tool_4 maps_geo_planner;将详细的结构化地址转换为经纬度坐标。支持对地标性名胜景区、\
+建筑物名称解析为经纬度坐标
- mcp_tool_5 weather_query;天气查询,用于查询天气信息
- - mcp_tool_6 maps_direction_transit_integrated;根据用户起终点经纬度坐标规划综合各类公共(火车、公交、地铁)交通方式的通勤方案,并且返回通勤方案的数据,跨城场景下必须传起点城市与终点城市
- - Final Final;结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。
+ - mcp_tool_6 maps_direction_transit_integrated;根据用户起终点经纬度坐标规划综合各类\
+公共交通方式(火车、公交、地铁)的通勤方案,并且返回通勤方案的数据,跨城场景下必须传起点城市与终点城市
+ - Final Final;结束步骤,当执行到这一步时,表示计划执行结束,\
+所得到的结果将作为最终结果。
# 输出
```json
@@ -610,7 +622,8 @@ TOOL_SKIP = dedent(r"""
- 执行状态:成功
- 得到数据:`{"result": "success"}`
第3步:分析端口扫描结果
- - 调用工具 `mysql_analyzer`,并提供参数 `{"host": "192.168.1.1", "port": 3306, "username": "root", "password": "password"}`
+ - 调用工具 `mysql_analyzer`,并提供参数 `{"host": "192.168.1.1", "port": 3306, "username": "root",\
+ "password": "password"}`
- 执行状态:成功
- 得到数据:`{"performance": "good", "bottleneck": "none"}`
# 当前步骤
@@ -638,8 +651,8 @@ TOOL_SKIP = dedent(r"""
{{step_content}}
# 输出
- """
- )
+ """)
+
RISK_EVALUATE = dedent(r"""
你是一个工具执行计划评估器。
你的任务是根据当前工具的名称、描述和入参以及附加信息,判断当前工具执行的风险并输出提示。
@@ -673,7 +686,8 @@ RISK_EVALUATE = dedent(r"""
```json
{
"risk": "中",
- "reason": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。请确保在非生产环境中执行此操作。"
+ "reason": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。\
+请确保在非生产环境中执行此操作。"
}
```
# 工具
@@ -686,8 +700,8 @@ RISK_EVALUATE = dedent(r"""
# 附加信息
{{additional_info}}
# 输出
- """
- )
+ """)
+
# 根据当前计划和报错信息决定下一步执行,具体计划有需要用户补充工具入参、重计划当前步骤、重计划接下来的所有计划
TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r"""
你是一个计划决策器。
@@ -739,7 +753,8 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r"""
```json
{
"error_type": "decorrect_plan",
- "reason": "当前计划的第二步执行失败,报错信息显示nmap命令未找到,可能是因为没有安装nmap工具,因此需要重计划当前步骤。"
+ "reason": "当前计划的第二步执行失败,报错信息显示nmap命令未找到,可能是因为没有安装nmap工具,\
+因此需要重计划当前步骤。"
}
```
# 用户目标
@@ -756,8 +771,8 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r"""
# 工具运行报错
{{error_message}}
# 输出
- """
- )
+ """)
+
IS_PARAM_ERROR = dedent(r"""
你是一个计划执行专家,你的任务是判断当前的步骤执行失败是否是因为参数错误导致的,
如果是,请返回`true`,否则返回`false`。
@@ -816,8 +831,8 @@ IS_PARAM_ERROR = dedent(r"""
# 工具运行报错
{{error_message}}
# 输出
- """
- )
+ """)
+
# 将当前程序运行的报错转换为自然语言
CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r"""
你是一个智能助手,你的任务是将当前程序运行的报错转换为自然语言描述。
@@ -883,6 +898,7 @@ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r"""
{{error_message}}
# 输出
""")
+
# 获取缺失的参数的json结构体
GET_MISSING_PARAMS = dedent(r"""
你是一个工具参数获取器。
@@ -965,8 +981,8 @@ GET_MISSING_PARAMS = dedent(r"""
# 运行报错
{{error_message}}
# 输出
- """
- )
+ """)
+
GEN_PARAMS = dedent(r"""
你是一个工具参数生成器。
你的任务是根据总的目标、阶段性的目标、工具信息、工具入参的schema和背景信息生成工具的入参。
@@ -1040,8 +1056,7 @@ GEN_PARAMS = dedent(r"""
# 背景信息
{{background_info}}
# 输出
- """
- )
+ """)
REPAIR_PARAMS = dedent(r"""
你是一个工具参数修复器。
@@ -1130,8 +1145,8 @@ REPAIR_PARAMS = dedent(r"""
# 补充的参数描述
{{params_description}}
# 输出
- """
- )
+ """)
+
FINAL_ANSWER = dedent(r"""
综合理解计划执行结果和背景信息,向用户报告目标的完成情况。
diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py
index 075e08f00cb2b5a2976d5372c623193b04c42412..d287b1139d334cb19faacbedaba5eaefa0ec7eb3 100644
--- a/apps/scheduler/mcp_agent/select.py
+++ b/apps/scheduler/mcp_agent/select.py
@@ -3,28 +3,16 @@
import logging
import random
+
from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
-from typing import AsyncGenerator, Any
-from apps.llm.function import JsonGenerator
-from apps.llm.reasoning import ReasoningLLM
-from apps.common.lance import LanceDB
-from apps.common.mongo import MongoDB
-from apps.llm.embedding import Embedding
-from apps.llm.function import FunctionLLM
from apps.llm.reasoning import ReasoningLLM
from apps.llm.token import TokenCalculator
-from apps.scheduler.mcp_agent.base import McpBase
+from apps.scheduler.mcp_agent.base import MCPBase
from apps.scheduler.mcp_agent.prompt import TOOL_SELECT
-from apps.schemas.mcp import (
- BaseModel,
- MCPCollection,
- MCPSelectResult,
- MCPTool,
- MCPToolIdsSelectResult
-)
-from apps.common.config import Config
+from apps.schemas.mcp import MCPTool, MCPToolIdsSelectResult
+
logger = logging.getLogger(__name__)
_env = SandboxedEnvironment(
@@ -38,7 +26,7 @@ FINAL_TOOL_ID = "FIANL"
SUMMARIZE_TOOL_ID = "SUMMARIZE"
-class MCPSelector(McpBase):
+class MCPSelector(MCPBase):
"""MCP选择器"""
@staticmethod
@@ -68,9 +56,9 @@ class MCPSelector(McpBase):
tokens = token_calculator.calculate_token_length(
messages=[{"role": "user", "content": template.render(
goal=goal, tools=[tool],
- additional_info=additional_info
+ additional_info=additional_info,
)}],
- pure_text=True
+ pure_text=True,
)
if tokens > max_tokens:
continue
@@ -90,7 +78,11 @@ class MCPSelector(McpBase):
schema["properties"]["tool_ids"]["items"] = {}
# 将enum添加到items中,限制数组元素的可选值
schema["properties"]["tool_ids"]["items"]["enum"] = [tool.id for tool in sub_tools]
- result = await MCPSelector.get_resoning_result(template.render(goal=goal, tools=sub_tools, additional_info="请根据目标选择对应的工具"), reasoning_llm)
+ result = await MCPSelector.get_resoning_result(
+ template.render(
+ goal=goal, tools=sub_tools, additional_info="请根据目标选择对应的工具",
+ ), reasoning_llm,
+ )
result = await MCPSelector._parse_result(result, schema)
try:
result = MCPToolIdsSelectResult.model_validate(result)
diff --git a/apps/scheduler/pool/mcp/client.py b/apps/scheduler/pool/mcp/client.py
index 0ced05e8cc68bb773cfe61eb9268a4f8baab8fa5..562a1056e2f9355e1cab82f9f0a30bb859bb74bd 100644
--- a/apps/scheduler/pool/mcp/client.py
+++ b/apps/scheduler/pool/mcp/client.py
@@ -129,12 +129,13 @@ class MCPClient:
done, pending = await asyncio.wait(
[asyncio.create_task(self.ready_sign.wait()),
asyncio.create_task(self.error_sign.wait())],
- return_when=asyncio.FIRST_COMPLETED
+ return_when=asyncio.FIRST_COMPLETED,
)
if self.error_sign.is_set():
self.status = MCPStatus.ERROR
- logger.error("[MCPClient] MCP %s:初始化失败", mcp_id)
- raise Exception(f"MCP {mcp_id} 初始化失败")
+ error = f"MCP {mcp_id} 初始化失败"
+ logger.error("[MCPClient] %s", error)
+ raise RuntimeError(error)
# 获取工具列表
self.tools = (await self.client.list_tools()).tools
@@ -148,5 +149,5 @@ class MCPClient:
self.stop_sign.set()
try:
await self.task
- except Exception as e:
+ except Exception as e: # noqa: BLE001
logger.warning("[MCPClient] MCP %s:停止时发生异常:%s", self.mcp_id, e)
diff --git a/apps/scheduler/pool/mcp/install.py b/apps/scheduler/pool/mcp/install.py
index 2b15cd690bfd8b1939326728647e27ab517f89ff..b694eff35fe96471556238ddd206b4844c611552 100644
--- a/apps/scheduler/pool/mcp/install.py
+++ b/apps/scheduler/pool/mcp/install.py
@@ -1,11 +1,11 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""MCP 安装"""
-from asyncio import subprocess
-from typing import TYPE_CHECKING
import logging
-import os
import shutil
+from asyncio import subprocess
+from typing import TYPE_CHECKING
+
from apps.constants import MCP_PATH
if TYPE_CHECKING:
@@ -27,31 +27,31 @@ async def install_uvx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer
:rtype: MCPServerStdioConfig
:raises ValueError: 未找到MCP Server对应的Python包
"""
- uv_path = shutil.which('uv')
+ uv_path = shutil.which("uv")
if uv_path is None:
error = "[Installer] 未找到uv命令,请先安装uv包管理器: pip install uv"
- logging.error(error)
- raise Exception(error)
+ logger.error(error)
+ raise RuntimeError(error)
# 找到包名
package = None
for arg in config.args:
if not arg.startswith("-"):
package = arg
break
- logger.error(f"[Installer] MCP包名: {package}")
+ logger.error("[Installer] MCP包名: %s", package)
if not package:
print("[Installer] 未找到包名") # noqa: T201
return None
# 创建文件夹
mcp_path = MCP_PATH / "template" / mcp_id / "project"
- logger.error(f"[Installer] MCP安装路径: {mcp_path}")
+ logger.error("[Installer] MCP安装路径: %s", mcp_path)
await mcp_path.mkdir(parents=True, exist_ok=True)
# 如果有pyproject.toml文件,则使用sync
flag = await (mcp_path / "pyproject.toml").exists()
- logger.error(f"[Installer] MCP安装标志: {flag}")
+ logger.error("[Installer] MCP安装标志: %s", flag)
if await (mcp_path / "pyproject.toml").exists():
shell_command = f"{uv_path} venv; {uv_path} sync --index-url https://pypi.tuna.tsinghua.edu.cn/simple --active --no-install-project --no-cache"
- logger.error(f"[Installer] MCP安装命令: {shell_command}")
+ logger.error("[Installer] MCP安装命令: %s", shell_command)
pipe = await subprocess.create_subprocess_shell(
(
f"{uv_path} venv; "
@@ -73,7 +73,7 @@ async def install_uvx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer
if "run" not in config.args:
config.args = ["run", *config.args]
config.auto_install = False
- logger.error(f"[Installer] MCP安装配置更新成功: {config}")
+ logger.error("[Installer] MCP安装配置更新成功: %s", config)
return config
# 否则,初始化uv项目
@@ -117,11 +117,11 @@ async def install_npx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer
:rtype: MCPServerStdioConfig
:raises ValueError: 未找到MCP Server对应的npm包
"""
- npm_path = shutil.which('npm')
+ npm_path = shutil.which("npm")
if npm_path is None:
error = "[Installer] 未找到npm命令,请先安装Node.js和npm"
- logging.error(error)
- raise Exception(error)
+ logger.error(error)
+ raise RuntimeError(error)
# 查找package name
package = None
for arg in config.args:
diff --git a/apps/scheduler/pool/mcp/pool.py b/apps/scheduler/pool/mcp/pool.py
index bf0320f429a9ef45864ba6548b1bb28e3d874b59..cb76864c68823d3f3a0ce69a36b446c7de3747c0 100644
--- a/apps/scheduler/pool/mcp/pool.py
+++ b/apps/scheduler/pool/mcp/pool.py
@@ -21,7 +21,7 @@ class MCPPool(metaclass=SingletonMeta):
"""初始化MCP池"""
self.pool = {}
- async def _init_mcp(self, mcp_id: str, user_sub: str) -> MCPClient | None:
+ async def init_mcp(self, mcp_id: str, user_sub: str) -> MCPClient | None:
"""初始化MCP池"""
config_path = MCP_USER_PATH / user_sub / mcp_id / "config.json"
flag = (await config_path.exists())
@@ -69,7 +69,7 @@ class MCPPool(metaclass=SingletonMeta):
return None
# 初始化进程
- item = await self._init_mcp(mcp_id, user_sub)
+ item = await self.init_mcp(mcp_id, user_sub)
if item is None:
return None
diff --git a/apps/schemas/message.py b/apps/schemas/message.py
index 5b465ee54321bbd4c649753911025bff41840186..17a569ca2cf710b206ddbc1afb655bf4675af7fd 100644
--- a/apps/schemas/message.py
+++ b/apps/schemas/message.py
@@ -1,16 +1,18 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""队列中的消息结构"""
-from typing import Any
from datetime import UTC, datetime
+from typing import Any
+
from pydantic import BaseModel, Field
from apps.schemas.enum_var import EventType, FlowStatus, StepStatus
from apps.schemas.record import RecordMetadata
-class param(BaseModel):
+class FlowParams(BaseModel):
"""流执行过程中的参数补充"""
+
content: dict[str, Any] = Field(default={}, description="流执行过程中的参数补充内容")
description: str = Field(default="", description="流执行过程中的参数补充描述")
diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py
index 3fd5a67fa29af6e1685d55285ee1ebd458cabce2..d04f6fd8a31b18e231578056d7de94a81bb11980 100644
--- a/apps/schemas/request_data.py
+++ b/apps/schemas/request_data.py
@@ -10,7 +10,7 @@ from apps.schemas.appcenter import AppData
from apps.schemas.enum_var import CommentType
from apps.schemas.flow_topology import FlowItem
from apps.schemas.mcp import MCPType
-from apps.schemas.message import param
+from apps.schemas.message import FlowParams
class RequestDataApp(BaseModel):
@@ -47,7 +47,7 @@ class RequestData(BaseModel):
app: RequestDataApp | None = Field(default=None, description="应用")
debug: bool = Field(default=False, description="是否调试")
task_id: str | None = Field(default=None, alias="taskId", description="任务ID")
- params: param | bool | None = Field(default=None, description="流执行过程中的参数补充", alias="params")
+ params: FlowParams | bool | None = Field(default=None, description="流执行过程中的参数补充", alias="params")
class QuestionBlacklistRequest(BaseModel):