diff --git a/apps/constants.py b/apps/constants.py index 58158b33c3aa4ea5aa3595f5642bf6444e7d76cb..710060b3dd29e50ea0eb05aa10069ca834e224dc 100644 --- a/apps/constants.py +++ b/apps/constants.py @@ -11,7 +11,7 @@ from apps.common.config import Config # 新对话默认标题 NEW_CHAT = "新对话" # 滑动窗口限流 默认窗口期 -SLIDE_WINDOW_TIME = 60 +SLIDE_WINDOW_TIME = 600 # OIDC 访问Token 过期时间(分钟) OIDC_ACCESS_TOKEN_EXPIRE_TIME = 30 # OIDC 刷新Token 过期时间(分钟) diff --git a/apps/main.py b/apps/main.py index 3c869d3a558e8dd3192ddc20ac77143f12471999..547cb6810828582114b59d6e2d74c77b1d2450f2 100644 --- a/apps/main.py +++ b/apps/main.py @@ -107,6 +107,16 @@ async def add_no_auth_user() -> None: logging.warning(f"添加无认证用户失败: {e}") +async def clear_user_activity() -> None: + """清除所有用户的活跃状态""" + from apps.services.activity import Activity + from apps.common.mongo import MongoDB + mongo = MongoDB() + activity_collection = mongo.get_collection("activity") + await activity_collection.delete_many({}) + logging.info("清除所有用户活跃状态完成") + + async def init_resources() -> None: """初始化必要资源""" WordsCheck() @@ -115,7 +125,7 @@ async def init_resources() -> None: TokenCalculator() if Config().get_config().no_auth.enable: await add_no_auth_user() - + await clear_user_activity() # 运行 if __name__ == "__main__": # 初始化必要资源 diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index 88df7cb110a97dd5464980ecb1f277c31fa4f81b..36815743349c2436564e30e4cb319caa8843dd99 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -9,10 +9,11 @@ from fastapi.responses import JSONResponse from apps.dependency.user import get_user, verify_user from apps.exceptions import InstancePermissionError -from apps.schemas.appcenter import AppFlowInfo, AppMcpServiceInfo, AppPermissionData +from apps.schemas.appcenter import AppFlowInfo, AppPermissionData from apps.schemas.enum_var import AppFilterType, AppType from apps.schemas.request_data import CreateAppRequest, ModFavAppRequest from apps.schemas.response_data import ( + AppMcpServiceInfo, LLMIteam, BaseAppOperationMsg, BaseAppOperationRsp, diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 5e03547587cd86e5671cb09014449c882b15587b..9a45c2d5af4fd8862373cb078d34bad2df0784d1 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -16,7 +16,7 @@ from apps.dependency import get_session, get_user from apps.schemas.enum_var import FlowStatus from apps.scheduler.scheduler import Scheduler from apps.scheduler.scheduler.context import save_data -from apps.schemas.request_data import RequestData +from apps.schemas.request_data import RequestData, RequestDataApp from apps.schemas.response_data import ResponseData from apps.schemas.task import Task from apps.services.activity import Activity @@ -54,11 +54,17 @@ async def init_task(post_body: RequestData, user_sub: str, session_id: str) -> T task = await TaskManager.init_new_task(user_sub=user_sub, session_id=session_id, post_body=post_body) task.runtime.question = post_body.question task.ids.group_id = post_body.group_id + task.state.app_id = post_body.app.app_id if post_body.app else "" else: if not post_body.task_id: err = "[Chat] task_id 不可为空!" raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="task_id cannot be empty") - task = await TaskManager.get_task_by_conversation_id(post_body.task_id) + task = await TaskManager.get_task_by_task_id(post_body.task_id) + post_body.app = RequestDataApp(appId=task.state.app_id) + post_body.group_id = task.ids.group_id + post_body.conversation_id = task.ids.conversation_id + post_body.language = task.language + post_body.question = task.runtime.question return task @@ -137,7 +143,7 @@ async def chat( ) -> StreamingResponse: """LLM流式对话接口""" # 问题黑名单检测 - if not await QuestionBlacklistManager.check_blacklisted_questions(input_question=post_body.question): + if post_body.question is not None and not await QuestionBlacklistManager.check_blacklisted_questions(input_question=post_body.question): # 用户扣分 await UserBlacklistManager.change_blacklisted_users(user_sub, -10) raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="question is blacklisted") diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 9ced57f909056d8c3fcbf74db6e0a3ed253b33bb..632cf9e3819be5c4169d22817986b7750e64d11f 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -1,10 +1,12 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP Agent执行器""" +import anyio import logging import uuid from pydantic import Field from typing import Any +from mcp.types import TextContent from apps.llm.patterns.rewrite import QuestionRewrite from apps.llm.reasoning import ReasoningLLM from apps.scheduler.executor.base import BaseExecutor @@ -23,6 +25,7 @@ from apps.schemas.mcp import ( MCPCollection, MCPTool ) +from apps.scheduler.pool.mcp.pool import MCPPool from apps.schemas.task import ExecutorState, FlowStepHistory, StepQueueItem from apps.schemas.message import param from apps.services.task import TaskManager @@ -40,9 +43,7 @@ class MCPAgentExecutor(BaseExecutor): agent_id: str = Field(default="", description="Agent ID") agent_description: str = Field(default="", description="Agent描述") mcp_list: list[MCPCollection] = Field(description="MCP服务器列表", default=[]) - mcp_client: dict[str, MCPClient] = Field( - description="MCP客户端列表,key为mcp_id", default={} - ) + mcp_pool: MCPPool = Field(description="MCP池", default=MCPPool()) tools: dict[str, MCPTool] = Field( description="MCP工具列表,key为tool_id", default={} ) @@ -84,7 +85,7 @@ class MCPAgentExecutor(BaseExecutor): continue self.mcp_list.append(mcp_service) - self.mcp_client[mcp_id] = await MCPHost.get_client(self.task.ids.user_sub, mcp_id) + await self.mcp_pool._init_mcp(mcp_id, self.task.ids.user_sub) for tool in mcp_service.tools: self.tools[tool.id] = tool @@ -93,9 +94,9 @@ class MCPAgentExecutor(BaseExecutor): error_message = "之前的计划遇到以下报错\n\n"+self.task.state.error_message else: error_message = "初始化计划" - tools = MCPSelector.select_top_tool( + tools = await MCPSelector.select_top_tool( self.task.runtime.question, list(self.tools.values()), - additional_info=error_message, top_n=40) + additional_info=error_message, top_n=40, reasoning_llm=self.resoning_llm) if is_replan: logger.info("[MCPAgentExecutor] 重新规划流程") if not start_index: @@ -103,7 +104,8 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.error_message, self.task.runtime.temporary_plans, self.resoning_llm) - current_plan = self.task.runtime.temporary_plans.plans[start_index:] + start_index = start_index.start_index + current_plan = MCPPlan(plans=self.task.runtime.temporary_plans.plans[start_index:]) error_message = self.task.state.error_message temporary_plans = await MCPPlanner.create_plan(self.task.runtime.question, is_replan=is_replan, @@ -113,17 +115,20 @@ class MCPAgentExecutor(BaseExecutor): max_steps=self.max_steps-start_index-1, reasoning_llm=self.resoning_llm ) - self.update_tokens() - self.push_message( + await self.update_tokens() + await self.push_message( EventType.STEP_CANCEL, data={} ) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: self.task.context[-1].step_status = StepStatus.CANCELLED - self.task.runtime.temporary_plans = self.task.runtime.temporary_plans.plans[:start_index] + temporary_plans.plans + self.task.runtime.temporary_plans.plans = self.task.runtime.temporary_plans.plans[ + : start_index] + temporary_plans.plans self.task.state.step_index = start_index else: start_index = 0 + logger.error( + f"各个字段的类型: {type(self.task.runtime.question)}, {type(tools)}, {type(self.max_steps)}, {type(self.resoning_llm)}") self.task.runtime.temporary_plans = await MCPPlanner.create_plan(self.task.runtime.question, tool_list=tools, max_steps=self.max_steps, reasoning_llm=self.resoning_llm) for i in range(start_index, len(self.task.runtime.temporary_plans.plans)): self.task.runtime.temporary_plans.plans[i].step_id = str(uuid.uuid4()) @@ -131,7 +136,9 @@ class MCPAgentExecutor(BaseExecutor): async def get_tool_input_param(self, is_first: bool) -> None: if is_first: # 获取第一个输入参数 - self.task.state.current_input = await MCPHost._get_first_input_params(self.tools[self.task.state.step_id], self.task.runtime.question, self.task) + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + mcp_tool = self.tools[tool_id] + self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task) else: # 获取后续输入参数 if isinstance(self.params, param): @@ -140,35 +147,40 @@ class MCPAgentExecutor(BaseExecutor): else: params = {} params_description = "" - self.task.state.current_input = await MCPHost._fill_params(self.tools[self.task.state.step_id], self.task.state.current_input, self.task.state.error_message, params, params_description) + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + mcp_tool = self.tools[tool_id] + self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.state.current_input, self.task.state.error_message, params, params_description) async def reset_step_to_index(self, start_index: int) -> None: """重置步骤到开始""" logger.info("[MCPAgentExecutor] 重置步骤到索引 %d", start_index) - if self.task.runtime.temporary_plans: + + if start_index < len(self.task.runtime.temporary_plans.plans): self.task.state.flow_status = FlowStatus.RUNNING self.task.state.step_id = self.task.runtime.temporary_plans.plans[start_index].step_id self.task.state.step_index = 0 - self.task.state.step_name = self.task.runtime.temporary_plans.plans[start_index].tool + self.task.state.step_name = self.tools[self.task.runtime.temporary_plans.plans[start_index].tool].name self.task.state.step_description = self.task.runtime.temporary_plans.plans[start_index].content - self.task.state.step_status = StepStatus.RUNNING + self.task.state.step_status = StepStatus.INIT self.task.state.retry_times = 0 else: - self.task.state.flow_status = FlowStatus.SUCCESS self.task.state.step_id = FINAL_TOOL_ID async def confirm_before_step(self) -> None: logger.info("[MCPAgentExecutor] 等待用户确认步骤 %d", self.task.state.step_index) # 发送确认消息 - confirm_message = await MCPPlanner.get_tool_risk(self.tools[self.task.state.step_id], self.task.state.current_input, "", self.resoning_llm) - self.update_tokens() - self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + mcp_tool = self.tools[tool_id] + confirm_message = await MCPPlanner.get_tool_risk(mcp_tool, self.task.state.current_input, "", self.resoning_llm) + await self.update_tokens() + await self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( exclude_none=True, by_alias=True)) - self.push_message(EventType.FLOW_STOP, {}) + await self.push_message(EventType.FLOW_STOP, {}) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.WAITING self.task.context.append( FlowStepHistory( + task_id=self.task.id, step_id=self.task.state.step_id, step_name=self.task.state.step_name, step_description=self.task.state.step_description, @@ -188,62 +200,85 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.step_status = StepStatus.RUNNING logger.info("[MCPAgentExecutor] 执行步骤 %d", self.task.state.step_index) # 获取MCP客户端 - mcp_tool = self.tools[self.task.state.step_id] - mcp_client = self.mcp_client[mcp_tool.mcp_id] - if not mcp_client: - logger.error("[MCPAgentExecutor] MCP客户端未找到: %s", mcp_tool.mcp_id) - self.task.state.flow_status = FlowStatus.ERROR - error = "[MCPAgentExecutor] MCP客户端未找到: {}".format(mcp_tool.mcp_id) - self.task.state.error_message = error + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + mcp_tool = self.tools[tool_id] + mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub)) try: output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input) - self.update_tokens() - self.push_message( - EventType.STEP_INPUT, - self.task.state.current_input - ) - self.push_message( - EventType.STEP_OUTPUT, - output_params - ) - self.task.context.append( - FlowStepHistory( - step_id=self.task.state.step_id, - step_name=self.task.state.step_name, - step_description=self.task.state.step_description, - step_status=StepStatus.SUCCESS, - flow_id=self.task.state.flow_id, - flow_name=self.task.state.flow_name, - flow_status=self.task.state.flow_status, - input_data=self.task.state.current_input, - output_data=output_params, - ) - ) - self.task.state.step_status = StepStatus.SUCCESS + except anyio.ClosedResourceError as e: + import traceback + logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, traceback.format_exc()) + await self.mcp_pool.stop(mcp_tool.mcp_id, self.task.ids.user_sub) + await self.mcp_pool._init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub) + logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, str(e)) + self.task.state.step_status = StepStatus.ERROR + return except Exception as e: - logger.warning("[MCPAgentExecutor] 执行步骤 %s 失败: %s", mcp_tool.name, str(e)) import traceback - self.task.state.error_message = traceback.format_exc() + logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误: %s", mcp_tool.name, traceback.format_exc()) self.task.state.step_status = StepStatus.ERROR + self.task.state.error_message = str(e) + return + if output_params.isError: + err = "" + for output in output_params.content: + if isinstance(output, TextContent): + err += output.text + self.task.state.step_status = StepStatus.ERROR + self.task.state.error_message = err + return + message = "" + for output in output_params.content: + if isinstance(output, TextContent): + message += output.text + output_params = { + "message": message, + } + + await self.update_tokens() + await self.push_message( + EventType.STEP_INPUT, + self.task.state.current_input + ) + await self.push_message( + EventType.STEP_OUTPUT, + output_params + ) + self.task.context.append( + FlowStepHistory( + task_id=self.task.id, + step_id=self.task.state.step_id, + step_name=self.task.state.step_name, + step_description=self.task.state.step_description, + step_status=StepStatus.SUCCESS, + flow_id=self.task.state.flow_id, + flow_name=self.task.state.flow_name, + flow_status=self.task.state.flow_status, + input_data=self.task.state.current_input, + output_data=output_params, + ) + ) + self.task.state.step_status = StepStatus.SUCCESS async def generate_params_with_null(self) -> None: """生成参数补充""" - mcp_tool = self.tools[self.task.state.step_id] + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + mcp_tool = self.tools[tool_id] params_with_null = await MCPPlanner.get_missing_param( mcp_tool, self.task.state.current_input, self.task.state.error_message, self.resoning_llm ) - self.update_tokens() - self.push_message( + await self.update_tokens() + await self.push_message( EventType.STEP_WAITING_FOR_PARAM, data={ "message": "当运行产生如下报错:\n" + self.task.state.error_message, "params": params_with_null } ) - self.push_message( + await self.push_message( EventType.FLOW_STOP, data={} ) @@ -251,6 +286,7 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.step_status = StepStatus.PARAM self.task.context.append( FlowStepHistory( + task_id=self.task.id, step_id=self.task.state.step_id, step_name=self.task.state.step_name, step_description=self.task.state.step_description, @@ -269,40 +305,26 @@ class MCPAgentExecutor(BaseExecutor): async def get_next_step(self) -> None: self.task.state.step_index += 1 - if self.task.state.step_index < len(self.task.runtime.temporary_plans): - if self.task.runtime.temporary_plans.plans[self.task.state.step_index].step_id == FINAL_TOOL_ID: - # 最后一步 - self.task.state.flow_status = FlowStatus.SUCCESS - self.task.state.step_status = StepStatus.SUCCESS - self.push_message( - EventType.FLOW_SUCCESS, - data={} - ) + if self.task.state.step_index < len(self.task.runtime.temporary_plans.plans): + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + if tool_id == FINAL_TOOL_ID: return self.task.state.step_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].step_id - self.task.state.step_name = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + self.task.state.step_name = self.tools[self.task.runtime.temporary_plans.plans + [self.task.state.step_index].tool].name self.task.state.step_description = self.task.runtime.temporary_plans.plans[self.task.state.step_index].content self.task.state.step_status = StepStatus.INIT self.task.state.current_input = {} - self.push_message( - EventType.STEP_INIT, - data={} - ) else: # 没有下一步了,结束流程 - self.task.state.flow_status = FlowStatus.SUCCESS - self.task.state.step_status = StepStatus.SUCCESS - self.push_message( - EventType.FLOW_SUCCESS, - data={} - ) + self.task.state.step_id = FINAL_TOOL_ID return async def error_handle_after_step(self) -> None: """步骤执行失败后的错误处理""" self.task.state.step_status = StepStatus.ERROR self.task.state.flow_status = FlowStatus.ERROR - self.push_message( + await self.push_message( EventType.FLOW_FAILED, data={} ) @@ -310,6 +332,7 @@ class MCPAgentExecutor(BaseExecutor): del self.task.context[-1] self.task.context.append( FlowStepHistory( + task_id=self.task.id, step_id=self.task.state.step_id, step_name=self.task.state.step_name, step_description=self.task.state.step_description, @@ -325,6 +348,10 @@ class MCPAgentExecutor(BaseExecutor): async def work(self) -> None: """执行当前步骤""" if self.task.state.step_status == StepStatus.INIT: + await self.push_message( + EventType.STEP_INIT, + data={} + ) await self.get_tool_input_param(is_first=True) user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) if not user_info.auto_execute: @@ -333,21 +360,21 @@ class MCPAgentExecutor(BaseExecutor): return self.task.state.step_status = StepStatus.RUNNING elif self.task.state.step_status in [StepStatus.PARAM, StepStatus.WAITING, StepStatus.RUNNING]: - if self.task.context[-1].step_status == StepStatus.PARAM: + if self.task.state.step_status == StepStatus.PARAM: if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: del self.task.context[-1] elif self.task.state.step_status == StepStatus.WAITING: - if self.params.content: + if self.params: if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: del self.task.context[-1] else: self.task.state.flow_status = FlowStatus.CANCELLED self.task.state.step_status = StepStatus.CANCELLED - self.push_message( + await self.push_message( EventType.STEP_CANCEL, data={} ) - self.push_message( + await self.push_message( EventType.FLOW_CANCEL, data={} ) @@ -368,7 +395,8 @@ class MCPAgentExecutor(BaseExecutor): await self.error_handle_after_step() else: user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) - mcp_tool = self.tools[self.task.state.step_id] + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + mcp_tool = self.tools[tool_id] error_type = await MCPPlanner.get_tool_execute_error_type( self.task.runtime.question, self.task.runtime.temporary_plans, @@ -392,7 +420,7 @@ class MCPAgentExecutor(BaseExecutor): (await MCPHost.assemble_memory(self.task)), self.resoning_llm ): - self.push_message( + await self.push_message( EventType.TEXT_ADD, data=chunk ) @@ -401,45 +429,72 @@ class MCPAgentExecutor(BaseExecutor): async def run(self) -> None: """执行MCP Agent的主逻辑""" # 初始化MCP服务 - self.load_state() - self.load_mcp() + await self.load_state() + await self.load_mcp() if self.task.state.flow_status == FlowStatus.INIT: # 初始化状态 try: self.task.state.flow_id = str(uuid.uuid4()) self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm) await self.plan(is_replan=False) - self.reset_step_to_index(0) - TaskManager.save_task(self.task.id, self.task) + await self.reset_step_to_index(0) + await TaskManager.save_task(self.task.id, self.task) except Exception as e: + import traceback + logger.error("[MCPAgentExecutor] 初始化失败: %s", traceback.format_exc()) logger.error("[MCPAgentExecutor] 初始化失败: %s", str(e)) self.task.state.flow_status = FlowStatus.ERROR self.task.state.error_message = str(e) - self.push_message( + await self.push_message( EventType.FLOW_FAILED, data={} ) return self.task.state.flow_status = FlowStatus.RUNNING - self.push_message( + await self.push_message( EventType.FLOW_START, data={} ) + if self.task.state.step_id == FINAL_TOOL_ID: + # 如果已经是最后一步,直接结束 + self.task.state.flow_status = FlowStatus.SUCCESS + await self.push_message( + EventType.FLOW_SUCCESS, + data={} + ) + await self.summarize() + return try: - while self.task.state.step_index < len(self.task.runtime.temporary_plans) and \ + while len(self.task.runtime.temporary_plans.plans) and \ + self.task.state.step_index < len(self.task.runtime.temporary_plans.plans) and \ self.task.state.flow_status == FlowStatus.RUNNING: + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + if tool_id == FINAL_TOOL_ID: + break await self.work() - TaskManager.save_task(self.task.id, self.task) + await TaskManager.save_task(self.task.id, self.task) + tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + if tool_id == FINAL_TOOL_ID: + # 如果已经是最后一步,直接结束 + self.task.state.flow_status = FlowStatus.SUCCESS + self.task.state.step_status = StepStatus.SUCCESS + await self.push_message( + EventType.FLOW_SUCCESS, + data={} + ) + await self.summarize() except Exception as e: + import traceback + logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", traceback.format_exc()) logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e)) self.task.state.flow_status = FlowStatus.ERROR self.task.state.error_message = str(e) self.task.state.step_status = StepStatus.ERROR - self.push_message( + await self.push_message( EventType.STEP_ERROR, data={} ) - self.push_message( + await self.push_message( EventType.FLOW_FAILED, data={} ) @@ -447,6 +502,7 @@ class MCPAgentExecutor(BaseExecutor): del self.task.context[-1] self.task.context.append( FlowStepHistory( + task_id=self.task.id, step_id=self.task.state.step_id, step_name=self.task.state.step_name, step_description=self.task.state.step_description, @@ -459,5 +515,9 @@ class MCPAgentExecutor(BaseExecutor): ) ) finally: - for client in self.mcp_client.values(): - await client.stop() + for mcp_service in self.mcp_list: + try: + await self.mcp_pool.stop(mcp_service.id, self.task.ids.user_sub) + except Exception as e: + import traceback + logger.error("[MCPAgentExecutor] 停止MCP客户端时发生错误: %s", traceback.format_exc()) diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index ced175ef9b56135caf8c67c4fa71c42406683ec6..2701477c4f212d006867cdd5447eb0a26ac7310c 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -33,25 +33,6 @@ _env = SandboxedEnvironment( class MCPHost: """MCP宿主服务""" - @staticmethod - async def get_client(user_sub, mcp_id: str) -> MCPClient | None: - """获取MCP客户端""" - mongo = MongoDB() - mcp_collection = mongo.get_collection("mcp") - - # 检查用户是否启用了这个mcp - mcp_db_result = await mcp_collection.find_one({"_id": mcp_id, "activated": user_sub}) - if not mcp_db_result: - logger.warning("用户 %s 未启用MCP %s", user_sub, mcp_id) - return None - - # 获取MCP配置 - try: - return await MCPPool().get(mcp_id, user_sub) - except KeyError: - logger.warning("用户 %s 的MCP %s 没有运行中的实例,请检查环境", user_sub, mcp_id) - return None - @staticmethod async def assemble_memory(task: Task) -> str: """组装记忆""" @@ -104,32 +85,3 @@ class MCPHost: mcp_tool.input_schema, ) return await json_generator.generate() - - async def call_tool(user_sub: str, tool: MCPTool, plan_item: MCPPlanItem) -> list[dict[str, Any]]: - """调用工具""" - # 拿到Client - client = await MCPPool().get(tool.mcp_id, user_sub) - if client is None: - err = f"[MCPHost] MCP Server不合法: {tool.mcp_id}" - logger.error(err) - raise ValueError(err) - - # 填充参数 - params = await MCPHost._fill_params(tool, plan_item.instruction) - # 调用工具 - result = await client.call_tool(tool.name, params) - # 保存记忆 - processed_result = [] - for item in result.content: - if not isinstance(item, TextContent): - logger.error("MCP结果类型不支持: %s", item) - continue - result = item.text - try: - json_result = json.loads(result) - except Exception as e: - logger.error("MCP结果解析失败: %s, 错误: %s", result, e) - continue - processed_result.append(json_result) - - return processed_result diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index 2d68dd595363764d39c916d78b8d71ece3671e54..977dfda109207f69933bf5d81fbcf7d552a19f54 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -63,7 +63,7 @@ class MCPPlanner: result, [ {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": result}, + {"role": "user", "content": "请提取下面内容中的json\n\n"+result}, ], schema, ) @@ -123,7 +123,7 @@ class MCPPlanner: async def get_replan_start_step_index( user_goal: str, error_message: str, current_plan: MCPPlan | None = None, history: str = "", - reasoning_llm: ReasoningLLM = ReasoningLLM()) -> MCPPlan: + reasoning_llm: ReasoningLLM = ReasoningLLM()) -> RestartStepIndex: """获取重新规划的步骤索引""" # 获取推理结果 template = _env.from_string(GET_REPLAN_START_STEP_INDEX) @@ -301,7 +301,7 @@ class MCPPlanner: async for chunk in resoning_llm.call( [{"role": "user", "content": prompt}], - streaming=False, + streaming=True, temperature=0.07, ): yield chunk diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index 01d5dbf915ca91e46cee08d42f6bad127e0cf210..139e8e3720be24fe56af4ff8341dd5ae9095f4fc 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -69,6 +69,7 @@ TOOL_SELECT = dedent(r""" 1. 确保充分理解当前目标,选择实现目标所需的MCP工具。 2. 请在给定的MCP工具列表中选择,不要自己生成MCP工具。 3. 可以选择一些辅助工具,但必须确保这些工具与当前目标相关。 + 4. 注意,返回的工具ID必须是MCP工具的ID,而不是名称。 必须按照以下格式生成选择结果,不要输出任何其他内容: ```json { @@ -503,7 +504,7 @@ RISK_EVALUATE = dedent(r""" ```json { "risk": "low/medium/high", - "message": "提示信息" + "reason": "提示信息" } ``` # 样例 @@ -530,7 +531,7 @@ RISK_EVALUATE = dedent(r""" ```json { "risk": "中", - "message": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。请确保在非生产环境中执行此操作。" + "reason": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。请确保在非生产环境中执行此操作。" } ``` # 工具 diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py index 933527c38bb94a05632b2586c8b49f4487bbd3fd..0ae54dca92eb98cc32ab9ffad3ef260b225ed048 100644 --- a/apps/scheduler/mcp_agent/select.py +++ b/apps/scheduler/mcp_agent/select.py @@ -5,8 +5,9 @@ import logging import random from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment -from typing import AsyncGenerator +from typing import AsyncGenerator, Any +from apps.llm.function import JsonGenerator from apps.llm.reasoning import ReasoningLLM from apps.common.lance import LanceDB from apps.common.mongo import MongoDB @@ -38,23 +39,56 @@ SUMMARIZE_TOOL_ID = "SUMMARIZE" class MCPSelector: """MCP选择器""" + @staticmethod + async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + """获取推理结果""" + # 调用推理大模型 + message = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt}, + ] + result = "" + async for chunk in resoning_llm.call( + message, + streaming=False, + temperature=0.07, + result_only=True, + ): + result += chunk + + return result + + @staticmethod + async def _parse_result(result: str, schema: dict[str, Any]) -> str: + """解析推理结果""" + json_generator = JsonGenerator( + result, + [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "请提取下面内容中的json\n\n"+result}, + ], + schema, + ) + json_result = await json_generator.generate() + return json_result @staticmethod async def select_top_tool( goal: str, tool_list: list[MCPTool], - additional_info: str | None = None, top_n: int | None = None) -> list[MCPTool]: + additional_info: str | None = None, top_n: int | None = None, + reasoning_llm: ReasoningLLM | None = None) -> list[MCPTool]: """选择最合适的工具""" random.shuffle(tool_list) - max_tokens = Config().get_config().function_call.max_tokens + max_tokens = reasoning_llm._config.max_tokens template = _env.from_string(TOOL_SELECT) - if TokenCalculator.calculate_token_length( + token_calculator = TokenCalculator() + if token_calculator.calculate_token_length( messages=[{"role": "user", "content": template.render( goal=goal, tools=[], additional_info=additional_info )}], pure_text=True) > max_tokens: logger.warning("[MCPSelector] 工具选择模板长度超过最大令牌数,无法进行选择") return [] - llm = FunctionLLM() current_index = 0 tool_ids = [] while current_index < len(tool_list): @@ -62,7 +96,7 @@ class MCPSelector: sub_tools = [] while index < len(tool_list): tool = tool_list[index] - tokens = TokenCalculator.calculate_token_length( + tokens = token_calculator.calculate_token_length( messages=[{"role": "user", "content": template.render( goal=goal, tools=[tool], additional_info=additional_info @@ -73,7 +107,7 @@ class MCPSelector: continue sub_tools.append(tool) - tokens = TokenCalculator.calculate_token_length(messages=[{"role": "user", "content": template.render( + tokens = token_calculator.calculate_token_length(messages=[{"role": "user", "content": template.render( goal=goal, tools=sub_tools, additional_info=additional_info)}, ], pure_text=True) if tokens > max_tokens: del sub_tools[-1] @@ -82,13 +116,13 @@ class MCPSelector: index += 1 current_index = index if sub_tools: - message = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": template.render(tools=sub_tools)}, - ] schema = MCPToolIdsSelectResult.model_json_schema() - schema["properties"]["tool_ids"]["enum"] = [tool.id for tool in sub_tools] - result = await llm.call(messages=message, schema=schema) + if "items" not in schema["properties"]["tool_ids"]: + schema["properties"]["tool_ids"]["items"] = {} + # 将enum添加到items中,限制数组元素的可选值 + schema["properties"]["tool_ids"]["items"]["enum"] = [tool.id for tool in sub_tools] + result = await MCPSelector.get_resoning_result(template.render(goal=goal, tools=sub_tools, additional_info="请根据目标选择对应的工具"), reasoning_llm) + result = await MCPSelector._parse_result(result, schema) try: result = MCPToolIdsSelectResult.model_validate(result) tool_ids.extend(result.tool_ids) diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py index a85395bfc7723af9d009a1945fe4617cf390b938..49407ba95358d4442c714087fd48b1aaca219f8d 100644 --- a/apps/scheduler/pool/loader/app.py +++ b/apps/scheduler/pool/loader/app.py @@ -78,6 +78,7 @@ class AppLoader: # 加载模型 try: metadata = AgentAppMetadata.model_validate(metadata) + logger.info(f"[AppLoader] Agent应用元数据验证成功: {metadata}") except Exception as e: err = "[AppLoader] Agent应用元数据验证失败" logger.exception(err) @@ -102,7 +103,6 @@ class AppLoader: await file_checker.diff_one(app_path) await self.load(app_id, file_checker.hashes[f"app/{app_id}"]) - @staticmethod async def delete(app_id: str, *, is_reload: bool = False) -> None: """ @@ -157,5 +157,7 @@ class AppLoader: }, upsert=True, ) + app_pool = await app_collection.find_one({"_id": metadata.id}) + logger.error(f"[AppLoader] 更新 MongoDB 成功: {app_pool}") except Exception: logger.exception("[AppLoader] 更新 MongoDB 失败") diff --git a/apps/scheduler/pool/loader/mcp.py b/apps/scheduler/pool/loader/mcp.py index 2be0fe5d562b0561efe3c9fafa60fa76f665b609..bf007fc862455905321d044a27dd67b12962f6b0 100644 --- a/apps/scheduler/pool/loader/mcp.py +++ b/apps/scheduler/pool/loader/mcp.py @@ -434,6 +434,19 @@ class MCPLoader(metaclass=SingletonMeta): ) ) await f.aclose() + if mcp_config.type == MCPType.STDIO: + index = None + for i in range(len(mcp_config.config.args)): + if mcp_config.config.args[i] == "--directory": + index = i + 1 + break + if index is not None: + if index < len(mcp_config.config.args): + mcp_config.config.args[index] = str(user_path)+'/project' + else: + mcp_config.config.args.append(str(user_path)+'/project') + else: + mcp_config.config.args = ["--directory", str(user_path)+'/project'] + mcp_config.config.args # 更新数据库 mongo = MongoDB() mcp_collection = mongo.get_collection("mcp") diff --git a/apps/scheduler/pool/mcp/client.py b/apps/scheduler/pool/mcp/client.py index f2fd4400affc8369fcae59c616c3c497216e6e91..0ced05e8cc68bb773cfe61eb9268a4f8baab8fa5 100644 --- a/apps/scheduler/pool/mcp/client.py +++ b/apps/scheduler/pool/mcp/client.py @@ -55,9 +55,10 @@ class MCPClient: """ # 创建Client if isinstance(config, MCPServerSSEConfig): + env = config.env or {} client = sse_client( url=config.url, - headers=config.env, + headers=env, ) elif isinstance(config, MCPServerStdioConfig): if user_sub: @@ -93,9 +94,9 @@ class MCPClient: self.ready_sign.set() self.status = MCPStatus.RUNNING - # 等待关闭信号 await self.stop_sign.wait() + logger.error("[MCPClient] MCP %s:收到停止信号,正在关闭", mcp_id) # 关闭Client try: @@ -147,5 +148,5 @@ class MCPClient: self.stop_sign.set() try: await self.task - except Exception: - logger.exception("[MCPClient] MCP %s:停止失败", self.mcp_id) + except Exception as e: + logger.warning("[MCPClient] MCP %s:停止时发生异常:%s", self.mcp_id, e) diff --git a/apps/scheduler/pool/mcp/pool.py b/apps/scheduler/pool/mcp/pool.py index 91cde4d9d6c83fd5088328e12cfbb6bf06d3c7cb..bf0320f429a9ef45864ba6548b1bb28e3d874b59 100644 --- a/apps/scheduler/pool/mcp/pool.py +++ b/apps/scheduler/pool/mcp/pool.py @@ -21,16 +21,13 @@ class MCPPool(metaclass=SingletonMeta): """初始化MCP池""" self.pool = {} - async def _init_mcp(self, mcp_id: str, user_sub: str) -> MCPClient | None: """初始化MCP池""" - mcp_math = MCP_USER_PATH / user_sub / mcp_id / "project" config_path = MCP_USER_PATH / user_sub / mcp_id / "config.json" - - if not await mcp_math.exists() or not await mcp_math.is_dir(): - logger.warning("[MCPPool] 用户 %s 的MCP %s 未激活", user_sub, mcp_id) + flag = (await config_path.exists()) + if not flag: + logger.warning("[MCPPool] 用户 %s 的MCP %s 配置文件不存在", user_sub, mcp_id) return None - config = MCPServerConfig.model_validate_json(await config_path.read_text()) if config.type in (MCPType.SSE, MCPType.STDIO): @@ -40,9 +37,11 @@ class MCPPool(metaclass=SingletonMeta): return None await client.init(user_sub, mcp_id, config.config) + if user_sub not in self.pool: + self.pool[user_sub] = {} + self.pool[user_sub][mcp_id] = client return client - async def _get_from_dict(self, mcp_id: str, user_sub: str) -> MCPClient | None: """从字典中获取MCP客户端""" if user_sub not in self.pool: @@ -53,7 +52,6 @@ class MCPPool(metaclass=SingletonMeta): return self.pool[user_sub][mcp_id] - async def _validate_user(self, mcp_id: str, user_sub: str) -> bool: """验证用户是否已激活""" mongo = MongoDB() @@ -61,7 +59,6 @@ class MCPPool(metaclass=SingletonMeta): mcp_db_result = await mcp_collection.find_one({"_id": mcp_id, "activated": user_sub}) return mcp_db_result is not None - async def get(self, mcp_id: str, user_sub: str) -> MCPClient | None: """获取MCP客户端""" item = await self._get_from_dict(mcp_id, user_sub) @@ -83,7 +80,6 @@ class MCPPool(metaclass=SingletonMeta): return item - async def stop(self, mcp_id: str, user_sub: str) -> None: """停止MCP客户端""" await self.pool[user_sub][mcp_id].stop() diff --git a/apps/scheduler/scheduler/context.py b/apps/scheduler/scheduler/context.py index dc35d4bd05d564bc96f50aaca537ded94caefae7..6e737314822c7d9c5a9671c8670bd5a38db40e3d 100644 --- a/apps/scheduler/scheduler/context.py +++ b/apps/scheduler/scheduler/context.py @@ -217,7 +217,6 @@ async def save_data(task: Task, user_sub: str, post_body: RequestData) -> None: if post_body.app and post_body.app.app_id: # 更新最近使用的应用 await AppCenterManager.update_recent_app(user_sub, post_body.app.app_id) - # 若状态为成功,删除Task if not task.state or task.state.flow_status == StepStatus.SUCCESS or task.state.flow_status == StepStatus.ERROR or task.state.flow_status == StepStatus.CANCELLED: await TaskManager.delete_task_by_task_id(task.id) diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 7731dd85df82066aaedc9f02c718cfcf93306d4a..a5e14b7fadcb023a60106e554ae9b81710e7d2d1 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -126,7 +126,12 @@ class Scheduler: # 创建用于通信的事件 kill_event = asyncio.Event() monitor = asyncio.create_task(self._monitor_activity(kill_event, self.task.ids.user_sub)) - if not self.post_body.app or self.post_body.app.app_id == "": + rag_method = True + if self.post_body.app and self.post_body.app.app_id: + rag_method = False + if self.task.state.app_id: + rag_method = False + if rag_method: llm = await self.get_llm_use_in_chat_with_rag() kb_ids = await self.get_kb_ids_use_in_chat_with_rag() self.task = await push_init_message(self.task, self.queue, 3, is_flow=False) @@ -232,7 +237,7 @@ class Scheduler: max_tokens=llm.max_tokens, ) ) - if background.conversation: + if background.conversation and self.task.state.flow_status == FlowStatus.INIT: try: question_obj = QuestionRewrite() post_body.question = await question_obj.generate(history=background.conversation, question=post_body.question, llm=reasion_llm) diff --git a/apps/schemas/appcenter.py b/apps/schemas/appcenter.py index 773e41cd1a9b801a0c39681e2f3a84d49eebb262..e3bb896eba2361e0c28ee914b84f387d7da76b87 100644 --- a/apps/schemas/appcenter.py +++ b/apps/schemas/appcenter.py @@ -50,14 +50,6 @@ class AppFlowInfo(BaseModel): debug: bool = Field(default=False, description="是否经过调试") -class AppMcpServiceInfo(BaseModel): - """应用关联的MCP服务信息""" - - id: str = Field(..., description="MCP服务ID") - name: str = Field(default="", description="MCP服务名称") - description: str = Field(default="", description="MCP服务简介") - - class AppData(BaseModel): """应用信息数据结构""" diff --git a/apps/schemas/pool.py b/apps/schemas/pool.py index 7df6dab8d98c6e1098271ce2adb93a5045de5fba..3532b6e2ffd31739e25e53414245b5d54fd395d0 100644 --- a/apps/schemas/pool.py +++ b/apps/schemas/pool.py @@ -109,7 +109,7 @@ class AppPool(BaseData): permission: Permission = Field(description="应用权限配置", default=Permission()) flows: list[AppFlow] = Field(description="Flow列表", default=[]) hashes: dict[str, str] = Field(description="关联文件的hash值", default={}) - mcp_service: list[str] = Field(default=[], alias="mcpService", description="MCP服务id列表") + mcp_service: list[str] = Field(default=[], description="MCP服务id列表") llm_id: str = Field( - default="empty", alias="llmId", description="应用使用的大模型ID(如果有的话)" + default="empty", description="应用使用的大模型ID(如果有的话)" ) diff --git a/apps/schemas/task.py b/apps/schemas/task.py index 336bfedc508e7558e92c73fac55e9e44c8092f3f..197f439d3bc16fb1663388dd92489c512adfacbd 100644 --- a/apps/schemas/task.py +++ b/apps/schemas/task.py @@ -98,6 +98,7 @@ class Task(BaseModel): state: ExecutorState = Field(description="Flow的状态", default=ExecutorState()) tokens: TaskTokens = Field(description="Token信息") runtime: TaskRuntime = Field(description="任务运行时数据") + language: str = Field(description="语言", default="zh") created_at: float = Field(default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3)) diff --git a/apps/services/appcenter.py b/apps/services/appcenter.py index 6b6e31714f2c0e05ed17eb2f52d81bb328415045..4e0d00d06254973d284b149faaa632d969700981 100644 --- a/apps/services/appcenter.py +++ b/apps/services/appcenter.py @@ -83,6 +83,7 @@ class AppCenterManager: if app_type is not None: filters["app_type"] = app_type.value # 获取应用列表 + logger.error(f"[AppCenterManager] 搜索条件: {filters}, 页码: {page}, 每页大小: {SERVICE_PAGE_SIZE}") apps, total_apps = await AppCenterManager._search_apps_by_filter(filters, page, SERVICE_PAGE_SIZE) # 构建返回的应用卡片列表 @@ -406,7 +407,6 @@ class AppCenterManager: "name": source.name, "description": source.description, "history_len": source.history_len, - "llm_id": source.llm_id, } @staticmethod @@ -485,10 +485,10 @@ class AppCenterManager: # 处理llm_id字段 if data is not None and hasattr(data, "llm") and data.llm: # 创建应用场景,验证传入的 llm_id 状态 (create_app) - metadata.llm_id = data.llm.llm_id if data.llm.llm_id else "empty" + metadata.llm_id = data.llm if data.llm else "empty" elif data is not None and hasattr(data, "llm_id"): # 更新应用场景,使用 data 中的 llm_id (update_app) - metadata.llm_id = data.llm_id if data.llm_id else "empty" + metadata.llm_id = data.llm if data.llm else "empty" elif app_data is not None and hasattr(app_data, "llm_id"): # 更新应用发布状态场景,使用 app_data 中的 llm_id (update_app_publish_status) metadata.llm_id = app_data.llm_id if app_data.llm_id else "empty" diff --git a/apps/services/mcp_service.py b/apps/services/mcp_service.py index d15ca7a480a4d6bc29b834dbaa065ff12482574b..434c875036675e63e7e4994d41bca210abe94a41 100644 --- a/apps/services/mcp_service.py +++ b/apps/services/mcp_service.py @@ -98,7 +98,7 @@ class MCPServiceManager: else: filters["activated"] = {"$nin": [user_sub]} if not is_installed: - user_info = await UserManager.get_user_info(user_sub) + user_info = await UserManager.get_userinfo_by_user_sub(user_sub) if not user_info.is_admin: filters["status"] = MCPInstallStatus.READY.value else: @@ -247,13 +247,20 @@ class MCPServiceManager: # 保存并载入配置 logger.info("[MCPServiceManager] 创建mcp:%s", mcp_server.name) mcp_path = MCP_PATH / "template" / mcp_id / "project" - index = None - for i in range(len(config.args)): - if not config.args[i].startswith("-"): - index = i + if isinstance(config, MCPServerStdioConfig): + index = None + for i in range(len(config.args)): + if not config.args[i] == "--directory": + continue + index = i + 1 break - if index is not None: - config.args[index] = str(mcp_path) + if index is not None: + if index >= len(config.args): + config.args.append(str(mcp_path)) + else: + config.args[index+1] = str(mcp_path) + else: + config.args += ["--directory", str(mcp_path)] await MCPLoader._insert_template_db(mcp_id=mcp_id, config=mcp_server) await MCPLoader.save_one(mcp_id, mcp_server) await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.INIT) diff --git a/apps/services/task.py b/apps/services/task.py index eec4e197afeb8b82c1ad90effe7be8ec961d7039..ad20f51f8ce38bf0d784e47bf10c6e237e9ec3a0 100644 --- a/apps/services/task.py +++ b/apps/services/task.py @@ -94,19 +94,18 @@ class TaskManager: return flow_context_list @staticmethod - async def get_context_by_task_id(task_id: str, length: int = 0) -> list[FlowStepHistory]: + async def get_context_by_task_id(task_id: str, length: int | None = None) -> list[FlowStepHistory]: """根据task_id获取flow信息""" flow_context_collection = MongoDB().get_collection("flow_context") flow_context = [] try: - async for history in flow_context_collection.find( - {"task_id": task_id}, - ).sort( - "created_at", -1, - ).limit(length): - for i in range(len(flow_context)): - flow_context.append(FlowStepHistory.model_validate(history)) + if length is None: + async for context in flow_context_collection.find({"task_id": task_id}): + flow_context.append(FlowStepHistory.model_validate(context)) + else: + async for context in flow_context_collection.find({"task_id": task_id}).limit(length): + flow_context.append(FlowStepHistory.model_validate(context)) except Exception: logger.exception("[TaskManager] 获取task_id的flow信息失败") return []