From 4068ee1c1234262d05244cefbd11912a8cb21ca6 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Mon, 11 Aug 2025 11:51:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9B=9E=E5=90=88agent=E5=88=86=E6=94=AF?= =?UTF-8?q?=E6=94=B9=E5=8A=A8=E5=88=B0=20!640=20(1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/constants.py | 2 +- apps/llm/embedding.py | 26 ++- apps/models/mcp.py | 4 +- apps/models/user.py | 2 +- apps/routers/appcenter.py | 3 +- apps/routers/user.py | 6 +- apps/scheduler/executor/agent.py | 289 ++++++++++++++++++----------- apps/scheduler/mcp_agent/base.py | 63 +++++++ apps/scheduler/mcp_agent/host.py | 50 +---- apps/scheduler/mcp_agent/plan.py | 8 +- apps/scheduler/mcp_agent/prompt.py | 86 +++++---- apps/schemas/appcenter.py | 16 +- apps/schemas/mcp.py | 2 + apps/schemas/message.py | 2 +- apps/schemas/request_data.py | 4 +- apps/schemas/response_data.py | 11 +- 16 files changed, 336 insertions(+), 238 deletions(-) create mode 100644 apps/scheduler/mcp_agent/base.py diff --git a/apps/constants.py b/apps/constants.py index 320888bd..1407f561 100644 --- a/apps/constants.py +++ b/apps/constants.py @@ -11,7 +11,7 @@ from .common.config import config # 新对话默认标题 NEW_CHAT = "新对话" # 滑动窗口限流 默认窗口期 -SLIDE_WINDOW_TIME = 60 +SLIDE_WINDOW_TIME = 600 # OIDC 访问Token 过期时间(分钟) OIDC_ACCESS_TOKEN_EXPIRE_TIME = 30 # OIDC 刷新Token 过期时间(分钟) diff --git a/apps/llm/embedding.py b/apps/llm/embedding.py index 89c3cf48..da1fad1a 100644 --- a/apps/llm/embedding.py +++ b/apps/llm/embedding.py @@ -1,9 +1,13 @@ """Embedding模型""" +import logging + import httpx from apps.common.config import config +logger = logging.getLogger(__name__) + class Embedding: """Embedding模型""" @@ -42,6 +46,7 @@ class Embedding: json = response.json() return [item["embedding"] for item in json["data"]] + @classmethod async def _get_tei_embedding(cls, text: list[str]) -> list[list[float]]: """访问TEI兼容的Embedding API,获得向量化数据""" @@ -67,6 +72,7 @@ class Embedding: return result + @classmethod async def get_embedding(cls, text: list[str]) -> list[list[float]]: """ @@ -75,10 +81,18 @@ class Embedding: :param text: 待向量化文本(多条文本组成List) :return: 文本对应的向量(顺序与text一致,也为List) """ - if config.embedding.type == "openai": - return await cls._get_openai_embedding(text) - if config.embedding.type == "mindie": - return await cls._get_tei_embedding(text) + try: + if Config().get_config().embedding.type == "openai": + return await cls._get_openai_embedding(text) + if Config().get_config().embedding.type == "mindie": + return await cls._get_tei_embedding(text) - err = f"不支持的Embedding API类型: {config.embedding.type}" - raise ValueError(err) + err = f"不支持的Embedding API类型: {Config().get_config().embedding.type}" + raise ValueError(err) + except Exception as e: + err = f"获取Embedding失败: {e}" + logger.error(err) + rt = [] + for i in range(len(text)): + rt.append([0.0]*1024) + return rt diff --git a/apps/models/mcp.py b/apps/models/mcp.py index 984cd049..c573585d 100644 --- a/apps/models/mcp.py +++ b/apps/models/mcp.py @@ -16,7 +16,9 @@ from .base import Base class MCPInstallStatus(str, PyEnum): """MCP 服务状态""" + INIT = "init" INSTALLING = "installing" + CANCELLED = "cancelled" READY = "ready" FAILED = "failed" @@ -52,7 +54,7 @@ class MCPInfo(Base): ) """MCP 更新时间""" status: Mapped[MCPInstallStatus] = mapped_column( - Enum(MCPInstallStatus), default=MCPInstallStatus.INSTALLING, nullable=False, + Enum(MCPInstallStatus), default=MCPInstallStatus.INIT, nullable=False, ) """MCP 状态""" mcpType: Mapped[MCPType] = mapped_column( # noqa: N815 diff --git a/apps/models/user.py b/apps/models/user.py index 56b352ac..22e5fcba 100644 --- a/apps/models/user.py +++ b/apps/models/user.py @@ -38,7 +38,7 @@ class User(Base): """用户选择的知识库的ID""" defaultLLM: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), default=None, nullable=True) # noqa: N815 """用户选择的大模型ID""" - autoExecute: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) # noqa: N815 + autoExecute: Mapped[bool | None] = mapped_column(Boolean, default=False, nullable=True) # noqa: N815 """Agent是否自动执行""" diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index d0893a08..e60981c9 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -10,10 +10,11 @@ from fastapi.responses import JSONResponse from apps.dependency.user import verify_personal_token, verify_session from apps.exceptions import InstancePermissionError -from apps.schemas.appcenter import AppFlowInfo, AppMcpServiceInfo, AppPermissionData +from apps.schemas.appcenter import AppFlowInfo, AppPermissionData from apps.schemas.enum_var import AppFilterType, AppType from apps.schemas.request_data import ChangeFavouriteAppRequest, CreateAppRequest from apps.schemas.response_data import ( + AppMcpServiceInfo, BaseAppOperationMsg, BaseAppOperationRsp, ChangeFavouriteAppMsg, diff --git a/apps/routers/user.py b/apps/routers/user.py index 6451c4b6..1ddc3d3f 100644 --- a/apps/routers/user.py +++ b/apps/routers/user.py @@ -18,10 +18,10 @@ router = APIRouter( @router.get("") async def list_user( - request: Request, + request: Request, page_size: int = 10, page_num: int = 1, ) -> JSONResponse: """查询不包含当前用户的所有用户信息,返回给前端,用以进行应用权限设置""" - user_list = await UserManager.list_user() + user_list, total = await UserManager.list_user(page_size, page_num) user_info_list = [] for user in user_list: if user.userSub == request.state.user_sub: @@ -37,7 +37,7 @@ async def list_user( content=UserGetRsp( code=status.HTTP_200_OK, message="用户数据详细信息获取成功", - result=UserGetMsp(userInfoList=user_info_list), + result=UserGetMsp(userInfoList=user_info_list, total=total), ).model_dump(exclude_none=True, by_alias=True), ) diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 421e2db2..a72e3b72 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -3,16 +3,21 @@ import logging import uuid +from typing import Any +import anyio +from mcp.types import TextContent from pydantic import Field +from apps.llm.patterns.rewrite import QuestionRewrite from apps.llm.reasoning import ReasoningLLM from apps.scheduler.executor.base import BaseExecutor from apps.scheduler.mcp_agent.host import MCPHost from apps.scheduler.mcp_agent.plan import MCPPlanner from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID, MCPSelector from apps.scheduler.pool.mcp.client import MCPClient -from apps.schemas.enum_var import EventType, FlowStatus, StepStatus +from apps.scheduler.pool.mcp.pool import MCPPool +from apps.schemas.enum_var import EventType, FlowStatus, SpecialCallType, StepStatus from apps.schemas.mcp import ( ErrorType, GoalEvaluationResult, @@ -24,7 +29,7 @@ from apps.schemas.mcp import ( ToolRisk, ) from apps.schemas.message import param -from apps.schemas.task import FlowStepHistory +from apps.schemas.task import ExecutorState, FlowStepHistory, StepQueueItem from apps.services.appcenter import AppCenterManager from apps.services.mcp_service import MCPServiceManager from apps.services.task import TaskManager @@ -41,13 +46,11 @@ class MCPAgentExecutor(BaseExecutor): agent_id: str = Field(default="", description="Agent ID") agent_description: str = Field(default="", description="Agent描述") mcp_list: list[MCPCollection] = Field(description="MCP服务器列表", default=[]) - mcp_client: dict[str, MCPClient] = Field( - description="MCP客户端列表,key为mcp_id", default={}, - ) + mcp_pool: MCPPool = Field(description="MCP池", default=MCPPool()) tools: dict[str, MCPTool] = Field( description="MCP工具列表,key为tool_id", default={}, ) - params: param | None = Field( + params: param | bool | None = Field( default=None, description="流执行过程中的参数补充", alias="params", ) resoning_llm: ReasoningLLM = Field( @@ -85,7 +88,7 @@ class MCPAgentExecutor(BaseExecutor): continue self.mcp_list.append(mcp_service) - self.mcp_client[mcp_id] = await MCPHost.get_client(self.task.ids.user_sub, mcp_id) + await self.mcp_pool._init_mcp(mcp_id, self.task.ids.user_sub) for tool in mcp_service.tools: self.tools[tool.id] = tool @@ -94,9 +97,9 @@ class MCPAgentExecutor(BaseExecutor): error_message = "之前的计划遇到以下报错\n\n"+self.task.state.error_message else: error_message = "初始化计划" - tools = MCPSelector.select_top_tool( + tools = await MCPSelector.select_top_tool( self.task.runtime.question, list(self.tools.values()), - additional_info=error_message, top_n=40) + additional_info=error_message, top_n=40, reasoning_llm=self.resoning_llm) if is_replan: logger.info("[MCPAgentExecutor] 重新规划流程") if not start_index: @@ -104,7 +107,8 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.error_message, self.task.runtime.temporary_plans, self.resoning_llm) - current_plan = self.task.runtime.temporary_plans.plans[start_index:] + start_index = start_index.start_index + current_plan = MCPPlan(plans=self.task.runtime.temporary_plans.plans[start_index:]) error_message = self.task.state.error_message temporary_plans = await MCPPlanner.create_plan(self.task.runtime.question, is_replan=is_replan, @@ -114,17 +118,20 @@ class MCPAgentExecutor(BaseExecutor): max_steps=self.max_steps-start_index-1, reasoning_llm=self.resoning_llm ) - self.update_tokens() - self.push_message( + await self.update_tokens() + await self.push_message( EventType.STEP_CANCEL, - data={} + data={}, ) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: self.task.context[-1].step_status = StepStatus.CANCELLED - self.task.runtime.temporary_plans = self.task.runtime.temporary_plans.plans[:start_index] + temporary_plans.plans + self.task.runtime.temporary_plans.plans = self.task.runtime.temporary_plans.plans[ + : start_index] + temporary_plans.plans self.task.state.step_index = start_index else: start_index = 0 + logger.error( + f"各个字段的类型: {type(self.task.runtime.question)}, {type(tools)}, {type(self.max_steps)}, {type(self.resoning_llm)}") self.task.runtime.temporary_plans = await MCPPlanner.create_plan(self.task.runtime.question, tool_list=tools, max_steps=self.max_steps, reasoning_llm=self.resoning_llm) for i in range(start_index, len(self.task.runtime.temporary_plans.plans)): self.task.runtime.temporary_plans.plans[i].step_id = str(uuid.uuid4()) @@ -132,7 +139,9 @@ class MCPAgentExecutor(BaseExecutor): async def get_tool_input_param(self, is_first: bool) -> None: if is_first: # 获取第一个输入参数 - self.task.state.current_input = await MCPHost._get_first_input_params(self.tools[self.task.state.step_id], self.task.runtime.question, self.task) + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + mcp_tool = self.tools[tool_id] + self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task) else: # 获取后续输入参数 if isinstance(self.params, param): @@ -141,35 +150,40 @@ class MCPAgentExecutor(BaseExecutor): else: params = {} params_description = "" - self.task.state.current_input = await MCPHost._fill_params(self.tools[self.task.state.step_id], self.task.state.current_input, self.task.state.error_message, params, params_description) + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + mcp_tool = self.tools[tool_id] + self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.state.current_input, self.task.state.error_message, params, params_description) async def reset_step_to_index(self, start_index: int) -> None: """重置步骤到开始""" logger.info("[MCPAgentExecutor] 重置步骤到索引 %d", start_index) - if self.task.runtime.temporary_plans: + + if start_index < len(self.task.runtime.temporary_plans.plans): self.task.state.flow_status = FlowStatus.RUNNING self.task.state.step_id = self.task.runtime.temporary_plans.plans[start_index].step_id self.task.state.step_index = 0 - self.task.state.step_name = self.task.runtime.temporary_plans.plans[start_index].tool + self.task.state.step_name = self.tools[self.task.runtime.temporary_plans.plans[start_index].tool].name self.task.state.step_description = self.task.runtime.temporary_plans.plans[start_index].content - self.task.state.step_status = StepStatus.RUNNING + self.task.state.step_status = StepStatus.INIT self.task.state.retry_times = 0 else: - self.task.state.flow_status = FlowStatus.SUCCESS self.task.state.step_id = FINAL_TOOL_ID async def confirm_before_step(self) -> None: logger.info("[MCPAgentExecutor] 等待用户确认步骤 %d", self.task.state.step_index) # 发送确认消息 - confirm_message = await MCPPlanner.get_tool_risk(self.tools[self.task.state.step_id], self.task.state.current_input, "", self.resoning_llm) - self.update_tokens() - self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + mcp_tool = self.tools[tool_id] + confirm_message = await MCPPlanner.get_tool_risk(mcp_tool, self.task.state.current_input, "", self.resoning_llm) + await self.update_tokens() + await self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( exclude_none=True, by_alias=True)) - self.push_message(EventType.FLOW_STOP, {}) + await self.push_message(EventType.FLOW_STOP, {}) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.WAITING self.task.context.append( FlowStepHistory( + task_id=self.task.id, step_id=self.task.state.step_id, step_name=self.task.state.step_name, step_description=self.task.state.step_description, @@ -180,7 +194,7 @@ class MCPAgentExecutor(BaseExecutor): input_data={}, output_data={}, ex_data=confirm_message.model_dump(exclude_none=True, by_alias=True), - ) + ), ) async def run_step(self): @@ -189,69 +203,93 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.step_status = StepStatus.RUNNING logger.info("[MCPAgentExecutor] 执行步骤 %d", self.task.state.step_index) # 获取MCP客户端 - mcp_tool = self.tools[self.task.state.step_id] - mcp_client = self.mcp_client[mcp_tool.mcp_id] - if not mcp_client: - logger.error("[MCPAgentExecutor] MCP客户端未找到: %s", mcp_tool.mcp_id) - self.task.state.flow_status = FlowStatus.ERROR - error = "[MCPAgentExecutor] MCP客户端未找到: {}".format(mcp_tool.mcp_id) - self.task.state.error_message = error + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + mcp_tool = self.tools[tool_id] + mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub)) try: output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input) - self.update_tokens() - self.push_message( - EventType.STEP_INPUT, - self.task.state.current_input - ) - self.push_message( - EventType.STEP_OUTPUT, - output_params - ) - self.task.context.append( - FlowStepHistory( - step_id=self.task.state.step_id, - step_name=self.task.state.step_name, - step_description=self.task.state.step_description, - step_status=StepStatus.SUCCESS, - flow_id=self.task.state.flow_id, - flow_name=self.task.state.flow_name, - flow_status=self.task.state.flow_status, - input_data=self.task.state.current_input, - output_data=output_params, - ) - ) - self.task.state.step_status = StepStatus.SUCCESS + except anyio.ClosedResourceError as e: + import traceback + logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, traceback.format_exc()) + await self.mcp_pool.stop(mcp_tool.mcp_id, self.task.ids.user_sub) + await self.mcp_pool._init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub) + logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, str(e)) + self.task.state.step_status = StepStatus.ERROR + return except Exception as e: - logger.warning("[MCPAgentExecutor] 执行步骤 %s 失败: %s", mcp_tool.name, str(e)) import traceback - self.task.state.error_message = traceback.format_exc() + logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误: %s", mcp_tool.name, traceback.format_exc()) + self.task.state.step_status = StepStatus.ERROR + self.task.state.error_message = str(e) + return + if output_params.isError: + err = "" + for output in output_params.content: + if isinstance(output, TextContent): + err += output.text self.task.state.step_status = StepStatus.ERROR + self.task.state.error_message = err + return + message = "" + for output in output_params.content: + if isinstance(output, TextContent): + message += output.text + output_params = { + "message": message, + } + + await self.update_tokens() + await self.push_message( + EventType.STEP_INPUT, + self.task.state.current_input, + ) + await self.push_message( + EventType.STEP_OUTPUT, + output_params, + ) + self.task.context.append( + FlowStepHistory( + task_id=self.task.id, + step_id=self.task.state.step_id, + step_name=self.task.state.step_name, + step_description=self.task.state.step_description, + step_status=StepStatus.SUCCESS, + flow_id=self.task.state.flow_id, + flow_name=self.task.state.flow_name, + flow_status=self.task.state.flow_status, + input_data=self.task.state.current_input, + output_data=output_params, + ), + ) + self.task.state.step_status = StepStatus.SUCCESS async def generate_params_with_null(self) -> None: """生成参数补充""" - mcp_tool = self.tools[self.task.state.step_id] + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + mcp_tool = self.tools[tool_id] params_with_null = await MCPPlanner.get_missing_param( mcp_tool, self.task.state.current_input, self.task.state.error_message, - self.resoning_llm + self.resoning_llm, ) - self.update_tokens() - self.push_message( + await self.update_tokens() + await self.push_message( EventType.STEP_WAITING_FOR_PARAM, data={ "message": "当运行产生如下报错:\n" + self.task.state.error_message, - "params": params_with_null - } + "params": params_with_null, + }, ) - self.push_message( + await self.push_message( EventType.FLOW_STOP, - data={} + data={}, ) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.PARAM self.task.context.append( FlowStepHistory( + task_id=self.task.id, step_id=self.task.state.step_id, step_name=self.task.state.step_name, step_description=self.task.state.step_description, @@ -263,47 +301,33 @@ class MCPAgentExecutor(BaseExecutor): output_data={}, ex_data={ "message": "当运行产生如下报错:\n" + self.task.state.error_message, - "params": params_with_null - } - ) + "params": params_with_null, + }, + ), ) async def get_next_step(self) -> None: self.task.state.step_index += 1 - if self.task.state.step_index < len(self.task.runtime.temporary_plans): - if self.task.runtime.temporary_plans.plans[self.task.state.step_index].step_id == FINAL_TOOL_ID: - # 最后一步 - self.task.state.flow_status = FlowStatus.SUCCESS - self.task.state.step_status = StepStatus.SUCCESS - self.push_message( - EventType.FLOW_SUCCESS, - data={} - ) + if self.task.state.step_index < len(self.task.runtime.temporary_plans.plans): + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + if tool_id == FINAL_TOOL_ID: return self.task.state.step_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].step_id - self.task.state.step_name = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + self.task.state.step_name = self.tools[self.task.runtime.temporary_plans.plans + [self.task.state.step_index].tool].name self.task.state.step_description = self.task.runtime.temporary_plans.plans[self.task.state.step_index].content self.task.state.step_status = StepStatus.INIT self.task.state.current_input = {} - self.push_message( - EventType.STEP_INIT, - data={} - ) else: # 没有下一步了,结束流程 - self.task.state.flow_status = FlowStatus.SUCCESS - self.task.state.step_status = StepStatus.SUCCESS - self.push_message( - EventType.FLOW_SUCCESS, - data={} - ) + self.task.state.step_id = FINAL_TOOL_ID return async def error_handle_after_step(self) -> None: """步骤执行失败后的错误处理""" self.task.state.step_status = StepStatus.ERROR self.task.state.flow_status = FlowStatus.ERROR - self.push_message( + await self.push_message( EventType.FLOW_FAILED, data={} ) @@ -311,6 +335,7 @@ class MCPAgentExecutor(BaseExecutor): del self.task.context[-1] self.task.context.append( FlowStepHistory( + task_id=self.task.id, step_id=self.task.state.step_id, step_name=self.task.state.step_name, step_description=self.task.state.step_description, @@ -326,6 +351,10 @@ class MCPAgentExecutor(BaseExecutor): async def work(self) -> None: """执行当前步骤""" if self.task.state.step_status == StepStatus.INIT: + await self.push_message( + EventType.STEP_INIT, + data={} + ) await self.get_tool_input_param(is_first=True) user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) if not user_info.auto_execute: @@ -334,21 +363,21 @@ class MCPAgentExecutor(BaseExecutor): return self.task.state.step_status = StepStatus.RUNNING elif self.task.state.step_status in [StepStatus.PARAM, StepStatus.WAITING, StepStatus.RUNNING]: - if self.task.context[-1].step_status == StepStatus.PARAM: + if self.task.state.step_status == StepStatus.PARAM: if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: del self.task.context[-1] elif self.task.state.step_status == StepStatus.WAITING: - if self.params.content: + if self.params: if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: del self.task.context[-1] else: self.task.state.flow_status = FlowStatus.CANCELLED self.task.state.step_status = StepStatus.CANCELLED - self.push_message( + await self.push_message( EventType.STEP_CANCEL, data={} ) - self.push_message( + await self.push_message( EventType.FLOW_CANCEL, data={} ) @@ -369,7 +398,8 @@ class MCPAgentExecutor(BaseExecutor): await self.error_handle_after_step() else: user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) - mcp_tool = self.tools[self.task.state.step_id] + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + mcp_tool = self.tools[tool_id] error_type = await MCPPlanner.get_tool_execute_error_type( self.task.runtime.question, self.task.runtime.temporary_plans, @@ -393,7 +423,7 @@ class MCPAgentExecutor(BaseExecutor): (await MCPHost.assemble_memory(self.task)), self.resoning_llm ): - self.push_message( + await self.push_message( EventType.TEXT_ADD, data=chunk ) @@ -402,35 +432,72 @@ class MCPAgentExecutor(BaseExecutor): async def run(self) -> None: """执行MCP Agent的主逻辑""" # 初始化MCP服务 - self.load_state() - self.load_mcp() + await self.load_state() + await self.load_mcp() if self.task.state.flow_status == FlowStatus.INIT: # 初始化状态 - self.task.state.flow_id = str(uuid.uuid4()) - self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm) - await self.plan(is_replan=False) - self.reset_step_to_index(0) - TaskManager.save_task(self.task.id, self.task) + try: + self.task.state.flow_id = str(uuid.uuid4()) + self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm) + await self.plan(is_replan=False) + await self.reset_step_to_index(0) + await TaskManager.save_task(self.task.id, self.task) + except Exception as e: + import traceback + logger.error("[MCPAgentExecutor] 初始化失败: %s", traceback.format_exc()) + logger.error("[MCPAgentExecutor] 初始化失败: %s", str(e)) + self.task.state.flow_status = FlowStatus.ERROR + self.task.state.error_message = str(e) + await self.push_message( + EventType.FLOW_FAILED, + data={} + ) + return self.task.state.flow_status = FlowStatus.RUNNING - self.push_message( + await self.push_message( EventType.FLOW_START, data={} ) + if self.task.state.step_id == FINAL_TOOL_ID: + # 如果已经是最后一步,直接结束 + self.task.state.flow_status = FlowStatus.SUCCESS + await self.push_message( + EventType.FLOW_SUCCESS, + data={} + ) + await self.summarize() + return try: - while self.task.state.step_index < len(self.task.runtime.temporary_plans) and \ + while len(self.task.runtime.temporary_plans.plans) and \ + self.task.state.step_index < len(self.task.runtime.temporary_plans.plans) and \ self.task.state.flow_status == FlowStatus.RUNNING: + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + if tool_id == FINAL_TOOL_ID: + break await self.work() - TaskManager.save_task(self.task.id, self.task) + await TaskManager.save_task(self.task.id, self.task) + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + if tool_id == FINAL_TOOL_ID: + # 如果已经是最后一步,直接结束 + self.task.state.flow_status = FlowStatus.SUCCESS + self.task.state.step_status = StepStatus.SUCCESS + await self.push_message( + EventType.FLOW_SUCCESS, + data={} + ) + await self.summarize() except Exception as e: + import traceback + logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", traceback.format_exc()) logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e)) self.task.state.flow_status = FlowStatus.ERROR self.task.state.error_message = str(e) self.task.state.step_status = StepStatus.ERROR - self.push_message( + await self.push_message( EventType.STEP_ERROR, data={} ) - self.push_message( + await self.push_message( EventType.FLOW_FAILED, data={} ) @@ -438,6 +505,7 @@ class MCPAgentExecutor(BaseExecutor): del self.task.context[-1] self.task.context.append( FlowStepHistory( + task_id=self.task.id, step_id=self.task.state.step_id, step_name=self.task.state.step_name, step_description=self.task.state.step_description, @@ -449,3 +517,10 @@ class MCPAgentExecutor(BaseExecutor): output_data={}, ) ) + finally: + for mcp_service in self.mcp_list: + try: + await self.mcp_pool.stop(mcp_service.id, self.task.ids.user_sub) + except Exception as e: + import traceback + logger.error("[MCPAgentExecutor] 停止MCP客户端时发生错误: %s", traceback.format_exc()) diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py new file mode 100644 index 00000000..f062a551 --- /dev/null +++ b/apps/scheduler/mcp_agent/base.py @@ -0,0 +1,63 @@ +import json +import logging +from typing import Any + +from jsonschema import validate + +from apps.llm.function import JsonGenerator +from apps.llm.reasoning import ReasoningLLM + +logger = logging.getLogger(__name__) + + +class McpBase: + @staticmethod + async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + """获取推理结果""" + # 调用推理大模型 + message = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt}, + ] + result = "" + async for chunk in resoning_llm.call( + message, + streaming=False, + temperature=0.07, + result_only=True, + ): + result += chunk + + return result + + @staticmethod + async def _parse_result(result: str, schema: dict[str, Any], left_str: str = '{', right_str: str = '}') -> str: + """解析推理结果""" + left_index = result.find(left_str) + right_index = result.rfind(right_str) + flag = True + if left_str == -1 or right_str == -1: + flag = False + + if left_index > right_index: + flag = False + if flag: + try: + tmp_js = json.loads(result[left_index:right_index + 1]) + validate(instance=tmp_js, schema=schema) + except Exception as e: + logger.error("[McpBase] 解析结果失败: %s", e) + flag = False + if not flag: + json_generator = JsonGenerator( + "请提取下面内容中的json\n\n", + [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "请提取下面内容中的json\n\n"+result}, + ], + schema, + ) + json_result = await json_generator.generate() + else: + json_result = json.loads(result[left_index:right_index + 1]) + return json_result diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index a56ce03b..83740699 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -33,25 +33,6 @@ _env = SandboxedEnvironment( class MCPHost: """MCP宿主服务""" - @staticmethod - async def get_client(user_sub, mcp_id: str) -> MCPClient | None: - """获取MCP客户端""" - mongo = MongoDB() - mcp_collection = mongo.get_collection("mcp") - - # 检查用户是否启用了这个mcp - mcp_db_result = await mcp_collection.find_one({"_id": mcp_id, "activated": user_sub}) - if not mcp_db_result: - logger.warning("用户 %s 未启用MCP %s", user_sub, mcp_id) - return None - - # 获取MCP配置 - try: - return await MCPPool().get(mcp_id, user_sub) - except KeyError: - logger.warning("用户 %s 的MCP %s 没有运行中的实例,请检查环境", user_sub, mcp_id) - return None - @staticmethod async def assemble_memory(task: Task) -> str: """组装记忆""" @@ -103,33 +84,4 @@ class MCPHost: ], mcp_tool.input_schema, ) - return await json_generator.generate() - - async def call_tool(user_sub: str, tool: MCPTool, plan_item: MCPPlanItem) -> list[dict[str, Any]]: - """调用工具""" - # 拿到Client - client = await MCPPool().get(tool.mcp_id, user_sub) - if client is None: - err = f"[MCPHost] MCP Server不合法: {tool.mcp_id}" - logger.error(err) - raise ValueError(err) - - # 填充参数 - params = await MCPHost._fill_params(tool, plan_item.instruction) - # 调用工具 - result = await client.call_tool(tool.name, params) - # 保存记忆 - processed_result = [] - for item in result.content: - if not isinstance(item, TextContent): - logger.error("MCP结果类型不支持: %s", item) - continue - result = item.text - try: - json_result = json.loads(result) - except Exception as e: - logger.error("MCP结果解析失败: %s, 错误: %s", result, e) - continue - processed_result.append(json_result) - - return processed_result \ No newline at end of file + return await json_generator.generate() \ No newline at end of file diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index f2d5665a..24eb737d 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -58,7 +58,7 @@ class MCPPlanner: result, [ {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": result}, + {"role": "user", "content": "请提取下面内容中的json\n\n" + result}, ], schema, ) @@ -118,7 +118,7 @@ class MCPPlanner: async def get_replan_start_step_index( user_goal: str, error_message: str, current_plan: MCPPlan | None = None, history: str = "", - reasoning_llm: ReasoningLLM = ReasoningLLM()) -> MCPPlan: + reasoning_llm: ReasoningLLM = ReasoningLLM()) -> RestartStepIndex: """获取重新规划的步骤索引""" # 获取推理结果 template = _env.from_string(GET_REPLAN_START_STEP_INDEX) @@ -296,7 +296,7 @@ class MCPPlanner: async for chunk in resoning_llm.call( [{"role": "user", "content": prompt}], - streaming=False, + streaming=True, temperature=0.07, ): - yield chunk \ No newline at end of file + yield chunk diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index b5bc085c..75ccd57f 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -62,6 +62,7 @@ MCP_SELECT = dedent(r""" ### 请一步一步思考: """) + TOOL_SELECT = dedent(r""" 你是一个乐于助人的智能助手。 你的任务是:根据当前目标,附加信息,选择最合适的MCP工具。 @@ -115,8 +116,7 @@ TOOL_SELECT = dedent(r""" ## 附加信息 {{additional_info}} # 输出 - """ - ) + """) EVALUATE_GOAL = dedent(r""" 你是一个计划评估器。 @@ -138,9 +138,9 @@ EVALUATE_GOAL = dedent(r""" # 工具集合 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - - mysql_analyzer 分析MySQL数据库性能 - - performance_tuner 调优数据库性能 - - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 + - mysql_analyzer 分析MySQL数据库性能 + - performance_tuner 调优数据库性能 + - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 # 附加信息 @@ -160,15 +160,16 @@ EVALUATE_GOAL = dedent(r""" # 工具集合 - { % for tool in tools % } - - {{tool.id}} {{tool.name}};{{tool.description}} - { % endfor % } + {% for tool in tools %} + - {{tool.id}} {{tool.name}};{{tool.description}} + {% endfor %} # 附加信息 {{additional_info}} """) + GENERATE_FLOW_NAME = dedent(r""" 你是一个智能助手,你的任务是根据用户的目标,生成一个合适的流程名称。 @@ -188,6 +189,7 @@ GENERATE_FLOW_NAME = dedent(r""" {{goal}} # 输出 """) + GET_REPLAN_START_STEP_INDEX = dedent(r""" 你是一个智能助手,你的任务是根据用户的目标、报错信息和当前计划和历史,获取重新规划的步骤起始索引。 @@ -293,9 +295,9 @@ CREATE_PLAN = dedent(r""" 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - { % for tool in tools % } - - {{tool.id}} {{tool.name}};{{tool.description}} - { % endfor % } + {% for tool in tools %} + - {{tool.id}} {{tool.name}};{{tool.description}} + {% endfor %} # 样例 @@ -349,6 +351,7 @@ CREATE_PLAN = dedent(r""" # 计划 """) + RECREATE_PLAN = dedent(r""" 你是一个计划重建器。 请根据用户的目标、当前计划和运行报错,重新生成一个计划。 @@ -392,12 +395,12 @@ RECREATE_PLAN = dedent(r""" 请帮我扫描一下192.168.1.1的这台机器的端口,看看有哪些端口开放。 # 工具 - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 + 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - - command_generator 生成命令行指令 - - tool_selector 选择合适的工具 - - command_executor 执行命令行指令 - - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 + - command_generator 生成命令行指令 + - tool_selector 选择合适的工具 + - command_executor 执行命令行指令 + - Final 结束步骤,当执行到这一步时,表示计划执行结束,所得到的结果将作为最终结果。 # 当前计划 ```json { @@ -481,12 +484,12 @@ RECREATE_PLAN = dedent(r""" # 工具 - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 + 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - { % for tool in tools % } - - {{tool.id}} {{tool.name}};{{tool.description}} - { % endfor % } + {% for tool in tools %} + - {{tool.id}} {{tool.name}};{{tool.description}} + {% endfor %} # 当前计划 @@ -497,6 +500,7 @@ RECREATE_PLAN = dedent(r""" # 重新生成的计划 """) + RISK_EVALUATE = dedent(r""" 你是一个工具执行计划评估器。 你的任务是根据当前工具的名称、描述和入参以及附加信息,判断当前工具执行的风险并输出提示。 @@ -535,16 +539,16 @@ RISK_EVALUATE = dedent(r""" ``` # 工具 - {{tool_name}} - {{tool_description}} + {{tool_name}} + {{tool_description}} # 工具入参 {{input_param}} # 附加信息 {{additional_info}} # 输出 - """ - ) + """) + # 根据当前计划和报错信息决定下一步执行,具体计划有需要用户补充工具入参、重计划当前步骤、重计划接下来的所有计划 TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r""" 你是一个计划决策器。 @@ -583,8 +587,8 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r""" ]} # 当前使用的工具 - command_executor - 执行命令行指令 + command_executor + 执行命令行指令 # 工具入参 { @@ -605,15 +609,15 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r""" {{current_plan}} # 当前使用的工具 - {{tool_name}} - {{tool_description}} + {{tool_name}} + {{tool_description}} # 工具入参 {{input_param}} # 工具运行报错 {{error_message}} # 输出 - """ + """, ) # 获取缺失的参数的json结构体 GET_MISSING_PARAMS = dedent(r""" @@ -687,8 +691,8 @@ GET_MISSING_PARAMS = dedent(r""" ``` # 工具 - {{tool_name}} - {{tool_description}} + {{tool_name}} + {{tool_description}} # 工具入参 {{input_param}} @@ -697,8 +701,8 @@ GET_MISSING_PARAMS = dedent(r""" # 运行报错 {{error_message}} # 输出 - """ - ) + """) + REPAIR_PARAMS = dedent(r""" 你是一个工具参数修复器。 你的任务是根据当前的工具信息、工具入参的schema、工具当前的入参、工具的报错、补充的参数和补充的参数描述,修复当前工具的入参。 @@ -706,8 +710,8 @@ REPAIR_PARAMS = dedent(r""" # 样例 # 工具信息 - mysql_analyzer - 分析MySQL数据库性能 + mysql_analyzer + 分析MySQL数据库性能 # 工具入参的schema { @@ -759,8 +763,8 @@ REPAIR_PARAMS = dedent(r""" ``` # 工具 - {{tool_name}} - {{tool_description}} + {{tool_name}} + {{tool_description}} # 工具入参scheme {{input_schema}} @@ -773,8 +777,8 @@ REPAIR_PARAMS = dedent(r""" # 补充的参数描述 {{params_description}} # 输出 - """ - ) + """) + FINAL_ANSWER = dedent(r""" 综合理解计划执行结果和背景信息,向用户报告目标的完成情况。 @@ -796,10 +800,10 @@ FINAL_ANSWER = dedent(r""" """) MEMORY_TEMPLATE = dedent(r""" - {% for ctx in context_list % } + {% for ctx in context_list %} - 第{{loop.index}}步:{{ctx.step_description}} - 调用工具 `{{ctx.step_id}}`,并提供参数 `{{ctx.input_data}}` - 执行状态:{{ctx.status}} - 得到数据:`{{ctx.output_data}}` - {% endfor % } + {% endfor %} """) diff --git a/apps/schemas/appcenter.py b/apps/schemas/appcenter.py index f6ccd609..bdda4481 100644 --- a/apps/schemas/appcenter.py +++ b/apps/schemas/appcenter.py @@ -47,17 +47,9 @@ class AppFlowInfo(BaseModel): """应用工作流数据结构""" id: str = Field(..., description="工作流ID") - name: str = Field(..., description="工作流名称") - description: str = Field(..., description="工作流简介") - debug: bool = Field(..., description="是否经过调试") - - -class AppMcpServiceInfo(BaseModel): - """应用关联的MCP服务信息""" - - id: str = Field(..., description="MCP服务ID") - name: str = Field(..., description="MCP服务名称") - description: str = Field(..., description="MCP服务简介") + name: str = Field(default="", description="工作流名称") + description: str = Field(default="", description="工作流简介") + debug: bool = Field(default=False, description="是否经过调试") class AppData(BaseModel): @@ -74,4 +66,4 @@ class AppData(BaseModel): permission: AppPermissionData = Field( default_factory=lambda: AppPermissionData(authorizedUsers=None), description="权限配置") workflows: list[AppFlowInfo] = Field(default=[], description="工作流信息列表") - mcp_service: list[AppMcpServiceInfo] = Field(default=[], alias="mcpService", description="MCP服务id列表") + mcp_service: list[str] = Field(default=[], alias="mcpService", description="MCP服务id列表") diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py index 226d6849..afb8d292 100644 --- a/apps/schemas/mcp.py +++ b/apps/schemas/mcp.py @@ -14,6 +14,7 @@ class MCPStatus(str, Enum): UNINITIALIZED = "uninitialized" RUNNING = "running" STOPPED = "stopped" + ERROR = "error" class MCPBasicConfig(BaseModel): @@ -22,6 +23,7 @@ class MCPBasicConfig(BaseModel): env: dict[str, str] = Field(description="MCP 服务器环境变量", default={}) autoApprove: list[str] = Field(description="自动批准的MCP权限列表", default=[]) # noqa: N815 autoInstall: bool = Field(description="是否自动安装MCP服务器", default=True) # noqa: N815 + timeout: int = Field(description="MCP 服务器超时时间(秒)", default=60, alias="timeout") class MCPServerStdioConfig(MCPBasicConfig): diff --git a/apps/schemas/message.py b/apps/schemas/message.py index 0b9ef518..8a9b35eb 100644 --- a/apps/schemas/message.py +++ b/apps/schemas/message.py @@ -15,7 +15,7 @@ from .record import RecordMetadata class param(BaseModel): """流执行过程中的参数补充""" - content: dict[str, Any] | bool = Field(default={}, description="流执行过程中的参数补充内容") + content: dict[str, Any] = Field(default={}, description="流执行过程中的参数补充内容") description: str = Field(default="", description="流执行过程中的参数补充描述") diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py index f271f6ee..6e79def6 100644 --- a/apps/schemas/request_data.py +++ b/apps/schemas/request_data.py @@ -85,7 +85,7 @@ class UpdateMCPServiceRequest(BaseModel): name: str = Field(..., description="MCP服务名称") description: str = Field(..., description="MCP服务描述") overview: str = Field(..., description="MCP服务概述") - config: str = Field(..., description="MCP服务配置") + config: dict[str, Any] = Field(..., description="MCP服务配置") mcp_type: MCPType = Field(description="MCP传输协议(Stdio/SSE/Streamable)", default=MCPType.STDIO, alias="mcpType") @@ -93,7 +93,7 @@ class ActiveMCPServiceRequest(BaseModel): """POST /api/mcp/{serviceId} 请求数据结构""" active: bool = Field(description="是否激活mcp服务") - mcp_env: dict[str, Any] = Field(default={}, description="MCP服务环境变量", alias="mcpEnv") + mcp_env: dict[str, Any] | None = Field(default=None, description="MCP服务环境变量", alias="mcpEnv") class UpdateServiceRequest(BaseModel): diff --git a/apps/schemas/response_data.py b/apps/schemas/response_data.py index 4d74a3d5..b37c28a2 100644 --- a/apps/schemas/response_data.py +++ b/apps/schemas/response_data.py @@ -82,14 +82,6 @@ class GetBlacklistQuestionRsp(ResponseData): result: GetBlacklistQuestionMsg -class LLMIteam(BaseModel): - """GET /api/conversation Result数据结构""" - - icon: str = Field(default=llm_provider_dict["ollama"]["icon"]) - llm_id: str = Field(alias="llmId", default="empty") - model_name: str = Field(alias="modelName", default="Ollama LLM") - - class KbIteam(BaseModel): """GET /api/conversation Result数据结构""" @@ -106,7 +98,6 @@ class ConversationListItem(BaseModel): created_time: str = Field(alias="createdTime") app_id: str = Field(alias="appId") debug: bool = Field(alias="debug") - llm: LLMIteam | None = Field(alias="llm", default=None) kb_list: list[KbIteam] = Field(alias="kbList", default=[]) @@ -257,6 +248,7 @@ class GetAppPropertyMsg(AppData): app_id: str = Field(..., alias="appId", description="应用ID") published: bool = Field(..., description="是否已发布") + mcp_service: list[AppMcpServiceInfo] = Field(default=[], alias="mcpService", description="MCP服务信息列表") class GetAppPropertyRsp(ResponseData): @@ -541,6 +533,7 @@ class FlowStructureDeleteRsp(ResponseData): class UserGetMsp(BaseModel): """GET /api/user result""" + total: int = Field(default=0) user_info_list: list[UserInfo] = Field(alias="userInfoList", default=[]) -- Gitee