diff --git a/apps/common/oidc_provider/openeuler.py b/apps/common/oidc_provider/openeuler.py index 86bdf372bf4a73d01518e46aa48cc8826636d5a6..789fd5d97d96524ea289091b98347b6ce5613693 100644 --- a/apps/common/oidc_provider/openeuler.py +++ b/apps/common/oidc_provider/openeuler.py @@ -86,10 +86,6 @@ class OpenEulerOIDCProvider(OIDCProviderBase): logger.info("[OpenEuler] 获取OIDC用户成功: %s", resp.text) result = resp.json() - if not result["phone_number_verified"]: - err = "Could not validate credentials." - raise RuntimeError(err) - return { "user_sub": result["sub"], "user_name": result.get("name", result.get("preferred_username", result.get("nickname", ""))), diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index ca2d93d5d0e11f69713a4130262de08d8191fd20..fad287f53aaa41e21e1832610e7d6a2ed5873c96 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -41,10 +41,14 @@ router = APIRouter( async def get_applications( # noqa: PLR0913 user_sub: Annotated[str, Depends(get_user)], *, - my_app: Annotated[bool, Query(..., alias="createdByMe", description="筛选我创建的")] = False, - my_fav: Annotated[bool, Query(..., alias="favorited", description="筛选我收藏的")] = False, - keyword: Annotated[str | None, Query(..., alias="keyword", description="搜索关键字")] = None, - app_type: Annotated[AppType | None, Query(..., alias="appType", description="应用类型")] = None, + my_app: Annotated[bool, Query(..., alias="createdByMe", + description="筛选我创建的")] = False, + my_fav: Annotated[bool, + Query(..., alias="favorited", description="筛选我收藏的")] = False, + keyword: Annotated[str | None, + Query(..., alias="keyword", description="搜索关键字")] = None, + app_type: Annotated[AppType | None, + Query(..., alias="appType", description="应用类型")] = None, page: Annotated[int, Query(..., alias="page", ge=1, description="页码")] = 1, ) -> JSONResponse: """获取应用列表""" @@ -58,7 +62,8 @@ async def get_applications( # noqa: PLR0913 ).model_dump(exclude_none=True, by_alias=True), ) try: - filter_type = AppFilterType.USER if my_app else (AppFilterType.FAVORITE if my_fav else AppFilterType.ALL) + filter_type = AppFilterType.USER if my_app else ( + AppFilterType.FAVORITE if my_fav else AppFilterType.ALL) app_cards, total_apps = await AppCenterManager.fetch_apps( user_sub, keyword, @@ -230,7 +235,7 @@ async def get_application( if app_data.llm_id == "empty": llm_item = LLMIteam() else: - llm_collection = await LLMManager.get_llm_by_id(user_sub, app_data.llm_id) + llm_collection = await LLMManager.get_llm_by_id(app_data.llm_id) llm_item = LLMIteam( llmId=llm_collection.id, modelName=llm_collection.model_name, diff --git a/apps/routers/mcp_service.py b/apps/routers/mcp_service.py index b238a5cda115e880d75ea91f6aa90a34a0360815..9317dbf46d755ca38a4c57e2c1a1e2cb03c6a021 100644 --- a/apps/routers/mcp_service.py +++ b/apps/routers/mcp_service.py @@ -172,9 +172,6 @@ async def get_service_detail( edit: Annotated[bool, Query(..., description="是否为编辑模式")] = False, ) -> JSONResponse: """获取MCP服务详情""" - # 检查用户权限 - if edit: - await _check_user_admin(user_sub) # 获取MCP服务详情 try: diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index c34ad1eb3e9647bc706b759f996f9468eafda90a..7ae534a905d7fb7673c45437f55dc71d2c4d2736 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -1,6 +1,6 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP Agent执行器""" -from datetime import datetime,UTC +from datetime import datetime, UTC import logging import uuid @@ -39,7 +39,7 @@ class MCPAgentExecutor(BaseExecutor): agent_id: str = Field(default="", description="Agent ID") agent_description: str = Field(default="", description="Agent描述") mcp_list: list[MCPCollection] = Field(description="MCP服务器列表", default=[]) - mcp_pool: MCPPool = Field(description="MCP池", default_factory=MCPPool) + mcp_pool: MCPPool = Field(description="MCP池", default=MCPPool()) tools: dict[str, MCPTool] = Field( description="MCP工具列表,key为tool_id", default={}, @@ -54,9 +54,10 @@ class MCPAgentExecutor(BaseExecutor): alias="params", ) resoning_llm: ReasoningLLM = Field( - default_factory=ReasoningLLM, + default=ReasoningLLM(), description="推理大模型", ) + app_owner: str = Field(default="", description="应用所有者") async def update_tokens(self) -> None: """更新令牌数""" @@ -79,16 +80,12 @@ class MCPAgentExecutor(BaseExecutor): mcp_ids = app.mcp_service for mcp_id in mcp_ids: mcp_service = await MCPServiceManager.get_mcp_service(mcp_id) - if self.task.ids.user_sub not in mcp_service.activated: - logger.warning( - "[MCPAgentExecutor] 用户 %s 未启用MCP %s", - self.task.ids.user_sub, - mcp_id, - ) + if self.app_owner not in mcp_service.activated: + logger.warning("[MCPAgentExecutor] MCP服务 %s 未被应用所有者 %s 激活,跳过", + mcp_service.name, self.app_owner) continue - self.mcp_list.append(mcp_service) - await self.mcp_pool._init_mcp(mcp_id, self.task.ids.user_sub) + await self.mcp_pool._init_mcp(mcp_id, self.app_owner) for tool in mcp_service.tools: self.tools[tool.id] = tool self.tool_list.extend(mcp_service.tools) @@ -97,7 +94,8 @@ class MCPAgentExecutor(BaseExecutor): id=FINAL_TOOL_ID, name="Final Tool", description="结束流程的工具", mcp_id="", input_schema={}, ) self.tool_list.append( - MCPTool(id=FINAL_TOOL_ID, name="Final Tool", description="结束流程的工具", mcp_id="", input_schema={}), + MCPTool(id=FINAL_TOOL_ID, name="Final Tool", + description="结束流程的工具", mcp_id="", input_schema={}), ) self.tools[SELF_DESC_TOOL_ID] = MCPTool( id=SELF_DESC_TOOL_ID, @@ -179,7 +177,8 @@ class MCPAgentExecutor(BaseExecutor): ) await self.update_tokens() await self.push_message( - EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(exclude_none=True, by_alias=True), + EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( + exclude_none=True, by_alias=True), ) await self.push_message(EventType.FLOW_STOP, {}) self.task.state.flow_status = FlowStatus.WAITING @@ -196,7 +195,8 @@ class MCPAgentExecutor(BaseExecutor): flow_status=self.task.state.flow_status, input_data={}, output_data={}, - ex_data=confirm_message.model_dump(exclude_none=True, by_alias=True), + ex_data=confirm_message.model_dump( + exclude_none=True, by_alias=True), ) ) @@ -217,17 +217,19 @@ class MCPAgentExecutor(BaseExecutor): } result_exchange = False else: - mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub)) + mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.app_owner)) output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input) except anyio.ClosedResourceError: - logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcp_id) - await self.mcp_pool.stop(mcp_tool.mcp_id, self.task.ids.user_sub) - await self.mcp_pool._init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub) + logger.exception( + "[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcp_id) + await self.mcp_pool.stop(mcp_tool.mcp_id, self.app_owner) + await self.mcp_pool._init_mcp(mcp_tool.mcp_id, self.app_owner) self.task.state.step_status = StepStatus.ERROR return except Exception as e: import traceback - logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误: %s", mcp_tool.name, traceback.format_exc()) + logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误: %s", + mcp_tool.name, traceback.format_exc()) self.task.state.step_status = StepStatus.ERROR self.task.state.error_message = str(e) return @@ -286,7 +288,8 @@ class MCPAgentExecutor(BaseExecutor): language=self.task.language, ) await self.push_message( - EventType.STEP_WAITING_FOR_PARAM, data={"message": error_message, "params": params_with_null} + EventType.STEP_WAITING_FOR_PARAM, data={ + "message": error_message, "params": params_with_null} ) await self.push_message(EventType.FLOW_STOP, data={}) self.task.state.flow_status = FlowStatus.WAITING @@ -312,7 +315,7 @@ class MCPAgentExecutor(BaseExecutor): async def get_next_step(self) -> None: """获取下一步""" - self.task.tokens.time=datetime.now(UTC).timestamp() + self.task.tokens.time = datetime.now(UTC).timestamp() self.task.state.retry_times = 0 if self.task.state.step_cnt < self.max_steps: self.task.state.step_cnt += 1 @@ -325,7 +328,8 @@ class MCPAgentExecutor(BaseExecutor): if step.tool_id in self.tools.keys(): break except Exception as e: - logger.warning("[MCPAgentExecutor] 获取下一步失败,重试中: %s", str(e)) + logger.warning( + "[MCPAgentExecutor] 获取下一步失败,重试中: %s", str(e)) if step is None or step.tool_id not in self.tools.keys(): step = Step( tool_id=FINAL_TOOL_ID, @@ -519,6 +523,7 @@ class MCPAgentExecutor(BaseExecutor): async def run(self) -> None: """执行MCP Agent的主逻辑""" # 初始化MCP服务 + self.app_owner = (await AppCenterManager.fetch_app_data_by_id(self.agent_id)).author await self.load_state() await self.load_mcp() data = {} @@ -534,7 +539,8 @@ class MCPAgentExecutor(BaseExecutor): ) user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) if user_info.auto_execute: - data = flow_risk.model_dump(exclude_none=True, by_alias=True) + data = flow_risk.model_dump( + exclude_none=True, by_alias=True) await TaskManager.save_task(self.task.id, self.task) await self.get_next_step() except Exception as e: @@ -608,7 +614,8 @@ class MCPAgentExecutor(BaseExecutor): finally: for mcp_service in self.mcp_list: try: - await self.mcp_pool.stop(mcp_service.id, self.task.ids.user_sub) + await self.mcp_pool.stop(mcp_service.id, self.app_owner) except Exception as e: import traceback - logger.error("[MCPAgentExecutor] 停止MCP客户端时发生错误: %s", traceback.format_exc()) \ No newline at end of file + logger.error( + "[MCPAgentExecutor] 停止MCP客户端时发生错误: %s", traceback.format_exc()) diff --git a/apps/scheduler/pool/loader/openapi.py b/apps/scheduler/pool/loader/openapi.py index 98801c96dfd291928729f152fec1f33f7027a78d..99cf6b5201185d8919028ef41fadf977201b72fa 100644 --- a/apps/scheduler/pool/loader/openapi.py +++ b/apps/scheduler/pool/loader/openapi.py @@ -134,7 +134,7 @@ class OpenAPILoader: nodes = [] for api_endpoint in spec.endpoints: # 通过算法生成唯一的标识符 - identifier = shake_128(f"openapi::{yaml_filename}::{api_endpoint.uri}".encode()).hexdigest(16) + identifier = shake_128(f"{service_id}/{yaml_filename}/{api_endpoint.name}".encode()).hexdigest(16) # 组装新的NodePool item node = APINode( _id=identifier, diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index ac27e5f709186958ce0c58e3e923f9c27e65a6ad..c4bb6749cdb82e506ac6218b213f2f5c5548ac0a 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -59,7 +59,8 @@ class Scheduler: # 检查用户活动状态 is_active = await Activity.is_active(self.task.ids.active_id) if not is_active: - logger.warning("[Scheduler] 用户 %s 不活跃,终止工作流", self.task.ids.user_sub) + logger.warning("[Scheduler] 用户 %s 不活跃,终止工作流", + self.task.ids.user_sub) kill_event.set() break @@ -71,7 +72,7 @@ class Scheduler: logger.error(f"[Scheduler] 活动监控过程中发生错误: {e}") async def get_llm_use_in_chat_with_rag(self) -> LLM: - """获取RAG大模型""" + """获取RAG大模型""" try: # 获取当前会话使用的大模型 llm_id = await LLMManager.get_llm_id_by_conversation_id( @@ -91,7 +92,7 @@ class Scheduler: ) return llm else: - llm = await LLMManager.get_llm_by_id(self.task.ids.user_sub, llm_id) + llm = await LLMManager.get_llm_by_user_sub_and_id(self.task.ids.user_sub, llm_id) if not llm: logger.error("[Scheduler] 获取大模型失败") return None @@ -152,7 +153,8 @@ class Scheduler: # 查找对应的App元数据 app_data = await AppCenterManager.fetch_app_data_by_id(self.post_body.app.app_id) if not app_data: - logger.error("[Scheduler] App %s 不存在", self.post_body.app.app_id) + logger.error("[Scheduler] App %s 不存在", + self.post_body.app.app_id) await self.queue.close() return @@ -173,7 +175,8 @@ class Scheduler: ) # 启动监控任务和主任务 - main_task = asyncio.create_task(self.run_executor(self.queue, self.post_body, executor_background)) + main_task = asyncio.create_task(self.run_executor( + self.queue, self.post_body, executor_background)) # 等待任一任务完成 done, pending = await asyncio.wait( [main_task, monitor], @@ -184,7 +187,8 @@ class Scheduler: if kill_event.is_set(): logger.warning("[Scheduler] 用户活动状态检测不活跃,正在终止工作流执行...") main_task.cancel() - need_change_cancel_flow_state = [FlowStatus.RUNNING, FlowStatus.WAITING] + need_change_cancel_flow_state = [ + FlowStatus.RUNNING, FlowStatus.WAITING] if self.task.state.flow_status in need_change_cancel_flow_state: self.task.state.flow_status = FlowStatus.CANCELLED try: @@ -225,9 +229,7 @@ class Scheduler: max_tokens=Config().get_config().llm.max_tokens, ) else: - llm = await LLMManager.get_llm_by_id( - self.task.ids.user_sub, app_metadata.llm_id, - ) + llm = await LLMManager.get_llm_by_id(app_metadata.llm_id) if not llm: logger.error("[Scheduler] 获取大模型失败") await self.queue.close() @@ -268,7 +270,8 @@ class Scheduler: else: # 如果用户没有选特定的Flow,则根据语义选择一个Flow logger.info("[Scheduler] 选择最合适的流") - flow_chooser = FlowChooser(self.task, post_body.question, app_info) + flow_chooser = FlowChooser( + self.task, post_body.question, app_info) flow_id = await flow_chooser.get_top_flow() self.task = flow_chooser.task logger.info("[Scheduler] 获取工作流定义") @@ -317,4 +320,4 @@ class Scheduler: else: logger.error("[Scheduler] 无效的应用类型") - return \ No newline at end of file + return diff --git a/apps/services/conversation.py b/apps/services/conversation.py index ee15e389995f5859871d08e5ea68ce515786ba10..093a6fabdba0dd92afcfd66303f650f5c45131ca 100644 --- a/apps/services/conversation.py +++ b/apps/services/conversation.py @@ -40,7 +40,7 @@ class ConversationManager: @staticmethod async def add_conversation_by_user_sub( - title: str, + title: str, user_sub: str, app_id: str, llm_id: str, kb_ids: list[str], *, debug: bool) -> Conversation | None: """通过用户ID新建对话""" @@ -51,7 +51,7 @@ class ConversationManager: icon=llm_provider_dict["ollama"]["icon"], ) else: - llm = await LLMManager.get_llm_by_id(user_sub, llm_id) + llm = await LLMManager.get_llm_by_user_sub_and_id(user_sub, llm_id) if llm is None: logger.error("[ConversationManager] 获取大模型失败") return None @@ -131,13 +131,13 @@ class ConversationManager: # 🔑 修正:获取所有需要清理的文件ID files_to_cleanup = [] - + # 1. 获取未转为历史记录的文件(unused_docs) conversation_data = await conv_collection.find_one({"_id": conversation_id, "user_sub": user_sub}) if conversation_data: unused_docs = conversation_data.get("unused_docs", []) files_to_cleanup.extend(unused_docs) - + # 2. 获取历史记录中绑定的文件 async for record_group in record_group_collection.find({"conversation_id": conversation_id}): docs = record_group.get("docs", []) @@ -165,7 +165,8 @@ class ConversationManager: # 去重文件ID unique_file_ids = list(set(files_to_cleanup)) await DocumentManager.delete_document(user_sub, unique_file_ids) - logger.info(f"已清理对话 {conversation_id} 的 {len(unique_file_ids)} 个文件") + logger.info( + f"已清理对话 {conversation_id} 的 {len(unique_file_ids)} 个文件") except Exception as e: logger.error(f"清理对话文件失败: {e}") @@ -173,12 +174,12 @@ class ConversationManager: try: from apps.scheduler.variable.pool_manager import get_pool_manager pool_manager = await get_pool_manager() - + # 获取对话变量池中的文件变量,清理其引用的文件 conversation_pool = await pool_manager.get_conversation_pool(conversation_id) if conversation_pool: await _cleanup_transient_file_variables_in_pool(conversation_pool, user_sub, files_to_cleanup) - + # 移除对话变量池 await pool_manager.remove_conversation_pool(conversation_id) logger.info(f"已清理对话变量池: {conversation_id}") @@ -193,10 +194,10 @@ async def _cleanup_transient_file_variables_in_pool(pool, user_sub: str, already try: from apps.scheduler.variable.type import VariableType from apps.services.document import DocumentManager - + variables = await pool.list_variables() file_ids_to_cleanup = [] - + for variable in variables: if variable.metadata.var_type in [VariableType.FILE, VariableType.ARRAY_FILE]: if isinstance(variable.value, dict): @@ -209,12 +210,12 @@ async def _cleanup_transient_file_variables_in_pool(pool, user_sub: str, already for file_id in file_ids: if file_id not in already_cleaned_files: file_ids_to_cleanup.append(file_id) - + # 批量删除文件(排除重复) unique_file_ids = list(set(file_ids_to_cleanup)) if unique_file_ids: await DocumentManager.delete_document(user_sub, unique_file_ids) logger.info(f"已清理变量池中的 {len(unique_file_ids)} 个暂态文件") - + except Exception as e: logger.error(f"清理变量池暂态文件失败: {e}") diff --git a/apps/services/llm.py b/apps/services/llm.py index 1ce87bdfa9952b184c072368329add0a4e96a9f5..fb2d2de17db5d476b1979bdd47bef48cdd9e2e00 100644 --- a/apps/services/llm.py +++ b/apps/services/llm.py @@ -53,7 +53,23 @@ class LLMManager: return result.get("llm", {}).get("llm_id", "") @staticmethod - async def get_llm_by_id(user_sub: str, llm_id: str) -> LLM: + async def get_llm_by_id(llm_id: str) -> LLM: + """ + 通过ID获取大模型 + + :param llm_id: 大模型ID + :return: 大模型对象 + """ + llm_collection = MongoDB().get_collection("llm") + result = await llm_collection.find_one({"_id": llm_id}) + if not result: + err = f"[LLMManager] LLM {llm_id} 不存在" + logger.error(err) + raise ValueError(err) + return LLM.model_validate(result) + + @staticmethod + async def get_llm_by_user_sub_and_id(user_sub: str, llm_id: str) -> LLM: """ 通过ID获取大模型