diff --git a/apps/llm/embedding.py b/apps/llm/embedding.py
index da1fad1a358ce0759f1c9911ff47f70f90be7243..298cc9ffcb3051340d612c7dbaeb8cec4cd80d55 100644
--- a/apps/llm/embedding.py
+++ b/apps/llm/embedding.py
@@ -82,17 +82,13 @@ class Embedding:
:return: 文本对应的向量(顺序与text一致,也为List)
"""
try:
- if Config().get_config().embedding.type == "openai":
+ if config.embedding.type == "openai":
return await cls._get_openai_embedding(text)
- if Config().get_config().embedding.type == "mindie":
+ if config.embedding.type == "mindie":
return await cls._get_tei_embedding(text)
- err = f"不支持的Embedding API类型: {Config().get_config().embedding.type}"
- raise ValueError(err)
- except Exception as e:
- err = f"获取Embedding失败: {e}"
- logger.error(err)
- rt = []
- for i in range(len(text)):
- rt.append([0.0]*1024)
- return rt
+ logger.error("不支持的Embedding API类型: %s", config.embedding.type)
+ return [[0.0] * 1024 for _ in range(len(text))]
+ except Exception:
+ logger.exception("获取Embedding失败")
+ return [[0.0] * 1024 for _ in range(len(text))]
diff --git a/apps/llm/function.py b/apps/llm/function.py
index 10cd92571f6911d843efc63661af8ad42484821b..cd000c665aa2f03a03fa2fa43eb33daec8b98e76 100644
--- a/apps/llm/function.py
+++ b/apps/llm/function.py
@@ -7,6 +7,8 @@ import re
from textwrap import dedent
from typing import Any
+import ollama
+import openai
from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
from jsonschema import Draft7Validator
@@ -45,29 +47,22 @@ class FunctionLLM:
"messages": [],
}
- if self._config.backend == "ollama":
- import ollama
-
- if not self._config.api_key:
- self._client = ollama.AsyncClient(host=self._config.endpoint)
- else:
- self._client = ollama.AsyncClient(
- host=self._config.endpoint,
- headers={
- "Authorization": f"Bearer {self._config.api_key}",
- },
- )
-
- else:
- import openai
-
- if not self._config.api_key:
- self._client = openai.AsyncOpenAI(base_url=self._config.endpoint)
- else:
- self._client = openai.AsyncOpenAI(
- base_url=self._config.endpoint,
- api_key=self._config.api_key,
- )
+ if self._config.backend == "ollama" and not self._config.api_key:
+ self._client = ollama.AsyncClient(host=self._config.endpoint)
+ elif self._config.backend == "ollama" and self._config.api_key:
+ self._client = ollama.AsyncClient(
+ host=self._config.endpoint,
+ headers={
+ "Authorization": f"Bearer {self._config.api_key}",
+ },
+ )
+ elif self._config.backend != "ollama" and not self._config.api_key:
+ self._client = openai.AsyncOpenAI(base_url=self._config.endpoint)
+ elif self._config.backend != "ollama" and self._config.api_key:
+ self._client = openai.AsyncOpenAI(
+ base_url=self._config.endpoint,
+ api_key=self._config.api_key,
+ )
async def _call_openai(
diff --git a/apps/llm/token.py b/apps/llm/token.py
index bf749ac1e684ad9b3d3c5e883171fcf1a3252dd7..b76cd744683e076a59a790057c9d2d77eca50b3b 100644
--- a/apps/llm/token.py
+++ b/apps/llm/token.py
@@ -3,6 +3,8 @@
import logging
+import tiktoken
+
from apps.common.singleton import SingletonMeta
logger = logging.getLogger(__name__)
@@ -13,7 +15,6 @@ class TokenCalculator(metaclass=SingletonMeta):
def __init__(self) -> None:
"""初始化Tokenizer"""
- import tiktoken
self._encoder = tiktoken.get_encoding("cl100k_base")
diff --git a/apps/models/task.py b/apps/models/task.py
index a1b71f65d5e8671dc7ad4eec54c98825fd38e40c..930a141b74b18f667ca82e2fac577a433b1b374d 100644
--- a/apps/models/task.py
+++ b/apps/models/task.py
@@ -8,7 +8,7 @@ from sqlalchemy import DateTime, Enum, Float, ForeignKey, Integer, String, Text
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import Mapped, mapped_column
-from apps.schemas.enum_var import FlowStatus, StepStatus
+from apps.schemas.enum_var import FlowStatus, Language, StepStatus
from .base import Base
@@ -63,6 +63,8 @@ class TaskRuntime(Base):
"""计划"""
document: Mapped[list[dict[str, Any]]] = mapped_column(JSONB, nullable=False, default={})
"""关联文档"""
+ language: Mapped[Language] = mapped_column(Enum(Language), nullable=False, default=Language.ZH)
+ """语言"""
class ExecutorCheckpoint(Base):
diff --git a/apps/routers/chat.py b/apps/routers/chat.py
index 860e71f2b61713e6f296051c82959a15f0333571..31b381dd0edba4ea7ec62f4cfa0a3492b5256039 100644
--- a/apps/routers/chat.py
+++ b/apps/routers/chat.py
@@ -14,7 +14,7 @@ from apps.dependency import verify_personal_token, verify_session
from apps.scheduler.scheduler import Scheduler
from apps.scheduler.scheduler.context import save_data
from apps.schemas.enum_var import FlowStatus
-from apps.schemas.request_data import RequestData
+from apps.schemas.request_data import RequestData, RequestDataApp
from apps.schemas.response_data import ResponseData
from apps.schemas.task import Task
from apps.services.activity import Activity
@@ -51,12 +51,16 @@ async def init_task(post_body: RequestData, user_sub: str, session_id: str) -> T
await RecordManager.update_record_flow_status_to_cancelled_by_task_ids(task_ids)
task = await TaskManager.init_new_task(user_sub=user_sub, session_id=session_id, post_body=post_body)
task.runtime.question = post_body.question
- task.ids.group_id = post_body.group_id
+ task.state.app_id = post_body.app.app_id if post_body.app else ""
else:
if not post_body.task_id:
err = "[Chat] task_id 不可为空!"
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="task_id cannot be empty")
- task = await TaskManager.get_task_by_conversation_id(post_body.task_id)
+ task = await TaskManager.get_task_by_task_id(post_body.task_id)
+ post_body.app = RequestDataApp(appId=task.state.app_id)
+ post_body.conversation_id = task.ids.conversation_id
+ post_body.language = task.language
+ post_body.question = task.runtime.question
return task
@@ -133,7 +137,9 @@ async def chat(request: Request, post_body: RequestData) -> StreamingResponse:
session_id = request.state.session_id
user_sub = request.state.user_sub
# 问题黑名单检测
- if not await QuestionBlacklistManager.check_blacklisted_questions(input_question=post_body.question):
+ if (post_body.question is not None) and (
+ not await QuestionBlacklistManager.check_blacklisted_questions(input_question=post_body.question)
+ ):
# 用户扣分
await UserBlacklistManager.change_blacklisted_users(user_sub, -10)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="question is blacklisted")
diff --git a/apps/routers/mcp_service.py b/apps/routers/mcp_service.py
index 3007d5609e69e9e6f9c43756c366cdb2bc9a6015..c4adec4179988c9abd0a3d99e4cabada63c9f2f3 100644
--- a/apps/routers/mcp_service.py
+++ b/apps/routers/mcp_service.py
@@ -55,6 +55,7 @@ async def get_mcpservice_list(
keyword: str | None = None,
page: Annotated[int, Query(ge=1)] = 1,
*,
+ is_installed: bool | None = None,
is_active: bool | None = None,
) -> JSONResponse:
"""获取服务列表"""
@@ -65,6 +66,7 @@ async def get_mcpservice_list(
user_sub,
keyword,
page,
+ is_installed=is_installed,
is_active=is_active,
)
except Exception as e:
@@ -134,9 +136,39 @@ async def create_or_update_mcpservice(
).model_dump(exclude_none=True, by_alias=True))
+@router.post("/{serviceId}/install")
+async def install_mcp_service(
+ request: Request,
+ service_id: Annotated[str, Path(..., alias="serviceId", description="服务ID")],
+ *,
+ install: Annotated[bool, Query(..., description="是否安装")] = True,
+) -> JSONResponse:
+ """安装MCP服务"""
+ try:
+ await MCPServiceManager.install_mcpservice(request.state.user_sub, service_id, install)
+ except Exception as e:
+ err = f"[MCPService] 安装mcp服务失败: {e!s}" if install else f"[MCPService] 卸载mcp服务失败: {e!s}"
+ logger.exception(err)
+ return JSONResponse(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ content=ResponseData(
+ code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ message=err,
+ result={},
+ ).model_dump(exclude_none=True, by_alias=True),
+ )
+ return JSONResponse(
+ status_code=status.HTTP_200_OK,
+ content=ResponseData(
+ code=status.HTTP_200_OK,
+ message="OK",
+ result={},
+ ).model_dump(exclude_none=True, by_alias=True),
+ )
+
+
@admin_router.get("/{serviceId}", response_model=GetMCPServiceDetailRsp)
async def get_service_detail(
- request: Request,
service_id: Annotated[str, Path(..., alias="serviceId", description="服务ID")],
*,
edit: Annotated[bool, Query(..., description="是否为编辑模式")] = False,
@@ -194,16 +226,6 @@ async def get_service_detail(
)
-@admin_router.get("/{serviceId}", response_model=GetMCPServiceDetailRsp)
-async def get_service_detail(serviceId: Annotated[str, Path()]) -> JSONResponse: # noqa: N803
- """获取MCP服务详情"""
- try:
- data = await MCPServiceManager.get_mcp_service(serviceId)
- config, icon = await MCPServiceManager.get_mcp_config(serviceId)
- except Exception as e:
- pass
-
-
@admin_router.delete("/{serviceId}", response_model=DeleteMCPServiceRsp)
async def delete_service(serviceId: Annotated[str, Path()]) -> JSONResponse: # noqa: N803
"""删除服务"""
diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py
index a72e3b72fe5d269fa134663198f1c70786f55209..e70b229a5fad42f4cf326a3fb0d2a1b51b522145 100644
--- a/apps/scheduler/executor/agent.py
+++ b/apps/scheduler/executor/agent.py
@@ -3,13 +3,11 @@
import logging
import uuid
-from typing import Any
import anyio
from mcp.types import TextContent
from pydantic import Field
-from apps.llm.patterns.rewrite import QuestionRewrite
from apps.llm.reasoning import ReasoningLLM
from apps.scheduler.executor.base import BaseExecutor
from apps.scheduler.mcp_agent.host import MCPHost
@@ -140,6 +138,7 @@ class MCPAgentExecutor(BaseExecutor):
if is_first:
# 获取第一个输入参数
tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
+ step = self.task.runtime.temporary_plans.plans[self.task.state.step_index]
mcp_tool = self.tools[tool_id]
self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task)
else:
@@ -152,7 +151,7 @@ class MCPAgentExecutor(BaseExecutor):
params_description = ""
tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
mcp_tool = self.tools[tool_id]
- self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.state.current_input, self.task.state.error_message, params, params_description)
+ self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, step.instruction, self.task)
async def reset_step_to_index(self, start_index: int) -> None:
"""重置步骤到开始"""
@@ -274,10 +273,16 @@ class MCPAgentExecutor(BaseExecutor):
self.resoning_llm,
)
await self.update_tokens()
+ error_message = MCPPlanner.change_err_message_to_description(
+ error_message=self.task.state.error_message,
+ tool=mcp_tool,
+ input_params=self.task.state.current_input,
+ reasoning_llm=self.resoning_llm,
+ )
await self.push_message(
EventType.STEP_WAITING_FOR_PARAM,
data={
- "message": "当运行产生如下报错:\n" + self.task.state.error_message,
+ "message": error_message,
"params": params_with_null,
},
)
@@ -300,7 +305,7 @@ class MCPAgentExecutor(BaseExecutor):
input_data={},
output_data={},
ex_data={
- "message": "当运行产生如下报错:\n" + self.task.state.error_message,
+ "message": error_message,
"params": params_with_null,
},
),
diff --git a/apps/scheduler/mcp/prompt.py b/apps/scheduler/mcp/prompt.py
index 4270b1b339dca508d9ce27a29c2faba08ee98e61..0d7b417f5277d59321e420a3a5a92cfaa43e637e 100644
--- a/apps/scheduler/mcp/prompt.py
+++ b/apps/scheduler/mcp/prompt.py
@@ -174,8 +174,8 @@ FINAL_ANSWER = dedent(r"""
MEMORY_TEMPLATE = dedent(r"""
{% for ctx in context_list %}
- 第{{ loop.index }}步:{{ ctx.step_description }}
- - 调用工具 `{{ ctx.step_id }}`,并提供参数 `{{ ctx.input_data }}`
+ - 调用工具 `{{ ctx.step_name }}`,并提供参数 `{{ ctx.input_data | tojson }}`。
- 执行状态:{{ ctx.status }}
- - 得到数据:`{{ ctx.output_data }}`
+ - 得到数据:`{{ ctx.output_data | tojson }}`
{% endfor %}
""")
diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py
index 83740699742953d057a1399eb173b21eb6461565..c0f52c7f9f882eba313499b33e7e76d11baeec76 100644
--- a/apps/scheduler/mcp_agent/host.py
+++ b/apps/scheduler/mcp_agent/host.py
@@ -9,7 +9,6 @@ from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
from mcp.types import TextContent
-from apps.common.mongo import MongoDB
from apps.llm.function import JsonGenerator
from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE
from apps.scheduler.mcp_agent.prompt import REPAIR_PARAMS
@@ -20,13 +19,19 @@ from apps.schemas.mcp import MCPPlanItem, MCPTool
from apps.schemas.task import FlowStepHistory, Task
from apps.services.task import TaskManager
-logger = logging.getLogger(__name__)
+def tojson_filter(value: Any) -> str:
+ """将值转换为JSON字符串"""
+ return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
+
+
+logger = logging.getLogger(__name__)
_env = SandboxedEnvironment(
loader=BaseLoader,
- autoescape=True,
+ autoescape=False,
trim_blocks=True,
lstrip_blocks=True,
+ filters={"tojson": tojson_filter},
)
@@ -36,7 +41,6 @@ class MCPHost:
@staticmethod
async def assemble_memory(task: Task) -> str:
"""组装记忆"""
-
return _env.from_string(MEMORY_TEMPLATE).render(
context_list=task.context,
)
diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py
index 24eb737da420539a37c6d67390d71f559d76bd51..b2f3a35e35a1d6d89d5827b0a11b594600dfb048 100644
--- a/apps/scheduler/mcp_agent/plan.py
+++ b/apps/scheduler/mcp_agent/plan.py
@@ -1,13 +1,16 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""MCP 用户目标拆解与规划"""
-from typing import Any, AsyncGenerator
+from collections.abc import AsyncGenerator
+from typing import Any
from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
from apps.llm.function import JsonGenerator
from apps.llm.reasoning import ReasoningLLM
+from apps.scheduler.mcp_agent.base import McpBase
from apps.scheduler.mcp_agent.prompt import (
+ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION,
CREATE_PLAN,
EVALUATE_GOAL,
FINAL_ANSWER,
@@ -29,42 +32,9 @@ _env = SandboxedEnvironment(
)
-class MCPPlanner:
+class MCPPlanner(McpBase):
"""MCP 用户目标拆解与规划"""
- @staticmethod
- async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
- """获取推理结果"""
- # 调用推理大模型
- message = [
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": prompt},
- ]
- result = ""
- async for chunk in resoning_llm.call(
- message,
- streaming=False,
- temperature=0.07,
- result_only=True,
- ):
- result += chunk
-
- return result
-
- @staticmethod
- async def _parse_result(result: str, schema: dict[str, Any]) -> str:
- """解析推理结果"""
- json_generator = JsonGenerator(
- result,
- [
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": "请提取下面内容中的json\n\n" + result},
- ],
- schema,
- )
- json_result = await json_generator.generate()
- return json_result
-
@staticmethod
async def evaluate_goal(
goal: str,
@@ -261,6 +231,22 @@ class MCPPlanner:
# 返回工具执行错误类型
return error_type
+ @staticmethod
+ async def change_err_message_to_description(
+ error_message: str, tool: MCPTool, input_params: dict[str, Any],
+ reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
+ """将错误信息转换为工具描述"""
+ template = _env.from_string(CHANGE_ERROR_MESSAGE_TO_DESCRIPTION)
+ prompt = template.render(
+ error_message=error_message,
+ tool_name=tool.name,
+ tool_description=tool.description,
+ input_schema=tool.input_schema,
+ input_params=input_params,
+ )
+ result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
+ return result
+
@staticmethod
async def get_missing_param(
tool: MCPTool,
diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py
index 75ccd57ff3e5c8f5b8945c076408817d85922160..bfe8cf39ed6752d1ffca92c959a2ef3af5781c46 100644
--- a/apps/scheduler/mcp_agent/prompt.py
+++ b/apps/scheduler/mcp_agent/prompt.py
@@ -70,6 +70,8 @@ TOOL_SELECT = dedent(r"""
1. 确保充分理解当前目标,选择实现目标所需的MCP工具。
2. 请在给定的MCP工具列表中选择,不要自己生成MCP工具。
3. 可以选择一些辅助工具,但必须确保这些工具与当前目标相关。
+ 4. 注意,返回的工具ID必须是MCP工具的ID,而不是名称。
+ 5. 不要选择不存在的工具。
必须按照以下格式生成选择结果,不要输出任何其他内容:
```json
{
@@ -507,7 +509,7 @@ RISK_EVALUATE = dedent(r"""
```json
{
"risk": "low/medium/high",
- "message": "提示信息"
+ "reason": "提示信息"
}
```
# 样例
@@ -534,7 +536,7 @@ RISK_EVALUATE = dedent(r"""
```json
{
"risk": "中",
- "message": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。请确保在非生产环境中执行此操作。"
+ "reason": "当前工具将连接到MySQL数据库并分析性能,可能会对数据库性能产生一定影响。请确保在非生产环境中执行此操作。"
}
```
# 工具
@@ -617,8 +619,74 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r"""
# 工具运行报错
{{error_message}}
# 输出
- """,
- )
+ """)
+
+# 将当前程序运行的报错转换为自然语言
+CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r"""
+ 你是一个智能助手,你的任务是将当前程序运行的报错转换为自然语言描述。
+ 请根据以下规则进行转换:
+ 1. 将报错信息转换为自然语言描述,描述应该简洁明了,能够让人理解报错的原因和影响。
+ 2. 描述应该包含报错的具体内容和可能的解决方案。
+ 3. 描述应该避免使用过于专业的术语,以便用户能够理解。
+ 4. 描述应该尽量简短,控制在50字以内。
+ 5. 只输出自然语言描述,不要输出其他内容。
+ # 样例
+ # 工具信息
+
+ port_scanner
+ 扫描主机端口
+
+ {
+ "type": "object",
+ "properties": {
+ "host": {
+ "type": "string",
+ "description": "主机地址"
+ },
+ "port": {
+ "type": "integer",
+ "description": "端口号"
+ },
+ "username": {
+ "type": "string",
+ "description": "用户名"
+ },
+ "password": {
+ "type": "string",
+ "description": "密码"
+ }
+ },
+ "required": ["host", "port", "username", "password"]
+ }
+
+
+ # 工具入参
+ {
+ "host": "192.0.0.1",
+ "port": 3306,
+ "username": "root",
+ "password": "password"
+ }
+ # 报错信息
+ 执行端口扫描命令时,出现了错误:`password is not correct`。
+ # 输出
+ 扫描端口时发生错误:密码不正确。请检查输入的密码是否正确,并重试。
+ # 现在开始转换报错信息:
+ # 工具信息
+
+ {{tool_name}}
+ {{tool_description}}
+
+ {{input_schema}}
+
+
+ # 工具入参
+ {{input_params}}
+ # 报错信息
+ {{error_message}}
+ # 输出
+ """)
+
# 获取缺失的参数的json结构体
GET_MISSING_PARAMS = dedent(r"""
你是一个工具参数获取器。
diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py
index f917a38ae78b0603c9b09c2d3526af637343a0ce..ce289c1f6ca502214ae5517e692da1889d5ff25e 100644
--- a/apps/scheduler/mcp_agent/select.py
+++ b/apps/scheduler/mcp_agent/select.py
@@ -3,20 +3,15 @@
import logging
import random
-from typing import AsyncGenerator
from jinja2 import BaseLoader
from jinja2.sandbox import SandboxedEnvironment
-from apps.common.config import Config
-from apps.common.lance import LanceDB
-from apps.common.mongo import MongoDB
-from apps.llm.embedding import Embedding
-from apps.llm.function import FunctionLLM
from apps.llm.reasoning import ReasoningLLM
from apps.llm.token import TokenCalculator
+from apps.scheduler.mcp_agent.base import McpBase
from apps.scheduler.mcp_agent.prompt import TOOL_SELECT
-from apps.schemas.mcp import BaseModel, MCPCollection, MCPSelectResult, MCPTool, MCPToolIdsSelectResult
+from apps.schemas.mcp import MCPCollection, MCPTool, MCPToolIdsSelectResult
logger = logging.getLogger(__name__)
@@ -31,25 +26,26 @@ FINAL_TOOL_ID = "FIANL"
SUMMARIZE_TOOL_ID = "SUMMARIZE"
-class MCPSelector:
+class MCPSelector(McpBase):
"""MCP选择器"""
@staticmethod
async def select_top_tool(
goal: str, tool_list: list[MCPTool],
- additional_info: str | None = None, top_n: int | None = None) -> list[MCPTool]:
+ additional_info: str | None = None, top_n: int | None = None,
+ reasoning_llm: ReasoningLLM | None = None) -> list[MCPTool]:
"""选择最合适的工具"""
random.shuffle(tool_list)
- max_tokens = Config().get_config().function_call.max_tokens
+ max_tokens = reasoning_llm._config.max_tokens
template = _env.from_string(TOOL_SELECT)
- if TokenCalculator.calculate_token_length(
+ token_calculator = TokenCalculator()
+ if token_calculator.calculate_token_length(
messages=[{"role": "user", "content": template.render(
goal=goal, tools=[], additional_info=additional_info
)}],
pure_text=True) > max_tokens:
logger.warning("[MCPSelector] 工具选择模板长度超过最大令牌数,无法进行选择")
return []
- llm = FunctionLLM()
current_index = 0
tool_ids = []
while current_index < len(tool_list):
@@ -57,7 +53,7 @@ class MCPSelector:
sub_tools = []
while index < len(tool_list):
tool = tool_list[index]
- tokens = TokenCalculator.calculate_token_length(
+ tokens = token_calculator.calculate_token_length(
messages=[{"role": "user", "content": template.render(
goal=goal, tools=[tool],
additional_info=additional_info
@@ -68,7 +64,7 @@ class MCPSelector:
continue
sub_tools.append(tool)
- tokens = TokenCalculator.calculate_token_length(messages=[{"role": "user", "content": template.render(
+ tokens = token_calculator.calculate_token_length(messages=[{"role": "user", "content": template.render(
goal=goal, tools=sub_tools, additional_info=additional_info)}, ], pure_text=True)
if tokens > max_tokens:
del sub_tools[-1]
@@ -77,13 +73,13 @@ class MCPSelector:
index += 1
current_index = index
if sub_tools:
- message = [
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": template.render(tools=sub_tools)},
- ]
schema = MCPToolIdsSelectResult.model_json_schema()
- schema["properties"]["tool_ids"]["enum"] = [tool.id for tool in sub_tools]
- result = await llm.call(messages=message, schema=schema)
+ if "items" not in schema["properties"]["tool_ids"]:
+ schema["properties"]["tool_ids"]["items"] = {}
+ # 将enum添加到items中,限制数组元素的可选值
+ schema["properties"]["tool_ids"]["items"]["enum"] = [tool.id for tool in sub_tools]
+ result = await MCPSelector.get_resoning_result(template.render(goal=goal, tools=sub_tools, additional_info="请根据目标选择对应的工具"), reasoning_llm)
+ result = await MCPSelector._parse_result(result, schema)
try:
result = MCPToolIdsSelectResult.model_validate(result)
tool_ids.extend(result.tool_ids)
@@ -98,4 +94,4 @@ class MCPSelector:
description="终止", mcp_id=FINAL_TOOL_ID, input_schema={}))
# mcp_tools.append(MCPTool(id=SUMMARIZE_TOOL_ID, name="Summarize",
# description="总结工具", mcp_id=SUMMARIZE_TOOL_ID, input_schema={}))
- return mcp_tools
\ No newline at end of file
+ return mcp_tools
diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py
index 16cc858d2580d1836cde68b190fb856df4922c80..5d2da58339e09cdd560de1d93e5e1d53e5a94be8 100644
--- a/apps/scheduler/pool/loader/app.py
+++ b/apps/scheduler/pool/loader/app.py
@@ -82,6 +82,7 @@ class AppLoader:
# 加载模型
try:
metadata = AgentAppMetadata.model_validate(metadata)
+ logger.info("[AppLoader] Agent应用元数据验证成功: %s", metadata)
except Exception as e:
err = "[AppLoader] Agent应用元数据验证失败"
logger.exception(err)
diff --git a/apps/scheduler/pool/loader/mcp.py b/apps/scheduler/pool/loader/mcp.py
index 16b1f6669a5a9781e07007c570324803e62d2740..e97ef4a02657c4d67ba098910842cc02873a59ce 100644
--- a/apps/scheduler/pool/loader/mcp.py
+++ b/apps/scheduler/pool/loader/mcp.py
@@ -85,37 +85,61 @@ class MCPLoader(metaclass=SingletonMeta):
mcp_id = next(iter(item.mcpServers.keys()))
mcp_config = item.mcpServers[mcp_id]
- if not mcp_config.autoInstall:
- print(f"[Installer] MCP模板无需安装: {mcp_id}") # noqa: T201
-
- elif isinstance(mcp_config, MCPServerStdioConfig):
- print(f"[Installer] Stdio方式的MCP模板,开始自动安装: {mcp_id}") # noqa: T201
- if "uv" in mcp_config.command:
- new_config = await install_uvx(mcp_id, mcp_config)
- elif "npx" in mcp_config.command:
- new_config = await install_npx(mcp_id, mcp_config)
-
- if new_config is None:
- logger.error("[MCPLoader] MCP模板安装失败: %s", mcp_id)
- await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.FAILED)
- return
-
- item.mcpServers[mcp_id] = new_config
-
- # 重新保存config
- template_config = MCP_PATH / "template" / mcp_id / "config.json"
- f = await template_config.open("w+", encoding="utf-8")
- config_data = item.model_dump(by_alias=True, exclude_none=True)
- await f.write(json.dumps(config_data, indent=4, ensure_ascii=False))
- await f.aclose()
+ try:
+ if not mcp_config.autoInstall:
+ print(f"[Installer] MCP模板无需安装: {mcp_id}") # noqa: T201
+
+ elif isinstance(mcp_config, MCPServerStdioConfig):
+ print(f"[Installer] Stdio方式的MCP模板,开始自动安装: {mcp_id}") # noqa: T201
+ if "uv" in mcp_config.command:
+ new_config = await install_uvx(mcp_id, mcp_config)
+ elif "npx" in mcp_config.command:
+ new_config = await install_npx(mcp_id, mcp_config)
+
+ if new_config is None:
+ logger.error("[MCPLoader] MCP模板安装失败: %s", mcp_id)
+ await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.FAILED)
+ return
+
+ mcp_config = new_config
+
+ # 重新保存config
+ template_config = MCP_PATH / "template" / mcp_id / "config.json"
+ f = await template_config.open("w+", encoding="utf-8")
+ config_data = config.model_dump(by_alias=True, exclude_none=True)
+ await f.write(json.dumps(config_data, indent=4, ensure_ascii=False))
+ await f.aclose()
+
+ else:
+ logger.info("[Installer] SSE/StreamableHTTP方式的MCP模板,无需安装: %s", mcp_id)
+ mcp_config.autoInstall = False
+
+ await MCPLoader._insert_template_tool(mcp_id, mcp_config)
+ await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.READY)
+ logger.info("[Installer] MCP模板安装成功: %s", mcp_id)
+ except Exception:
+ logger.exception("[MCPLoader] MCP模板安装失败: %s", mcp_id)
+ await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.FAILED)
+ raise
- else:
- print(f"[Installer] SSE/StreamableHTTP方式的MCP模板,无需安装: {mcp_id}") # noqa: T201
- item.mcpServers[mcp_id].autoInstall = False
- print(f"[Installer] MCP模板安装成功: {mcp_id}") # noqa: T201
- await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.READY)
- await MCPLoader._insert_template_tool(mcp_id, item)
+ @staticmethod
+ async def clear_ready_or_failed_mcp_installation() -> None:
+ """清除状态为ready或failed的MCP安装任务"""
+ mcp_collection = MongoDB().get_collection("mcp")
+ mcp_ids = ProcessHandler.get_all_task_ids()
+ # 检索_id在mcp_ids且状态为ready或者failed的MCP的内容
+ db_service_list = await mcp_collection.find(
+ {"_id": {"$in": mcp_ids}, "status": {"$in": [MCPInstallStatus.READY, MCPInstallStatus.FAILED]}},
+ ).to_list(None)
+ for db_service in db_service_list:
+ try:
+ item = MCPCollection.model_validate(db_service)
+ except Exception as e:
+ logger.error("[MCPLoader] MCP模板数据验证失败: %s, 错误: %s", db_service["_id"], e)
+ continue
+ ProcessHandler.remove_task(item.id)
+ logger.info("[MCPLoader] 删除已完成或失败的MCP安装进程: %s", item.id)
@staticmethod
@@ -127,6 +151,9 @@ class MCPLoader(metaclass=SingletonMeta):
:param MCPServerConfig config: MCP配置
:return: 无
"""
+ # 清除状态为ready或failed的MCP安装任务
+ await MCPLoader.clear_ready_or_failed_mcp_installation()
+
# 如果包含多个MCP Server,报错
if len(config.mcpServers) > 1:
err = f"[MCPLoader] MCP模板“{mcp_id}”包含多个MCP Server,无法初始化"
@@ -147,6 +174,29 @@ class MCPLoader(metaclass=SingletonMeta):
raise RuntimeError(err)
+ @staticmethod
+ async def cancel_all_installing_task() -> None:
+ """取消正在安装的MCP模板任务"""
+ template_path = MCP_PATH / "template"
+ logger.info("[MCPLoader] 初始化所有MCP模板: %s", template_path)
+ mongo = MongoDB()
+ mcp_collection = mongo.get_collection("mcp")
+ # 遍历所有模板
+ mcp_ids = []
+ async for mcp_dir in template_path.iterdir():
+ # 不是目录
+ if not await mcp_dir.is_dir():
+ logger.warning("[MCPLoader] 跳过非目录: %s", mcp_dir.as_posix())
+ continue
+
+ mcp_ids.append(mcp_dir.name)
+ # 更新数据库状态
+ await mcp_collection.update_many(
+ {"_id": {"$in": mcp_ids}, "status": MCPInstallStatus.INSTALLING},
+ {"$set": {"status": MCPInstallStatus.CANCELLED}},
+ )
+
+
@staticmethod
async def _init_all_template() -> None:
"""
@@ -330,7 +380,7 @@ class MCPLoader(metaclass=SingletonMeta):
@staticmethod
- async def user_active_template(user_sub: str, mcp_id: str, mcp_env: dict[str, Any]) -> None:
+ async def user_active_template(user_sub: str, mcp_id: str, mcp_env: dict[str, Any] | None = None) -> None:
"""
用户激活MCP模板
@@ -350,7 +400,6 @@ class MCPLoader(metaclass=SingletonMeta):
raise FileExistsError(err)
mcp_config = await MCPLoader.get_config(mcp_id)
- mcp_config.mcpServers[mcp_id].env.update(mcp_env)
# 拷贝文件
await asyncer.asyncify(shutil.copytree)(
template_path.as_posix(),
@@ -359,17 +408,32 @@ class MCPLoader(metaclass=SingletonMeta):
symlinks=True,
)
- user_config_path = user_path / "config.json"
- # 更新用户配置
- f = await user_config_path.open("w", encoding="utf-8", errors="ignore")
- await f.write(
- json.dumps(
- mcp_config.model_dump(by_alias=True, exclude_none=True),
- indent=4,
- ensure_ascii=False,
- ),
- )
- await f.aclose()
+ if mcp_env is not None:
+ mcp_config.mcpServers[mcp_id].env.update(mcp_env)
+ user_config_path = user_path / "config.json"
+ # 更新用户配置
+ f = await user_config_path.open("w", encoding="utf-8", errors="ignore")
+ await f.write(
+ json.dumps(
+ mcp_config.model_dump(by_alias=True, exclude_none=True),
+ indent=4,
+ ensure_ascii=False,
+ )
+ )
+ await f.aclose()
+ if mcp_config.mcpType == MCPType.STDIO:
+ index = None
+ for i in range(len(mcp_config.config.args)):
+ if mcp_config.config.args[i] == "--directory":
+ index = i + 1
+ break
+ if index is not None:
+ if index < len(mcp_config.config.args):
+ mcp_config.config.args[index] = str(user_path)+'/project'
+ else:
+ mcp_config.config.args.append(str(user_path)+'/project')
+ else:
+ mcp_config.config.args = ["--directory", str(user_path)+'/project'] + mcp_config.config.args
# 更新数据库
async with postgres.session() as session:
await session.merge(MCPActivated(
@@ -425,6 +489,27 @@ class MCPLoader(metaclass=SingletonMeta):
return deleted_mcp_list
+ @staticmethod
+ async def cancel_installing_task(cancel_mcp_list: list[str]) -> None:
+ """
+ 取消正在安装的MCP模板任务
+
+ :param list[str] cancel_mcp_list: 需要取消的MCP列表
+ :return: 无
+ """
+ mongo = MongoDB()
+ mcp_collection = mongo.get_collection("mcp")
+ # 更新数据库状态
+ cancel_mcp_list = await mcp_collection.distinct("_id", {"_id": {"$in": cancel_mcp_list}, "status": MCPInstallStatus.INSTALLING})
+ await mcp_collection.update_many(
+ {"_id": {"$in": cancel_mcp_list}, "status": MCPInstallStatus.INSTALLING},
+ {"$set": {"status": MCPInstallStatus.CANCELLED}},
+ )
+ for mcp_id in cancel_mcp_list:
+ ProcessHandler.remove_task(mcp_id)
+ logger.info("[MCPLoader] 取消这些正在安装的MCP模板任务: %s", cancel_mcp_list)
+
+
@staticmethod
async def remove_deleted_mcp(deleted_mcp_list: list[str]) -> None:
"""
@@ -553,6 +638,7 @@ class MCPLoader(metaclass=SingletonMeta):
# 初始化所有模板
await MCPLoader._init_all_template()
+ await MCPLoader.cancel_all_installing_task()
# 加载用户MCP
await MCPLoader._load_user_mcp()
diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py
index 4297f6e30d64a2ba003b407a056692a802d8a4de..802ec35050ae860727dedb2e354c277853330559 100644
--- a/apps/scheduler/pool/loader/service.py
+++ b/apps/scheduler/pool/loader/service.py
@@ -32,6 +32,9 @@ class ServiceLoader:
"""加载单个Service"""
service_path = BASE_PATH / str(service_id)
# 载入元数据
+ if not await (service_path / "metadata.yaml").exists():
+ logger.error("[ServiceLoader] Service %s 的元数据不存在", service_id)
+ return
metadata = await MetadataLoader().load_one(service_path / "metadata.yaml")
if not isinstance(metadata, ServiceMetadata):
err = f"[ServiceLoader] 元数据类型错误: {service_path}/metadata.yaml"
diff --git a/apps/scheduler/pool/mcp/client.py b/apps/scheduler/pool/mcp/client.py
index 092bac8909635a5e0c846dddef3456d8ad3be43c..297ed3e2e28042687f9a724248e436b927e540cd 100644
--- a/apps/scheduler/pool/mcp/client.py
+++ b/apps/scheduler/pool/mcp/client.py
@@ -29,6 +29,7 @@ class MCPClient:
mcp_id: str
task: asyncio.Task
ready_sign: asyncio.Event
+ error_sign: asyncio.Event
stop_sign: asyncio.Event
client: ClientSession
status: MCPStatus
@@ -54,9 +55,10 @@ class MCPClient:
"""
# 创建Client
if isinstance(config, MCPServerSSEConfig):
+ env = config.env or {}
client = sse_client(
url=config.url,
- headers=config.env,
+ headers=env,
)
elif isinstance(config, MCPServerStdioConfig):
if user_sub:
@@ -72,6 +74,7 @@ class MCPClient:
cwd=cwd.as_posix(),
))
else:
+ self.error_sign.set()
err = f"[MCPClient] MCP {mcp_id}:未知的MCP服务类型“{config.type}”"
logger.error(err)
raise TypeError(err)
@@ -85,6 +88,8 @@ class MCPClient:
# 初始化Client
await session.initialize()
except Exception:
+ self.error_sign.set()
+ self.status = MCPStatus.STOPPED
logger.exception("[MCPClient] MCP %s:初始化失败", mcp_id)
raise
@@ -93,6 +98,7 @@ class MCPClient:
# 等待关闭信号
await self.stop_sign.wait()
+ logger.info("[MCPClient] MCP %s:收到停止信号,正在关闭", mcp_id)
# 关闭Client
try:
@@ -116,13 +122,22 @@ class MCPClient:
# 初始化变量
self.mcp_id = mcp_id
self.ready_sign = asyncio.Event()
+ self.error_sign = asyncio.Event()
self.stop_sign = asyncio.Event()
# 创建协程
self.task = asyncio.create_task(self._main_loop(user_sub, mcp_id, config))
# 等待初始化完成
- await self.ready_sign.wait()
+ done, pending = await asyncio.wait(
+ [asyncio.create_task(self.ready_sign.wait()),
+ asyncio.create_task(self.error_sign.wait())],
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+ if self.error_sign.is_set():
+ self.status = MCPStatus.ERROR
+ logger.error("[MCPClient] MCP %s:初始化失败", mcp_id)
+ raise Exception(f"MCP {mcp_id} 初始化失败")
# 获取工具列表
self.tools = (await self.client.list_tools()).tools
@@ -138,5 +153,5 @@ class MCPClient:
self.stop_sign.set()
try:
await self.task
- except Exception:
- logger.exception("[MCPClient] MCP %s:停止失败", self.mcp_id)
+ except Exception as e: # noqa: BLE001
+ logger.warning("[MCPClient] MCP %s:停止时发生异常:%s", self.mcp_id, e)
diff --git a/apps/scheduler/pool/mcp/install.py b/apps/scheduler/pool/mcp/install.py
index a320714c27534f5e8ec58528cc77af4be71215b4..60a660c88c37a8ba7d748a4def51048a62fc5597 100644
--- a/apps/scheduler/pool/mcp/install.py
+++ b/apps/scheduler/pool/mcp/install.py
@@ -1,6 +1,8 @@
# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
"""MCP 安装"""
+import logging
+import shutil
from asyncio import subprocess
from typing import TYPE_CHECKING
@@ -10,6 +12,9 @@ if TYPE_CHECKING:
from apps.schemas.mcp import MCPServerStdioConfig
+logger = logging.getLogger(__name__)
+
+
async def install_uvx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServerStdioConfig | None":
"""
安装使用uvx包管理器的MCP服务
@@ -23,29 +28,40 @@ async def install_uvx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer
:rtype: MCPServerStdioConfig
:raises ValueError: 未找到MCP Server对应的Python包
"""
- # 创建文件夹
- mcp_path = MCP_PATH / "template" / mcp_id / "project"
- await mcp_path.mkdir(parents=True, exist_ok=True)
+ uv_path = shutil.which("uv")
+ if uv_path is None:
+ error = "[Installer] 未找到uv命令,请先安装uv包管理器: pip install uv"
+ logger.error(error)
+ return None
# 找到包名
- package = ""
+ package = None
for arg in config.args:
if not arg.startswith("-"):
package = arg
break
-
+ logger.info("[Installer] MCP包名: %s", package)
if not package:
print("[Installer] 未找到包名") # noqa: T201
return None
+ # 创建文件夹
+ mcp_path = MCP_PATH / "template" / mcp_id / "project"
+ logger.info("[Installer] MCP安装路径: %s", mcp_path)
+ await mcp_path.mkdir(parents=True, exist_ok=True)
+
# 如果有pyproject.toml文件,则使用sync
- if await (mcp_path / "pyproject.toml").exists():
+ flag = await (mcp_path / "pyproject.toml").exists()
+ logger.info("[Installer] MCP安装标志: %s", flag)
+ if flag:
+ shell_command = (
+ f"{uv_path} venv; "
+ f"{uv_path} sync --index-url https://pypi.tuna.tsinghua.edu.cn/simple --active "
+ "--no-install-project --no-cache"
+ )
+ logger.info("[Installer] MCP安装命令: %s", shell_command)
pipe = await subprocess.create_subprocess_shell(
- (
- "uv venv; "
- "uv sync --index-url https://pypi.tuna.tsinghua.edu.cn/simple --active "
- "--no-install-project --no-cache"
- ),
+ shell_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=mcp_path,
@@ -57,21 +73,24 @@ async def install_uvx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer
return None
print(f"[Installer] 检查依赖成功: {mcp_path}; {stdout.decode() if stdout else '(无输出信息)'}") # noqa: T201
- config.command = "uv"
- config.args = ["run", *config.args]
+ config.command = uv_path
+ if "run" not in config.args:
+ config.args = ["run", *config.args]
config.autoInstall = False
+ logger.info("[Installer] MCP安装配置更新成功: %s", config)
return config
# 否则,初始化uv项目
+ shell_command = (
+ f"{uv_path} init; "
+ f"{uv_path} venv; "
+ f"{uv_path} add --index-url https://pypi.tuna.tsinghua.edu.cn/simple {package}; "
+ f"{uv_path} sync --index-url https://pypi.tuna.tsinghua.edu.cn/simple --active --no-install-project --no-cache"
+ )
+ logger.info("[Installer] MCP安装命令: %s", shell_command)
pipe = await subprocess.create_subprocess_shell(
- (
- f"uv init; "
- f"uv venv; "
- f"uv add --index-url https://pypi.tuna.tsinghua.edu.cn/simple {package}; "
- f"uv sync --index-url https://pypi.tuna.tsinghua.edu.cn/simple --active "
- f"--no-install-project --no-cache"
- ),
+ shell_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=mcp_path,
@@ -84,10 +103,12 @@ async def install_uvx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer
print(f"[Installer] 安装 {package} 成功: {mcp_path}; {stdout.decode() if stdout else '(无输出信息)'}") # noqa: T201
# 更新配置
- config.command = "uv"
- config.args = ["run", *config.args]
+ config.command = uv_path
+ if "run" not in config.args:
+ config.args = ["run", *config.args]
config.autoInstall = False
+ logger.info("[Installer] MCP安装配置更新成功: %s", config)
return config
@@ -103,17 +124,14 @@ async def install_npx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer
:rtype: MCPServerStdioConfig
:raises ValueError: 未找到MCP Server对应的npm包
"""
- mcp_path = MCP_PATH / "template" / mcp_id / "project"
- await mcp_path.mkdir(parents=True, exist_ok=True)
-
- # 如果有node_modules文件夹,则认为已安装
- if await (mcp_path / "node_modules").exists():
- config.command = "npm"
- config.args = ["exec", *config.args]
- return config
+ npm_path = shutil.which("npm")
+ if npm_path is None:
+ error = "[Installer] 未找到npm命令,请先安装Node.js和npm"
+ logger.error(error)
+ return None
# 查找package name
- package = ""
+ package = None
for arg in config.args:
if not arg.startswith("-"):
package = arg
@@ -123,9 +141,19 @@ async def install_npx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer
print("[Installer] 未找到包名") # noqa: T201
return None
+ mcp_path = MCP_PATH / "template" / mcp_id / "project"
+ await mcp_path.mkdir(parents=True, exist_ok=True)
+
+ # 如果有node_modules文件夹,则认为已安装
+ if await (mcp_path / "node_modules").exists():
+ config.command = npm_path
+ if "exec" not in config.args:
+ config.args = ["exec", *config.args]
+ return config
+
# 安装NPM包
pipe = await subprocess.create_subprocess_shell(
- f"npm install {package}",
+ f"{npm_path} install {package}",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=mcp_path,
@@ -137,8 +165,9 @@ async def install_npx(mcp_id: str, config: "MCPServerStdioConfig") -> "MCPServer
print(f"[Installer] 安装 {package} 成功: {mcp_path}; {stdout.decode() if stdout else '(无输出信息)'}") # noqa: T201
# 更新配置
- config.command = "npm"
- config.args = ["exec", *config.args]
+ config.command = npm_path
+ if "exec" not in config.args:
+ config.args = ["exec", *config.args]
config.autoInstall = False
return config
diff --git a/apps/scheduler/pool/mcp/pool.py b/apps/scheduler/pool/mcp/pool.py
index 9ff5d2e9642581bb4ffdf534a8d740c87f1d3d90..91413db6167da62435d246b2d1264a305afdb9e4 100644
--- a/apps/scheduler/pool/mcp/pool.py
+++ b/apps/scheduler/pool/mcp/pool.py
@@ -24,11 +24,11 @@ class MCPPool(metaclass=SingletonMeta):
async def _init_mcp(self, mcp_id: str, user_sub: str) -> MCPClient | None:
"""初始化MCP池"""
- mcp_math = MCP_USER_PATH / user_sub / mcp_id / "project"
config_path = MCP_USER_PATH / user_sub / mcp_id / "config.json"
- if not await mcp_math.exists() or not await mcp_math.is_dir():
- logger.warning("[MCPPool] 用户 %s 的MCP %s 未激活", user_sub, mcp_id)
+ flag = (await config_path.exists())
+ if not flag:
+ logger.warning("[MCPPool] 用户 %s 的MCP %s 配置文件不存在", user_sub, mcp_id)
return None
config = MCPServerConfig.model_validate_json(await config_path.read_text())
@@ -40,6 +40,9 @@ class MCPPool(metaclass=SingletonMeta):
return None
await client.init(user_sub, mcp_id, config.config)
+ if user_sub not in self.pool:
+ self.pool[user_sub] = {}
+ self.pool[user_sub][mcp_id] = client
return client
diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py
index 0504c322f051904c2800481d885ce8730da1d0b7..496b18daf6c1b526f7bc9c30adb3cd3920664740 100644
--- a/apps/scheduler/pool/pool.py
+++ b/apps/scheduler/pool/pool.py
@@ -119,7 +119,11 @@ class Pool:
for app in changed_app:
hash_key = Path("app/" + str(app)).as_posix()
if hash_key in checker.hashes:
- await AppLoader.load(app, checker.hashes[hash_key])
+ try:
+ await AppLoader.load(app, checker.hashes[hash_key])
+ except Exception as e:
+ await AppLoader.delete(app, is_reload=True)
+ logger.warning("[Pool] 加载App %s 失败: %s", app, e)
# 载入MCP
logger.info("[Pool] 载入MCP")
diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py
index 1dd9b08c08a58494c931986a97688721a95984ea..d5e2e313ce8ac6c11f73435732d158a44ef8ed78 100644
--- a/apps/scheduler/scheduler/scheduler.py
+++ b/apps/scheduler/scheduler/scheduler.py
@@ -128,7 +128,12 @@ class Scheduler:
# 创建用于通信的事件
kill_event = asyncio.Event()
monitor = asyncio.create_task(self._monitor_activity(kill_event, self.task.ids.user_sub))
- if not self.post_body.app or self.post_body.app.app_id == "":
+ rag_method = True
+ if self.post_body.app and self.post_body.app.app_id:
+ rag_method = False
+ if self.task.state.app_id:
+ rag_method = False
+ if rag_method:
llm = await self.get_llm_use_in_chat_with_rag()
kb_ids = await self.get_kb_ids_use_in_chat_with_rag()
self.task = await push_init_message(self.task, self.queue, 3, is_flow=False)
@@ -232,7 +237,7 @@ class Scheduler:
max_tokens=llm.max_tokens,
),
)
- if background.conversation:
+ if background.conversation and self.task.state.flow_status == FlowStatus.INIT:
try:
question_obj = QuestionRewrite()
post_body.question = await question_obj.generate(history=background.conversation, question=post_body.question, llm=reasion_llm)
@@ -296,7 +301,7 @@ class Scheduler:
servers_id=servers_id,
background=background,
agent_id=app_info.app_id,
- params=post_body.app.params,
+ params=post_body.params,
)
# 开始运行
logger.info("[Scheduler] 运行Executor")
diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py
index f80a0c4a7c49df005d8a9dda24beb7a25ca834b1..8ebc46e3c04240cddb9127ffcc7c3d77e8521183 100644
--- a/apps/schemas/enum_var.py
+++ b/apps/schemas/enum_var.py
@@ -195,3 +195,10 @@ class AgentState(str, Enum):
RUNNING = "RUNNING"
FINISHED = "FINISHED"
ERROR = "ERROR"
+
+
+class Language(str, Enum):
+ """语言"""
+
+ ZH = "zh"
+ EN = "en"
diff --git a/apps/services/mcp_service.py b/apps/services/mcp_service.py
index b77ab65690e78efce4652808ad5cbeb841a6cd66..8c9cc8b9d64351055af3c02d658845f0dc5bf12d 100644
--- a/apps/services/mcp_service.py
+++ b/apps/services/mcp_service.py
@@ -15,6 +15,7 @@ from apps.common.postgres import postgres
from apps.constants import (
ALLOWED_ICON_MIME_TYPES,
ICON_PATH,
+ MCP_PATH,
SERVICE_PAGE_SIZE,
)
from apps.models.mcp import (
@@ -34,6 +35,7 @@ from apps.schemas.mcp import (
)
from apps.schemas.request_data import UpdateMCPServiceRequest
from apps.schemas.response_data import MCPServiceCardItem
+from apps.services.user import UserManager
logger = logging.getLogger(__name__)
MCP_ICON_PATH = ICON_PATH / "mcp"
@@ -96,6 +98,7 @@ class MCPServiceManager:
keyword: str | None,
page: int,
*,
+ is_installed: bool | None = None,
is_active: bool | None = None,
) -> list[MCPServiceCardItem]:
"""
@@ -229,9 +232,9 @@ class MCPServiceManager:
"""
# 检查config
if data.mcp_type == MCPType.SSE:
- config = MCPServerSSEConfig.model_validate_json(data.config)
+ config = MCPServerSSEConfig.model_validate(data.config)
else:
- config = MCPServerStdioConfig.model_validate_json(data.config)
+ config = MCPServerStdioConfig.model_validate(data.config)
# 构造Server
mcp_server = MCPServerConfig(
@@ -252,8 +255,25 @@ class MCPServiceManager:
# 保存并载入配置
logger.info("[MCPServiceManager] 创建mcp:%s", mcp_server.name)
+ mcp_path = MCP_PATH / "template" / mcp_id / "project"
+ index = None
+ if isinstance(config, MCPServerStdioConfig):
+ index = None
+ for i in range(len(config.args)):
+ if config.args[i] != "--directory":
+ continue
+ index = i + 1
+ break
+ if index is not None:
+ if index >= len(config.args):
+ config.args.append(str(mcp_path))
+ else:
+ config.args[index+1] = str(mcp_path)
+ else:
+ config.args += ["--directory", str(mcp_path)]
+ await MCPLoader._insert_template_db(mcp_id=mcp_id, config=mcp_server)
await MCPLoader.save_one(mcp_server.id, mcp_server)
- await MCPLoader.init_one_template(mcp_id=mcp_server.id, config=mcp_server)
+ await MCPLoader.update_template_status(mcp_id, MCPInstallStatus.INIT)
return mcp_server.id
@@ -279,18 +299,21 @@ class MCPServiceManager:
for user_id in db_service.activated:
await MCPServiceManager.deactive_mcpservice(user_sub=user_id, mcp_id=data.service_id)
- await MCPLoader.init_one_template(mcp_id=data.service_id, config=MCPServerConfig(
+ mcp_config = MCPServerConfig(
name=data.name,
overview=data.overview,
description=data.description,
- mcpServers=MCPServerStdioConfig.model_validate_json(
+ mcpServers=MCPServerStdioConfig.model_validate(
data.config,
- ) if data.mcp_type == MCPType.STDIO else MCPServerSSEConfig.model_validate_json(
+ ) if data.mcp_type == MCPType.STDIO else MCPServerSSEConfig.model_validate(
data.config,
),
mcpType=data.mcp_type,
author=user_sub,
- ))
+ )
+ await MCPLoader._insert_template_db(mcp_id=data.service_id, config=mcp_config)
+ await MCPLoader.save_one(mcp_id=data.service_id, config=mcp_config)
+ await MCPLoader.update_template_status(data.service_id, MCPInstallStatus.INIT)
# 返回服务ID
return data.service_id
@@ -434,3 +457,29 @@ class MCPServiceManager:
"""
async with postgres.session() as session:
return list((await session.scalars(select(MCPTools).where(MCPTools.mcpId == mcp_id))).all())
+
+
+ @staticmethod
+ async def install_mcpservice(user_sub: str, service_id: str, install: bool) -> None:
+ """
+ 安装或卸载MCP服务
+
+ :param user_sub: str: 用户ID
+ :param service_id: str: MCP服务ID
+ :param install: bool: 是否安装
+ :return: 无
+ """
+ service_collection = MongoDB().get_collection("mcp")
+ db_service = await service_collection.find_one({"_id": service_id, "author": user_sub})
+ db_service = MCPCollection.model_validate(db_service)
+ if install:
+ if db_service.status == MCPInstallStatus.INSTALLING or db_service.status == MCPInstallStatus.READY:
+ err = "[MCPServiceManager] MCP服务已处于安装中或已准备就绪"
+ raise Exception(err)
+ mcp_config = await MCPLoader.get_config(service_id)
+ await MCPLoader.init_one_template(mcp_id=service_id, config=mcp_config)
+ else:
+ if db_service.status != MCPInstallStatus.INSTALLING:
+ err = "[MCPServiceManager] 只能卸载处于安装中的MCP服务"
+ raise Exception(err)
+ await MCPLoader.cancel_installing_task([service_id])
diff --git a/apps/services/task.py b/apps/services/task.py
index 61131fdd0d8544ab6f21804663a83fceef57f6b8..95afa094165e66fe49a7fe6d6750283f65a98eb0 100644
--- a/apps/services/task.py
+++ b/apps/services/task.py
@@ -53,7 +53,7 @@ class TaskManager:
@staticmethod
- async def get_context_by_task_id(task_id: str, length: int = 0) -> list[FlowStepHistory]:
+ async def get_context_by_task_id(task_id: str, length: int | None = None) -> list[FlowStepHistory]:
"""根据task_id获取flow信息"""
async with postgres.session() as session:
executor_history_collection = session.query(ExecutorHistory)
diff --git a/pyproject.toml b/pyproject.toml
index ab024be57c90be4a7b6daa91ea42c3b880f96346..f02ecaf12f0d3af8622d4e0429da784e09d83753 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -14,10 +14,10 @@ dependencies = [
"jinja2==3.1.6",
"jionlp==1.5.20",
"jsonschema==4.23.0",
- "mcp==1.12.2",
+ "mcp==1.12.4",
"minio==7.2.15",
- "ollama==0.5.1",
- "openai==1.97.1",
+ "ollama==0.5.3",
+ "openai==1.99.6",
"pandas==2.2.3",
"pgvector==0.4.1",
"pillow==10.3.0",