From 3147f1a3c82f6a123cd4d96b7af18d5b9977b729 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Tue, 5 Aug 2025 17:55:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9B=9E=E5=90=88agent=E5=88=86=E6=94=AF?= =?UTF-8?q?=E8=87=B3=20!597?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/call/__init__.py | 2 + apps/scheduler/call/choice/choice.py | 146 ++++++++- .../call/choice/condition_handler.py | 308 ++++++++++++++++++ apps/scheduler/call/choice/schema.py | 50 +++ apps/scheduler/call/core.py | 13 +- apps/scheduler/call/mcp/mcp.py | 2 +- apps/scheduler/executor/agent.py | 30 +- apps/scheduler/executor/flow.py | 20 +- apps/scheduler/executor/step.py | 14 +- apps/scheduler/mcp/host.py | 3 +- apps/scheduler/mcp_agent/__init__.py | 8 + apps/scheduler/mcp_agent/agent/base.py | 196 ----------- apps/scheduler/mcp_agent/agent/mcp.py | 82 ----- apps/scheduler/mcp_agent/agent/react.py | 36 -- apps/scheduler/mcp_agent/agent/toolcall.py | 238 -------------- apps/scheduler/mcp_agent/host.py | 190 +++++++++++ apps/scheduler/mcp_agent/plan.py | 110 +++++++ apps/scheduler/mcp_agent/prompt.py | 240 ++++++++++++++ apps/scheduler/mcp_agent/schema.py | 148 --------- apps/scheduler/mcp_agent/select.py | 185 +++++++++++ apps/scheduler/mcp_agent/tool/__init__.py | 9 - apps/scheduler/mcp_agent/tool/base.py | 73 ----- apps/scheduler/mcp_agent/tool/terminate.py | 25 -- .../mcp_agent/tool/tool_collection.py | 55 ---- apps/scheduler/scheduler/context.py | 5 +- apps/scheduler/scheduler/message.py | 11 +- apps/scheduler/scheduler/scheduler.py | 2 +- apps/scheduler/slot/slot.py | 42 +++ apps/schemas/enum_var.py | 13 + apps/schemas/record.py | 1 + apps/schemas/task.py | 13 +- tests/common/test_queue.py | 2 +- 32 files changed, 1347 insertions(+), 925 deletions(-) create mode 100644 apps/scheduler/call/choice/condition_handler.py create mode 100644 apps/scheduler/mcp_agent/__init__.py delete mode 100644 apps/scheduler/mcp_agent/agent/base.py delete mode 100644 apps/scheduler/mcp_agent/agent/mcp.py delete mode 100644 apps/scheduler/mcp_agent/agent/react.py delete mode 100644 apps/scheduler/mcp_agent/agent/toolcall.py create mode 100644 apps/scheduler/mcp_agent/host.py create mode 100644 apps/scheduler/mcp_agent/plan.py create mode 100644 apps/scheduler/mcp_agent/prompt.py delete mode 100644 apps/scheduler/mcp_agent/schema.py create mode 100644 apps/scheduler/mcp_agent/select.py delete mode 100644 apps/scheduler/mcp_agent/tool/__init__.py delete mode 100644 apps/scheduler/mcp_agent/tool/base.py delete mode 100644 apps/scheduler/mcp_agent/tool/terminate.py delete mode 100644 apps/scheduler/mcp_agent/tool/tool_collection.py diff --git a/apps/scheduler/call/__init__.py b/apps/scheduler/call/__init__.py index 2ee8b862..0fc7106d 100644 --- a/apps/scheduler/call/__init__.py +++ b/apps/scheduler/call/__init__.py @@ -2,6 +2,7 @@ """Agent工具部分""" from apps.scheduler.call.api.api import API +from apps.scheduler.call.choice.choice import Choice from apps.scheduler.call.graph.graph import Graph from apps.scheduler.call.llm.llm import LLM from apps.scheduler.call.mcp.mcp import MCP @@ -16,6 +17,7 @@ __all__ = [ "MCP", "RAG", "SQL", + "Choice", "Graph", "Suggestion", ] diff --git a/apps/scheduler/call/choice/choice.py b/apps/scheduler/call/choice/choice.py index 574b3ceb..0d9ace60 100644 --- a/apps/scheduler/call/choice/choice.py +++ b/apps/scheduler/call/choice/choice.py @@ -1,20 +1,150 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """使用大模型或使用程序做出判断""" -from enum import Enum +import ast +import copy +import logging +from collections.abc import AsyncGenerator +from typing import Any +from pydantic import Field + +from apps.scheduler.call.choice.condition_handler import ConditionHandler +from apps.scheduler.call.choice.schema import ( + ChoiceBranch, + ChoiceInput, + ChoiceOutput, + Condition, + Logic, +) from apps.scheduler.call.core import CoreCall +from apps.schemas.enum_var import CallOutputType +from apps.schemas.parameters import Type +from apps.schemas.scheduler import ( + CallError, + CallInfo, + CallOutputChunk, + CallVars, +) + +logger = logging.getLogger(__name__) -from .schema import ChoiceInput, ChoiceOutput +class Choice(CoreCall, input_model=ChoiceInput, output_model=ChoiceOutput): + """Choice工具""" -class Operator(str, Enum): - """Choice工具支持的运算符""" + to_user: bool = Field(default=False) + choices: list[ChoiceBranch] = Field(description="分支", default=[ChoiceBranch(), + ChoiceBranch(conditions=[Condition()], is_default=False)]) - pass + @classmethod + def info(cls) -> CallInfo: + """返回Call的名称和描述""" + return CallInfo(name="选择器", description="使用大模型或使用程序做出判断") + async def _prepare_message(self, call_vars: CallVars) -> list[dict[str, Any]]: + """替换choices中的系统变量""" + valid_choices = [] -class Choice(CoreCall, input_model=ChoiceInput, output_model=ChoiceOutput): - """Choice工具""" + for choice in self.choices: + try: + # 验证逻辑运算符 + if choice.logic not in [Logic.AND, Logic.OR]: + msg = f"无效的逻辑运算符: {choice.logic}" + logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + continue + + valid_conditions = [] + for i in range(len(choice.conditions)): + condition = copy.deepcopy(choice.conditions[i]) + # 处理左值 + if condition.left.step_id is not None: + condition.left.value = self._extract_history_variables( + condition.left.step_id+'/'+condition.left.value, call_vars.history) + # 检查历史变量是否成功提取 + if condition.left.value is None: + msg = f"步骤 {condition.left.step_id} 的历史变量不存在" + logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + continue + if not ConditionHandler.check_value_type( + condition.left.value, condition.left.type): + msg = f"左值类型不匹配: {condition.left.value} 应为 {condition.left.type.value}" + logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + continue + else: + msg = "左侧变量缺少step_id" + logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + continue + # 处理右值 + if condition.right.step_id is not None: + condition.right.value = self._extract_history_variables( + condition.right.step_id+'/'+condition.right.value, call_vars.history) + # 检查历史变量是否成功提取 + if condition.right.value is None: + msg = f"步骤 {condition.right.step_id} 的历史变量不存在" + logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + continue + if not ConditionHandler.check_value_type( + condition.right.value, condition.right.type): + msg = f"右值类型不匹配: {condition.right.value} 应为 {condition.right.type.value}" + logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + continue + else: + # 如果右值没有step_id,尝试从call_vars中获取 + right_value_type = await ConditionHandler.get_value_type_from_operate( + condition.operate) + if right_value_type is None: + msg = f"不支持的运算符: {condition.operate}" + logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + continue + if condition.right.type != right_value_type: + msg = f"右值类型不匹配: {condition.right.value} 应为 {right_value_type.value}" + logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + continue + if right_value_type == Type.STRING: + condition.right.value = str(condition.right.value) + else: + condition.right.value = ast.literal_eval(condition.right.value) + if not ConditionHandler.check_value_type( + condition.right.value, condition.right.type): + msg = f"右值类型不匹配: {condition.right.value} 应为 {condition.right.type.value}" + logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + continue + valid_conditions.append(condition) + + # 如果所有条件都无效,抛出异常 + if not valid_conditions: + msg = "分支没有有效条件" + logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + continue + + # 更新有效条件 + choice.conditions = valid_conditions + valid_choices.append(choice) + + except ValueError as e: + logger.warning("分支 %s 处理失败: %s,已跳过", choice.branch_id, str(e)) + continue + + return valid_choices + + async def _init(self, call_vars: CallVars) -> ChoiceInput: + """初始化Choice工具""" + return ChoiceInput( + choices=await self._prepare_message(call_vars), + ) - pass + async def _exec( + self, input_data: dict[str, Any] + ) -> AsyncGenerator[CallOutputChunk, None]: + """执行Choice工具""" + # 解析输入数据 + data = ChoiceInput(**input_data) + try: + branch_id = ConditionHandler.handler(data.choices) + yield CallOutputChunk( + type=CallOutputType.DATA, + content=ChoiceOutput(branch_id=branch_id).model_dump(exclude_none=True, by_alias=True), + ) + except Exception as e: + raise CallError(message=f"选择工具调用失败:{e!s}", data={}) from e \ No newline at end of file diff --git a/apps/scheduler/call/choice/condition_handler.py b/apps/scheduler/call/choice/condition_handler.py new file mode 100644 index 00000000..f8a9e40e --- /dev/null +++ b/apps/scheduler/call/choice/condition_handler.py @@ -0,0 +1,308 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""处理条件分支的工具""" + + +import logging + +from pydantic import BaseModel + +from apps.scheduler.call.choice.schema import ChoiceBranch, Condition, Logic, Value +from apps.schemas.parameters import ( + BoolOperate, + DictOperate, + ListOperate, + NumberOperate, + StringOperate, + Type, +) + +logger = logging.getLogger(__name__) + + +class ConditionHandler(BaseModel): + """条件分支处理器""" + @staticmethod + async def get_value_type_from_operate(operate: NumberOperate | StringOperate | ListOperate | + BoolOperate | DictOperate) -> Type: + """获取右值的类型""" + if isinstance(operate, NumberOperate): + return Type.NUMBER + if operate in [ + StringOperate.EQUAL, StringOperate.NOT_EQUAL, StringOperate.CONTAINS, StringOperate.NOT_CONTAINS, + StringOperate.STARTS_WITH, StringOperate.ENDS_WITH, StringOperate.REGEX_MATCH]: + return Type.STRING + if operate in [StringOperate.LENGTH_EQUAL, StringOperate.LENGTH_GREATER_THAN, + StringOperate.LENGTH_GREATER_THAN_OR_EQUAL, StringOperate.LENGTH_LESS_THAN, + StringOperate.LENGTH_LESS_THAN_OR_EQUAL]: + return Type.NUMBER + if operate in [ListOperate.EQUAL, ListOperate.NOT_EQUAL]: + return Type.LIST + if operate in [ListOperate.CONTAINS, ListOperate.NOT_CONTAINS]: + return Type.STRING + if operate in [ListOperate.LENGTH_EQUAL, ListOperate.LENGTH_GREATER_THAN, + ListOperate.LENGTH_GREATER_THAN_OR_EQUAL, ListOperate.LENGTH_LESS_THAN, + ListOperate.LENGTH_LESS_THAN_OR_EQUAL]: + return Type.NUMBER + if operate in [BoolOperate.EQUAL, BoolOperate.NOT_EQUAL]: + return Type.BOOL + if operate in [DictOperate.EQUAL, DictOperate.NOT_EQUAL]: + return Type.DICT + if operate in [DictOperate.CONTAINS_KEY, DictOperate.NOT_CONTAINS_KEY]: + return Type.STRING + return None + + @staticmethod + def check_value_type(value: Value, expected_type: Type) -> bool: + """检查值的类型是否符合预期""" + if expected_type == Type.STRING and isinstance(value.value, str): + return True + if expected_type == Type.NUMBER and isinstance(value.value, (int, float)): + return True + if expected_type == Type.LIST and isinstance(value.value, list): + return True + if expected_type == Type.DICT and isinstance(value.value, dict): + return True + if expected_type == Type.BOOL and isinstance(value.value, bool): + return True + return False + + @staticmethod + def handler(choices: list[ChoiceBranch]) -> str: + """处理条件""" + default_branch = [c for c in choices if c.is_default] + + for block_judgement in choices: + results = [] + if block_judgement.is_default: + continue + for condition in block_judgement.conditions: + result = ConditionHandler._judge_condition(condition) + results.append(result) + if block_judgement.logic == Logic.AND: + final_result = all(results) + elif block_judgement.logic == Logic.OR: + final_result = any(results) + + if final_result: + return block_judgement.branch_id + + # 如果没有匹配的分支,选择默认分支 + if default_branch: + return default_branch[0].branch_id + return "" + + @staticmethod + def _judge_condition(condition: Condition) -> bool: + """ + 判断条件是否成立。 + + Args: + condition (Condition): 'left', 'operate', 'right', 'type' + + Returns: + bool + + """ + left = condition.left + operate = condition.operate + right = condition.right + value_type = condition.type + + result = None + if value_type == Type.STRING: + result = ConditionHandler._judge_string_condition(left, operate, right) + elif value_type == Type.NUMBER: + result = ConditionHandler._judge_int_condition(left, operate, right) + elif value_type == Type.BOOL: + result = ConditionHandler._judge_bool_condition(left, operate, right) + elif value_type == Type.LIST: + result = ConditionHandler._judge_list_condition(left, operate, right) + elif value_type == Type.DICT: + result = ConditionHandler._judge_dict_condition(left, operate, right) + else: + logger.error("不支持的数据类型: %s", value_type) + msg = f"不支持的数据类型: {value_type}" + raise ValueError(msg) + return result + + @staticmethod + def _judge_string_condition(left: Value, operate: StringOperate, right: Value) -> bool: + """ + 判断字符串类型的条件。 + + Args: + left (Value): 左值,包含 'value' 键。 + operate (Operate): 操作符 + right (Value): 右值,包含 'value' 键。 + + Returns: + bool + + """ + left_value = left.value + if not isinstance(left_value, str): + logger.error("左值不是字符串类型: %s", left_value) + msg = "左值必须是字符串类型" + raise TypeError(msg) + right_value = right.value + result = False + if operate == StringOperate.EQUAL: + return left_value == right_value + elif operate == StringOperate.NOT_EQUAL: + return left_value != right_value + elif operate == StringOperate.CONTAINS: + return right_value in left_value + elif operate == StringOperate.NOT_CONTAINS: + return right_value not in left_value + elif operate == StringOperate.STARTS_WITH: + return left_value.startswith(right_value) + elif operate == StringOperate.ENDS_WITH: + return left_value.endswith(right_value) + elif operate == StringOperate.REGEX_MATCH: + import re + return bool(re.match(right_value, left_value)) + elif operate == StringOperate.LENGTH_EQUAL: + return len(left_value) == right_value + elif operate == StringOperate.LENGTH_GREATER_THAN: + return len(left_value) > right_value + elif operate == StringOperate.LENGTH_GREATER_THAN_OR_EQUAL: + return len(left_value) >= right_value + elif operate == StringOperate.LENGTH_LESS_THAN: + return len(left_value) < right_value + elif operate == StringOperate.LENGTH_LESS_THAN_OR_EQUAL: + return len(left_value) <= right_value + return False + + @staticmethod + def _judge_number_condition(left: Value, operate: NumberOperate, right: Value) -> bool: # noqa: PLR0911 + """ + 判断数字类型的条件。 + + Args: + left (Value): 左值,包含 'value' 键。 + operate (Operate): 操作符 + right (Value): 右值,包含 'value' 键。 + + Returns: + bool + + """ + left_value = left.value + if not isinstance(left_value, (int, float)): + logger.error("左值不是数字类型: %s", left_value) + msg = "左值必须是数字类型" + raise TypeError(msg) + right_value = right.value + if operate == NumberOperate.EQUAL: + return left_value == right_value + elif operate == NumberOperate.NOT_EQUAL: + return left_value != right_value + elif operate == NumberOperate.GREATER_THAN: + return left_value > right_value + elif operate == NumberOperate.LESS_THAN: # noqa: PLR2004 + return left_value < right_value + elif operate == NumberOperate.GREATER_THAN_OR_EQUAL: + return left_value >= right_value + elif operate == NumberOperate.LESS_THAN_OR_EQUAL: + return left_value <= right_value + return False + + @staticmethod + def _judge_bool_condition(left: Value, operate: BoolOperate, right: Value) -> bool: + """ + 判断布尔类型的条件。 + + Args: + left (Value): 左值,包含 'value' 键。 + operate (Operate): 操作符 + right (Value): 右值,包含 'value' 键。 + + Returns: + bool + + """ + left_value = left.value + if not isinstance(left_value, bool): + logger.error("左值不是布尔类型: %s", left_value) + msg = "左值必须是布尔类型" + raise TypeError(msg) + right_value = right.value + if operate == BoolOperate.EQUAL: + return left_value == right_value + elif operate == BoolOperate.NOT_EQUAL: + return left_value != right_value + elif operate == BoolOperate.IS_EMPTY: + return not left_value + elif operate == BoolOperate.NOT_EMPTY: + return left_value + return False + + @staticmethod + def _judge_list_condition(left: Value, operate: ListOperate, right: Value): + """ + 判断列表类型的条件。 + + Args: + left (Value): 左值,包含 'value' 键。 + operate (Operate): 操作符 + right (Value): 右值,包含 'value' 键。 + + Returns: + bool + + """ + left_value = left.value + if not isinstance(left_value, list): + logger.error("左值不是列表类型: %s", left_value) + msg = "左值必须是列表类型" + raise TypeError(msg) + right_value = right.value + if operate == ListOperate.EQUAL: + return left_value == right_value + elif operate == ListOperate.NOT_EQUAL: + return left_value != right_value + elif operate == ListOperate.CONTAINS: + return right_value in left_value + elif operate == ListOperate.NOT_CONTAINS: + return right_value not in left_value + elif operate == ListOperate.LENGTH_EQUAL: + return len(left_value) == right_value + elif operate == ListOperate.LENGTH_GREATER_THAN: + return len(left_value) > right_value + elif operate == ListOperate.LENGTH_GREATER_THAN_OR_EQUAL: + return len(left_value) >= right_value + elif operate == ListOperate.LENGTH_LESS_THAN: + return len(left_value) < right_value + elif operate == ListOperate.LENGTH_LESS_THAN_OR_EQUAL: + return len(left_value) <= right_value + return False + + @staticmethod + def _judge_dict_condition(left: Value, operate: DictOperate, right: Value): + """ + 判断字典类型的条件。 + + Args: + left (Value): 左值,包含 'value' 键。 + operate (Operate): 操作符 + right (Value): 右值,包含 'value' 键。 + + Returns: + bool + + """ + left_value = left.value + if not isinstance(left_value, dict): + logger.error("左值不是字典类型: %s", left_value) + msg = "左值必须是字典类型" + raise TypeError(msg) + right_value = right.value + if operate == DictOperate.EQUAL: + return left_value == right_value + elif operate == DictOperate.NOT_EQUAL: + return left_value != right_value + elif operate == DictOperate.CONTAINS_KEY: + return right_value in left_value + elif operate == DictOperate.NOT_CONTAINS_KEY: + return right_value not in left_value + return False \ No newline at end of file diff --git a/apps/scheduler/call/choice/schema.py b/apps/scheduler/call/choice/schema.py index 60b62d09..9e886c57 100644 --- a/apps/scheduler/call/choice/schema.py +++ b/apps/scheduler/call/choice/schema.py @@ -1,12 +1,62 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """Choice Call的输入和输出""" +import uuid +from enum import Enum + +from pydantic import BaseModel, Field from apps.scheduler.call.core import DataBase +from apps.schemas.parameters import ( + BoolOperate, + DictOperate, + ListOperate, + NumberOperate, + StringOperate, + Type, +) + + +class Logic(str, Enum): + """Choice 工具支持的逻辑运算符""" + + AND = "and" + OR = "or" + + +class Value(DataBase): + """值的结构""" + + step_id: str | None = Field(description="步骤id", default=None) + type: Type | None = Field(description="值的类型", default=None) + value: str | float | int | bool | list | dict | None = Field(description="值", default=None) + + +class Condition(DataBase): + """单个条件""" + + left: Value = Field(description="左值", default=Value()) + right: Value = Field(description="右值", default=Value()) + operate: NumberOperate | StringOperate | ListOperate | BoolOperate | DictOperate | None = Field( + description="运算符", default=None) + id: str = Field(description="条件ID", default_factory=lambda: str(uuid.uuid4())) + + +class ChoiceBranch(DataBase): + """子分支""" + + branch_id: str = Field(description="分支ID", default_factory=lambda: str(uuid.uuid4())) + logic: Logic = Field(description="逻辑运算符", default=Logic.AND) + conditions: list[Condition] = Field(description="条件列表", default=[]) + is_default: bool = Field(description="是否为默认分支", default=True) class ChoiceInput(DataBase): """Choice Call的输入""" + choices: list[ChoiceBranch] = Field(description="分支", default=[]) + class ChoiceOutput(DataBase): """Choice Call的输出""" + + branch_id: str = Field(description="分支ID", default="") \ No newline at end of file diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py index 7edfeb13..91f22409 100644 --- a/apps/scheduler/call/core.py +++ b/apps/scheduler/call/core.py @@ -131,18 +131,17 @@ class CoreCall(BaseModel): :return: 变量 """ split_path = path.split("/") + if len(split_path) < 2: + err = f"[CoreCall] 路径格式错误: {path}" + logger.error(err) + return None if split_path[0] not in history: err = f"[CoreCall] 步骤{split_path[0]}不存在" logger.error(err) - raise CallError( - message=err, - data={ - "step_id": split_path[0], - }, - ) + return None data = history[split_path[0]].output_data - for key in split_path[1:]: + for key in split_path[2:]: if key not in data: err = f"[CoreCall] 输出Key {key} 不存在" logger.error(err) diff --git a/apps/scheduler/call/mcp/mcp.py b/apps/scheduler/call/mcp/mcp.py index e84b56cb..6b4b7958 100644 --- a/apps/scheduler/call/mcp/mcp.py +++ b/apps/scheduler/call/mcp/mcp.py @@ -32,7 +32,7 @@ class MCP(CoreCall, input_model=MCPInput, output_model=MCPOutput): """MCP工具""" mcp_list: list[str] = Field(description="MCP Server ID列表", max_length=5, min_length=1) - max_steps: int = Field(description="最大步骤数", default=6) + max_steps: int = Field(description="最大步骤数", default=20) text_output: bool = Field(description="是否将结果以文本形式返回", default=True) to_user: bool = Field(description="是否将结果返回给用户", default=True) diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index f6814dd3..cb8e183e 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -6,7 +6,9 @@ import logging from pydantic import Field from apps.scheduler.executor.base import BaseExecutor -from apps.scheduler.mcp_agent.agent.mcp import MCPAgent +from apps.scheduler.mcp_agent import host, plan, select +from apps.schemas.task import ExecutorState, StepQueueItem +from apps.services.task import TaskManager logger = logging.getLogger(__name__) @@ -15,26 +17,14 @@ class MCPAgentExecutor(BaseExecutor): """MCP Agent执行器""" question: str = Field(description="用户输入") - max_steps: int = Field(default=10, description="最大步数") + max_steps: int = Field(default=20, description="最大步数") servers_id: list[str] = Field(description="MCP server id") agent_id: str = Field(default="", description="Agent ID") agent_description: str = Field(default="", description="Agent描述") - async def run(self) -> None: - """运行MCP Agent""" - agent = await MCPAgent.create( - servers_id=self.servers_id, - max_steps=self.max_steps, - task=self.task, - msg_queue=self.msg_queue, - question=self.question, - agent_id=self.agent_id, - description=self.agent_description, - ) - - try: - answer = await agent.run(self.question) - self.task = agent.task - self.task.runtime.answer = answer - except Exception as e: - logger.error(f"Error: {str(e)}") + async def load_state(self) -> None: + """从数据库中加载FlowExecutor的状态""" + logger.info("[FlowExecutor] 加载Executor状态") + # 尝试恢复State + if self.task.state: + self.task.context = await TaskManager.get_context_by_task_id(self.task.id) diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index 8a3b21d9..89f8326b 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -9,7 +9,7 @@ from datetime import UTC, datetime from pydantic import Field from apps.scheduler.call.llm.prompt import LLM_ERROR_PROMPT -from apps.schemas.enum_var import EventType, SpecialCallType, StepStatus +from apps.schemas.enum_var import EventType, FlowStatus, SpecialCallType, StepStatus from apps.schemas.flow import Flow, Step from apps.schemas.request_data import RequestDataApp from apps.schemas.task import ExecutorState, StepQueueItem @@ -60,8 +60,9 @@ class FlowExecutor(BaseExecutor): self.task.state = ExecutorState( flow_id=str(self.flow_id), flow_name=self.flow.name, + flow_status=FlowStatus.RUNNING, description=str(self.flow.description), - status=StepStatus.RUNNING, + step_status=StepStatus.RUNNING, app_id=str(self.post_body_app.app_id), step_id="start", step_name="开始", @@ -118,6 +119,15 @@ class FlowExecutor(BaseExecutor): # 如果当前步骤为结束,则直接返回 if self.task.state.step_id == "end" or not self.task.state.step_id: # type: ignore[arg-type] return [] + if self.task.state.step_name == "Choice": + # 如果是choice节点,获取分支ID + branch_id = self.task.context[-1]["output_data"]["branch_id"] + if branch_id: + self.task.state.step_id = self.task.state.step_id + "." + branch_id + logger.info("[FlowExecutor] 分支ID:%s", branch_id) + else: + logger.warning("[FlowExecutor] 没有找到分支ID,返回空列表") + return [] next_steps = await self._find_next_id(self.task.state.step_id) # type: ignore[arg-type] # 如果step没有任何出边,直接跳到end @@ -167,11 +177,12 @@ class FlowExecutor(BaseExecutor): # 插入首个步骤 self.step_queue.append(first_step) + self.task.state.flow_status = FlowStatus.RUNNING # type: ignore[arg-type] # 运行Flow(未达终点) while not self._reached_end: # 如果当前步骤出错,执行错误处理步骤 - if self.task.state.status == StepStatus.ERROR: # type: ignore[arg-type] + if self.task.state.step_status == StepStatus.ERROR: # type: ignore[arg-type] logger.warning("[FlowExecutor] Executor出错,执行错误处理步骤") self.step_queue.clear() self.step_queue.appendleft(StepQueueItem( @@ -191,6 +202,7 @@ class FlowExecutor(BaseExecutor): enable_filling=False, to_user=False, )) + self.task.state.flow_status = FlowStatus.ERROR # type: ignore[arg-type] # 错误处理后结束 self._reached_end = True @@ -217,3 +229,5 @@ class FlowExecutor(BaseExecutor): self.task.tokens.time = round(datetime.now(UTC).timestamp(), 2) - self.task.tokens.full_time # 推送Flow停止消息 await self.push_message(EventType.FLOW_STOP.value) + # 更新Flow状态 + self.task.state.flow_status = FlowStatus.SUCCESS # type: ignore[arg-type] diff --git a/apps/scheduler/executor/step.py b/apps/scheduler/executor/step.py index 00613fde..c9d30e54 100644 --- a/apps/scheduler/executor/step.py +++ b/apps/scheduler/executor/step.py @@ -134,7 +134,7 @@ class StepExecutor(BaseExecutor): # 更新State self.task.state.step_id = str(uuid.uuid4()) # type: ignore[arg-type] self.task.state.step_name = "自动参数填充" # type: ignore[arg-type] - self.task.state.status = StepStatus.RUNNING # type: ignore[arg-type] + self.task.state.step_status = StepStatus.RUNNING # type: ignore[arg-type] self.task.tokens.time = round(datetime.now(UTC).timestamp(), 2) # 初始化填参 @@ -157,9 +157,9 @@ class StepExecutor(BaseExecutor): # 如果没有填全,则状态设置为待填参 if result.remaining_schema: - self.task.state.status = StepStatus.PARAM # type: ignore[arg-type] + self.task.state.step_status = StepStatus.PARAM # type: ignore[arg-type] else: - self.task.state.status = StepStatus.SUCCESS # type: ignore[arg-type] + self.task.state.step_status = StepStatus.SUCCESS # type: ignore[arg-type] await self.push_message(EventType.STEP_OUTPUT.value, result.model_dump(by_alias=True, exclude_none=True)) # 更新输入 @@ -213,7 +213,7 @@ class StepExecutor(BaseExecutor): await self._run_slot_filling() # 更新状态 - self.task.state.status = StepStatus.RUNNING # type: ignore[arg-type] + self.task.state.step_status = StepStatus.RUNNING # type: ignore[arg-type] self.task.tokens.time = round(datetime.now(UTC).timestamp(), 2) # 推送输入 await self.push_message(EventType.STEP_INPUT.value, self.obj.input) @@ -225,7 +225,7 @@ class StepExecutor(BaseExecutor): content = await self._process_chunk(iterator, to_user=self.obj.to_user) except Exception as e: logger.exception("[StepExecutor] 运行步骤失败,进行异常处理步骤") - self.task.state.status = StepStatus.ERROR # type: ignore[arg-type] + self.task.state.step_status = StepStatus.ERROR # type: ignore[arg-type] await self.push_message(EventType.STEP_OUTPUT.value, {}) if isinstance(e, CallError): self.task.state.error_info = { # type: ignore[arg-type] @@ -240,7 +240,7 @@ class StepExecutor(BaseExecutor): return # 更新执行状态 - self.task.state.status = StepStatus.SUCCESS # type: ignore[arg-type] + self.task.state.step_status = StepStatus.SUCCESS # type: ignore[arg-type] self.task.tokens.input_tokens += self.obj.tokens.input_tokens self.task.tokens.output_tokens += self.obj.tokens.output_tokens self.task.tokens.full_time += round(datetime.now(UTC).timestamp(), 2) - self.task.tokens.time @@ -259,7 +259,7 @@ class StepExecutor(BaseExecutor): step_id=self.step.step_id, step_name=self.step.step.name, step_description=self.step.step.description, - status=self.task.state.status, # type: ignore[arg-type] + step_status=self.task.state.step_status, input_data=self.obj.input, output_data=output_data, ) diff --git a/apps/scheduler/mcp/host.py b/apps/scheduler/mcp/host.py index 4b104e9d..941e8822 100644 --- a/apps/scheduler/mcp/host.py +++ b/apps/scheduler/mcp/host.py @@ -101,11 +101,12 @@ class MCPHost: task_id=self._task_id, flow_id=self._runtime_id, flow_name=self._runtime_name, + flow_status=StepStatus.SUCCESS, step_id=tool.id, step_name=tool.toolName, # description是规划的实际内容 step_description=plan_item.content, - status=StepStatus.SUCCESS, + step_status=StepStatus.SUCCESS, input_data=input_data, output_data=output_data, ) diff --git a/apps/scheduler/mcp_agent/__init__.py b/apps/scheduler/mcp_agent/__init__.py new file mode 100644 index 00000000..12f5cb68 --- /dev/null +++ b/apps/scheduler/mcp_agent/__init__.py @@ -0,0 +1,8 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""Scheduler MCP 模块""" + +from apps.scheduler.mcp.host import MCPHost +from apps.scheduler.mcp.plan import MCPPlanner +from apps.scheduler.mcp.select import MCPSelector + +__all__ = ["MCPHost", "MCPPlanner", "MCPSelector"] diff --git a/apps/scheduler/mcp_agent/agent/base.py b/apps/scheduler/mcp_agent/agent/base.py deleted file mode 100644 index 69b06bc3..00000000 --- a/apps/scheduler/mcp_agent/agent/base.py +++ /dev/null @@ -1,196 +0,0 @@ -"""MCP Agent基类""" -import logging -from abc import ABC, abstractmethod -from contextlib import asynccontextmanager - -from pydantic import BaseModel, Field, model_validator - -from apps.common.queue import MessageQueue -from apps.llm.reasoning import ReasoningLLM -from apps.scheduler.mcp_agent.schema import Memory, Message, Role -from apps.schemas.enum_var import AgentState -from apps.schemas.task import Task -from apps.services.activity import Activity - -logger = logging.getLogger(__name__) - - -class BaseAgent(BaseModel, ABC): - """ - 用于管理代理状态和执行的抽象基类。 - - 为状态转换、内存管理、 - 以及分步执行循环。子类必须实现`step`方法。 - """ - - msg_queue: MessageQueue - task: Task - name: str = Field(..., description="Agent名称") - agent_id: str = Field(default="", description="Agent ID") - description: str = Field(default="", description="Agent描述") - question: str - # Prompts - next_step_prompt: str | None = Field( - None, description="判断下一步动作的提示" - ) - - # Dependencies - llm: ReasoningLLM = Field(default_factory=ReasoningLLM, description="大模型实例") - memory: Memory = Field(default_factory=Memory, description="Agent记忆库") - state: AgentState = Field( - default=AgentState.IDLE, description="Agent状态" - ) - servers_id: list[str] = Field(default_factory=list, description="MCP server id") - - # Execution control - max_steps: int = Field(default=10, description="终止前的最大步长") - current_step: int = Field(default=0, description="执行中的当前步骤") - - duplicate_threshold: int = 2 - - user_prompt: str = r""" - 当前步骤:{step} 工具输出结果:{result} - 请总结当前正在执行的步骤和对应的工具输出结果,内容包括当前步骤是多少,执行的工具是什么,输出是什么。 - 最终以报告的形式展示。 - 如果工具输出结果中执行的工具为terminate,请按照状态输出本次交互过程最终结果并完成对整个报告的总结,不需要输出你的分析过程。 - """ - """用户提示词""" - - class Config: - arbitrary_types_allowed = True - extra = "allow" # Allow extra fields for flexibility in subclasses - - @model_validator(mode="after") - def initialize_agent(self) -> "BaseAgent": - """初始化Agent""" - if self.llm is None or not isinstance(self.llm, ReasoningLLM): - self.llm = ReasoningLLM() - if not isinstance(self.memory, Memory): - self.memory = Memory() - return self - - @asynccontextmanager - async def state_context(self, new_state: AgentState): - """ - Agent状态转换上下文管理器 - - Args: - new_state: 要转变的状态 - - :return: None - :raise ValueError: 如果new_state无效 - """ - if not isinstance(new_state, AgentState): - raise ValueError(f"无效状态: {new_state}") - - previous_state = self.state - self.state = new_state - try: - yield - except Exception as e: - self.state = AgentState.ERROR # Transition to ERROR on failure - raise e - finally: - self.state = previous_state # Revert to previous state - - def update_memory( - self, - role: Role, - content: str, - **kwargs, - ) -> None: - """添加信息到Agent的memory中""" - message_map = { - "user": Message.user_message, - "system": Message.system_message, - "assistant": Message.assistant_message, - "tool": lambda content, **kw: Message.tool_message(content, **kw), - } - - if role not in message_map: - raise ValueError(f"不支持的消息角色: {role}") - - # Create message with appropriate parameters based on role - kwargs = {**(kwargs if role == "tool" else {})} - self.memory.add_message(message_map[role](content, **kwargs)) - - async def run(self, request: str | None = None) -> str: - """异步执行Agent的主循环""" - self.task.runtime.question = request - if self.state != AgentState.IDLE: - raise RuntimeError(f"无法从以下状态运行智能体: {self.state}") - - if request: - self.update_memory("user", request) - - results: list[str] = [] - async with self.state_context(AgentState.RUNNING): - while ( - self.current_step < self.max_steps and self.state != AgentState.FINISHED - ): - if not await Activity.is_active(self.task.ids.user_sub): - logger.info("用户终止会话,任务停止!") - return "" - self.current_step += 1 - logger.info(f"执行步骤{self.current_step}/{self.max_steps}") - step_result = await self.step() - - # Check for stuck state - if self.is_stuck(): - self.handle_stuck_state() - result = f"Step {self.current_step}: {step_result}" - results.append(result) - - if self.current_step >= self.max_steps: - self.current_step = 0 - self.state = AgentState.IDLE - result = f"任务终止: 已达到最大步数 ({self.max_steps})" - await self.msg_queue.push_output( - self.task, - event_type="text.add", - data={"text": result}, # type: ignore[arg-type] - ) - results.append(result) - return "\n".join(results) if results else "未执行任何步骤" - - @abstractmethod - async def step(self) -> str: - """ - 执行代理工作流程中的单个步骤。 - - 必须由子类实现,以定义具体的行为。 - """ - - def handle_stuck_state(self): - """通过添加更改策略的提示来处理卡住状态""" - stuck_prompt = "\ - 观察到重复响应。考虑新策略,避免重复已经尝试过的无效路径" - self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}" - logger.warning(f"检测到智能体处于卡住状态。新增提示:{stuck_prompt}") - - def is_stuck(self) -> bool: - """通过检测重复内容来检查代理是否卡在循环中""" - if len(self.memory.messages) < 2: - return False - - last_message = self.memory.messages[-1] - if not last_message.content: - return False - - duplicate_count = sum( - 1 - for msg in reversed(self.memory.messages[:-1]) - if msg.role == "assistant" and msg.content == last_message.content - ) - - return duplicate_count >= self.duplicate_threshold - - @property - def messages(self) -> list[Message]: - """从Agent memory中检索消息列表""" - return self.memory.messages - - @messages.setter - def messages(self, value: list[Message]) -> None: - """设置Agent memory的消息列表""" - self.memory.messages = value diff --git a/apps/scheduler/mcp_agent/agent/mcp.py b/apps/scheduler/mcp_agent/agent/mcp.py deleted file mode 100644 index f3092263..00000000 --- a/apps/scheduler/mcp_agent/agent/mcp.py +++ /dev/null @@ -1,82 +0,0 @@ -"""MCP Agent""" -import logging - -from pydantic import Field - -from apps.scheduler.mcp.host import MCPHost -from apps.scheduler.mcp_agent.tool import Terminate, ToolCollection - -from .toolcall import ToolCallAgent - -logger = logging.getLogger(__name__) - - -class MCPAgent(ToolCallAgent): - """ - 用于与MCP(模型上下文协议)服务器交互。 - - 使用SSE或stdio传输连接到MCP服务器 - 并使服务器的工具 - """ - - name: str = "MCPAgent" - description: str = "一个多功能的智能体,能够使用多种工具(包括基于MCP的工具)解决各种任务" - - # Add general-purpose tools to the tool collection - available_tools: ToolCollection = Field( - default_factory=lambda: ToolCollection( - Terminate(), - ), - ) - - special_tool_names: list[str] = Field(default_factory=lambda: [Terminate().name]) - - _initialized: bool = False - - @classmethod - async def create(cls, **kwargs) -> "MCPAgent": # noqa: ANN003 - """创建并初始化MCP Agent实例""" - instance = cls(**kwargs) - await instance.initialize_mcp_servers() - instance._initialized = True - return instance - - async def initialize_mcp_servers(self) -> None: - """初始化与已配置的MCP服务器的连接""" - mcp_host = MCPHost( - self.task.ids.user_sub, - self.task.id, - self.agent_id, - self.description, - ) - mcps = {} - for mcp_id in self.servers_id: - client = await mcp_host.get_client(mcp_id) - if client: - mcps[mcp_id] = client - - for mcp_id, mcp_client in mcps.items(): - new_tools = [] - for tool in mcp_client.tools: - original_name = tool.name - # Always prefix with server_id to ensure uniqueness - tool_name = f"mcp_{mcp_id}_{original_name}" - - server_tool = MCPClientTool( - name=tool_name, - description=tool.description, - parameters=tool.inputSchema, - session=mcp_client.session, - server_id=mcp_id, - original_name=original_name, - ) - new_tools.append(server_tool) - self.available_tools.add_tools(*new_tools) - - async def think(self) -> bool: - """使用适当的上下文处理当前状态并决定下一步操作""" - if not self._initialized: - await self.initialize_mcp_servers() - self._initialized = True - - return await super().think() diff --git a/apps/scheduler/mcp_agent/agent/react.py b/apps/scheduler/mcp_agent/agent/react.py deleted file mode 100644 index 15dcaac3..00000000 --- a/apps/scheduler/mcp_agent/agent/react.py +++ /dev/null @@ -1,36 +0,0 @@ -from abc import ABC, abstractmethod - -from pydantic import Field - -from apps.llm.reasoning import ReasoningLLM -from apps.scheduler.mcp_agent.schema import Memory -from apps.schemas.enum_var import AgentState - -from .base import BaseAgent - - -class ReActAgent(BaseAgent, ABC): - name: str - description: str | None = None - - system_prompt: str | None = None - next_step_prompt: str | None = None - - llm: ReasoningLLM | None = Field(default_factory=ReasoningLLM) - memory: Memory = Field(default_factory=Memory) - state: AgentState = AgentState.IDLE - - @abstractmethod - async def think(self) -> bool: - """处理当前状态并决定下一步操作""" - - @abstractmethod - async def act(self) -> str: - """执行已决定的行动""" - - async def step(self) -> str: - """执行一个步骤:思考和行动""" - should_act = await self.think() - if not should_act: - return "思考完成-无需采取任何行动" - return await self.act() diff --git a/apps/scheduler/mcp_agent/agent/toolcall.py b/apps/scheduler/mcp_agent/agent/toolcall.py deleted file mode 100644 index dc00ebef..00000000 --- a/apps/scheduler/mcp_agent/agent/toolcall.py +++ /dev/null @@ -1,238 +0,0 @@ -import asyncio -import json -import logging -from typing import Any, Optional - -from pydantic import Field - -from apps.llm.function import JsonGenerator -from apps.llm.patterns import Select -from apps.scheduler.mcp_agent.agent.react import ReActAgent -from apps.scheduler.mcp_agent.schema import Function, Message, ToolCall -from apps.scheduler.mcp_agent.tool import Terminate, ToolCollection -from apps.schemas.enum_var import AgentState - -logger = logging.getLogger(__name__) - - -class ToolCallAgent(ReActAgent): - """用于处理工具/函数调用的基本Agent类""" - - name: str = "toolcall" - description: str = "可以执行工具调用的智能体" - - available_tools: ToolCollection = ToolCollection( - Terminate(), - ) - tool_choices: str = "auto" - special_tool_names: list[str] = Field(default_factory=lambda: [Terminate().name]) - - tool_calls: list[ToolCall] = Field(default_factory=list) - _current_base64_image: str | None = None - - max_observe: int | bool | None = None - - async def think(self) -> bool: - """使用工具处理当前状态并决定下一步行动""" - messages = [] - for message in self.messages: - if isinstance(message, Message): - message = message.to_dict() - messages.append(message) - try: - # 通过工具获得响应 - select_obj = Select() - choices = [] - for available_tool in self.available_tools.to_params(): - choices.append(available_tool.get("function")) - - tool = await select_obj.generate(question=self.question, choices=choices) - if tool in self.available_tools.tool_map: - schema = self.available_tools.tool_map[tool].parameters - json_generator = JsonGenerator( - query="根据跟定的信息,获取工具参数", - conversation=messages, - schema=schema, - ) # JsonGenerator - parameters = await json_generator.generate() - - else: - raise ValueError(f"尝试调用不存在的工具: {tool}") - except Exception as e: - raise - self.tool_calls = tool_calls = [ToolCall(id=tool, function=Function(name=tool, arguments=parameters))] - content = f"选择的执行工具为:{tool}, 参数为{parameters}" - - logger.info( - f"{self.name} 选择 {len(tool_calls) if tool_calls else 0}个工具执行" - ) - if tool_calls: - logger.info( - f"准备使用的工具: {[call.function.name for call in tool_calls]}" - ) - logger.info(f"工具参数: {tool_calls[0].function.arguments}") - - try: - - assistant_msg = ( - Message.from_tool_calls(content=content, tool_calls=self.tool_calls) - if self.tool_calls - else Message.assistant_message(content) - ) - self.memory.add_message(assistant_msg) - - if not self.tool_calls: - return bool(content) - - return bool(self.tool_calls) - except Exception as e: - logger.error(f"{self.name}的思考过程遇到了问题:: {e}") - self.memory.add_message( - Message.assistant_message( - f"处理时遇到错误: {str(e)}" - ) - ) - return False - - async def act(self) -> str: - """执行工具调用并处理其结果""" - if not self.tool_calls: - # 如果没有工具调用,则返回最后的消息内容 - return self.messages[-1].content or "没有要执行的内容或命令" - - results = [] - for command in self.tool_calls: - await self.msg_queue.push_output( - self.task, - event_type="text.add", - data={"text": f"正在执行工具{command.function.name}"} - ) - - self._current_base64_image = None - - result = await self.execute_tool(command) - - if self.max_observe: - result = result[: self.max_observe] - - push_result = "" - async for chunk in self.llm.call( - messages=[{"role": "system", "content": "You are a helpful asistant."}, - {"role": "user", "content": self.user_prompt.format( - step=self.current_step, - result=result, - )}, ], streaming=False - ): - push_result += chunk - self.task.tokens.input_tokens += self.llm.input_tokens - self.task.tokens.output_tokens += self.llm.output_tokens - await self.msg_queue.push_output( - self.task, - event_type="text.add", - data={"text": push_result}, # type: ignore[arg-type] - ) - - await self.msg_queue.push_output( - self.task, - event_type="text.add", - data={"text": f"工具{command.function.name}执行完成"}, # type: ignore[arg-type] - ) - - logger.info( - f"工具'{command.function.name}'执行完成! 执行结果为: {result}" - ) - - # 将工具响应添加到内存 - tool_msg = Message.tool_message( - content=result, - tool_call_id=command.id, - name=command.function.name, - ) - self.memory.add_message(tool_msg) - results.append(result) - self.question += ( - f"\n已执行工具{command.function.name}, " - f"作用是{self.available_tools.tool_map[command.function.name].description},结果为{result}" - ) - - return "\n\n".join(results) - - async def execute_tool(self, command: ToolCall) -> str: - """执行单个工具调用""" - if not command or not command.function or not command.function.name: - return "错误:无效的命令格式" - - name = command.function.name - if name not in self.available_tools.tool_map: - return f"错误:未知工具 '{name}'" - - try: - # 解析参数 - args = command.function.arguments - # 执行工具 - logger.info(f"激活工具:'{name}'...") - result = await self.available_tools.execute(name=name, tool_input=args) - - # 执行特殊工具 - await self._handle_special_tool(name=name, result=result) - - # 格式化结果 - observation = ( - f"观察到执行的工具 `{name}`的输出:\n{str(result)}" - if result - else f"工具 `{name}` 已完成,无输出" - ) - - return observation - except json.JSONDecodeError: - error_msg = f"解析{name}的参数时出错:JSON格式无效" - logger.error( - f"{name}”的参数没有意义-无效的JSON,参数:{command.function.arguments}" - ) - return f"错误: {error_msg}" - except Exception as e: - error_msg = f"工具 '{name}' 遇到问题: {str(e)}" - logger.exception(error_msg) - return f"错误: {error_msg}" - - async def _handle_special_tool(self, name: str, result: Any, **kwargs): - """处理特殊工具的执行和状态变化""" - if not self._is_special_tool(name): - return - - if self._should_finish_execution(name=name, result=result, **kwargs): - # 将智能体状态设为finished - logger.info(f"特殊工具'{name}'已完成任务!") - self.state = AgentState.FINISHED - - @staticmethod - def _should_finish_execution(**kwargs) -> bool: - """确定工具执行是否应完成""" - return True - - def _is_special_tool(self, name: str) -> bool: - """检查工具名称是否在特殊工具列表中""" - return name.lower() in [n.lower() for n in self.special_tool_names] - - async def cleanup(self): - """清理Agent工具使用的资源。""" - logger.info(f"正在清理智能体的资源'{self.name}'...") - for tool_name, tool_instance in self.available_tools.tool_map.items(): - if hasattr(tool_instance, "cleanup") and asyncio.iscoroutinefunction( - tool_instance.cleanup - ): - try: - logger.debug(f"清理工具: {tool_name}") - await tool_instance.cleanup() - except Exception as e: - logger.error( - f"清理工具时发生错误'{tool_name}': {e}", exc_info=True - ) - logger.info(f"智能体清理完成'{self.name}'.") - - async def run(self, request: Optional[str] = None) -> str: - """运行Agent""" - try: - return await super().run(request) - finally: - await self.cleanup() diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py new file mode 100644 index 00000000..acdd4871 --- /dev/null +++ b/apps/scheduler/mcp_agent/host.py @@ -0,0 +1,190 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""MCP宿主""" + +import json +import logging +from typing import Any + +from jinja2 import BaseLoader +from jinja2.sandbox import SandboxedEnvironment +from mcp.types import TextContent + +from apps.common.mongo import MongoDB +from apps.llm.function import JsonGenerator +from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE +from apps.scheduler.pool.mcp.client import MCPClient +from apps.scheduler.pool.mcp.pool import MCPPool +from apps.schemas.enum_var import StepStatus +from apps.schemas.mcp import MCPPlanItem, MCPTool +from apps.schemas.task import FlowStepHistory +from apps.services.task import TaskManager + +logger = logging.getLogger(__name__) + + +class MCPHost: + """MCP宿主服务""" + + def __init__(self, user_sub: str, task_id: str, runtime_id: str, runtime_name: str) -> None: + """初始化MCP宿主""" + self._user_sub = user_sub + self._task_id = task_id + # 注意:runtime在工作流中是flow_id和step_description,在Agent中可为标识Agent的id和description + self._runtime_id = runtime_id + self._runtime_name = runtime_name + self._context_list = [] + self._env = SandboxedEnvironment( + loader=BaseLoader(), + autoescape=False, + trim_blocks=True, + lstrip_blocks=True, + ) + + async def get_client(self, mcp_id: str) -> MCPClient | None: + """获取MCP客户端""" + mongo = MongoDB() + mcp_collection = mongo.get_collection("mcp") + + # 检查用户是否启用了这个mcp + mcp_db_result = await mcp_collection.find_one({"_id": mcp_id, "activated": self._user_sub}) + if not mcp_db_result: + logger.warning("用户 %s 未启用MCP %s", self._user_sub, mcp_id) + return None + + # 获取MCP配置 + try: + return await MCPPool().get(mcp_id, self._user_sub) + except KeyError: + logger.warning("用户 %s 的MCP %s 没有运行中的实例,请检查环境", self._user_sub, mcp_id) + return None + + async def assemble_memory(self) -> str: + """组装记忆""" + task = await TaskManager.get_task_by_task_id(self._task_id) + if not task: + logger.error("任务 %s 不存在", self._task_id) + return "" + + context_list = [] + for ctx_id in self._context_list: + context = next((ctx for ctx in task.context if ctx["_id"] == ctx_id), None) + if not context: + continue + context_list.append(context) + + return self._env.from_string(MEMORY_TEMPLATE).render( + context_list=context_list, + ) + + async def _save_memory( + self, + tool: MCPTool, + plan_item: MCPPlanItem, + input_data: dict[str, Any], + result: str, + ) -> dict[str, Any]: + """保存记忆""" + try: + output_data = json.loads(result) + except Exception: # noqa: BLE001 + logger.warning("[MCPHost] 得到的数据不是dict格式!尝试转换为str") + output_data = { + "message": result, + } + + if not isinstance(output_data, dict): + output_data = { + "message": result, + } + + # 创建context;注意用法 + context = FlowStepHistory( + task_id=self._task_id, + flow_id=self._runtime_id, + flow_name=self._runtime_name, + flow_status=StepStatus.SUCCESS, + step_id=tool.name, + step_name=tool.name, + # description是规划的实际内容 + step_description=plan_item.content, + step_status=StepStatus.SUCCESS, + input_data=input_data, + output_data=output_data, + ) + + # 保存到task + task = await TaskManager.get_task_by_task_id(self._task_id) + if not task: + logger.error("任务 %s 不存在", self._task_id) + return {} + self._context_list.append(context.id) + task.context.append(context.model_dump(by_alias=True, exclude_none=True)) + await TaskManager.save_task(self._task_id, task) + + return output_data + + async def _fill_params(self, tool: MCPTool, query: str) -> dict[str, Any]: + """填充工具参数""" + # 更清晰的输入·指令,这样可以调用generate + llm_query = rf""" + 请使用参数生成工具,生成满足以下目标的工具参数: + + {query} + """ + + # 进行生成 + json_generator = JsonGenerator( + llm_query, + [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": await self.assemble_memory()}, + ], + tool.input_schema, + ) + return await json_generator.generate() + + async def call_tool(self, tool: MCPTool, plan_item: MCPPlanItem) -> list[dict[str, Any]]: + """调用工具""" + # 拿到Client + client = await MCPPool().get(tool.mcp_id, self._user_sub) + if client is None: + err = f"[MCPHost] MCP Server不合法: {tool.mcp_id}" + logger.error(err) + raise ValueError(err) + + # 填充参数 + params = await self._fill_params(tool, plan_item.instruction) + # 调用工具 + result = await client.call_tool(tool.name, params) + # 保存记忆 + processed_result = [] + for item in result.content: + if not isinstance(item, TextContent): + logger.error("MCP结果类型不支持: %s", item) + continue + processed_result.append(await self._save_memory(tool, plan_item, params, item.text)) + + return processed_result + + async def get_tool_list(self, mcp_id_list: list[str]) -> list[MCPTool]: + """获取工具列表""" + mongo = MongoDB() + mcp_collection = mongo.get_collection("mcp") + + # 获取工具列表 + tool_list = [] + for mcp_id in mcp_id_list: + # 检查用户是否启用了这个mcp + mcp_db_result = await mcp_collection.find_one({"_id": mcp_id, "activated": self._user_sub}) + if not mcp_db_result: + logger.warning("用户 %s 未启用MCP %s", self._user_sub, mcp_id) + continue + # 获取MCP工具配置 + try: + for tool in mcp_db_result["tools"]: + tool_list.extend([MCPTool.model_validate(tool)]) + except KeyError: + logger.warning("用户 %s 的MCP Tool %s 配置错误", self._user_sub, mcp_id) + continue + + return tool_list diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py new file mode 100644 index 00000000..cd4f5975 --- /dev/null +++ b/apps/scheduler/mcp_agent/plan.py @@ -0,0 +1,110 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""MCP 用户目标拆解与规划""" + +from jinja2 import BaseLoader +from jinja2.sandbox import SandboxedEnvironment + +from apps.llm.function import JsonGenerator +from apps.llm.reasoning import ReasoningLLM +from apps.scheduler.mcp.prompt import CREATE_PLAN, FINAL_ANSWER +from apps.schemas.mcp import MCPPlan, MCPTool + + +class MCPPlanner: + """MCP 用户目标拆解与规划""" + + def __init__(self, user_goal: str) -> None: + """初始化MCP规划器""" + self.user_goal = user_goal + self._env = SandboxedEnvironment( + loader=BaseLoader, + autoescape=True, + trim_blocks=True, + lstrip_blocks=True, + ) + self.input_tokens = 0 + self.output_tokens = 0 + + + async def create_plan(self, tool_list: list[MCPTool], max_steps: int = 6) -> MCPPlan: + """规划下一步的执行流程,并输出""" + # 获取推理结果 + result = await self._get_reasoning_plan(tool_list, max_steps) + + # 解析为结构化数据 + return await self._parse_plan_result(result, max_steps) + + + async def _get_reasoning_plan(self, tool_list: list[MCPTool], max_steps: int) -> str: + """获取推理大模型的结果""" + # 格式化Prompt + template = self._env.from_string(CREATE_PLAN) + prompt = template.render( + goal=self.user_goal, + tools=tool_list, + max_num=max_steps, + ) + + # 调用推理大模型 + message = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt}, + ] + reasoning_llm = ReasoningLLM() + result = "" + async for chunk in reasoning_llm.call( + message, + streaming=False, + temperature=0.07, + result_only=True, + ): + result += chunk + + # 保存token用量 + self.input_tokens = reasoning_llm.input_tokens + self.output_tokens = reasoning_llm.output_tokens + + return result + + + async def _parse_plan_result(self, result: str, max_steps: int) -> MCPPlan: + """将推理结果解析为结构化数据""" + # 格式化Prompt + schema = MCPPlan.model_json_schema() + schema["properties"]["plans"]["maxItems"] = max_steps + + # 使用Function模型解析结果 + json_generator = JsonGenerator( + result, + [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": result}, + ], + schema, + ) + plan = await json_generator.generate() + return MCPPlan.model_validate(plan) + + + async def generate_answer(self, plan: MCPPlan, memory: str) -> str: + """生成最终回答""" + template = self._env.from_string(FINAL_ANSWER) + prompt = template.render( + plan=plan, + memory=memory, + goal=self.user_goal, + ) + + llm = ReasoningLLM() + result = "" + async for chunk in llm.call( + [{"role": "user", "content": prompt}], + streaming=False, + temperature=0.07, + ): + result += chunk + + self.input_tokens = llm.input_tokens + self.output_tokens = llm.output_tokens + + return result diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py new file mode 100644 index 00000000..b322fb08 --- /dev/null +++ b/apps/scheduler/mcp_agent/prompt.py @@ -0,0 +1,240 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""MCP相关的大模型Prompt""" + +from textwrap import dedent + +MCP_SELECT = dedent(r""" + 你是一个乐于助人的智能助手。 + 你的任务是:根据当前目标,选择最合适的MCP Server。 + + ## 选择MCP Server时的注意事项: + + 1. 确保充分理解当前目标,选择最合适的MCP Server。 + 2. 请在给定的MCP Server列表中选择,不要自己生成MCP Server。 + 3. 请先给出你选择的理由,再给出你的选择。 + 4. 当前目标将在下面给出,MCP Server列表也会在下面给出。 + 请将你的思考过程放在"思考过程"部分,将你的选择放在"选择结果"部分。 + 5. 选择必须是JSON格式,严格按照下面的模板,不要输出任何其他内容: + + ```json + { + "mcp": "你选择的MCP Server的名称" + } + ``` + + 6. 下面的示例仅供参考,不要将示例中的内容作为选择MCP Server的依据。 + + ## 示例 + + ### 目标 + + 我需要一个MCP Server来完成一个任务。 + + ### MCP Server列表 + + - **mcp_1**: "MCP Server 1";MCP Server 1的描述 + - **mcp_2**: "MCP Server 2";MCP Server 2的描述 + + ### 请一步一步思考: + + 因为当前目标需要一个MCP Server来完成一个任务,所以选择mcp_1。 + + ### 选择结果 + + ```json + { + "mcp": "mcp_1" + } + ``` + + ## 现在开始! + + ### 目标 + + {{goal}} + + ### MCP Server列表 + + {% for mcp in mcp_list %} + - **{{mcp.id}}**: "{{mcp.name}}";{{mcp.description}} + {% endfor %} + + ### 请一步一步思考: + +""") +CREATE_PLAN = dedent(r""" + 你是一个计划生成器。 + 请分析用户的目标,并生成一个计划。你后续将根据这个计划,一步一步地完成用户的目标。 + + # 一个好的计划应该: + + 1. 能够成功完成用户的目标 + 2. 计划中的每一个步骤必须且只能使用一个工具。 + 3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。 + 4. 计划中的最后一步必须是Final工具,以确保计划执行结束。 + + # 生成计划时的注意事项: + + - 每一条计划包含3个部分: + - 计划内容:描述单个计划步骤的大致内容 + - 工具ID:必须从下文的工具列表中选择 + - 工具指令:改写用户的目标,使其更符合工具的输入要求 + - 必须按照如下格式生成计划,不要输出任何额外数据: + + ```json + { + "plans": [ + { + "content": "计划内容", + "tool": "工具ID", + "instruction": "工具指令" + } + ] + } + ``` + + - 在生成计划之前,请一步一步思考,解析用户的目标,并指导你接下来的生成。\ +思考过程应放置在 XML标签中。 + - 计划内容中,可以使用"Result[]"来引用之前计划步骤的结果。例如:"Result[3]"表示引用第三条计划执行后的结果。 + - 计划不得多于{{ max_num }}条,且每条计划内容应少于150字。 + + # 工具 + + 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 + + + {% for tool in tools %} + - {{ tool.id }}{{tool.name}};{{ tool.description }} + {% endfor %} + - Final结束步骤,当执行到这一步时,\ +表示计划执行结束,所得到的结果将作为最终结果。 + + + # 样例 + + ## 目标 + + 在后台运行一个新的alpine:latest容器,将主机/root文件夹挂载至/data,并执行top命令。 + + ## 计划 + + + 1. 这个目标需要使用Docker来完成,首先需要选择合适的MCP Server + 2. 目标可以拆解为以下几个部分: + - 运行alpine:latest容器 + - 挂载主机目录 + - 在后台运行 + - 执行top命令 + 3. 需要先选择MCP Server,然后生成Docker命令,最后执行命令 + + + ```json + { + "plans": [ + { + "content": "选择一个支持Docker的MCP Server", + "tool": "mcp_selector", + "instruction": "需要一个支持Docker容器运行的MCP Server" + }, + { + "content": "使用Result[0]中选择的MCP Server,生成Docker命令", + "tool": "command_generator", + "instruction": "生成Docker命令:在后台运行alpine:latest容器,挂载/root到/data,执行top命令" + }, + { + "content": "在Result[0]的MCP Server上执行Result[1]生成的命令", + "tool": "command_executor", + "instruction": "执行Docker命令" + }, + { + "content": "任务执行完成,容器已在后台运行,结果为Result[2]", + "tool": "Final", + "instruction": "" + } + ] + } + ``` + + # 现在开始生成计划: + + ## 目标 + + {{goal}} + + # 计划 +""") +EVALUATE_PLAN = dedent(r""" + 你是一个计划评估器。 + 请根据给定的计划,和当前计划执行的实际情况,分析当前计划是否合理和完整,并生成改进后的计划。 + + # 一个好的计划应该: + + 1. 能够成功完成用户的目标 + 2. 计划中的每一个步骤必须且只能使用一个工具。 + 3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。 + 4. 计划中的最后一步必须是Final工具,以确保计划执行结束。 + + # 你此前的计划是: + + {{ plan }} + + # 这个计划的执行情况是: + + 计划的执行情况将放置在 XML标签中。 + + + {{ memory }} + + + # 进行评估时的注意事项: + + - 请一步一步思考,解析用户的目标,并指导你接下来的生成。思考过程应放置在 XML标签中。 + - 评估结果分为两个部分: + - 计划评估的结论 + - 改进后的计划 + - 请按照以下JSON格式输出评估结果: + + ```json + { + "evaluation": "评估结果", + "plans": [ + { + "content": "改进后的计划内容", + "tool": "工具ID", + "instruction": "工具指令" + } + ] + } + ``` + + # 现在开始评估计划: + +""") +FINAL_ANSWER = dedent(r""" + 综合理解计划执行结果和背景信息,向用户报告目标的完成情况。 + + # 用户目标 + + {{ goal }} + + # 计划执行情况 + + 为了完成上述目标,你实施了以下计划: + + {{ memory }} + + # 其他背景信息: + + {{ status }} + + # 现在,请根据以上信息,向用户报告目标的完成情况: + +""") +MEMORY_TEMPLATE = dedent(r""" + {% for ctx in context_list %} + - 第{{ loop.index }}步:{{ ctx.step_description }} + - 调用工具 `{{ ctx.step_id }}`,并提供参数 `{{ ctx.input_data }}` + - 执行状态:{{ ctx.status }} + - 得到数据:`{{ ctx.output_data }}` + {% endfor %} +""") diff --git a/apps/scheduler/mcp_agent/schema.py b/apps/scheduler/mcp_agent/schema.py deleted file mode 100644 index 61413907..00000000 --- a/apps/scheduler/mcp_agent/schema.py +++ /dev/null @@ -1,148 +0,0 @@ -"""MCP Agent执行数据结构""" -from typing import Any, Self - -from pydantic import BaseModel, Field - -from apps.schemas.enum_var import Role - - -class Function(BaseModel): - """工具函数""" - - name: str - arguments: dict[str, Any] - - -class ToolCall(BaseModel): - """Represents a tool/function call in a message""" - - id: str - type: str = "function" - function: Function - - -class Message(BaseModel): - """Represents a chat message in the conversation""" - - role: Role = Field(...) - content: str | None = Field(default=None) - tool_calls: list[ToolCall] | None = Field(default=None) - name: str | None = Field(default=None) - tool_call_id: str | None = Field(default=None) - - def __add__(self, other) -> list["Message"]: - """支持 Message + list 或 Message + Message 的操作""" - if isinstance(other, list): - return [self] + other - elif isinstance(other, Message): - return [self, other] - else: - raise TypeError( - f"unsupported operand type(s) for +: '{type(self).__name__}' and '{type(other).__name__}'" - ) - - def __radd__(self, other) -> list["Message"]: - """支持 list + Message 的操作""" - if isinstance(other, list): - return other + [self] - else: - raise TypeError( - f"unsupported operand type(s) for +: '{type(other).__name__}' and '{type(self).__name__}'" - ) - - def to_dict(self) -> dict: - """Convert message to dictionary format""" - message = {"role": self.role} - if self.content is not None: - message["content"] = self.content - if self.tool_calls is not None: - message["tool_calls"] = [tool_call.dict() for tool_call in self.tool_calls] - if self.name is not None: - message["name"] = self.name - if self.tool_call_id is not None: - message["tool_call_id"] = self.tool_call_id - return message - - @classmethod - def user_message(cls, content: str) -> Self: - """Create a user message""" - return cls(role=Role.USER, content=content) - - @classmethod - def system_message(cls, content: str) -> Self: - """Create a system message""" - return cls(role=Role.SYSTEM, content=content) - - @classmethod - def assistant_message( - cls, content: str | None = None, - ) -> Self: - """Create an assistant message""" - return cls(role=Role.ASSISTANT, content=content) - - @classmethod - def tool_message( - cls, content: str, name: str, tool_call_id: str, - ) -> Self: - """Create a tool message""" - return cls( - role=Role.TOOL, - content=content, - name=name, - tool_call_id=tool_call_id, - ) - - @classmethod - def from_tool_calls( - cls, - tool_calls: list[Any], - content: str | list[str] = "", - **kwargs, # noqa: ANN003 - ) -> Self: - """Create ToolCallsMessage from raw tool calls. - - Args: - tool_calls: Raw tool calls from LLM - content: Optional message content - """ - formatted_calls = [ - {"id": call.id, "function": call.function.model_dump(), "type": "function"} - for call in tool_calls - ] - return cls( - role=Role.ASSISTANT, - content=content, - tool_calls=formatted_calls, - **kwargs, - ) - - -class Memory(BaseModel): - messages: list[Message] = Field(default_factory=list) - max_messages: int = Field(default=100) - - def add_message(self, message: Message) -> None: - """Add a message to memory""" - self.messages.append(message) - # Optional: Implement message limit - if len(self.messages) > self.max_messages: - self.messages = self.messages[-self.max_messages:] - - def add_messages(self, messages: list[Message]) -> None: - """Add multiple messages to memory""" - self.messages.extend(messages) - # Optional: Implement message limit - if len(self.messages) > self.max_messages: - self.messages = self.messages[-self.max_messages:] - - def clear(self) -> None: - """Clear all messages""" - self.messages.clear() - - def get_recent_messages(self, n: int) -> list[Message]: - """Get n most recent messages""" - return self.messages[-n:] - - def to_dict_list(self) -> list[dict]: - """Convert messages to list of dicts""" - return [msg.to_dict() for msg in self.messages] diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py new file mode 100644 index 00000000..2ff50344 --- /dev/null +++ b/apps/scheduler/mcp_agent/select.py @@ -0,0 +1,185 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""选择MCP Server及其工具""" + +import logging + +from jinja2 import BaseLoader +from jinja2.sandbox import SandboxedEnvironment + +from apps.common.lance import LanceDB +from apps.common.mongo import MongoDB +from apps.llm.embedding import Embedding +from apps.llm.function import FunctionLLM +from apps.llm.reasoning import ReasoningLLM +from apps.scheduler.mcp.prompt import ( + MCP_SELECT, +) +from apps.schemas.mcp import ( + MCPCollection, + MCPSelectResult, + MCPTool, +) + +logger = logging.getLogger(__name__) + + +class MCPSelector: + """MCP选择器""" + + def __init__(self) -> None: + """初始化助手类""" + self.input_tokens = 0 + self.output_tokens = 0 + + @staticmethod + def _assemble_sql(mcp_list: list[str]) -> str: + """组装SQL""" + sql = "(" + for mcp_id in mcp_list: + sql += f"'{mcp_id}', " + return sql.rstrip(", ") + ")" + + + async def _get_top_mcp_by_embedding( + self, + query: str, + mcp_list: list[str], + ) -> list[dict[str, str]]: + """通过向量检索获取Top5 MCP Server""" + logger.info("[MCPHelper] 查询MCP Server向量: %s, %s", query, mcp_list) + mcp_table = await LanceDB().get_table("mcp") + query_embedding = await Embedding.get_embedding([query]) + mcp_vecs = await (await mcp_table.search( + query=query_embedding, + vector_column_name="embedding", + )).where(f"id IN {MCPSelector._assemble_sql(mcp_list)}").limit(5).to_list() + + # 拿到名称和description + logger.info("[MCPHelper] 查询MCP Server名称和描述: %s", mcp_vecs) + mcp_collection = MongoDB().get_collection("mcp") + llm_mcp_list: list[dict[str, str]] = [] + for mcp_vec in mcp_vecs: + mcp_id = mcp_vec["id"] + mcp_data = await mcp_collection.find_one({"_id": mcp_id}) + if not mcp_data: + logger.warning("[MCPHelper] 查询MCP Server名称和描述失败: %s", mcp_id) + continue + mcp_data = MCPCollection.model_validate(mcp_data) + llm_mcp_list.extend([{ + "id": mcp_id, + "name": mcp_data.name, + "description": mcp_data.description, + }]) + return llm_mcp_list + + + async def _get_mcp_by_llm( + self, + query: str, + mcp_list: list[dict[str, str]], + mcp_ids: list[str], + ) -> MCPSelectResult: + """通过LLM选择最合适的MCP Server""" + # 初始化jinja2环境 + env = SandboxedEnvironment( + loader=BaseLoader, + autoescape=True, + trim_blocks=True, + lstrip_blocks=True, + ) + template = env.from_string(MCP_SELECT) + # 渲染模板 + mcp_prompt = template.render( + mcp_list=mcp_list, + goal=query, + ) + + # 调用大模型进行推理 + result = await self._call_reasoning(mcp_prompt) + + # 使用小模型提取JSON + return await self._call_function_mcp(result, mcp_ids) + + + async def _call_reasoning(self, prompt: str) -> str: + """调用大模型进行推理""" + logger.info("[MCPHelper] 调用推理大模型") + llm = ReasoningLLM() + message = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt}, + ] + result = "" + async for chunk in llm.call(message): + result += chunk + self.input_tokens += llm.input_tokens + self.output_tokens += llm.output_tokens + return result + + + async def _call_function_mcp(self, reasoning_result: str, mcp_ids: list[str]) -> MCPSelectResult: + """调用结构化输出小模型提取JSON""" + logger.info("[MCPHelper] 调用结构化输出小模型") + llm = FunctionLLM() + message = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": reasoning_result}, + ] + schema = MCPSelectResult.model_json_schema() + # schema中加入选项 + schema["properties"]["mcp_id"]["enum"] = mcp_ids + result = await llm.call(messages=message, schema=schema) + try: + result = MCPSelectResult.model_validate(result) + except Exception: + logger.exception("[MCPHelper] 解析MCP Select Result失败") + raise + return result + + + async def select_top_mcp( + self, + query: str, + mcp_list: list[str], + ) -> MCPSelectResult: + """ + 选择最合适的MCP Server + + 先通过Embedding选择Top5,然后通过LLM选择Top 1 + """ + # 通过向量检索获取Top5 + llm_mcp_list = await self._get_top_mcp_by_embedding(query, mcp_list) + + # 通过LLM选择最合适的 + return await self._get_mcp_by_llm(query, llm_mcp_list, mcp_list) + + + @staticmethod + async def select_top_tool(query: str, mcp_list: list[str], top_n: int = 10) -> list[MCPTool]: + """选择最合适的工具""" + tool_vector = await LanceDB().get_table("mcp_tool") + query_embedding = await Embedding.get_embedding([query]) + tool_vecs = await (await tool_vector.search( + query=query_embedding, + vector_column_name="embedding", + )).where(f"mcp_id IN {MCPSelector._assemble_sql(mcp_list)}").limit(top_n).to_list() + + # 拿到工具 + tool_collection = MongoDB().get_collection("mcp") + llm_tool_list = [] + + for tool_vec in tool_vecs: + # 到MongoDB里找对应的工具 + logger.info("[MCPHelper] 查询MCP Tool名称和描述: %s", tool_vec["mcp_id"]) + tool_data = await tool_collection.aggregate([ + {"$match": {"_id": tool_vec["mcp_id"]}}, + {"$unwind": "$tools"}, + {"$match": {"tools.id": tool_vec["id"]}}, + {"$project": {"_id": 0, "tools": 1}}, + {"$replaceRoot": {"newRoot": "$tools"}}, + ]) + async for tool in tool_data: + tool_obj = MCPTool.model_validate(tool) + llm_tool_list.append(tool_obj) + + return llm_tool_list diff --git a/apps/scheduler/mcp_agent/tool/__init__.py b/apps/scheduler/mcp_agent/tool/__init__.py deleted file mode 100644 index 4593f317..00000000 --- a/apps/scheduler/mcp_agent/tool/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from apps.scheduler.mcp_agent.tool.base import BaseTool -from apps.scheduler.mcp_agent.tool.terminate import Terminate -from apps.scheduler.mcp_agent.tool.tool_collection import ToolCollection - -__all__ = [ - "BaseTool", - "Terminate", - "ToolCollection", -] diff --git a/apps/scheduler/mcp_agent/tool/base.py b/apps/scheduler/mcp_agent/tool/base.py deleted file mode 100644 index 04ad45c4..00000000 --- a/apps/scheduler/mcp_agent/tool/base.py +++ /dev/null @@ -1,73 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Any, Dict, Optional - -from pydantic import BaseModel, Field - - -class BaseTool(ABC, BaseModel): - name: str - description: str - parameters: Optional[dict] = None - - class Config: - arbitrary_types_allowed = True - - async def __call__(self, **kwargs) -> Any: - return await self.execute(**kwargs) - - @abstractmethod - async def execute(self, **kwargs) -> Any: - """使用给定的参数执行工具""" - - def to_param(self) -> Dict: - """将工具转换为函数调用格式""" - return { - "type": "function", - "function": { - "name": self.name, - "description": self.description, - "parameters": self.parameters, - }, - } - - -class ToolResult(BaseModel): - """表示工具执行的结果""" - - output: Any = Field(default=None) - error: Optional[str] = Field(default=None) - system: Optional[str] = Field(default=None) - - class Config: - arbitrary_types_allowed = True - - def __bool__(self): - return any(getattr(self, field) for field in self.__fields__) - - def __add__(self, other: "ToolResult"): - def combine_fields( - field: Optional[str], other_field: Optional[str], concatenate: bool = True - ): - if field and other_field: - if concatenate: - return field + other_field - raise ValueError("Cannot combine tool results") - return field or other_field - - return ToolResult( - output=combine_fields(self.output, other.output), - error=combine_fields(self.error, other.error), - system=combine_fields(self.system, other.system), - ) - - def __str__(self): - return f"Error: {self.error}" if self.error else self.output - - def replace(self, **kwargs): - """返回一个新的ToolResult,其中替换了给定的字段""" - # return self.copy(update=kwargs) - return type(self)(**{**self.dict(), **kwargs}) - - -class ToolFailure(ToolResult): - """表示失败的ToolResult""" diff --git a/apps/scheduler/mcp_agent/tool/terminate.py b/apps/scheduler/mcp_agent/tool/terminate.py deleted file mode 100644 index 84aa1203..00000000 --- a/apps/scheduler/mcp_agent/tool/terminate.py +++ /dev/null @@ -1,25 +0,0 @@ -from apps.scheduler.mcp_agent.tool.base import BaseTool - - -_TERMINATE_DESCRIPTION = """当请求得到满足或助理无法继续处理任务时,终止交互。 -当您完成所有任务后,调用此工具结束工作。""" - - -class Terminate(BaseTool): - name: str = "terminate" - description: str = _TERMINATE_DESCRIPTION - parameters: dict = { - "type": "object", - "properties": { - "status": { - "type": "string", - "description": "交互的完成状态", - "enum": ["success", "failure"], - } - }, - "required": ["status"], - } - - async def execute(self, status: str) -> str: - """Finish the current execution""" - return f"交互已完成,状态为: {status}" diff --git a/apps/scheduler/mcp_agent/tool/tool_collection.py b/apps/scheduler/mcp_agent/tool/tool_collection.py deleted file mode 100644 index 95bda317..00000000 --- a/apps/scheduler/mcp_agent/tool/tool_collection.py +++ /dev/null @@ -1,55 +0,0 @@ -"""用于管理多个工具的集合类""" -import logging -from typing import Any - -from apps.scheduler.mcp_agent.tool.base import BaseTool, ToolFailure, ToolResult - -logger = logging.getLogger(__name__) - - -class ToolCollection: - """定义工具的集合""" - - class Config: - arbitrary_types_allowed = True - - def __init__(self, *tools: BaseTool): - self.tools = tools - self.tool_map = {tool.name: tool for tool in tools} - - def __iter__(self): - return iter(self.tools) - - def to_params(self) -> list[dict[str, Any]]: - return [tool.to_param() for tool in self.tools] - - async def execute( - self, *, name: str, tool_input: dict[str, Any] = None - ) -> ToolResult: - tool = self.tool_map.get(name) - if not tool: - return ToolFailure(error=f"Tool {name} is invalid") - try: - result = await tool(**tool_input) - return result - except Exception as e: - return ToolFailure(error=f"Failed to execute tool {name}: {e}") - - def add_tool(self, tool: BaseTool): - """ - 将单个工具添加到集合中。 - - 如果已存在同名工具,则将跳过该工具并记录警告。 - """ - if tool.name in self.tool_map: - logger.warning(f"Tool {tool.name} already exists in collection, skipping") - return self - - self.tools += (tool,) - self.tool_map[tool.name] = tool - return self - - def add_tools(self, *tools: BaseTool): - for tool in tools: - self.add_tool(tool) - return self diff --git a/apps/scheduler/scheduler/context.py b/apps/scheduler/scheduler/context.py index f9182773..61be756b 100644 --- a/apps/scheduler/scheduler/context.py +++ b/apps/scheduler/scheduler/context.py @@ -112,11 +112,14 @@ async def save_data(task: Task, user_sub: str, post_body: RequestData) -> None: used_docs.append( RecordGroupDocument( _id=docs["id"], + author=docs.get("author", ""), + order=docs.get("order", 0), name=docs["name"], abstract=docs.get("abstract", ""), extension=docs.get("extension", ""), size=docs.get("size", 0), associated="answer", + created_at=docs.get("created_at", round(datetime.now(UTC).timestamp(), 3)), ) ) if docs.get("order") is not None: @@ -197,7 +200,7 @@ async def save_data(task: Task, user_sub: str, post_body: RequestData) -> None: await AppCenterManager.update_recent_app(user_sub, post_body.app.app_id) # 若状态为成功,删除Task - if not task.state or task.state.status == StepStatus.SUCCESS: + if not task.state or task.state.flow_status == StepStatus.SUCCESS or task.state.flow_status == StepStatus.ERROR or task.state.flow_status == StepStatus.CANCELLED: await TaskManager.delete_task_by_task_id(task.id) else: # 更新Task diff --git a/apps/scheduler/scheduler/message.py b/apps/scheduler/scheduler/message.py index 929d7639..bbb02d3e 100644 --- a/apps/scheduler/scheduler/message.py +++ b/apps/scheduler/scheduler/message.py @@ -71,14 +71,7 @@ async def push_rag_message( # 如果是文本消息,直接拼接到答案中 full_answer += content_obj.content elif content_obj.event_type == EventType.DOCUMENT_ADD.value: - task.runtime.documents.append({ - "id": content_obj.content.get("id", ""), - "order": content_obj.content.get("order", 0), - "name": content_obj.content.get("name", ""), - "abstract": content_obj.content.get("abstract", ""), - "extension": content_obj.content.get("extension", ""), - "size": content_obj.content.get("size", 0), - }) + task.runtime.documents.append(content_obj.content) # 保存答案 task.runtime.answer = full_answer @@ -116,10 +109,12 @@ async def _push_rag_chunk(task: Task, queue: MessageQueue, content: str) -> tupl data=DocumentAddContent( documentId=content_obj.content.get("id", ""), documentOrder=content_obj.content.get("order", 0), + documentAuthor=content_obj.content.get("author", ""), documentName=content_obj.content.get("name", ""), documentAbstract=content_obj.content.get("abstract", ""), documentType=content_obj.content.get("extension", ""), documentSize=content_obj.content.get("size", 0), + createdAt=round(datetime.now(tz=UTC).timestamp(), 3), ).model_dump(exclude_none=True, by_alias=True), ) except Exception: diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 17f551f0..ef0a747d 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -203,7 +203,7 @@ class Scheduler: task=self.task, msg_queue=queue, question=post_body.question, - max_steps=app_metadata.history_len, + history_len=app_metadata.history_len, servers_id=servers_id, background=background, agent_id=app_info.app_id, diff --git a/apps/scheduler/slot/slot.py b/apps/scheduler/slot/slot.py index 1b372e9e..bb2fde49 100644 --- a/apps/scheduler/slot/slot.py +++ b/apps/scheduler/slot/slot.py @@ -12,6 +12,9 @@ from jsonschema.exceptions import ValidationError from jsonschema.protocols import Validator from jsonschema.validators import extend +from apps.scheduler.call.choice.schema import Type +from apps.schemas.response_data import ParamsNode + from .parser import ( SlotConstParser, SlotDateParser, @@ -221,6 +224,45 @@ class Slot: return data return _extract_type_desc(self._schema) + def get_params_node_from_schema(self, root: str = "") -> ParamsNode: + """从JSON Schema中提取ParamsNode""" + def _extract_params_node(schema_node: dict[str, Any], name: str = "", path: str = "") -> ParamsNode: + """递归提取ParamsNode""" + if "type" not in schema_node: + return None + + param_type = schema_node["type"] + if param_type == "object": + param_type = Type.DICT + elif param_type == "array": + param_type = Type.LIST + elif param_type == "string": + param_type = Type.STRING + elif param_type == "number": + param_type = Type.NUMBER + elif param_type == "boolean": + param_type = Type.BOOL + else: + logger.warning(f"[Slot] 不支持的参数类型: {param_type}") + return None + sub_params = [] + + if param_type == "object" and "properties" in schema_node: + for key, value in schema_node["properties"].items(): + sub_params.append(_extract_params_node(value, name=key, path=f"{path}/{key}")) + else: + # 对于非对象类型,直接返回空子参数 + sub_params = None + return ParamsNode(paramName=name, + paramPath=path, + paramType=param_type, + subParams=sub_params) + try: + return _extract_params_node(self._schema, name=root, path=root) + except Exception as e: + logger.error(f"[Slot] 提取ParamsNode失败: {e!s}\n{traceback.format_exc()}") + return None + def _flatten_schema(self, schema: dict[str, Any]) -> tuple[dict[str, Any], list[str]]: """将JSON Schema扁平化""" result = {} diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py index 6970781e..a6a2d812 100644 --- a/apps/schemas/enum_var.py +++ b/apps/schemas/enum_var.py @@ -20,6 +20,17 @@ class StepStatus(str, Enum): SUCCESS = "success" ERROR = "error" PARAM = "param" + CANCELLED = "cancelled" + + +class FlowStatus(str, Enum): + """Flow状态""" + + WAITING = "waiting" + RUNNING = "running" + SUCCESS = "success" + ERROR = "error" + CANCELLED = "cancelled" class DocumentStatus(str, Enum): @@ -45,6 +56,8 @@ class EventType(str, Enum): STEP_INPUT = "step.input" STEP_OUTPUT = "step.output" FLOW_STOP = "flow.stop" + FLOW_FAILED = "flow.failed" + FLOW_CANCELLED = "flow.cancelled" DONE = "done" diff --git a/apps/schemas/record.py b/apps/schemas/record.py index 7f4f3103..1ea12c39 100644 --- a/apps/schemas/record.py +++ b/apps/schemas/record.py @@ -41,6 +41,7 @@ class RecordFlow(BaseModel): id: str record_id: str = Field(alias="recordId") flow_id: str = Field(alias="flowId") + flow_status: StepStatus = Field(alias="flowStatus", default=StepStatus.SUCCESS) step_num: int = Field(alias="stepNum") steps: list[RecordFlowStep] diff --git a/apps/schemas/task.py b/apps/schemas/task.py index afb445d1..4d49e1c1 100644 --- a/apps/schemas/task.py +++ b/apps/schemas/task.py @@ -7,7 +7,8 @@ from typing import Any from pydantic import BaseModel, Field -from .enum_var import StepStatus +from apps.schemas.enum_var import FlowStatus, StepStatus + from .flow import Step from .mcp import MCPPlan @@ -23,10 +24,11 @@ class FlowStepHistory(BaseModel): task_id: str = Field(description="任务ID") flow_id: str = Field(description="FlowID") flow_name: str = Field(description="Flow名称") + flow_status: FlowStatus = Field(description="Flow状态") step_id: str = Field(description="当前步骤名称") step_name: str = Field(description="当前步骤名称") - step_description: str = Field(description="当前步骤描述") - status: StepStatus = Field(description="当前步骤状态") + step_description: str = Field(description="当前步骤描述", default="") + step_status: StepStatus = Field(description="当前步骤状态") input_data: dict[str, Any] = Field(description="当前Step执行的输入", default={}) output_data: dict[str, Any] = Field(description="当前Step执行后的结果", default={}) created_at: float = Field(default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3)) @@ -39,10 +41,11 @@ class ExecutorState(BaseModel): flow_id: str = Field(description="Flow ID") flow_name: str = Field(description="Flow名称") description: str = Field(description="Flow描述") - status: StepStatus = Field(description="Flow执行状态") - # 附加信息 + flow_status: FlowStatus = Field(description="Flow状态") + # 任务级数据 step_id: str = Field(description="当前步骤ID") step_name: str = Field(description="当前步骤名称") + step_status: StepStatus = Field(description="当前步骤状态") step_description: str = Field(description="当前步骤描述", default="") app_id: str = Field(description="应用ID") slot: dict[str, Any] = Field(description="待填充参数的JSON Schema", default={}) diff --git a/tests/common/test_queue.py b/tests/common/test_queue.py index 5375180a..e1517277 100644 --- a/tests/common/test_queue.py +++ b/tests/common/test_queue.py @@ -74,7 +74,7 @@ async def test_push_output_with_flow(message_queue, mock_task): mock_task.state.flow_id = "flow_id" mock_task.state.step_id = "step_id" mock_task.state.step_name = "step_name" - mock_task.state.status = "running" + mock_task.state.step_status = "running" await message_queue.init("test_task") await message_queue.push_output(mock_task, EventType.TEXT_ADD, {}) -- Gitee