diff --git a/apps/llm/embedding.py b/apps/llm/embedding.py index da1fad1a358ce0759f1c9911ff47f70f90be7243..298cc9ffcb3051340d612c7dbaeb8cec4cd80d55 100644 --- a/apps/llm/embedding.py +++ b/apps/llm/embedding.py @@ -82,17 +82,13 @@ class Embedding: :return: 文本对应的向量(顺序与text一致,也为List) """ try: - if Config().get_config().embedding.type == "openai": + if config.embedding.type == "openai": return await cls._get_openai_embedding(text) - if Config().get_config().embedding.type == "mindie": + if config.embedding.type == "mindie": return await cls._get_tei_embedding(text) - err = f"不支持的Embedding API类型: {Config().get_config().embedding.type}" - raise ValueError(err) - except Exception as e: - err = f"获取Embedding失败: {e}" - logger.error(err) - rt = [] - for i in range(len(text)): - rt.append([0.0]*1024) - return rt + logger.error("不支持的Embedding API类型: %s", config.embedding.type) + return [[0.0] * 1024 for _ in range(len(text))] + except Exception: + logger.exception("获取Embedding失败") + return [[0.0] * 1024 for _ in range(len(text))] diff --git a/apps/llm/function.py b/apps/llm/function.py index 10cd92571f6911d843efc63661af8ad42484821b..cd000c665aa2f03a03fa2fa43eb33daec8b98e76 100644 --- a/apps/llm/function.py +++ b/apps/llm/function.py @@ -7,6 +7,8 @@ import re from textwrap import dedent from typing import Any +import ollama +import openai from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from jsonschema import Draft7Validator @@ -45,29 +47,22 @@ class FunctionLLM: "messages": [], } - if self._config.backend == "ollama": - import ollama - - if not self._config.api_key: - self._client = ollama.AsyncClient(host=self._config.endpoint) - else: - self._client = ollama.AsyncClient( - host=self._config.endpoint, - headers={ - "Authorization": f"Bearer {self._config.api_key}", - }, - ) - - else: - import openai - - if not self._config.api_key: - self._client = openai.AsyncOpenAI(base_url=self._config.endpoint) - else: - self._client = openai.AsyncOpenAI( - base_url=self._config.endpoint, - api_key=self._config.api_key, - ) + if self._config.backend == "ollama" and not self._config.api_key: + self._client = ollama.AsyncClient(host=self._config.endpoint) + elif self._config.backend == "ollama" and self._config.api_key: + self._client = ollama.AsyncClient( + host=self._config.endpoint, + headers={ + "Authorization": f"Bearer {self._config.api_key}", + }, + ) + elif self._config.backend != "ollama" and not self._config.api_key: + self._client = openai.AsyncOpenAI(base_url=self._config.endpoint) + elif self._config.backend != "ollama" and self._config.api_key: + self._client = openai.AsyncOpenAI( + base_url=self._config.endpoint, + api_key=self._config.api_key, + ) async def _call_openai( diff --git a/apps/llm/token.py b/apps/llm/token.py index bf749ac1e684ad9b3d3c5e883171fcf1a3252dd7..b76cd744683e076a59a790057c9d2d77eca50b3b 100644 --- a/apps/llm/token.py +++ b/apps/llm/token.py @@ -3,6 +3,8 @@ import logging +import tiktoken + from apps.common.singleton import SingletonMeta logger = logging.getLogger(__name__) @@ -13,7 +15,6 @@ class TokenCalculator(metaclass=SingletonMeta): def __init__(self) -> None: """初始化Tokenizer""" - import tiktoken self._encoder = tiktoken.get_encoding("cl100k_base") diff --git a/apps/models/task.py b/apps/models/task.py index a1b71f65d5e8671dc7ad4eec54c98825fd38e40c..930a141b74b18f667ca82e2fac577a433b1b374d 100644 --- a/apps/models/task.py +++ b/apps/models/task.py @@ -8,7 +8,7 @@ from sqlalchemy import DateTime, Enum, Float, ForeignKey, Integer, String, Text from sqlalchemy.dialects.postgresql import JSONB, UUID from sqlalchemy.orm import Mapped, mapped_column -from apps.schemas.enum_var import FlowStatus, StepStatus +from apps.schemas.enum_var import FlowStatus, Language, StepStatus from .base import Base @@ -63,6 +63,8 @@ class TaskRuntime(Base): """计划""" document: Mapped[list[dict[str, Any]]] = mapped_column(JSONB, nullable=False, default={}) """关联文档""" + language: Mapped[Language] = mapped_column(Enum(Language), nullable=False, default=Language.ZH) + """语言""" class ExecutorCheckpoint(Base): diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 860e71f2b61713e6f296051c82959a15f0333571..31b381dd0edba4ea7ec62f4cfa0a3492b5256039 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -14,7 +14,7 @@ from apps.dependency import verify_personal_token, verify_session from apps.scheduler.scheduler import Scheduler from apps.scheduler.scheduler.context import save_data from apps.schemas.enum_var import FlowStatus -from apps.schemas.request_data import RequestData +from apps.schemas.request_data import RequestData, RequestDataApp from apps.schemas.response_data import ResponseData from apps.schemas.task import Task from apps.services.activity import Activity @@ -51,12 +51,16 @@ async def init_task(post_body: RequestData, user_sub: str, session_id: str) -> T await RecordManager.update_record_flow_status_to_cancelled_by_task_ids(task_ids) task = await TaskManager.init_new_task(user_sub=user_sub, session_id=session_id, post_body=post_body) task.runtime.question = post_body.question - task.ids.group_id = post_body.group_id + task.state.app_id = post_body.app.app_id if post_body.app else "" else: if not post_body.task_id: err = "[Chat] task_id 不可为空!" raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="task_id cannot be empty") - task = await TaskManager.get_task_by_conversation_id(post_body.task_id) + task = await TaskManager.get_task_by_task_id(post_body.task_id) + post_body.app = RequestDataApp(appId=task.state.app_id) + post_body.conversation_id = task.ids.conversation_id + post_body.language = task.language + post_body.question = task.runtime.question return task @@ -133,7 +137,9 @@ async def chat(request: Request, post_body: RequestData) -> StreamingResponse: session_id = request.state.session_id user_sub = request.state.user_sub # 问题黑名单检测 - if not await QuestionBlacklistManager.check_blacklisted_questions(input_question=post_body.question): + if (post_body.question is not None) and ( + not await QuestionBlacklistManager.check_blacklisted_questions(input_question=post_body.question) + ): # 用户扣分 await UserBlacklistManager.change_blacklisted_users(user_sub, -10) raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="question is blacklisted") diff --git a/apps/routers/mcp_service.py b/apps/routers/mcp_service.py index 3007d5609e69e9e6f9c43756c366cdb2bc9a6015..c4adec4179988c9abd0a3d99e4cabada63c9f2f3 100644 --- a/apps/routers/mcp_service.py +++ b/apps/routers/mcp_service.py @@ -55,6 +55,7 @@ async def get_mcpservice_list( keyword: str | None = None, page: Annotated[int, Query(ge=1)] = 1, *, + is_installed: bool | None = None, is_active: bool | None = None, ) -> JSONResponse: """获取服务列表""" @@ -65,6 +66,7 @@ async def get_mcpservice_list( user_sub, keyword, page, + is_installed=is_installed, is_active=is_active, ) except Exception as e: @@ -134,9 +136,39 @@ async def create_or_update_mcpservice( ).model_dump(exclude_none=True, by_alias=True)) +@router.post("/{serviceId}/install") +async def install_mcp_service( + request: Request, + service_id: Annotated[str, Path(..., alias="serviceId", description="服务ID")], + *, + install: Annotated[bool, Query(..., description="是否安装")] = True, +) -> JSONResponse: + """安装MCP服务""" + try: + await MCPServiceManager.install_mcpservice(request.state.user_sub, service_id, install) + except Exception as e: + err = f"[MCPService] 安装mcp服务失败: {e!s}" if install else f"[MCPService] 卸载mcp服务失败: {e!s}" + logger.exception(err) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=ResponseData( + code=status.HTTP_500_INTERNAL_SERVER_ERROR, + message=err, + result={}, + ).model_dump(exclude_none=True, by_alias=True), + ) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=ResponseData( + code=status.HTTP_200_OK, + message="OK", + result={}, + ).model_dump(exclude_none=True, by_alias=True), + ) + + @admin_router.get("/{serviceId}", response_model=GetMCPServiceDetailRsp) async def get_service_detail( - request: Request, service_id: Annotated[str, Path(..., alias="serviceId", description="服务ID")], *, edit: Annotated[bool, Query(..., description="是否为编辑模式")] = False, @@ -194,16 +226,6 @@ async def get_service_detail( ) -@admin_router.get("/{serviceId}", response_model=GetMCPServiceDetailRsp) -async def get_service_detail(serviceId: Annotated[str, Path()]) -> JSONResponse: # noqa: N803 - """获取MCP服务详情""" - try: - data = await MCPServiceManager.get_mcp_service(serviceId) - config, icon = await MCPServiceManager.get_mcp_config(serviceId) - except Exception as e: - pass - - @admin_router.delete("/{serviceId}", response_model=DeleteMCPServiceRsp) async def delete_service(serviceId: Annotated[str, Path()]) -> JSONResponse: # noqa: N803 """删除服务""" diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index a72e3b72fe5d269fa134663198f1c70786f55209..e70b229a5fad42f4cf326a3fb0d2a1b51b522145 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -3,13 +3,11 @@ import logging import uuid -from typing import Any import anyio from mcp.types import TextContent from pydantic import Field -from apps.llm.patterns.rewrite import QuestionRewrite from apps.llm.reasoning import ReasoningLLM from apps.scheduler.executor.base import BaseExecutor from apps.scheduler.mcp_agent.host import MCPHost @@ -140,6 +138,7 @@ class MCPAgentExecutor(BaseExecutor): if is_first: # 获取第一个输入参数 tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + step = self.task.runtime.temporary_plans.plans[self.task.state.step_index] mcp_tool = self.tools[tool_id] self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task) else: @@ -152,7 +151,7 @@ class MCPAgentExecutor(BaseExecutor): params_description = "" tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool mcp_tool = self.tools[tool_id] - self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.state.current_input, self.task.state.error_message, params, params_description) + self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, step.instruction, self.task) async def reset_step_to_index(self, start_index: int) -> None: """重置步骤到开始""" @@ -274,10 +273,16 @@ class MCPAgentExecutor(BaseExecutor): self.resoning_llm, ) await self.update_tokens() + error_message = MCPPlanner.change_err_message_to_description( + error_message=self.task.state.error_message, + tool=mcp_tool, + input_params=self.task.state.current_input, + reasoning_llm=self.resoning_llm, + ) await self.push_message( EventType.STEP_WAITING_FOR_PARAM, data={ - "message": "当运行产生如下报错:\n" + self.task.state.error_message, + "message": error_message, "params": params_with_null, }, ) @@ -300,7 +305,7 @@ class MCPAgentExecutor(BaseExecutor): input_data={}, output_data={}, ex_data={ - "message": "当运行产生如下报错:\n" + self.task.state.error_message, + "message": error_message, "params": params_with_null, }, ), diff --git a/apps/scheduler/mcp/prompt.py b/apps/scheduler/mcp/prompt.py index 4270b1b339dca508d9ce27a29c2faba08ee98e61..0d7b417f5277d59321e420a3a5a92cfaa43e637e 100644 --- a/apps/scheduler/mcp/prompt.py +++ b/apps/scheduler/mcp/prompt.py @@ -174,8 +174,8 @@ FINAL_ANSWER = dedent(r""" MEMORY_TEMPLATE = dedent(r""" {% for ctx in context_list %} - 第{{ loop.index }}步:{{ ctx.step_description }} - - 调用工具 `{{ ctx.step_id }}`,并提供参数 `{{ ctx.input_data }}` + - 调用工具 `{{ ctx.step_name }}`,并提供参数 `{{ ctx.input_data | tojson }}`。 - 执行状态:{{ ctx.status }} - - 得到数据:`{{ ctx.output_data }}` + - 得到数据:`{{ ctx.output_data | tojson }}` {% endfor %} """) diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index 83740699742953d057a1399eb173b21eb6461565..c0f52c7f9f882eba313499b33e7e76d11baeec76 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -9,7 +9,6 @@ from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from mcp.types import TextContent -from apps.common.mongo import MongoDB from apps.llm.function import JsonGenerator from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE from apps.scheduler.mcp_agent.prompt import REPAIR_PARAMS @@ -20,13 +19,19 @@ from apps.schemas.mcp import MCPPlanItem, MCPTool from apps.schemas.task import FlowStepHistory, Task from apps.services.task import TaskManager -logger = logging.getLogger(__name__) +def tojson_filter(value: Any) -> str: + """将值转换为JSON字符串""" + return json.dumps(value, ensure_ascii=False, separators=(",", ":")) + + +logger = logging.getLogger(__name__) _env = SandboxedEnvironment( loader=BaseLoader, - autoescape=True, + autoescape=False, trim_blocks=True, lstrip_blocks=True, + filters={"tojson": tojson_filter}, ) @@ -36,7 +41,6 @@ class MCPHost: @staticmethod async def assemble_memory(task: Task) -> str: """组装记忆""" - return _env.from_string(MEMORY_TEMPLATE).render( context_list=task.context, ) diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index 24eb737da420539a37c6d67390d71f559d76bd51..b2f3a35e35a1d6d89d5827b0a11b594600dfb048 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -1,13 +1,16 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP 用户目标拆解与规划""" -from typing import Any, AsyncGenerator +from collections.abc import AsyncGenerator +from typing import Any from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from apps.llm.function import JsonGenerator from apps.llm.reasoning import ReasoningLLM +from apps.scheduler.mcp_agent.base import McpBase from apps.scheduler.mcp_agent.prompt import ( + CHANGE_ERROR_MESSAGE_TO_DESCRIPTION, CREATE_PLAN, EVALUATE_GOAL, FINAL_ANSWER, @@ -29,42 +32,9 @@ _env = SandboxedEnvironment( ) -class MCPPlanner: +class MCPPlanner(McpBase): """MCP 用户目标拆解与规划""" - @staticmethod - async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: - """获取推理结果""" - # 调用推理大模型 - message = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": prompt}, - ] - result = "" - async for chunk in resoning_llm.call( - message, - streaming=False, - temperature=0.07, - result_only=True, - ): - result += chunk - - return result - - @staticmethod - async def _parse_result(result: str, schema: dict[str, Any]) -> str: - """解析推理结果""" - json_generator = JsonGenerator( - result, - [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "请提取下面内容中的json\n\n" + result}, - ], - schema, - ) - json_result = await json_generator.generate() - return json_result - @staticmethod async def evaluate_goal( goal: str, @@ -261,6 +231,22 @@ class MCPPlanner: # 返回工具执行错误类型 return error_type + @staticmethod + async def change_err_message_to_description( + error_message: str, tool: MCPTool, input_params: dict[str, Any], + reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + """将错误信息转换为工具描述""" + template = _env.from_string(CHANGE_ERROR_MESSAGE_TO_DESCRIPTION) + prompt = template.render( + error_message=error_message, + tool_name=tool.name, + tool_description=tool.description, + input_schema=tool.input_schema, + input_params=input_params, + ) + result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) + return result + @staticmethod async def get_missing_param( tool: MCPTool, diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index 75ccd57ff3e5c8f5b8945c076408817d85922160..bfe8cf39ed6752d1ffca92c959a2ef3af5781c46 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -70,6 +70,8 @@ TOOL_SELECT = dedent(r""" 1. 确保充分理解当前目标,选择实现目标所需的MCP工具。 2. 请在给定的MCP工具列表中选择,不要自己生成MCP工具。 3. 可以选择一些辅助工具,但必须确保这些工具与当前目标相关。 + 4. 注意,返回的工具ID必须是MCP工具的ID,而不是名称。 + 5. 不要选择不存在的工具。 必须按照以下格式生成选择结果,不要输出任何其他内容: ```json { @@ -507,7 +509,7 @@ RISK_EVALUATE = dedent(r""" ```json { "risk": "low/medium/high", - "message": "提示信息" + "reason": "提示信息" } ``` # 样例 @@ -534,7 +536,7 @@ RISK_EVALUATE = dedent(r""" ```json { "risk": "中", - "message": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。请确保在非生产环境中执行此操作。" + "reason": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。请确保在非生产环境中执行此操作。" } ``` # 工具 @@ -617,8 +619,74 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r""" # 工具运行报错 {{error_message}} # 输出 - """, - ) + """) + +# 将当前程序运行的报错转换为自然语言 +CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r""" + 你是一个智能助手,你的任务是将当前程序运行的报错转换为自然语言描述。 + 请根据以下规则进行转换: + 1. 将报错信息转换为自然语言描述,描述应该简洁明了,能够让人理解报错的原因和影响。 + 2. 描述应该包含报错的具体内容和可能的解决方案。 + 3. 描述应该避免使用过于专业的术语,以便用户能够理解。 + 4. 描述应该尽量简短,控制在50字以内。 + 5. 只输出自然语言描述,不要输出其他内容。 + # 样例 + # 工具信息 + + port_scanner + 扫描主机端口 + + { + "type": "object", + "properties": { + "host": { + "type": "string", + "description": "主机地址" + }, + "port": { + "type": "integer", + "description": "端口号" + }, + "username": { + "type": "string", + "description": "用户名" + }, + "password": { + "type": "string", + "description": "密码" + } + }, + "required": ["host", "port", "username", "password"] + } + + + # 工具入参 + { + "host": "192.0.0.1", + "port": 3306, + "username": "root", + "password": "password" + } + # 报错信息 + 执行端口扫描命令时,出现了错误:`password is not correct`。 + # 输出 + 扫描端口时发生错误:密码不正确。请检查输入的密码是否正确,并重试。 + # 现在开始转换报错信息: + # 工具信息 + + {{tool_name}} + {{tool_description}} + + {{input_schema}} + + + # 工具入参 + {{input_params}} + # 报错信息 + {{error_message}} + # 输出 + """) + # 获取缺失的参数的json结构体 GET_MISSING_PARAMS = dedent(r""" 你是一个工具参数获取器。 diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py index f917a38ae78b0603c9b09c2d3526af637343a0ce..ce289c1f6ca502214ae5517e692da1889d5ff25e 100644 --- a/apps/scheduler/mcp_agent/select.py +++ b/apps/scheduler/mcp_agent/select.py @@ -3,20 +3,15 @@ import logging import random -from typing import AsyncGenerator from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment -from apps.common.config import Config -from apps.common.lance import LanceDB -from apps.common.mongo import MongoDB -from apps.llm.embedding import Embedding -from apps.llm.function import FunctionLLM from apps.llm.reasoning import ReasoningLLM from apps.llm.token import TokenCalculator +from apps.scheduler.mcp_agent.base import McpBase from apps.scheduler.mcp_agent.prompt import TOOL_SELECT -from apps.schemas.mcp import BaseModel, MCPCollection, MCPSelectResult, MCPTool, MCPToolIdsSelectResult +from apps.schemas.mcp import MCPCollection, MCPTool, MCPToolIdsSelectResult logger = logging.getLogger(__name__) @@ -31,25 +26,26 @@ FINAL_TOOL_ID = "FIANL" SUMMARIZE_TOOL_ID = "SUMMARIZE" -class MCPSelector: +class MCPSelector(McpBase): """MCP选择器""" @staticmethod async def select_top_tool( goal: str, tool_list: list[MCPTool], - additional_info: str | None = None, top_n: int | None = None) -> list[MCPTool]: + additional_info: str | None = None, top_n: int | None = None, + reasoning_llm: ReasoningLLM | None = None) -> list[MCPTool]: """选择最合适的工具""" random.shuffle(tool_list) - max_tokens = Config().get_config().function_call.max_tokens + max_tokens = reasoning_llm._config.max_tokens template = _env.from_string(TOOL_SELECT) - if TokenCalculator.calculate_token_length( + token_calculator = TokenCalculator() + if token_calculator.calculate_token_length( messages=[{"role": "user", "content": template.render( goal=goal, tools=[], additional_info=additional_info )}], pure_text=True) > max_tokens: logger.warning("[MCPSelector] 工具选择模板长度超过最大令牌数,无法进行选择") return [] - llm = FunctionLLM() current_index = 0 tool_ids = [] while current_index < len(tool_list): @@ -57,7 +53,7 @@ class MCPSelector: sub_tools = [] while index < len(tool_list): tool = tool_list[index] - tokens = TokenCalculator.calculate_token_length( + tokens = token_calculator.calculate_token_length( messages=[{"role": "user", "content": template.render( goal=goal, tools=[tool], additional_info=additional_info @@ -68,7 +64,7 @@ class MCPSelector: continue sub_tools.append(tool) - tokens = TokenCalculator.calculate_token_length(messages=[{"role": "user", "content": template.render( + tokens = token_calculator.calculate_token_length(messages=[{"role": "user", "content": template.render( goal=goal, tools=sub_tools, additional_info=additional_info)}, ], pure_text=True) if tokens > max_tokens: del sub_tools[-1] @@ -77,13 +73,13 @@ class MCPSelector: index += 1 current_index = index if sub_tools: - message = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": template.render(tools=sub_tools)}, - ] schema = MCPToolIdsSelectResult.model_json_schema() - schema["properties"]["tool_ids"]["enum"] = [tool.id for tool in sub_tools] - result = await llm.call(messages=message, schema=schema) + if "items" not in schema["properties"]["tool_ids"]: + schema["properties"]["tool_ids"]["items"] = {} + # 将enum添加到items中,限制数组元素的可选值 + schema["properties"]["tool_ids"]["items"]["enum"] = [tool.id for tool in sub_tools] + result = await MCPSelector.get_resoning_result(template.render(goal=goal, tools=sub_tools, additional_info="请根据目标选择对应的工具"), reasoning_llm) + result = await MCPSelector._parse_result(result, schema) try: result = MCPToolIdsSelectResult.model_validate(result) tool_ids.extend(result.tool_ids) @@ -98,4 +94,4 @@ class MCPSelector: description="终止", mcp_id=FINAL_TOOL_ID, input_schema={})) # mcp_tools.append(MCPTool(id=SUMMARIZE_TOOL_ID, name="Summarize", # description="总结工具", mcp_id=SUMMARIZE_TOOL_ID, input_schema={})) - return mcp_tools \ No newline at end of file + return mcp_tools diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py index 16cc858d2580d1836cde68b190fb856df4922c80..5d2da58339e09cdd560de1d93e5e1d53e5a94be8 100644 --- a/apps/scheduler/pool/loader/app.py +++ b/apps/scheduler/pool/loader/app.py @@ -82,6 +82,7 @@ class AppLoader: # 加载模型 try: metadata = AgentAppMetadata.model_validate(metadata) + logger.info("[AppLoader] Agent应用元数据验证成功: %s", metadata) except Exception as e: err = "[AppLoader] Agent应用元数据验证失败" logger.exception(err) diff --git a/apps/scheduler/pool/loader/mcp.py b/apps/scheduler/pool/loader/mcp.py index 16b1f6669a5a9781e07007c570324803e62d2740..e97ef4a02657c4d67ba098910842cc02873a59ce 100644 --- a/apps/scheduler/pool/loader/mcp.py +++ b/apps/scheduler/pool/loader/mcp.py @@ -85,37 +85,61 @@ class MCPLoader(metaclass=SingletonMeta): mcp_id = next(iter(item.mcpServers.keys())) mcp_config = item.mcpServers[mcp_id] - if not mcp_config.autoInstall: - print(f"[Installer] MCP模板无需安装: {mcp_id}") # noqa: T201 - - elif isinstance(mcp_config, MCPServerStdioConfig): - print(f"[Installer] Stdio方式的MCP模板,开始自动安装: {mcp_id}") # noqa: T201 - if "uv" in mcp_config.command: - new_config = await install_uvx(mcp_id, mcp_config) - elif "npx" in mcp_config.command: - new_config = await install_npx(mcp_id, mcp_config) - - if new_config is None: - logger.error("[MCPLoader] MCP模板安装失败: %s", mcp_id) - await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.FAILED) - return - - item.mcpServers[mcp_id] = new_config - - # 重新保存config - template_config = MCP_PATH / "template" / mcp_id / "config.json" - f = await template_config.open("w+", encoding="utf-8") - config_data = item.model_dump(by_alias=True, exclude_none=True) - await f.write(json.dumps(config_data, indent=4, ensure_ascii=False)) - await f.aclose() + try: + if not mcp_config.autoInstall: + print(f"[Installer] MCP模板无需安装: {mcp_id}") # noqa: T201 + + elif isinstance(mcp_config, MCPServerStdioConfig): + print(f"[Installer] Stdio方式的MCP模板,开始自动安装: {mcp_id}") # noqa: T201 + if "uv" in mcp_config.command: + new_config = await install_uvx(mcp_id, mcp_config) + elif "npx" in mcp_config.command: + new_config = await install_npx(mcp_id, mcp_config) + + if new_config is None: + logger.error("[MCPLoader] MCP模板安装失败: %s", mcp_id) + await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.FAILED) + return + + mcp_config = new_config + + # 重新保存config + template_config = MCP_PATH / "template" / mcp_id / "config.json" + f = await template_config.open("w+", encoding="utf-8") + config_data = config.model_dump(by_alias=True, exclude_none=True) + await f.write(json.dumps(config_data, indent=4, ensure_ascii=False)) + await f.aclose() + + else: + logger.info("[Installer] SSE/StreamableHTTP方式的MCP模板,无需安装: %s", mcp_id) + mcp_config.autoInstall = False + + await MCPLoader._insert_template_tool(mcp_id, mcp_config) + await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.READY) + logger.info("[Installer] MCP模板安装成功: %s", mcp_id) + except Exception: + logger.exception("[MCPLoader] MCP模板安装失败: %s", mcp_id) + await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.FAILED) + raise - else: - print(f"[Installer] SSE/StreamableHTTP方式的MCP模板,无需安装: {mcp_id}") # noqa: T201 - item.mcpServers[mcp_id].autoInstall = False - print(f"[Installer] MCP模板安装成功: {mcp_id}") # noqa: T201 - await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.READY) - await MCPLoader._insert_template_tool(mcp_id, item) + @staticmethod + async def clear_ready_or_failed_mcp_installation() -> None: + """清除状态为ready或failed的MCP安装任务""" + mcp_collection = MongoDB().get_collection("mcp") + mcp_ids = ProcessHandler.get_all_task_ids() + # 检索_id在mcp_ids且状态为ready或者failed的MCP的内容 + db_service_list = await mcp_collection.find( + {"_id": {"$in": mcp_ids}, "status": {"$in": [MCPInstallStatus.READY, MCPInstallStatus.FAILED]}}, + ).to_list(None) + for db_service in db_service_list: + try: + item = MCPCollection.model_validate(db_service) + except Exception as e: + logger.error("[MCPLoader] MCP模板数据验证失败: %s, 错误: %s", db_service["_id"], e) + continue + ProcessHandler.remove_task(item.id) + logger.info("[MCPLoader] 删除已完成或失败的MCP安装进程: %s", item.id) @staticmethod @@ -127,6 +151,9 @@ class MCPLoader(metaclass=SingletonMeta): :param MCPServerConfig config: MCP配置 :return: 无 """ + # 清除状态为ready或failed的MCP安装任务 + await MCPLoader.clear_ready_or_failed_mcp_installation() + # 如果包含多个MCP Server,报错 if len(config.mcpServers) > 1: err = f"[MCPLoader] MCP模板“{mcp_id}”包含多个MCP Server,无法初始化" @@ -147,6 +174,29 @@ class MCPLoader(metaclass=SingletonMeta): raise RuntimeError(err) + @staticmethod + async def cancel_all_installing_task() -> None: + """取消正在安装的MCP模板任务""" + template_path = MCP_PATH / "template" + logger.info("[MCPLoader] 初始化所有MCP模板: %s", template_path) + mongo = MongoDB() + mcp_collection = mongo.get_collection("mcp") + # 遍历所有模板 + mcp_ids = [] + async for mcp_dir in template_path.iterdir(): + # 不是目录 + if not await mcp_dir.is_dir(): + logger.warning("[MCPLoader] 跳过非目录: %s", mcp_dir.as_posix()) + continue + + mcp_ids.append(mcp_dir.name) + # 更新数据库状态 + await mcp_collection.update_many( + {"_id": {"$in": mcp_ids}, "status": MCPInstallStatus.INSTALLING}, + {"$set": {"status": MCPInstallStatus.CANCELLED}}, + ) + + @staticmethod async def _init_all_template() -> None: """ @@ -330,7 +380,7 @@ class MCPLoader(metaclass=SingletonMeta): @staticmethod - async def user_active_template(user_sub: str, mcp_id: str, mcp_env: dict[str, Any]) -> None: + async def user_active_template(user_sub: str, mcp_id: str, mcp_env: dict[str, Any] | None = None) -> None: """ 用户激活MCP模板 @@ -350,7 +400,6 @@ class MCPLoader(metaclass=SingletonMeta): raise FileExistsError(err) mcp_config = await MCPLoader.get_config(mcp_id) - mcp_config.mcpServers[mcp_id].env.update(mcp_env) # 拷贝文件 await asyncer.asyncify(shutil.copytree)( template_path.as_posix(), @@ -359,17 +408,32 @@ class MCPLoader(metaclass=SingletonMeta): symlinks=True, ) - user_config_path = user_path / "config.json" - # 更新用户配置 - f = await user_config_path.open("w", encoding="utf-8", errors="ignore") - await f.write( - json.dumps( - mcp_config.model_dump(by_alias=True, exclude_none=True), - indent=4, - ensure_ascii=False, - ), - ) - await f.aclose() + if mcp_env is not None: + mcp_config.mcpServers[mcp_id].env.update(mcp_env) + user_config_path = user_path / "config.json" + # 更新用户配置 + f = await user_config_path.open("w", encoding="utf-8", errors="ignore") + await f.write( + json.dumps( + mcp_config.model_dump(by_alias=True, exclude_none=True), + indent=4, + ensure_ascii=False, + ) + ) + await f.aclose() + if mcp_config.mcpType == MCPType.STDIO: + index = None + for i in range(len(mcp_config.config.args)): + if mcp_config.config.args[i] == "--directory": + index = i + 1 + break + if index is not None: + if index < len(mcp_config.config.args): + mcp_config.config.args[index] = str(user_path)+'/project' + else: + mcp_config.config.args.append(str(user_path)+'/project') + else: + mcp_config.config.args = ["--directory", str(user_path)+'/project'] + mcp_config.config.args # 更新数据库 async with postgres.session() as session: await session.merge(MCPActivated( @@ -425,6 +489,27 @@ class MCPLoader(metaclass=SingletonMeta): return deleted_mcp_list + @staticmethod + async def cancel_installing_task(cancel_mcp_list: list[str]) -> None: + """ + 取消正在安装的MCP模板任务 + + :param list[str] cancel_mcp_list: 需要取消的MCP列表 + :return: 无 + """ + mongo = MongoDB() + mcp_collection = mongo.get_collection("mcp") + # 更新数据库状态 + cancel_mcp_list = await mcp_collection.distinct("_id", {"_id": {"$in": cancel_mcp_list}, "status": MCPInstallStatus.INSTALLING}) + await mcp_collection.update_many( + {"_id": {"$in": cancel_mcp_list}, "status": MCPInstallStatus.INSTALLING}, + {"$set": {"status": MCPInstallStatus.CANCELLED}}, + ) + for mcp_id in cancel_mcp_list: + ProcessHandler.remove_task(mcp_id) + logger.info("[MCPLoader] 取消这些正在安装的MCP模板任务: %s", cancel_mcp_list) + + @staticmethod async def remove_deleted_mcp(deleted_mcp_list: list[str]) -> None: """ @@ -553,6 +638,7 @@ class MCPLoader(metaclass=SingletonMeta): # 初始化所有模板 await MCPLoader._init_all_template() + await MCPLoader.cancel_all_installing_task() # 加载用户MCP await MCPLoader._load_user_mcp() diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index 4297f6e30d64a2ba003b407a056692a802d8a4de..802ec35050ae860727dedb2e354c277853330559 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -32,6 +32,9 @@ class ServiceLoader: """加载单个Service""" service_path = BASE_PATH / str(service_id) # 载入元数据 + if not await (service_path / "metadata.yaml").exists(): + logger.error("[ServiceLoader] Service %s 的元数据不存在", service_id) + return metadata = await MetadataLoader().load_one(service_path / "metadata.yaml") if not isinstance(metadata, ServiceMetadata): err = f"[ServiceLoader] 元数据类型错误: {service_path}/metadata.yaml" diff --git a/apps/scheduler/pool/mcp/client.py b/apps/scheduler/pool/mcp/client.py index 092bac8909635a5e0c846dddef3456d8ad3be43c..297ed3e2e28042687f9a724248e436b927e540cd 100644 --- a/apps/scheduler/pool/mcp/client.py +++ b/apps/scheduler/pool/mcp/client.py @@ -29,6 +29,7 @@ class MCPClient: mcp_id: str task: asyncio.Task ready_sign: asyncio.Event + error_sign: asyncio.Event stop_sign: asyncio.Event client: ClientSession status: MCPStatus @@ -54,9 +55,10 @@ class MCPClient: """ # 创建Client if isinstance(config, MCPServerSSEConfig): + env = config.env or {} client = sse_client( url=config.url, - headers=config.env, + headers=env, ) elif isinstance(config, MCPServerStdioConfig): if user_sub: @@ -72,6 +74,7 @@ class MCPClient: cwd=cwd.as_posix(), )) else: + self.error_sign.set() err = f"[MCPClient] MCP {mcp_id}:未知的MCP服务类型“{config.type}”" logger.error(err) raise TypeError(err) @@ -85,6 +88,8 @@ class MCPClient: # 初始化Client await session.initialize() except Exception: + self.error_sign.set() + self.status = MCPStatus.STOPPED logger.exception("[MCPClient] MCP %s:初始化失败", mcp_id) raise @@ -93,6 +98,7 @@ class MCPClient: # 等待关闭信号 await self.stop_sign.wait() + logger.info("[MCPClient] MCP %s:收到停止信号,正在关闭", mcp_id) # 关闭Client try: @@ -116,13 +122,22 @@ class MCPClient: # 初始化变量 self.mcp_id = mcp_id self.ready_sign = asyncio.Event() + self.error_sign = asyncio.Event() self.stop_sign = asyncio.Event() # 创建协程 self.task = asyncio.create_task(self._main_loop(user_sub, mcp_id, config)) # 等待初始化完成 - await self.ready_sign.wait() + done, pending = await asyncio.wait( + [asyncio.create_task(self.ready_sign.wait()), + asyncio.create_task(self.error_sign.wait())], + return_when=asyncio.FIRST_COMPLETED, + ) + if self.error_sign.is_set(): + self.status = MCPStatus.ERROR + logger.error("[MCPClient] MCP %s:初始化失败", mcp_id) + raise Exception(f"MCP {mcp_id} 初始化失败") # 获取工具列表 self.tools = (await self.client.list_tools()).tools @@ -138,5 +153,5 @@ class MCPClient: self.stop_sign.set() try: await self.task - except Exception: - logger.exception("[MCPClient] MCP %s:停止失败", self.mcp_id) + except Exception as e: # noqa: BLE001 + logger.warning("[MCPClient] MCP %s:停止时发生异常:%s", self.mcp_id, e) diff --git a/apps/scheduler/pool/mcp/install.py b/apps/scheduler/pool/mcp/install.py index a320714c27534f5e8ec58528cc77af4be71215b4..60a660c88c37a8ba7d748a4def51048a62fc5597 100644 --- a/apps/scheduler/pool/mcp/install.py +++ b/apps/scheduler/pool/mcp/install.py @@ -1,6 +1,8 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP 安装""" +import logging +import shutil from asyncio import subprocess from typing import TYPE_CHECKING @@ -10,6 +12,9 @@ if TYPE_CHECKING: from apps.schemas.mcp import MCPServerStdioConfig +logger = logging.getLogger(__name__) + + async def install_uvx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServerStdioConfig | None": """ 安装使用uvx包管理器的MCP服务 @@ -23,29 +28,40 @@ async def install_uvx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer :rtype: MCPServerStdioConfig :raises ValueError: 未找到MCP Server对应的Python包 """ - # 创建文件夹 - mcp_path = MCP_PATH / "template" / mcp_id / "project" - await mcp_path.mkdir(parents=True, exist_ok=True) + uv_path = shutil.which("uv") + if uv_path is None: + error = "[Installer] 未找到uv命令,请先安装uv包管理器: pip install uv" + logger.error(error) + return None # 找到包名 - package = "" + package = None for arg in config.args: if not arg.startswith("-"): package = arg break - + logger.info("[Installer] MCP包名: %s", package) if not package: print("[Installer] 未找到包名") # noqa: T201 return None + # 创建文件夹 + mcp_path = MCP_PATH / "template" / mcp_id / "project" + logger.info("[Installer] MCP安装路径: %s", mcp_path) + await mcp_path.mkdir(parents=True, exist_ok=True) + # 如果有pyproject.toml文件,则使用sync - if await (mcp_path / "pyproject.toml").exists(): + flag = await (mcp_path / "pyproject.toml").exists() + logger.info("[Installer] MCP安装标志: %s", flag) + if flag: + shell_command = ( + f"{uv_path} venv; " + f"{uv_path} sync --index-url https://pypi.tuna.tsinghua.edu.cn/simple --active " + "--no-install-project --no-cache" + ) + logger.info("[Installer] MCP安装命令: %s", shell_command) pipe = await subprocess.create_subprocess_shell( - ( - "uv venv; " - "uv sync --index-url https://pypi.tuna.tsinghua.edu.cn/simple --active " - "--no-install-project --no-cache" - ), + shell_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=mcp_path, @@ -57,21 +73,24 @@ async def install_uvx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer return None print(f"[Installer] 检查依赖成功: {mcp_path}; {stdout.decode() if stdout else '(无输出信息)'}") # noqa: T201 - config.command = "uv" - config.args = ["run", *config.args] + config.command = uv_path + if "run" not in config.args: + config.args = ["run", *config.args] config.autoInstall = False + logger.info("[Installer] MCP安装配置更新成功: %s", config) return config # 否则,初始化uv项目 + shell_command = ( + f"{uv_path} init; " + f"{uv_path} venv; " + f"{uv_path} add --index-url https://pypi.tuna.tsinghua.edu.cn/simple {package}; " + f"{uv_path} sync --index-url https://pypi.tuna.tsinghua.edu.cn/simple --active --no-install-project --no-cache" + ) + logger.info("[Installer] MCP安装命令: %s", shell_command) pipe = await subprocess.create_subprocess_shell( - ( - f"uv init; " - f"uv venv; " - f"uv add --index-url https://pypi.tuna.tsinghua.edu.cn/simple {package}; " - f"uv sync --index-url https://pypi.tuna.tsinghua.edu.cn/simple --active " - f"--no-install-project --no-cache" - ), + shell_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=mcp_path, @@ -84,10 +103,12 @@ async def install_uvx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer print(f"[Installer] 安装 {package} 成功: {mcp_path}; {stdout.decode() if stdout else '(无输出信息)'}") # noqa: T201 # 更新配置 - config.command = "uv" - config.args = ["run", *config.args] + config.command = uv_path + if "run" not in config.args: + config.args = ["run", *config.args] config.autoInstall = False + logger.info("[Installer] MCP安装配置更新成功: %s", config) return config @@ -103,17 +124,14 @@ async def install_npx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer :rtype: MCPServerStdioConfig :raises ValueError: 未找到MCP Server对应的npm包 """ - mcp_path = MCP_PATH / "template" / mcp_id / "project" - await mcp_path.mkdir(parents=True, exist_ok=True) - - # 如果有node_modules文件夹,则认为已安装 - if await (mcp_path / "node_modules").exists(): - config.command = "npm" - config.args = ["exec", *config.args] - return config + npm_path = shutil.which("npm") + if npm_path is None: + error = "[Installer] 未找到npm命令,请先安装Node.js和npm" + logger.error(error) + return None # 查找package name - package = "" + package = None for arg in config.args: if not arg.startswith("-"): package = arg @@ -123,9 +141,19 @@ async def install_npx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer print("[Installer] 未找到包名") # noqa: T201 return None + mcp_path = MCP_PATH / "template" / mcp_id / "project" + await mcp_path.mkdir(parents=True, exist_ok=True) + + # 如果有node_modules文件夹,则认为已安装 + if await (mcp_path / "node_modules").exists(): + config.command = npm_path + if "exec" not in config.args: + config.args = ["exec", *config.args] + return config + # 安装NPM包 pipe = await subprocess.create_subprocess_shell( - f"npm install {package}", + f"{npm_path} install {package}", stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=mcp_path, @@ -137,8 +165,9 @@ async def install_npx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer print(f"[Installer] 安装 {package} 成功: {mcp_path}; {stdout.decode() if stdout else '(无输出信息)'}") # noqa: T201 # 更新配置 - config.command = "npm" - config.args = ["exec", *config.args] + config.command = npm_path + if "exec" not in config.args: + config.args = ["exec", *config.args] config.autoInstall = False return config diff --git a/apps/scheduler/pool/mcp/pool.py b/apps/scheduler/pool/mcp/pool.py index 9ff5d2e9642581bb4ffdf534a8d740c87f1d3d90..91413db6167da62435d246b2d1264a305afdb9e4 100644 --- a/apps/scheduler/pool/mcp/pool.py +++ b/apps/scheduler/pool/mcp/pool.py @@ -24,11 +24,11 @@ class MCPPool(metaclass=SingletonMeta): async def _init_mcp(self, mcp_id: str, user_sub: str) -> MCPClient | None: """初始化MCP池""" - mcp_math = MCP_USER_PATH / user_sub / mcp_id / "project" config_path = MCP_USER_PATH / user_sub / mcp_id / "config.json" - if not await mcp_math.exists() or not await mcp_math.is_dir(): - logger.warning("[MCPPool] 用户 %s 的MCP %s 未激活", user_sub, mcp_id) + flag = (await config_path.exists()) + if not flag: + logger.warning("[MCPPool] 用户 %s 的MCP %s 配置文件不存在", user_sub, mcp_id) return None config = MCPServerConfig.model_validate_json(await config_path.read_text()) @@ -40,6 +40,9 @@ class MCPPool(metaclass=SingletonMeta): return None await client.init(user_sub, mcp_id, config.config) + if user_sub not in self.pool: + self.pool[user_sub] = {} + self.pool[user_sub][mcp_id] = client return client diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index 0504c322f051904c2800481d885ce8730da1d0b7..496b18daf6c1b526f7bc9c30adb3cd3920664740 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -119,7 +119,11 @@ class Pool: for app in changed_app: hash_key = Path("app/" + str(app)).as_posix() if hash_key in checker.hashes: - await AppLoader.load(app, checker.hashes[hash_key]) + try: + await AppLoader.load(app, checker.hashes[hash_key]) + except Exception as e: + await AppLoader.delete(app, is_reload=True) + logger.warning("[Pool] 加载App %s 失败: %s", app, e) # 载入MCP logger.info("[Pool] 载入MCP") diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 1dd9b08c08a58494c931986a97688721a95984ea..d5e2e313ce8ac6c11f73435732d158a44ef8ed78 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -128,7 +128,12 @@ class Scheduler: # 创建用于通信的事件 kill_event = asyncio.Event() monitor = asyncio.create_task(self._monitor_activity(kill_event, self.task.ids.user_sub)) - if not self.post_body.app or self.post_body.app.app_id == "": + rag_method = True + if self.post_body.app and self.post_body.app.app_id: + rag_method = False + if self.task.state.app_id: + rag_method = False + if rag_method: llm = await self.get_llm_use_in_chat_with_rag() kb_ids = await self.get_kb_ids_use_in_chat_with_rag() self.task = await push_init_message(self.task, self.queue, 3, is_flow=False) @@ -232,7 +237,7 @@ class Scheduler: max_tokens=llm.max_tokens, ), ) - if background.conversation: + if background.conversation and self.task.state.flow_status == FlowStatus.INIT: try: question_obj = QuestionRewrite() post_body.question = await question_obj.generate(history=background.conversation, question=post_body.question, llm=reasion_llm) @@ -296,7 +301,7 @@ class Scheduler: servers_id=servers_id, background=background, agent_id=app_info.app_id, - params=post_body.app.params, + params=post_body.params, ) # 开始运行 logger.info("[Scheduler] 运行Executor") diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py index f80a0c4a7c49df005d8a9dda24beb7a25ca834b1..8ebc46e3c04240cddb9127ffcc7c3d77e8521183 100644 --- a/apps/schemas/enum_var.py +++ b/apps/schemas/enum_var.py @@ -195,3 +195,10 @@ class AgentState(str, Enum): RUNNING = "RUNNING" FINISHED = "FINISHED" ERROR = "ERROR" + + +class Language(str, Enum): + """语言""" + + ZH = "zh" + EN = "en" diff --git a/apps/services/mcp_service.py b/apps/services/mcp_service.py index b77ab65690e78efce4652808ad5cbeb841a6cd66..8c9cc8b9d64351055af3c02d658845f0dc5bf12d 100644 --- a/apps/services/mcp_service.py +++ b/apps/services/mcp_service.py @@ -15,6 +15,7 @@ from apps.common.postgres import postgres from apps.constants import ( ALLOWED_ICON_MIME_TYPES, ICON_PATH, + MCP_PATH, SERVICE_PAGE_SIZE, ) from apps.models.mcp import ( @@ -34,6 +35,7 @@ from apps.schemas.mcp import ( ) from apps.schemas.request_data import UpdateMCPServiceRequest from apps.schemas.response_data import MCPServiceCardItem +from apps.services.user import UserManager logger = logging.getLogger(__name__) MCP_ICON_PATH = ICON_PATH / "mcp" @@ -96,6 +98,7 @@ class MCPServiceManager: keyword: str | None, page: int, *, + is_installed: bool | None = None, is_active: bool | None = None, ) -> list[MCPServiceCardItem]: """ @@ -229,9 +232,9 @@ class MCPServiceManager: """ # 检查config if data.mcp_type == MCPType.SSE: - config = MCPServerSSEConfig.model_validate_json(data.config) + config = MCPServerSSEConfig.model_validate(data.config) else: - config = MCPServerStdioConfig.model_validate_json(data.config) + config = MCPServerStdioConfig.model_validate(data.config) # 构造Server mcp_server = MCPServerConfig( @@ -252,8 +255,25 @@ class MCPServiceManager: # 保存并载入配置 logger.info("[MCPServiceManager] 创建mcp:%s", mcp_server.name) + mcp_path = MCP_PATH / "template" / mcp_id / "project" + index = None + if isinstance(config, MCPServerStdioConfig): + index = None + for i in range(len(config.args)): + if config.args[i] != "--directory": + continue + index = i + 1 + break + if index is not None: + if index >= len(config.args): + config.args.append(str(mcp_path)) + else: + config.args[index+1] = str(mcp_path) + else: + config.args += ["--directory", str(mcp_path)] + await MCPLoader._insert_template_db(mcp_id=mcp_id, config=mcp_server) await MCPLoader.save_one(mcp_server.id, mcp_server) - await MCPLoader.init_one_template(mcp_id=mcp_server.id, config=mcp_server) + await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.INIT) return mcp_server.id @@ -279,18 +299,21 @@ class MCPServiceManager: for user_id in db_service.activated: await MCPServiceManager.deactive_mcpservice(user_sub=user_id, mcp_id=data.service_id) - await MCPLoader.init_one_template(mcp_id=data.service_id, config=MCPServerConfig( + mcp_config = MCPServerConfig( name=data.name, overview=data.overview, description=data.description, - mcpServers=MCPServerStdioConfig.model_validate_json( + mcpServers=MCPServerStdioConfig.model_validate( data.config, - ) if data.mcp_type == MCPType.STDIO else MCPServerSSEConfig.model_validate_json( + ) if data.mcp_type == MCPType.STDIO else MCPServerSSEConfig.model_validate( data.config, ), mcpType=data.mcp_type, author=user_sub, - )) + ) + await MCPLoader._insert_template_db(mcp_id=data.service_id, config=mcp_config) + await MCPLoader.save_one(mcp_id=data.service_id, config=mcp_config) + await MCPLoader.update_template_status(data.service_id, MCPInstallStatus.INIT) # 返回服务ID return data.service_id @@ -434,3 +457,29 @@ class MCPServiceManager: """ async with postgres.session() as session: return list((await session.scalars(select(MCPTools).where(MCPTools.mcpId == mcp_id))).all()) + + + @staticmethod + async def install_mcpservice(user_sub: str, service_id: str, install: bool) -> None: + """ + 安装或卸载MCP服务 + + :param user_sub: str: 用户ID + :param service_id: str: MCP服务ID + :param install: bool: 是否安装 + :return: 无 + """ + service_collection = MongoDB().get_collection("mcp") + db_service = await service_collection.find_one({"_id": service_id, "author": user_sub}) + db_service = MCPCollection.model_validate(db_service) + if install: + if db_service.status == MCPInstallStatus.INSTALLING or db_service.status == MCPInstallStatus.READY: + err = "[MCPServiceManager] MCP服务已处于安装中或已准备就绪" + raise Exception(err) + mcp_config = await MCPLoader.get_config(service_id) + await MCPLoader.init_one_template(mcp_id=service_id, config=mcp_config) + else: + if db_service.status != MCPInstallStatus.INSTALLING: + err = "[MCPServiceManager] 只能卸载处于安装中的MCP服务" + raise Exception(err) + await MCPLoader.cancel_installing_task([service_id]) diff --git a/apps/services/task.py b/apps/services/task.py index 61131fdd0d8544ab6f21804663a83fceef57f6b8..95afa094165e66fe49a7fe6d6750283f65a98eb0 100644 --- a/apps/services/task.py +++ b/apps/services/task.py @@ -53,7 +53,7 @@ class TaskManager: @staticmethod - async def get_context_by_task_id(task_id: str, length: int = 0) -> list[FlowStepHistory]: + async def get_context_by_task_id(task_id: str, length: int | None = None) -> list[FlowStepHistory]: """根据task_id获取flow信息""" async with postgres.session() as session: executor_history_collection = session.query(ExecutorHistory) diff --git a/pyproject.toml b/pyproject.toml index ab024be57c90be4a7b6daa91ea42c3b880f96346..f02ecaf12f0d3af8622d4e0429da784e09d83753 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,10 +14,10 @@ dependencies = [ "jinja2==3.1.6", "jionlp==1.5.20", "jsonschema==4.23.0", - "mcp==1.12.2", + "mcp==1.12.4", "minio==7.2.15", - "ollama==0.5.1", - "openai==1.97.1", + "ollama==0.5.3", + "openai==1.99.6", "pandas==2.2.3", "pgvector==0.4.1", "pillow==10.3.0",