From 56318ea987d9c8479eec012da97aa57e8a37f296 Mon Sep 17 00:00:00 2001 From: zxstty Date: Sun, 12 Oct 2025 10:52:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84mcp=E6=A3=80=E7=B4=A2?= =?UTF-8?q?=E5=92=8Cagent=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/executor/agent.py | 49 ++++++++++++++++++++------------ apps/services/mcp_service.py | 2 +- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 4b3038b7..145fc219 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -1,6 +1,6 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP Agent执行器""" -from datetime import datetime,UTC +from datetime import datetime, UTC import logging import uuid @@ -57,6 +57,7 @@ class MCPAgentExecutor(BaseExecutor): default=ReasoningLLM(), description="推理大模型", ) + app_owner: str = Field(default="", description="应用所有者") async def update_tokens(self) -> None: """更新令牌数""" @@ -79,11 +80,12 @@ class MCPAgentExecutor(BaseExecutor): mcp_ids = app.mcp_service for mcp_id in mcp_ids: mcp_service = await MCPServiceManager.get_mcp_service(mcp_id) - if self.task.ids.user_sub not in mcp_service.activated: - await MCPServiceManager.active_mcpservice(self.task.ids.user_sub, mcp_id) - + if self.app_owner not in mcp_service.activated: + logger.warning("[MCPAgentExecutor] MCP服务 %s 未被应用所有者 %s 激活,跳过", + mcp_service.name, self.app_owner) + continue self.mcp_list.append(mcp_service) - await self.mcp_pool._init_mcp(mcp_id, self.task.ids.user_sub) + await self.mcp_pool._init_mcp(mcp_id, self.app_owner) for tool in mcp_service.tools: self.tools[tool.id] = tool self.tool_list.extend(mcp_service.tools) @@ -92,7 +94,8 @@ class MCPAgentExecutor(BaseExecutor): id=FINAL_TOOL_ID, name="Final Tool", description="结束流程的工具", mcp_id="", input_schema={}, ) self.tool_list.append( - MCPTool(id=FINAL_TOOL_ID, name="Final Tool", description="结束流程的工具", mcp_id="", input_schema={}), + MCPTool(id=FINAL_TOOL_ID, name="Final Tool", + description="结束流程的工具", mcp_id="", input_schema={}), ) self.tools[SELF_DESC_TOOL_ID] = MCPTool( id=SELF_DESC_TOOL_ID, @@ -174,7 +177,8 @@ class MCPAgentExecutor(BaseExecutor): ) await self.update_tokens() await self.push_message( - EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(exclude_none=True, by_alias=True), + EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( + exclude_none=True, by_alias=True), ) await self.push_message(EventType.FLOW_STOP, {}) self.task.state.flow_status = FlowStatus.WAITING @@ -191,7 +195,8 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data={}, output_data={}, - ex_data=confirm_message.model_dump(exclude_none=True, by_alias=True), + ex_data=confirm_message.model_dump( + exclude_none=True, by_alias=True), ) ) @@ -212,17 +217,19 @@ class MCPAgentExecutor(BaseExecutor): } result_exchange = False else: - mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub)) + mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.app_owner)) output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input) except anyio.ClosedResourceError: - logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcp_id) - await self.mcp_pool.stop(mcp_tool.mcp_id, self.task.ids.user_sub) - await self.mcp_pool._init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub) + logger.exception( + "[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcp_id) + await self.mcp_pool.stop(mcp_tool.mcp_id, self.app_owner) + await self.mcp_pool._init_mcp(mcp_tool.mcp_id, self.app_owner) self.task.state.step_status = StepStatus.ERROR return except Exception as e: import traceback - logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误: %s", mcp_tool.name, traceback.format_exc()) + logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误: %s", + mcp_tool.name, traceback.format_exc()) self.task.state.step_status = StepStatus.ERROR self.task.state.error_message = str(e) return @@ -281,7 +288,8 @@ class MCPAgentExecutor(BaseExecutor): language=self.task.language, ) await self.push_message( - EventType.STEP_WAITING_FOR_PARAM, data={"message": error_message, "params": params_with_null} + EventType.STEP_WAITING_FOR_PARAM, data={ + "message": error_message, "params": params_with_null} ) await self.push_message(EventType.FLOW_STOP, data={}) self.task.state.flow_status = FlowStatus.WAITING @@ -320,7 +328,8 @@ class MCPAgentExecutor(BaseExecutor): if step.tool_id in self.tools.keys(): break except Exception as e: - logger.warning("[MCPAgentExecutor] 获取下一步失败,重试中: %s", str(e)) + logger.warning( + "[MCPAgentExecutor] 获取下一步失败,重试中: %s", str(e)) if step is None or step.tool_id not in self.tools.keys(): step = Step( tool_id=FINAL_TOOL_ID, @@ -514,6 +523,7 @@ class MCPAgentExecutor(BaseExecutor): async def run(self) -> None: """执行MCP Agent的主逻辑""" # 初始化MCP服务 + self.app_owner = (await AppCenterManager.fetch_app_data_by_id(self.agent_id)).author await self.load_state() await self.load_mcp() data = {} @@ -529,7 +539,8 @@ class MCPAgentExecutor(BaseExecutor): ) user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) if user_info.auto_execute: - data = flow_risk.model_dump(exclude_none=True, by_alias=True) + data = flow_risk.model_dump( + exclude_none=True, by_alias=True) await TaskManager.save_task(self.task.id, self.task) await self.get_next_step() except Exception as e: @@ -603,7 +614,9 @@ class MCPAgentExecutor(BaseExecutor): finally: for mcp_service in self.mcp_list: try: - await self.mcp_pool.stop(mcp_service.id, self.task.ids.user_sub) + owner_sub = (await AppCenterManager.fetch_app_data_by_id(self.agent_id)).author + await self.mcp_pool.stop(mcp_service.id, self.app_owner) except Exception as e: import traceback - logger.error("[MCPAgentExecutor] 停止MCP客户端时发生错误: %s", traceback.format_exc()) + logger.error( + "[MCPAgentExecutor] 停止MCP客户端时发生错误: %s", traceback.format_exc()) diff --git a/apps/services/mcp_service.py b/apps/services/mcp_service.py index f6fec0eb..a80c5f7a 100644 --- a/apps/services/mcp_service.py +++ b/apps/services/mcp_service.py @@ -183,7 +183,7 @@ class MCPServiceManager: mcpservice_collection = MongoDB().get_collection("mcp") # 分页查询 skip = (page - 1) * SERVICE_PAGE_SIZE - db_mcpservices = await mcpservice_collection.find(search_conditions).skip(skip).limit( + db_mcpservices = await mcpservice_collection.find(search_conditions).sort("_id", -1).skip(skip).limit( SERVICE_PAGE_SIZE, ).to_list() # 如果未找到,返回空列表 -- Gitee