diff --git a/apps/constants.py b/apps/constants.py index 1303e3d66e9c91e26f2c4e4272aca54ece18434b..9c02fd8da5a09d1e699b5dbdb03f914a3292a727 100644 --- a/apps/constants.py +++ b/apps/constants.py @@ -51,5 +51,11 @@ ALLOWED_ICON_MIME_TYPES = [ MCP_PATH = Path(config.deploy.data_dir) / "semantics" / "mcp" # 项目路径 PROJ_PATH = Path(__file__).parent.parent -# 图标存储 +# 图标存储位置 ICON_PATH = PROJ_PATH / "static" / "icons" +# MCP Agent 最大重试次数 +AGENT_MAX_RETRY_TIMES = 3 +# MCP Agent 最大步骤数 +AGENT_MAX_STEPS = 25 +# MCP Agent 最终步骤名称 +AGENT_FINAL_STEP_NAME = "FIANL" diff --git a/apps/llm/function.py b/apps/llm/function.py index b6766e49d4fcd9a679ddb5bc9d679bfec05323fa..491087916fb6aad8f1283be1843777f399157c4a 100644 --- a/apps/llm/function.py +++ b/apps/llm/function.py @@ -25,6 +25,7 @@ class FunctionLLM: """用于FunctionCall的模型""" timeout: float = 30.0 + config: LLMData def __init__(self, llm_config: LLMData | None = None) -> None: """ @@ -43,34 +44,34 @@ class FunctionLLM: logger.error(err) raise RuntimeError(err) - self._config: LLMData = llm_config + self.config: LLMData = llm_config self._params = { - "model": self._config.modelName, + "model": self.config.modelName, "messages": [], } - if self._config.functionCallBackend == FunctionCallBackend.OLLAMA and not self._config.openaiAPIKey: + if self.config.functionCallBackend == FunctionCallBackend.OLLAMA and not self.config.openaiAPIKey: self._client = ollama.AsyncClient( - host=self._config.openaiBaseUrl, + host=self.config.openaiBaseUrl, timeout=self.timeout, ) - elif self._config.functionCallBackend == FunctionCallBackend.OLLAMA and self._config.openaiAPIKey: + elif self.config.functionCallBackend == FunctionCallBackend.OLLAMA and self.config.openaiAPIKey: self._client = ollama.AsyncClient( - host=self._config.openaiBaseUrl, + host=self.config.openaiBaseUrl, headers={ - "Authorization": f"Bearer {self._config.openaiAPIKey}", + "Authorization": f"Bearer {self.config.openaiAPIKey}", }, timeout=self.timeout, ) - elif self._config.functionCallBackend != FunctionCallBackend.OLLAMA and not self._config.openaiAPIKey: + elif self.config.functionCallBackend != FunctionCallBackend.OLLAMA and not self.config.openaiAPIKey: self._client = openai.AsyncOpenAI( - base_url=self._config.openaiBaseUrl, + base_url=self.config.openaiBaseUrl, timeout=self.timeout, ) - elif self._config.functionCallBackend != FunctionCallBackend.OLLAMA and self._config.openaiAPIKey: + elif self.config.functionCallBackend != FunctionCallBackend.OLLAMA and self.config.openaiAPIKey: self._client = openai.AsyncOpenAI( - base_url=self._config.openaiBaseUrl, - api_key=self._config.openaiAPIKey, + base_url=self.config.openaiBaseUrl, + api_key=self.config.openaiAPIKey, timeout=self.timeout, ) @@ -98,14 +99,14 @@ class FunctionLLM: "temperature": temperature, }) - if self._config.functionCallBackend == FunctionCallBackend.VLLM: + if self.config.functionCallBackend == FunctionCallBackend.VLLM: self._params["extra_body"] = {"guided_json": schema} - elif self._config.functionCallBackend == FunctionCallBackend.JSON_MODE: + elif self.config.functionCallBackend == FunctionCallBackend.JSON_MODE: logger.warning("[FunctionCall] json_mode无法确保输出格式符合要求,使用效果将受到影响") self._params["response_format"] = {"type": "json_object"} - elif self._config.functionCallBackend == FunctionCallBackend.STRUCTURED_OUTPUT: + elif self.config.functionCallBackend == FunctionCallBackend.STRUCTURED_OUTPUT: self._params["response_format"] = { "type": "json_schema", "json_schema": { @@ -116,7 +117,7 @@ class FunctionLLM: }, } - elif self._config.functionCallBackend == FunctionCallBackend.FUNCTION_CALL: + elif self.config.functionCallBackend == FunctionCallBackend.FUNCTION_CALL: logger.warning("[FunctionCall] function_call无法确保一定调用工具,使用效果将受到影响") self._params["tools"] = [ { @@ -220,14 +221,14 @@ class FunctionLLM: """ # 检查max_tokens和temperature是否设置 if max_tokens is None: - max_tokens = self._config.maxToken + max_tokens = self.config.maxToken if temperature is None: - temperature = self._config.temperature + temperature = self.config.temperature - if self._config.functionCallBackend == FunctionCallBackend.OLLAMA: + if self.config.functionCallBackend == FunctionCallBackend.OLLAMA: json_str = await self._call_ollama(messages, schema, max_tokens, temperature) - elif self._config.functionCallBackend in [ + elif self.config.functionCallBackend in [ FunctionCallBackend.FUNCTION_CALL, FunctionCallBackend.JSON_MODE, FunctionCallBackend.STRUCTURED_OUTPUT, @@ -306,12 +307,14 @@ class JsonGenerator: return {} - def __init__(self, config: LLMData, query: str, conversation: list[dict[str, str]], schema: dict[str, Any]) -> None: + def __init__( + self, llm: FunctionLLM, query: str, conversation: list[dict[str, str]], schema: dict[str, Any], + ) -> None: """初始化JSON生成器""" self._query = query self._conversation = conversation self._schema = schema - self._config = config + self._llm = llm self._trial = {} self._count = 0 @@ -327,7 +330,7 @@ class JsonGenerator: async def _assemble_message(self) -> str: """组装消息""" # 检查类型 - function_call = self._config.functionCallBackend == FunctionCallBackend.FUNCTION_CALL + function_call = self._llm.config.functionCallBackend == FunctionCallBackend.FUNCTION_CALL # 渲染模板 template = self._env.from_string(JSON_GEN_BASIC) @@ -347,8 +350,7 @@ class JsonGenerator: {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}, ] - function = FunctionLLM() - return await function.call(messages, self._schema, max_tokens, temperature) + return await self._llm.call(messages, self._schema, max_tokens, temperature) async def generate(self) -> dict[str, Any]: diff --git a/apps/llm/patterns/__init__.py b/apps/llm/patterns/__init__.py index 156953c901eb3388fce75f6e08547ee5ffb2fcc7..8a29c93cc645ac06dd121423c88de28e8f0d2a05 100644 --- a/apps/llm/patterns/__init__.py +++ b/apps/llm/patterns/__init__.py @@ -4,13 +4,9 @@ from apps.llm.patterns.core import CorePattern from apps.llm.patterns.executor import ( ExecutorSummary, - ExecutorThought, ) -from apps.llm.patterns.select import Select __all__ = [ "CorePattern", "ExecutorSummary", - "ExecutorThought", - "Select", ] diff --git a/apps/llm/patterns/executor.py b/apps/llm/patterns/executor.py index ecb514a6ea0770cb05a81d7a7e2d153c0721949e..81861ab798326e023a1c2939da930c244ed65d2e 100644 --- a/apps/llm/patterns/executor.py +++ b/apps/llm/patterns/executor.py @@ -1,7 +1,7 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """使用大模型生成Executor的思考内容""" -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING from apps.llm.reasoning import ReasoningLLM from apps.llm.snippet import convert_context_to_prompt, facts_to_prompt @@ -13,121 +13,6 @@ if TYPE_CHECKING: from apps.schemas.scheduler import ExecutorBackground -class ExecutorThought(CorePattern): - """通过大模型生成Executor的思考内容""" - - @staticmethod - def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: - """默认的Prompt内容""" - return { - LanguageType.CHINESE: r"You are a helpful assistant.", - LanguageType.ENGLISH: r"You are a helpful assistant.", - }, { - LanguageType.CHINESE: r""" - - - 你是一个可以使用工具的智能助手。 - 在回答用户的问题时,你为了获取更多的信息,使用了一个工具。 - 请简明扼要地总结工具的使用过程,提供你的见解,并给出下一步的行动。 - - 注意: - 工具的相关信息在标签中给出。 - 为了使你更好的理解发生了什么,你之前的思考过程在标签中给出。 - 输出时请不要包含XML标签,输出时请保持简明和清晰。 - - - - - {tool_name} - {tool_description} - {tool_output} - - - - {last_thought} - - - - 你当前需要解决的问题是: - {user_question} - - - 请综合以上信息,再次一步一步地进行思考,并给出见解和行动: - """, - LanguageType.ENGLISH: r""" - - - You are an intelligent assistant who can use tools. - When answering user questions, you use a tool to get more information. - Please summarize the process of using the tool briefly, provide your insights, \ -and give the next action. - - Note: - The information about the tool is given in the tag. - To help you better understand what happened, your previous thought process is given in the \ - tag. - Do not include XML tags in the output, and keep the output brief and clear. - - - - - {tool_name} - {tool_description} - {tool_output} - - - - {last_thought} - - - - The question you need to solve is: - {user_question} - - - Please integrate the above information, think step by step again, provide insights, and give actions: - """, - } - - def __init__( - self, - system_prompt: dict[LanguageType, str] | None = None, - user_prompt: dict[LanguageType, str] | None = None, - ) -> None: - """处理Prompt""" - super().__init__(system_prompt, user_prompt) - - async def generate(self, **kwargs) -> str: # noqa: ANN003 - """调用大模型,生成对话总结""" - last_thought: str = kwargs["last_thought"] - user_question: str = kwargs["user_question"] - tool_info: dict[str, Any] = kwargs["tool_info"] - language: LanguageType = kwargs.get("language", LanguageType.CHINESE) - - messages = [ - {"role": "system", "content": "You are a helpful assistant."}, - { - "role": "user", - "content": self.user_prompt[language].format( - last_thought=last_thought, - user_question=user_question, - tool_name=tool_info["name"], - tool_description=tool_info["description"], - tool_output=tool_info["output"], - ), - }, - ] - - llm = ReasoningLLM() - result = "" - async for chunk in llm.call(messages, streaming=False, temperature=0.7): - result += chunk - self.input_tokens = llm.input_tokens - self.output_tokens = llm.output_tokens - - return result - - class ExecutorSummary(CorePattern): """使用大模型进行生成Executor初始背景""" diff --git a/apps/llm/patterns/rewrite.py b/apps/llm/patterns/rewrite.py deleted file mode 100644 index f4e83230fab8e9dbf8c5d568ed731e297c45f20c..0000000000000000000000000000000000000000 --- a/apps/llm/patterns/rewrite.py +++ /dev/null @@ -1,218 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""问题改写""" - -import logging -from textwrap import dedent - -from jinja2 import BaseLoader -from jinja2.sandbox import SandboxedEnvironment -from pydantic import BaseModel, Field - -from apps.llm.function import JsonGenerator -from apps.llm.reasoning import ReasoningLLM -from apps.llm.token import TokenCalculator -from apps.schemas.enum_var import LanguageType - -from .core import CorePattern - -logger = logging.getLogger(__name__) -_env = SandboxedEnvironment( - loader=BaseLoader, - autoescape=False, - trim_blocks=True, - lstrip_blocks=True, -) - - -class QuestionRewriteResult(BaseModel): - """问题补全与重写结果""" - - question: str = Field(description="补全后的问题") - - -class QuestionRewrite(CorePattern): - """问题补全与重写""" - - @staticmethod - def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: - """默认的Prompt内容""" - return { - LanguageType.CHINESE: r"You are a helpful assistant.", - LanguageType.ENGLISH: r"You are a helpful assistant.", - }, { - LanguageType.CHINESE: dedent(r""" - - - 根据历史对话,推断用户的实际意图并补全用户的提问内容,历史对话被包含在标签中,用户意图被包含在标签中。 - 要求: - 1. 请使用JSON格式输出,参考下面给出的样例;不要包含任何XML标签,不要包含任何解释说明; - 2. 若用户当前提问内容与对话上文不相关,或你认为用户的提问内容已足够完整,请直接输出用户的提问内容。 - 3. 补全内容必须精准、恰当,不要编造任何内容。 - 4. 请输出补全后的问题,不要输出其他内容。 - 输出格式样例: - ```json - { - "question": "补全后的问题" - } - ``` - - - - - - - openEuler的优势有哪些? - - - openEuler的优势包括开源、社区支持、以及对云计算和边缘计算的优化。 - - - - - - 详细点? - - - ```json - { - "question": "详细说明openEuler操作系统的优势和应用场景" - } - ``` - - - - - - {{history}} - - - {{question}} - - - 现在,请输出补全后的问题: - - """).strip("\n"), - LanguageType.ENGLISH: dedent(r""" - - - Based on the historical dialogue, infer the user's actual intent and complete the user's question. \ -The historical dialogue is contained within the tags, and the user's intent is contained within the \ - tags. - Requirements: - 1. Please output in JSON format, referring to the example provided below; do not include any XML \ -tags or any explanatory notes; - 2. If the user's current question is unrelated to the previous dialogue or you believe the \ -user's question is already complete enough, directly output the user's question. - 3. The completed content must be precise and appropriate; do not fabricate any content. - 4. Output only the completed question; do not include any other content. - Example output format: - ```json - { - "question": "The completed question" - } - ``` - - - - - - - What are the features of openEuler? - - - Compared to other operating systems, openEuler's features include support for multiple \ -hardware architectures and providing a stable, secure, and efficient operating system platform. - - - - - What are the advantages of openEuler? - - - The advantages of openEuler include being open-source, having community support, \ -and optimizations for cloud and edge computing. - - - - - - More details? - - - ```json - { - "question": "What are the features of openEuler? Please elaborate on its advantages and \ -application scenarios." - } - ``` - - - - - {{history}} - - - {{question}} - - """).strip("\n"), - } - - async def generate(self, **kwargs) -> str: # noqa: ANN003 - """问题补全与重写""" - history = kwargs.get("history", []) - question = kwargs["question"] - language = kwargs.get("language", LanguageType.CHINESE) - - messages = [ - {"role": "system", "content": self.system_prompt[language]}, - {"role": "user", "content": _env.from_string( - self.user_prompt[language], - ).render(history="", question=question)}, - ] - llm = kwargs.get("llm") - if not llm: - llm = ReasoningLLM() - leave_tokens = llm.config.max_tokens - leave_tokens -= TokenCalculator().calculate_token_length(messages) - if leave_tokens <= 0: - logger.error("[QuestionRewrite] 大模型上下文窗口不足,无法进行问题补全与重写") - return question - index = 0 - qa = "" - while index < len(history) - 1 and leave_tokens > 0: - q = history[index - 1].get("content", "") - a = history[index].get("content", "") - sub_qa = f"\n\n{q}\n\n\n{a}\n\n" - leave_tokens -= TokenCalculator().calculate_token_length( - messages=[ - {"role": "user", "content": sub_qa}, - ], - pure_text=True, - ) - if leave_tokens >= 0: - qa = sub_qa + qa - index += 2 - - messages[1]["content"] = self.user_prompt[language].format(history=qa, question=question) - result = "" - async for chunk in llm.call(messages, streaming=False): - result += chunk - self.input_tokens = llm.input_tokens - self.output_tokens = llm.output_tokens - - tmp_js = await JsonGenerator.parse_result_by_stack(result, QuestionRewriteResult.model_json_schema()) - if tmp_js is not None: - return tmp_js["question"] - messages += [{"role": "assistant", "content": result}] - json_gen = JsonGenerator( - query="根据给定的背景信息,生成预测问题", - conversation=messages, - schema=QuestionRewriteResult.model_json_schema(), - ) - try: - question_dict = QuestionRewriteResult.model_validate(await json_gen.generate()) - except Exception: - logger.exception("[QuestionRewrite] 问题补全与重写失败") - return question - - return question_dict.question diff --git a/apps/llm/patterns/select.py b/apps/llm/patterns/select.py deleted file mode 100644 index d28a1c84b8c01a5ab4a7303eed405e61f0a67ade..0000000000000000000000000000000000000000 --- a/apps/llm/patterns/select.py +++ /dev/null @@ -1,223 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""使用大模型多轮投票,选择最优选项""" - -import asyncio -import json -import logging -from collections import Counter -from typing import Any - -from apps.llm.function import JsonGenerator -from apps.llm.reasoning import ReasoningLLM -from apps.llm.snippet import choices_to_prompt -from apps.schemas.enum_var import LanguageType - -from .core import CorePattern - -logger = logging.getLogger(__name__) - - -class Select(CorePattern): - """通过投票选择最佳答案""" - - @staticmethod - def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: - """默认的Prompt内容""" - return { - LanguageType.CHINESE: r"You are a helpful assistant.", - LanguageType.ENGLISH: r"You are a helpful assistant.", - }, { - LanguageType.CHINESE: r""" - - - 根据历史对话(包括工具调用结果)和用户问题,从给出的选项列表中,选出最符合要求的那一项。 - 在输出之前,请先思考,并使用“”标签给出思考过程。 - 结果需要使用JSON格式输出,输出格式为:{{ "choice": "选项名称" }} - - - - - 使用天气API,查询明天杭州的天气信息 - - - - API - HTTP请求,获得返回的JSON数据 - - - SQL - 查询数据库,获得数据库表中的数据 - - - - - - API 工具可以通过 API 来获取外部数据,而天气信息可能就存储在外部数据中,由于用户说明中明确 \ -提到了天气 API 的使用,因此应该优先使用 API 工具。SQL 工具用于从数据库中获取信息,考虑到天气数据的可变性和动态性\ -,不太可能存储在数据库中,因此 SQL 工具的优先级相对较低,最佳选择似乎是“API:请求特定 API,获取返回的 JSON 数据”。 - - - - {{ "choice": "API" }} - - - - - - - {question} - - - - {choice_list} - - - - - 让我们一步一步思考。 - """, - LanguageType.ENGLISH: r""" - - - Based on the historical dialogue (including tool call results) and user question, select the \ -most suitable option from the given option list. - Before outputting, please think carefully and use the "" tag to give the thinking \ -process. - The output needs to be in JSON format, the output format is: {{ "choice": "option name" }} - - - - - Use the weather API to query the weather information of Hangzhou \ -tomorrow - - - - API - HTTP request, get the returned JSON data - - - SQL - Query the database, get the data in the database table - - - - - - The API tool can get external data through API, and the weather information may be stored \ -in external data. Since the user clearly mentioned the use of weather API, it should be given priority to the API \ -tool. The SQL tool is used to get information from the database, considering the variability and dynamism of weather \ -data, it is unlikely to be stored in the database, so the priority of the SQL tool is relatively low, \ -The best choice seems to be "API: request a specific API, get the returned JSON data". - - - - {{ "choice": "API" }} - - - - - - {question} - - - - {choice_list} - - - - - Let's think step by step. - - - """, - } - - def __init__( - self, - system_prompt: dict[LanguageType, str] | None = None, - user_prompt: dict[LanguageType, str] | None = None, - slot_schema: dict[str, Any] | None = None, - ) -> None: - """初始化Prompt""" - super().__init__(system_prompt, user_prompt) - if slot_schema is not None: - self.slot_schema = slot_schema - else: - self.slot_schema = { - "type": "object", - "properties": { - "choice": { - "type": "string", - "description": "The choice of the option.", - }, - }, - "required": ["choice"], - } - - - async def _generate_single_attempt(self, user_input: str, choice_list: list[str]) -> str: - """使用ReasoningLLM进行单次尝试""" - logger.info("[Select] 单次选择尝试: %s", user_input) - messages = [ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": user_input}, - ] - result = "" - llm = ReasoningLLM() - async for chunk in llm.call(messages, streaming=False): - result += chunk - self.input_tokens = llm.input_tokens - self.output_tokens = llm.output_tokens - logger.info("[Select] 选择结果: %s", result) - - # 使用FunctionLLM进行参数提取 - schema = self.slot_schema - schema["properties"]["choice"]["enum"] = choice_list - - messages += [{"role": "assistant", "content": result}] - json_gen = JsonGenerator( - query="根据给定的背景信息,生成预测问题", - conversation=messages, - schema=schema, - ) - function_result = await json_gen.generate() - return function_result["choice"] - - - async def generate(self, **kwargs) -> str: # noqa: ANN003 - """使用大模型做出选择""" - logger.info("[Select] 使用LLM选择") - max_try = 3 - result_list = [] - - background = kwargs.get("background", "无背景信息。") - data_str = json.dumps(kwargs.get("data", {}), ensure_ascii=False) - language = kwargs.get("language", LanguageType.CHINESE) - - choice_prompt, choices_list = choices_to_prompt(kwargs["choices"]) - - if not choices_list: - error_msg = "[Select] 选项列表不能为空" - logger.error(error_msg) - raise ValueError(error_msg) - if len(choices_list) == 1: - logger.info("[Select] 选项列表只有一个选项,直接返回") - return choices_list[0] - - logger.info("[Select] 选项列表: %s", choice_prompt) - user_input = self.user_prompt[language].format( - question=kwargs["question"], - background=background, - data=data_str, - choice_list=choice_prompt, - ) - - result_coroutine = [self._generate_single_attempt(user_input, choices_list) for _ in range(max_try)] - result_list = await asyncio.gather(*result_coroutine) - - count = Counter(result_list) - selected_choice = count.most_common(1)[0][0] - - logger.info("[Select] 选择结果: %s", selected_choice) - return selected_choice diff --git a/apps/scheduler/call/graph/graph.py b/apps/scheduler/call/graph/graph.py index dc5986c03e6b67668ef9994b0ef788880b705bc2..fcd7f2f69443ef92e1a9d0071fe7a02ef4c4d82b 100644 --- a/apps/scheduler/call/graph/graph.py +++ b/apps/scheduler/call/graph/graph.py @@ -6,6 +6,8 @@ from collections.abc import AsyncGenerator from typing import Any from anyio import Path +from jinja2 import BaseLoader +from jinja2.sandbox import SandboxedEnvironment from pydantic import Field from apps.scheduler.call.core import CoreCall @@ -18,7 +20,12 @@ from apps.schemas.scheduler import ( ) from .prompt import GENERATE_STYLE_PROMPT -from .schema import RenderFormat, RenderInput, RenderOutput +from .schema import ( + RenderFormat, + RenderInput, + RenderOutput, + RenderStyleResult, +) class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): @@ -39,6 +46,13 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): async def _init(self, call_vars: CallVars) -> RenderInput: """初始化Render Call,校验参数,读取option模板""" + self._env = SandboxedEnvironment( + loader=BaseLoader(), + autoescape=False, + trim_blocks=True, + lstrip_blocks=True, + ) + try: option_location = Path(__file__).parent / "option.json" f = await Path(option_location).open(encoding="utf-8") @@ -88,10 +102,18 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): self._option_template["dataset"]["source"] = processed_data try: - llm_output = await style_obj.generate(question=data.question) - - add_style = llm_output.get("additional_style", "") - self._parse_options(column_num, llm_output["chart_type"], add_style, llm_output["scale_type"]) + style_obj = self._env.from_string(GENERATE_STYLE_PROMPT[self._sys_vars.language]) + style_prompt = style_obj.render(question=data.question) + + result = "" + async for chunk in self._llm(messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": style_prompt}, + ], streaming=True): + result += chunk + llm_output = RenderStyleResult.model_validate_json(result) + + self._parse_options(column_num, llm_output) except Exception as e: raise CallError(message=f"图表生成失败:{e!s}", data={"data": data}) from e @@ -120,25 +142,25 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): return result - def _parse_options(self, column_num: int, chart_style: str, additional_style: str, scale_style: str) -> None: + def _parse_options(self, column_num: int, style: RenderStyleResult) -> None: """解析LLM做出的图表样式选择""" series_template = {} - if chart_style == "line": + if style.chart_type == "line": series_template["type"] = "line" - elif chart_style == "scatter": + elif style.chart_type == "scatter": series_template["type"] = "scatter" - elif chart_style == "pie": + elif style.chart_type == "pie": column_num = 1 series_template["type"] = "pie" - if additional_style == "ring": + if style.additional_style == "ring": series_template["radius"] = ["40%", "70%"] else: series_template["type"] = "bar" - if additional_style == "stacked": + if style.additional_style == "stacked": series_template["stack"] = "total" - if scale_style == "log": + if style.scale_type == "log": self._option_template["yAxis"]["type"] = "log" for _ in range(column_num): diff --git a/apps/scheduler/call/graph/prompt.py b/apps/scheduler/call/graph/prompt.py index 521a432de1f984d747b561152cff6a2dc9164f79..8eec1f85f925600656bd1651aab3e7f8832e69c4 100644 --- a/apps/scheduler/call/graph/prompt.py +++ b/apps/scheduler/call/graph/prompt.py @@ -43,7 +43,7 @@ GENERATE_STYLE_PROMPT: dict[LanguageType, str] = { ## 问题 - {question} + {{question}} ## 思考 让我们一步步思考。 @@ -92,7 +92,7 @@ should be `bar`, i.e. a bar chart; the chart style should be `stacked`, i.e. a s ## Question - {question} + {{question}} ## Thought diff --git a/apps/scheduler/call/mcp/mcp.py b/apps/scheduler/call/mcp/mcp.py index 3805fd4a8f9a43823f3fc8d4610081dedfa4c092..eb2e3330214291a5b513e902f78d5d452196c548 100644 --- a/apps/scheduler/call/mcp/mcp.py +++ b/apps/scheduler/call/mcp/mcp.py @@ -86,11 +86,8 @@ class MCP(CoreCall, input_model=MCPInput, output_model=MCPOutput): """初始化MCP""" # 获取MCP交互类 self._host = MCPHost( - call_vars.ids.user_sub, call_vars.ids.task_id, call_vars.ids.executor_id, - self.description, - language=self._sys_vars.language, ) self._tool_list = await self._host.get_tool_list(self.mcp_list) self._call_vars = call_vars diff --git a/apps/scheduler/call/rag/prompt.py b/apps/scheduler/call/rag/prompt.py index 0a32ec4c4321d968c1fe1390e8adc5600323e685..b574555b246abc6cf176aadfaf29423d60d99a4f 100644 --- a/apps/scheduler/call/rag/prompt.py +++ b/apps/scheduler/call/rag/prompt.py @@ -1,7 +1,128 @@ """RAG工具的提示词""" +from textwrap import dedent + from apps.schemas.enum_var import LanguageType +QUESTION_REWRITE: dict[LanguageType, str] = { + LanguageType.CHINESE: dedent(r""" + + + 根据历史对话,推断用户的实际意图并补全用户的提问内容,历史对话被包含在标签中,用户意图被包含在标签中。 + 要求: + 1. 请使用JSON格式输出,参考下面给出的样例;不要包含任何XML标签,不要包含任何解释说明; + 2. 若用户当前提问内容与对话上文不相关,或你认为用户的提问内容已足够完整,请直接输出用户的提问内容。 + 3. 补全内容必须精准、恰当,不要编造任何内容。 + 4. 请输出补全后的问题,不要输出其他内容。 + 输出格式样例: + ```json + { + "question": "补全后的问题" + } + ``` + + + + + + + openEuler的优势有哪些? + + + openEuler的优势包括开源、社区支持、以及对云计算和边缘计算的优化。 + + + + + + 详细点? + + + ```json + { + "question": "详细说明openEuler操作系统的优势和应用场景" + } + ``` + + + + + + {{history}} + + + {{question}} + + + 现在,请输出补全后的问题: + + """).strip("\n"), + LanguageType.ENGLISH: dedent(r""" + + + Based on the historical dialogue, infer the user's actual intent and complete the user's question. \ +The historical dialogue is contained within the tags, and the user's intent is contained within the \ + tags. + Requirements: + 1. Please output in JSON format, referring to the example provided below; do not include any XML \ +tags or any explanatory notes; + 2. If the user's current question is unrelated to the previous dialogue or you believe the \ +user's question is already complete enough, directly output the user's question. + 3. The completed content must be precise and appropriate; do not fabricate any content. + 4. Output only the completed question; do not include any other content. + Example output format: + ```json + { + "question": "The completed question" + } + ``` + + + + + + + What are the features of openEuler? + + + Compared to other operating systems, openEuler's features include support for multiple \ +hardware architectures and providing a stable, secure, and efficient operating system platform. + + + + + What are the advantages of openEuler? + + + The advantages of openEuler include being open-source, having community support, \ +and optimizations for cloud and edge computing. + + + + + + More details? + + + ```json + { + "question": "What are the features of openEuler? Please elaborate on its advantages and \ +application scenarios." + } + ``` + + + + + + {{history}} + + + {{question}} + + """).strip("\n"), +} + GEN_RAG_ANSWER: dict[LanguageType, str] = { LanguageType.CHINESE: r""" diff --git a/apps/scheduler/call/rag/rag.py b/apps/scheduler/call/rag/rag.py index 740110900f8140f26408a2609c25ff466931918b..16f99d5ae9c94abcfa7d4aac981c6ff8dbd42dac 100644 --- a/apps/scheduler/call/rag/rag.py +++ b/apps/scheduler/call/rag/rag.py @@ -10,7 +10,6 @@ from fastapi import status from pydantic import Field from apps.common.config import config -from apps.llm.patterns.rewrite import QuestionRewrite from apps.scheduler.call.core import CoreCall from apps.schemas.enum_var import CallOutputType, LanguageType from apps.schemas.scheduler import ( @@ -20,6 +19,7 @@ from apps.schemas.scheduler import ( CallVars, ) +from .prompt import QUESTION_REWRITE from .schema import RAGInput, RAGOutput, SearchMethod logger = logging.getLogger(__name__) diff --git a/apps/scheduler/call/search/__init__.py b/apps/scheduler/call/search/__init__.py deleted file mode 100644 index cd72c95eaaeb74c6c9778889ee330978b81db4f4..0000000000000000000000000000000000000000 --- a/apps/scheduler/call/search/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""搜索工具""" diff --git a/apps/scheduler/call/search/schema.py b/apps/scheduler/call/search/schema.py deleted file mode 100644 index 26f8e24a33125575b40824a832a02bc41ba406f3..0000000000000000000000000000000000000000 --- a/apps/scheduler/call/search/schema.py +++ /dev/null @@ -1,19 +0,0 @@ -"""Search Call的输入和输出""" - -from typing import Any - -from pydantic import Field - -from apps.scheduler.call.core import DataBase - - -class SearchInput(DataBase): - """搜索工具输入""" - - query: list[str] = Field(description="搜索关键词") - - -class SearchOutput(DataBase): - """搜索工具返回值""" - - data: list[dict[str, Any]] = Field(description="搜索结果") diff --git a/apps/scheduler/call/search/search.py b/apps/scheduler/call/search/search.py deleted file mode 100644 index b32bd53e679dd6c3f5ca9225f318e7f68ac44797..0000000000000000000000000000000000000000 --- a/apps/scheduler/call/search/search.py +++ /dev/null @@ -1,39 +0,0 @@ -"""搜索工具""" - -from collections.abc import AsyncGenerator -from typing import Any - -from apps.scheduler.call.core import CoreCall -from apps.schemas.enum_var import LanguageType -from apps.schemas.scheduler import ( - CallError, - CallInfo, - CallOutputChunk, - CallVars, -) - -from .schema import SearchInput, SearchOutput - - -class Search(CoreCall, input_model=SearchInput, output_model=SearchOutput): - """搜索工具""" - - @classmethod - def info(cls, language: LanguageType = LanguageType.CHINESE) -> CallInfo: - """返回Call的名称和描述""" - i18n_info = { - LanguageType.CHINESE: CallInfo(name="搜索", description="获取搜索引擎的结果"), - LanguageType.ENGLISH: CallInfo(name="Search", description="Get the results of the search engine."), - } - return i18n_info[language] - - - async def _init(self, call_vars: CallVars) -> SearchInput: - """初始化工具""" - pass - - - async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: - """执行工具""" - pass - diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index cbfd34cb38274bddbe8c2493f55dfe2ff8b8c424..992cf71c3ad4f3fe7728d48a5e2f7a9ed8c0de7c 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -8,9 +8,10 @@ import anyio from mcp.types import TextContent from pydantic import Field -from apps.llm.reasoning import ReasoningLLM -from apps.models.mcp import MCPInfo, MCPTools +from apps.constants import AGENT_FINAL_STEP_NAME, AGENT_MAX_RETRY_TIMES, AGENT_MAX_STEPS +from apps.models.mcp import MCPTools from apps.models.task import ExecutorHistory +from apps.scheduler.call.slot.slot import Slot from apps.scheduler.executor.base import BaseExecutor from apps.scheduler.mcp_agent.host import MCPHost from apps.scheduler.mcp_agent.plan import MCPPlanner @@ -20,26 +21,19 @@ from apps.schemas.mcp import Step from apps.schemas.message import FlowParams from apps.services.appcenter import AppCenterManager from apps.services.mcp_service import MCPServiceManager -from apps.services.task import TaskManager from apps.services.user import UserManager _logger = logging.getLogger(__name__) -_FINAL_TOOL_ID = "FIANL" class MCPAgentExecutor(BaseExecutor): """MCP Agent执行器""" - max_steps: int = Field(default=40, description="最大步数") agent_id: uuid.UUID = Field(default=uuid.uuid4(), description="App ID作为Agent ID") agent_description: str = Field(default="", description="Agent描述") tools: dict[str, MCPTools] = Field( description="MCP工具列表,key为tool_id", default={}, ) - tool_list: list[MCPTools] = Field( - description="MCP工具列表,包含所有MCP工具", - default=[], - ) params: FlowParams | bool | None = Field( default=None, description="流执行过程中的参数补充", @@ -50,10 +44,19 @@ class MCPAgentExecutor(BaseExecutor): """初始化MCP Agent""" # 初始化必要变量 self._step_cnt = 0 + self._retry_times = 0 self._mcp_pool = MCPPool() + self._mcp_list = [] + self._current_input = {} # 初始化MCP Host相关对象 - self.planner = MCPPlanner(self.task.runtime.userInput, self.llm, self.task.runtime.language) - self.host = MCPHost(self.task.metadata.userSub, self.llm) + self._planner = MCPPlanner(self.task.runtime.userInput, self.llm, self.task.runtime.language) + self._host = MCPHost(self.task.metadata.userSub, self.llm) + user = await UserManager.get_user(self.task.metadata.userSub) + if not user: + err = "[MCPAgentExecutor] 用户不存在: %s" + _logger.error(err) + raise RuntimeError(err) + self._user = user async def load_mcp(self) -> None: """加载MCP服务器列表""" @@ -62,8 +65,7 @@ class MCPAgentExecutor(BaseExecutor): app = await AppCenterManager.fetch_app_data_by_id(self.agent_id) mcp_ids = app.mcp_service for mcp_id in mcp_ids: - mcp_service = await MCPServiceManager.get_mcp_service(mcp_id) - if self.task.metadata.userSub not in mcp_service.activated: + if not await MCPServiceManager.is_user_actived(self.task.metadata.userSub, mcp_id): _logger.warning( "[MCPAgentExecutor] 用户 %s 未启用MCP %s", self.task.metadata.userSub, @@ -71,19 +73,17 @@ class MCPAgentExecutor(BaseExecutor): ) continue - self.mcp_list.append(mcp_service) - await self.mcp_pool.init_mcp(mcp_id, self.task.metadata.userSub) - for tool in mcp_service.tools: - self.tools[tool.id] = tool - self.tool_list.extend(mcp_service.tools) - self.tools[_FINAL_TOOL_ID] = MCPTools( - id=_FINAL_TOOL_ID, mcpId="", toolName="Final Tool", description="结束流程的工具", + mcp_service = await MCPServiceManager.get_mcp_service(mcp_id) + if mcp_service: + self._mcp_list.append(mcp_service) + + for tool in await MCPServiceManager.get_mcp_tools(mcp_id): + self.tools[tool.id] = tool + + self.tools[AGENT_FINAL_STEP_NAME] = MCPTools( + id=AGENT_FINAL_STEP_NAME, mcpId="", toolName="Final Tool", description="结束流程的工具", inputSchema={}, outputSchema={}, ) - self.tool_list.append(MCPTools( - id=_FINAL_TOOL_ID, mcpId="", toolName="Final Tool", description="结束流程的工具", - inputSchema={}, outputSchema={}), - ) async def get_tool_input_param(self, *, is_first: bool) -> None: """获取工具输入参数""" @@ -95,7 +95,7 @@ class MCPAgentExecutor(BaseExecutor): if is_first: # 获取第一个输入参数 mcp_tool = self.tools[self.task.state.stepName] - self.task.state.currentInput = await self.host.get_first_input_params( + self._current_input = await self._host.get_first_input_params( mcp_tool, self.task.runtime.userInput, self.task, ) else: @@ -107,7 +107,7 @@ class MCPAgentExecutor(BaseExecutor): params = {} params_description = "" mcp_tool = self.tools[self.task.state.stepName] - self.task.state.currentInput = await self.host.fill_params( + self.task.state.currentInput = await self._host.fill_params( mcp_tool, self.task.runtime.userInput, self.task.state.currentInput, @@ -126,10 +126,9 @@ class MCPAgentExecutor(BaseExecutor): # 发送确认消息 mcp_tool = self.tools[self.task.state.stepName] - confirm_message = await self.planner.get_tool_risk( - mcp_tool, self.task.state.currentInput, "", self.resoning_llm, self.task.runtime.language, + confirm_message = await self._planner.get_tool_risk( + mcp_tool, self._current_input, "", self.llm, self.task.runtime.language, ) - await self.update_tokens() await self._push_message( EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(exclude_none=True, by_alias=True), ) @@ -162,27 +161,41 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.executorStatus = ExecutorStatus.RUNNING self.task.state.stepStatus = StepStatus.RUNNING mcp_tool = self.tools[self.task.state.stepName] - mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.task.metadata.userSub)) + mcp_client = await self._mcp_pool.get(mcp_tool.mcpId, self.task.metadata.userSub) + if not mcp_client: + _logger.exception("[MCPAgentExecutor] MCP客户端不存在: %s", mcp_tool.mcpId) + self.task.state.stepStatus = StepStatus.ERROR + self.task.state.errorMessage = { + "err_msg": f"MCP客户端不存在: {mcp_tool.mcpId}", + "data": self._current_input, + } + return + try: - output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.currentInput) - except anyio.ClosedResourceError: - _logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcp_id) - await self.mcp_pool.stop(mcp_tool.mcp_id, self.task.metadata.userSub) - await self.mcp_pool.init_mcp(mcp_tool.mcp_id, self.task.metadata.userSub) + output_data = await mcp_client.call_tool(mcp_tool.name, self._current_input) + except anyio.ClosedResourceError as e: + _logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcpId) + # 停止当前用户MCP进程 + await self._mcp_pool.stop(mcp_tool.mcpId, self.task.metadata.userSub) self.task.state.stepStatus = StepStatus.ERROR + self.task.state.errorMessage = { + "err_msg": str(e), + "data": self._current_input, + } return except Exception as e: - _logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误", mcp_tool.name) + _logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误", self.task.state.stepName) self.task.state.stepStatus = StepStatus.ERROR self.task.state.errorMessage = { "err_msg": str(e), - "data": self.task.state.currentInput, + "data": self._current_input, } return - _logger.error(f"当前工具名称: {mcp_tool.name}, 输出参数: {output_params}") - if output_params.isError: + + _logger.error("当前工具名称: %s, 输出参数: %s", self.task.state.stepName, output_data) + if output_data.isError: err = "" - for output in output_params.content: + for output in output_data.content: if isinstance(output, TextContent): err += output.text self.task.state.stepStatus = StepStatus.ERROR @@ -191,17 +204,17 @@ class MCPAgentExecutor(BaseExecutor): "data": {}, } return + message = "" - for output in output_params.content: + for output in output_data.content: if isinstance(output, TextContent): message += output.text - output_params = { + output_data = { "message": message, } - await self.update_tokens() - await self._push_message(EventType.STEP_INPUT, self.task.state.currentInput) - await self._push_message(EventType.STEP_OUTPUT, output_params) + await self._push_message(EventType.STEP_INPUT, self._current_input) + await self._push_message(EventType.STEP_OUTPUT, output_data) self.task.context.append( ExecutorHistory( taskId=self.task.metadata.id, @@ -212,8 +225,8 @@ class MCPAgentExecutor(BaseExecutor): executorId=self.task.state.executorId, executorName=self.task.state.executorName, executorStatus=self.task.state.executorStatus, - inputData=self.task.state.currentInput, - outputData=output_params, + inputData=self._current_input, + outputData=output_data, ), ) self.task.state.stepStatus = StepStatus.SUCCESS @@ -226,16 +239,15 @@ class MCPAgentExecutor(BaseExecutor): raise RuntimeError(err) mcp_tool = self.tools[self.task.state.stepName] - params_with_null = await self.planner.get_missing_param( + params_with_null = await self._planner.get_missing_param( mcp_tool, - self.task.state.currentInput, + self._current_input, self.task.state.errorMessage, ) - await self.update_tokens() - error_message = await self.planner.change_err_message_to_description( + error_message = await self._planner.change_err_message_to_description( error_message=self.task.state.errorMessage, tool=mcp_tool, - input_params=self.task.state.currentInput, + input_params=self._current_input, ) await self._push_message( EventType.STEP_WAITING_FOR_PARAM, data={"message": error_message, "params": params_with_null}, @@ -269,32 +281,31 @@ class MCPAgentExecutor(BaseExecutor): _logger.error(err) raise RuntimeError(err) - if self.step_cnt < self.max_steps: - self.step_cnt += 1 - history = await self.host.assemble_memory(self.task.runtime, self.task.context) + if self._step_cnt < AGENT_MAX_STEPS: + self._step_cnt += 1 + history = await self._host.assemble_memory(self.task.runtime, self.task.context) max_retry = 3 step = None for _ in range(max_retry): try: - step = await self.planner.create_next_step(history, self.tool_list) + step = await self._planner.create_next_step(history, self.tool_list) if step.tool_id in self.tools: break except Exception: _logger.exception("[MCPAgentExecutor] 获取下一步失败,重试中...") if step is None or step.tool_id not in self.tools: step = Step( - tool_id=_FINAL_TOOL_ID, - description=_FINAL_TOOL_ID, + tool_id=AGENT_FINAL_STEP_NAME, + description=AGENT_FINAL_STEP_NAME, ) step_description = step.description self.task.state.stepId = uuid.uuid4() self.task.state.stepName = step.tool_id self.task.state.stepDescription = step_description self.task.state.stepStatus = StepStatus.INIT - self.task.state.currentInput = {} else: # 没有下一步了,结束流程 - self.task.state.toolId = _FINAL_TOOL_ID + self.task.state.toolId = AGENT_FINAL_STEP_NAME async def error_handle_after_step(self) -> None: """步骤执行失败后的错误处理""" @@ -326,7 +337,7 @@ class MCPAgentExecutor(BaseExecutor): ), ) - async def work(self) -> None: + async def work(self) -> None: # noqa: C901, PLR0912, PLR0915 """执行当前步骤""" if not self.task.state: err = "[MCPAgentExecutor] 任务状态不存在" @@ -339,8 +350,7 @@ class MCPAgentExecutor(BaseExecutor): data={}, ) await self.get_tool_input_param(is_first=True) - user_info = await UserManager.get_user(self.task.metadata.userSub) - if not user_info.auto_execute: + if not self._user.autoExecute: # 等待用户确认 await self.confirm_before_step() return @@ -376,10 +386,10 @@ class MCPAgentExecutor(BaseExecutor): break elif self.task.state.stepStatus == StepStatus.ERROR: # 错误处理 - if self.task.state.retry_times >= 3: + if self._retry_times >= AGENT_MAX_RETRY_TIMES: await self.error_handle_after_step() else: - user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) + user_info = await UserManager.get_user(self.task.metadata.userSub) if user_info.auto_execute: await self._push_message( EventType.STEP_ERROR, @@ -412,9 +422,9 @@ class MCPAgentExecutor(BaseExecutor): await self.get_next_step() else: mcp_tool = self.tools[self.task.state.toolId] - is_param_error = await self.planner.is_param_error( - self.task.runtime.question, - await self.host.assemble_memory(self.task.runtime, self.task.context), + is_param_error = await self._planner.is_param_error( + self.task.runtime.userInput, + await self._host.assemble_memory(self.task.runtime, self.task.context), self.task.state.errorMessage, mcp_tool, self.task.state.stepDescription, @@ -459,10 +469,10 @@ class MCPAgentExecutor(BaseExecutor): async def summarize(self) -> None: """总结""" - async for chunk in self.planner.generate_answer( + async for chunk in self._planner.generate_answer( self.task.runtime.userInput, - (await self.host.assemble_memory(self.task.runtime, self.task.context)), - self.resoning_llm, + (await self._host.assemble_memory(self.task.runtime, self.task.context)), + self.llm, self.task.runtime.language, ): await self._push_message( @@ -471,7 +481,7 @@ class MCPAgentExecutor(BaseExecutor): ) self.task.runtime.fullAnswer += chunk - async def run(self) -> None: + async def run(self) -> None: # noqa: C901 """执行MCP Agent的主逻辑""" if not self.task.state: err = "[MCPAgentExecutor] 任务状态不存在" @@ -479,68 +489,48 @@ class MCPAgentExecutor(BaseExecutor): raise RuntimeError(err) # 初始化MCP服务 - await self.load_state() await self.load_mcp() + data = {} if self.task.state.executorStatus == ExecutorStatus.INIT: # 初始化状态 - try: - self.task.state.executorId = str(uuid.uuid4()) - self.task.state.executorName = (await self.planner.get_flow_name( - self.task.runtime.question, self.resoning_llm, self.task.runtime.language - )).flow_name - await TaskManager.save_task(self.task.metadata.id, self.task) - await self.get_next_step() - except Exception as e: - _logger.exception("[MCPAgentExecutor] 初始化失败") - self.task.state.executorStatus = ExecutorStatus.ERROR - self.task.state.errorMessage = str(e) - await self._push_message( - EventType.FLOW_FAILED, - data={}, - ) - return + self.task.state.executorId = str(uuid.uuid4()) + self.task.state.executorName = (await self._planner.get_flow_name()).flow_name + flow_risk = await self._planner.get_flow_excute_risk(self.tool_list, self.task.language) + if self._user.autoExecute: + data = flow_risk.model_dump(exclude_none=True, by_alias=True) + await self.get_next_step() + self.task.state.executorStatus = ExecutorStatus.RUNNING - await self._push_message( - EventType.FLOW_START, - data={}, - ) - if self.task.state.toolId == _FINAL_TOOL_ID: + await self._push_message(EventType.FLOW_START, data=data) + + if self.task.state.stepName == AGENT_FINAL_STEP_NAME: # 如果已经是最后一步,直接结束 - self.state.executorStatus = ExecutorStatus.SUCCESS - await self._push_message( - EventType.FLOW_SUCCESS, - data={}, - ) + self.task.state.executorStatus = ExecutorStatus.SUCCESS + await self._push_message(EventType.FLOW_SUCCESS, data={}) await self.summarize() return + try: while self.task.state.executorStatus == ExecutorStatus.RUNNING: - if self.state.toolId == _FINAL_TOOL_ID: - break await self.work() - await TaskManager.save_task(self.task.metadata.id, self.task) - if self.state.toolId == _FINAL_TOOL_ID: + + if self.task.state.stepName == AGENT_FINAL_STEP_NAME: # 如果已经是最后一步,直接结束 self.task.state.executorStatus = ExecutorStatus.SUCCESS self.task.state.stepStatus = StepStatus.SUCCESS - await self._push_message( - EventType.FLOW_SUCCESS, - data={}, - ) + await self._push_message(EventType.FLOW_SUCCESS, data={}) await self.summarize() except Exception as e: _logger.exception("[MCPAgentExecutor] 执行过程中发生错误") self.task.state.executorName = ExecutorStatus.ERROR - self.task.state.errorMessage = str(e) + self.task.state.errorMessage = { + "err_msg": str(e), + "data": {}, + } self.task.state.stepStatus = StepStatus.ERROR - await self._push_message( - EventType.STEP_ERROR, - data={}, - ) - await self._push_message( - EventType.FLOW_FAILED, - data={}, - ) + await self._push_message(EventType.STEP_ERROR, data={}) + await self._push_message(EventType.FLOW_FAILED, data={}) + if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId: del self.task.context[-1] self.task.context.append( @@ -558,8 +548,8 @@ class MCPAgentExecutor(BaseExecutor): ), ) finally: - for mcp_service in self.mcp_list: + for mcp_service in self._mcp_list: try: - await self.mcp_pool.stop(mcp_service.id, self.task.metadata.userSub) + await self._mcp_pool.stop(mcp_service.id, self.task.metadata.userSub) except Exception: _logger.exception("[MCPAgentExecutor] 停止MCP客户端时发生错误") diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index 2b3f25a49fa9cd3473afd9a74e985641de8d1986..e94431a4a54ef4259b95842bde28d9fef95be9b2 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -79,8 +79,6 @@ class FlowExecutor(BaseExecutor): self.state and self.state.executorStatus not in [ExecutorStatus.INIT, ExecutorStatus.UNKNOWN] ): - self.context = await TaskManager.get_context_by_task_id(self.task.id) - else: # 创建ExecutorState self.state = ExecutorCheckpoint( taskId=self.task.id, diff --git a/apps/scheduler/executor/prompt.py b/apps/scheduler/executor/prompt.py new file mode 100644 index 0000000000000000000000000000000000000000..8aea17705303f89d1b2f5f122249817d6c6cd779 --- /dev/null +++ b/apps/scheduler/executor/prompt.py @@ -0,0 +1,70 @@ +"""Executor相关大模型提示词""" + +from apps.schemas.enum_var import LanguageType + +EXECUTOR_REASONING: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" + + + 你是一个可以使用工具的智能助手。 + 在回答用户的问题时,你为了获取更多的信息,使用了一个工具。 + 请简明扼要地总结工具的使用过程,提供你的见解,并给出下一步的行动。 + + 注意: + 工具的相关信息在标签中给出。 + 为了使你更好的理解发生了什么,你之前的思考过程在标签中给出。 + 输出时请不要包含XML标签,输出时请保持简明和清晰。 + + + + + {tool_name} + {tool_description} + {tool_output} + + + + {last_thought} + + + + 你当前需要解决的问题是: + {user_question} + + + 请综合以上信息,再次一步一步地进行思考,并给出见解和行动: + """, + LanguageType.ENGLISH: r""" + + + You are an intelligent assistant who can use tools. + When answering user questions, you use a tool to get more information. + Please summarize the process of using the tool briefly, provide your insights, \ +and give the next action. + + Note: + The information about the tool is given in the tag. + To help you better understand what happened, your previous thought process is given in the \ + tag. + Do not include XML tags in the output, and keep the output brief and clear. + + + + + {tool_name} + {tool_description} + {tool_output} + + + + {last_thought} + + + + The question you need to solve is: + {user_question} + + + Please integrate the above information, think step by step again, provide insights, and give actions: + """, +} diff --git a/apps/scheduler/executor/qa.py b/apps/scheduler/executor/qa.py index d5d8190d90cf39d9e4592a319fe4ce822298452f..3ff995e703554da9a8daa35cff9d329183d2ae2c 100644 --- a/apps/scheduler/executor/qa.py +++ b/apps/scheduler/executor/qa.py @@ -1,11 +1,12 @@ """用于执行智能问答的Executor""" + import logging import uuid from datetime import UTC, datetime from textwrap import dedent from apps.models.document import Document -from apps.schemas.enum_var import EventType +from apps.schemas.enum_var import EventType, ExecutorStatus from apps.schemas.message import DocumentAddContent, TextAddContent from apps.schemas.rag_data import RAGEventData from apps.schemas.record import RecordDocument @@ -76,7 +77,9 @@ class QAExecutor(BaseExecutor): full_answer = "" try: - async for chunk in RAG.chat_with_llm_base_on_rag(user_sub, llm, history, doc_ids, rag_data): + async for chunk in RAG.chat_with_llm_base_on_rag( + user_sub, llm, history, doc_ids, rag_data + ): task, content_obj = await self._push_rag_chunk(task, queue, chunk) if not content_obj: continue @@ -87,9 +90,8 @@ class QAExecutor(BaseExecutor): task.runtime.documents.append(content_obj.content) task.state.flow_status = ExecutorStatus.SUCCESS except Exception as e: - logger.error(f"[Scheduler] RAG服务发生错误: {e}") + _logger.error(f"[Scheduler] RAG服务发生错误: {e}") task.state.flow_status = ExecutorStatus.ERROR # 保存答案 - task.runtime.answer = full_answer - task.tokens.full_time = round(datetime.now(UTC).timestamp(), 2) - task.tokens.time - await TaskManager.save_task(task.id, task) + self.task.runtime.fullAnswer = full_answer + self.task.runtime.fullTime = round(datetime.now(UTC).timestamp(), 2) - self.task.runtime.time diff --git a/apps/scheduler/mcp/host.py b/apps/scheduler/mcp/host.py index ec2e004d55a4bfdbf66f67aedeb81b68bf9ba02a..de4d39df5f0c9eab1d2f170fd652077a65d21df5 100644 --- a/apps/scheduler/mcp/host.py +++ b/apps/scheduler/mcp/host.py @@ -18,6 +18,7 @@ from apps.scheduler.pool.mcp.client import MCPClient from apps.scheduler.pool.mcp.pool import MCPPool from apps.schemas.enum_var import ExecutorStatus, LanguageType, StepStatus from apps.schemas.mcp import MCPPlanItem +from apps.schemas.scheduler import LLMConfig from apps.services.mcp_service import MCPServiceManager from apps.services.task import TaskManager @@ -27,13 +28,10 @@ logger = logging.getLogger(__name__) class MCPHost: """MCP宿主服务""" - def __init__( - self, user_sub: str, task_id: uuid.UUID, runtime_id: str, runtime_name: str, - language: LanguageType, - ) -> None: + def __init__(self, user_sub: str,task_id: uuid.UUID, llm: LLMConfig) -> None: """初始化MCP宿主""" - self._user_sub = user_sub self._task_id = task_id + self._user_sub = user_sub # 注意:runtime在工作流中是flow_id和step_description,在Agent中可为标识Agent的id和description self._runtime_id = runtime_id self._runtime_name = runtime_name @@ -44,7 +42,9 @@ class MCPHost: trim_blocks=True, lstrip_blocks=True, ) - self.language = language + + async def init(self) -> None: + """初始化MCP宿主""" async def get_client(self, mcp_id: str) -> MCPClient | None: @@ -123,7 +123,6 @@ class MCPHost: return {} self._context_list.append(context.id) context.append(context) - await TaskManager.save_task(self._task_id, task) return output_data diff --git a/apps/scheduler/mcp/plan.py b/apps/scheduler/mcp/plan.py index 78885687a8c88200295e2a41442108aa058eaf7e..448a4081e3c9171fc1fddac3fba86b5a076ef397 100644 --- a/apps/scheduler/mcp/plan.py +++ b/apps/scheduler/mcp/plan.py @@ -1,33 +1,35 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP 用户目标拆解与规划""" +import logging + from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from apps.llm.function import JsonGenerator -from apps.llm.reasoning import ReasoningLLM from apps.models.mcp import MCPTools from apps.schemas.enum_var import LanguageType from apps.schemas.mcp import MCPPlan +from apps.schemas.scheduler import LLMConfig from .prompt import CREATE_PLAN, FINAL_ANSWER +_logger = logging.getLogger(__name__) class MCPPlanner: """MCP 用户目标拆解与规划""" - def __init__(self, user_goal: str, language: LanguageType) -> None: + def __init__(self, user_goal: str, language: LanguageType, llm: LLMConfig) -> None: """初始化MCP规划器""" - self.user_goal = user_goal + self._user_goal = user_goal self._env = SandboxedEnvironment( loader=BaseLoader, autoescape=True, trim_blocks=True, lstrip_blocks=True, ) - self.input_tokens = 0 - self.output_tokens = 0 - self.language = language + self._language = language + self._llm = llm async def create_plan(self, tool_list: list[MCPTools], max_steps: int = 6) -> MCPPlan: @@ -42,9 +44,9 @@ class MCPPlanner: async def _get_reasoning_plan(self, tool_list: list[MCPTools], max_steps: int) -> str: """获取推理大模型的结果""" # 格式化Prompt - template = self._env.from_string(CREATE_PLAN[self.language]) + template = self._env.from_string(CREATE_PLAN[self._language]) prompt = template.render( - goal=self.user_goal, + goal=self._user_goal, tools=tool_list, max_num=max_steps, ) @@ -54,31 +56,31 @@ class MCPPlanner: {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}, ] - reasoning_llm = ReasoningLLM() result = "" - async for chunk in reasoning_llm.call( + async for chunk in self._llm.reasoning.call( message, streaming=False, temperature=0.07, result_only=True, ): result += chunk - - # 保存token用量 - self.input_tokens = reasoning_llm.input_tokens - self.output_tokens = reasoning_llm.output_tokens - return result async def _parse_plan_result(self, result: str, max_steps: int) -> MCPPlan: """将推理结果解析为结构化数据""" + if not self._llm.function: + err = "[MCPPlanner] 未设置Function模型" + _logger.error(err) + raise RuntimeError(err) + # 格式化Prompt schema = MCPPlan.model_json_schema() schema["properties"]["plans"]["maxItems"] = max_steps # 使用Function模型解析结果 json_generator = JsonGenerator( + self._llm.function, result, [ {"role": "system", "content": "You are a helpful assistant."}, @@ -92,23 +94,19 @@ class MCPPlanner: async def generate_answer(self, plan: MCPPlan, memory: str) -> str: """生成最终回答""" - template = self._env.from_string(FINAL_ANSWER[self.language]) + template = self._env.from_string(FINAL_ANSWER[self._language]) prompt = template.render( plan=plan, memory=memory, - goal=self.user_goal, + goal=self._user_goal, ) - llm = ReasoningLLM() result = "" - async for chunk in llm.call( + async for chunk in self._llm.reasoning.call( [{"role": "user", "content": prompt}], streaming=False, temperature=0.07, ): result += chunk - self.input_tokens = llm.input_tokens - self.output_tokens = llm.output_tokens - return result diff --git a/apps/scheduler/mcp/select.py b/apps/scheduler/mcp/select.py index e0ae86e10931b67de5431161a7ab2692f3566aa9..3594b9d891034392197e66f7cc51195c399ca20c 100644 --- a/apps/scheduler/mcp/select.py +++ b/apps/scheduler/mcp/select.py @@ -6,12 +6,9 @@ import logging from sqlalchemy import select from apps.common.postgres import postgres -from apps.llm.embedding import Embedding -from apps.llm.function import FunctionLLM -from apps.llm.reasoning import ReasoningLLM from apps.models.mcp import MCPTools -from apps.models.vectors import MCPToolVector from apps.schemas.mcp import MCPSelectResult +from apps.schemas.scheduler import LLMConfig from apps.services.mcp_service import MCPServiceManager logger = logging.getLogger(__name__) @@ -20,32 +17,31 @@ logger = logging.getLogger(__name__) class MCPSelector: """MCP选择器""" - def __init__(self) -> None: - """初始化助手类""" - self.input_tokens = 0 - self.output_tokens = 0 - + def __init__(self, llm: LLMConfig) -> None: + """初始化MCP选择器""" + self._llm = llm async def _call_reasoning(self, prompt: str) -> str: """调用大模型进行推理""" - logger.info("[MCPHelper] 调用推理大模型") - llm = ReasoningLLM() + logger.info("[MCPSelector] 调用推理大模型") message = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}, ] result = "" - async for chunk in llm.call(message): + async for chunk in self._llm.reasoning.call(message): result += chunk - self.input_tokens += llm.input_tokens - self.output_tokens += llm.output_tokens return result async def _call_function_mcp(self, reasoning_result: str, mcp_ids: list[str]) -> MCPSelectResult: """调用结构化输出小模型提取JSON""" - logger.info("[MCPHelper] 调用结构化输出小模型") - llm = FunctionLLM() + if not self._llm.function: + err = "[MCPSelector] 未设置Function模型" + logger.error(err) + raise RuntimeError(err) + + logger.info("[MCPSelector] 调用结构化输出小模型") message = [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": reasoning_result}, @@ -53,23 +49,27 @@ class MCPSelector: schema = MCPSelectResult.model_json_schema() # schema中加入选项 schema["properties"]["mcp_id"]["enum"] = mcp_ids - result = await llm.call(messages=message, schema=schema) + result = await self._llm.function.call(messages=message, schema=schema) try: result = MCPSelectResult.model_validate(result) except Exception: - logger.exception("[MCPHelper] 解析MCP Select Result失败") + logger.exception("[MCPSelector] 解析MCP Select Result失败") raise return result - @staticmethod - async def select_top_tool(query: str, mcp_list: list[str], top_n: int = 10) -> list[MCPTools]: + async def select_top_tool(self, query: str, mcp_list: list[str], top_n: int = 10) -> list[MCPTools]: """选择最合适的工具""" - query_embedding = await Embedding.get_embedding([query]) + if not self._llm.embedding: + err = "[MCPSelector] 未设置Embedding模型" + logger.error(err) + raise RuntimeError(err) + + query_embedding = await self._llm.embedding.get_embedding([query]) async with postgres.session() as session: tool_vecs = await session.scalars( - select(MCPToolVector).where(MCPToolVector.mcpId.in_(mcp_list)) - .order_by(MCPToolVector.embedding.cosine_distance(query_embedding)).limit(top_n), + select(self._llm.embedding.MCPToolVector).where(self._llm.embedding.MCPToolVector.mcpId.in_(mcp_list)) + .order_by(self._llm.embedding.MCPToolVector.embedding.cosine_distance(query_embedding)).limit(top_n), ) # 拿到工具 diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py index cad4256bae6d481bd496cddf1a5b98a983d198f8..03c0ad4aa53b017e0ffecedaf70cbfcc5180e920 100644 --- a/apps/scheduler/mcp_agent/base.py +++ b/apps/scheduler/mcp_agent/base.py @@ -5,22 +5,27 @@ import logging from typing import Any from apps.llm.function import JsonGenerator -from apps.models.llm import LLMData +from apps.schemas.enum_var import LanguageType +from apps.schemas.scheduler import LLMConfig +from apps.schemas.task import TaskData -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) class MCPBase: """MCP基类""" - user_sub: str + _user_sub: str + _llm: LLMConfig + _goal: str + _language: LanguageType - def __init__(self, user_sub: str) -> None: + def __init__(self, task: TaskData, llm: LLMConfig) -> None: """初始化MCP基类""" - self.user_sub = user_sub - - async def init(self) -> None: - pass + self._user_sub = task.metadata.userSub + self._llm = llm + self._goal = task.runtime.userInput + self._language = task.runtime.language async def get_resoning_result(self, prompt: str) -> str: """获取推理结果""" @@ -30,7 +35,7 @@ class MCPBase: {"role": "user", "content": "Please provide a JSON response based on the above information and schema."}, ] result = "" - async for chunk in self.llm.call( + async for chunk in self._llm.reasoning.call( message, streaming=False, temperature=0.07, @@ -40,13 +45,18 @@ class MCPBase: return result - @staticmethod - async def _parse_result(result: str, schema: dict[str, Any]) -> dict[str, Any]: + async def _parse_result(self, result: str, schema: dict[str, Any]) -> dict[str, Any]: """解析推理结果""" + if not self._llm.function: + err = "[MCPBase] 未找到函数调用模型" + _logger.error(err) + raise RuntimeError(err) + json_result = await JsonGenerator.parse_result_by_stack(result, schema) if json_result is not None: return json_result json_generator = JsonGenerator( + self._llm.function, "Please provide a JSON response based on the above information and schema.\n\n", [ {"role": "system", "content": "You are a helpful assistant."}, diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index 3ba5a8666eedb669698459eaf77c426669bec825..b41b837223c0e86622d09006af962daddf905ec5 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -9,7 +9,6 @@ from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment from apps.llm.function import JsonGenerator -from apps.llm.reasoning import ReasoningLLM from apps.models.mcp import MCPTools from apps.models.task import ExecutorHistory, TaskRuntime from apps.scheduler.mcp.prompt import MEMORY_TEMPLATE @@ -17,7 +16,7 @@ from apps.scheduler.mcp_agent.base import MCPBase from apps.scheduler.mcp_agent.prompt import GEN_PARAMS, REPAIR_PARAMS from apps.schemas.enum_var import LanguageType -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) _env = SandboxedEnvironment( loader=BaseLoader, autoescape=False, @@ -32,7 +31,7 @@ def tojson_filter(value: dict[str, Any]) -> str: _env.filters["tojson"] = tojson_filter -LLM_QUERY_FIX = { +_LLM_QUERY_FIX = { LanguageType.CHINESE: "请生成修复之后的工具参数", LanguageType.ENGLISH: "Please generate the tool parameters after repair", } @@ -41,12 +40,6 @@ LLM_QUERY_FIX = { class MCPHost(MCPBase): """MCP宿主服务""" - def __init__(self, goal: str, llm: ReasoningLLM) -> None: - """初始化MCP宿主服务""" - super().__init__() - self.goal = goal - self.llm = llm - @staticmethod async def assemble_memory(runtime: TaskRuntime, context: list[ExecutorHistory]) -> str: """组装记忆""" @@ -58,19 +51,19 @@ class MCPHost(MCPBase): self, mcp_tool: MCPTools, current_goal: str, runtime: TaskRuntime, context: list[ExecutorHistory], ) -> dict[str, Any]: """填充工具参数""" - # 更清晰的输入·指令,这样可以调用generate + # 更清晰的输入指令,这样可以调用generate prompt = _env.from_string(GEN_PARAMS[runtime.language]).render( tool_name=mcp_tool.toolName, tool_description=mcp_tool.description, - goal=self.goal, + goal=self._goal, current_goal=current_goal, input_schema=mcp_tool.inputSchema, background_info=await self.assemble_memory(runtime, context), ) - logger.info("[MCPHost] 填充工具参数: %s", prompt) + _logger.info("[MCPHost] 填充工具参数: %s", prompt) result = await self.get_resoning_result(prompt) # 使用JsonGenerator解析结果 - return await MCPHost._parse_result( + return await self._parse_result( result, mcp_tool.inputSchema, ) @@ -85,10 +78,15 @@ class MCPHost(MCPBase): params: dict[str, Any] | None = None, params_description: str = "", ) -> dict[str, Any]: - llm_query = LLM_QUERY_FIX[language] + if not self._llm.function: + err = "[MCPHost] 未找到函数调用模型" + _logger.error(err) + raise RuntimeError(err) + + llm_query = _LLM_QUERY_FIX[language] prompt = _env.from_string(REPAIR_PARAMS[language]).render( tool_name=mcp_tool.toolName, - goal=self.goal, + goal=self._goal, current_goal=current_goal, tool_description=mcp_tool.description, input_schema=mcp_tool.inputSchema, @@ -99,6 +97,7 @@ class MCPHost(MCPBase): ) json_generator = JsonGenerator( + self._llm.function, llm_query, [ {"role": "system", "content": "You are a helpful assistant."}, diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index 33c626acf4c09d444ee40aa158c4591e4b8db5bc..b060d6ddf28e7db42a67546f195953eb4b7ae536 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -8,13 +8,13 @@ from typing import Any from jinja2 import BaseLoader from jinja2.sandbox import SandboxedEnvironment -from apps.llm.reasoning import ReasoningLLM from apps.models.mcp import MCPTools from apps.scheduler.mcp_agent.base import MCPBase from apps.scheduler.mcp_agent.prompt import ( CHANGE_ERROR_MESSAGE_TO_DESCRIPTION, FINAL_ANSWER, GEN_STEP, + GENERATE_FLOW_EXCUTE_RISK, GENERATE_FLOW_NAME, GET_MISSING_PARAMS, IS_PARAM_ERROR, @@ -24,6 +24,7 @@ from apps.scheduler.slot.slot import Slot from apps.schemas.enum_var import LanguageType from apps.schemas.mcp import ( FlowName, + FlowRisk, IsParamError, Step, ToolRisk, @@ -41,19 +42,10 @@ logger = logging.getLogger(__name__) class MCPPlanner(MCPBase): """MCP 用户目标拆解与规划""" - goal: str - llm: ReasoningLLM - - def __init__(self, user_sub: str, goal: str, language: LanguageType) -> None: - """初始化MCPPlanner""" - super().__init__(user_sub) - self.goal = goal - self.language = language - async def get_flow_name(self) -> FlowName: """获取当前流程的名称""" - template = _env.from_string(GENERATE_FLOW_NAME[self.language]) - prompt = template.render(goal=self.goal) + template = _env.from_string(GENERATE_FLOW_NAME[self._language]) + prompt = template.render(goal=self._goal) result = await self.get_resoning_result(prompt) result = await self._parse_result(result, FlowName.model_json_schema()) return FlowName.model_validate(result) @@ -61,8 +53,8 @@ class MCPPlanner(MCPBase): async def create_next_step(self, history: str, tools: list[MCPTools]) -> Step: """创建下一步的执行步骤""" # 获取推理结果 - template = _env.from_string(GEN_STEP[self.language]) - prompt = template.render(goal=self.goal, history=history, tools=tools) + template = _env.from_string(GEN_STEP[self._language]) + prompt = template.render(goal=self._goal, history=history, tools=tools) result = await self.get_resoning_result(prompt) # 解析为结构化数据 @@ -76,6 +68,22 @@ class MCPPlanner(MCPBase): # 使用Step模型解析结果 return Step.model_validate(step) + async def get_flow_excute_risk( + self, + tools: list[MCPTools], + language: LanguageType = LanguageType.CHINESE, + ) -> FlowRisk: + """获取当前流程的风险评估结果""" + template = _env.from_string(GENERATE_FLOW_EXCUTE_RISK[language]) + prompt = template.render( + goal=self._goal, + tools=tools, + ) + result = await self.get_resoning_result(prompt) + result = await self._parse_result(result, FlowRisk.model_json_schema()) + # 使用FlowRisk模型解析结果 + return FlowRisk.model_validate(result) + async def get_tool_risk( self, tool: MCPTools, @@ -84,7 +92,7 @@ class MCPPlanner(MCPBase): ) -> ToolRisk: """获取MCP工具的风险评估结果""" # 获取推理结果 - template = _env.from_string(RISK_EVALUATE[self.language]) + template = _env.from_string(RISK_EVALUATE[self._language]) prompt = template.render( tool_name=tool.toolName, tool_description=tool.description, @@ -108,9 +116,9 @@ class MCPPlanner(MCPBase): input_params: dict[str, Any], ) -> IsParamError: """判断错误信息是否是参数错误""" - tmplate = _env.from_string(IS_PARAM_ERROR[self.language]) + tmplate = _env.from_string(IS_PARAM_ERROR[self._language]) prompt = tmplate.render( - goal=self.goal, + goal=self._goal, history=history, step_id=tool.id, step_name=tool.toolName, @@ -129,7 +137,7 @@ class MCPPlanner(MCPBase): self, error_message: str, tool: MCPTools, input_params: dict[str, Any], ) -> str: """将错误信息转换为工具描述""" - template = _env.from_string(CHANGE_ERROR_MESSAGE_TO_DESCRIPTION[self.language]) + template = _env.from_string(CHANGE_ERROR_MESSAGE_TO_DESCRIPTION[self._language]) prompt = template.render( error_message=error_message, tool_name=tool.toolName, @@ -140,11 +148,11 @@ class MCPPlanner(MCPBase): return await self.get_resoning_result(prompt) async def get_missing_param( - self, tool: MCPTools, input_param: dict[str, Any], error_message: str, + self, tool: MCPTools, input_param: dict[str, Any], error_message: dict[str, Any], ) -> dict[str, Any]: """获取缺失的参数""" slot = Slot(schema=tool.inputSchema) - template = _env.from_string(GET_MISSING_PARAMS[self.language]) + template = _env.from_string(GET_MISSING_PARAMS[self._language]) schema_with_null = slot.add_null_to_basic_types() prompt = template.render( tool_name=tool.toolName, @@ -161,12 +169,12 @@ class MCPPlanner(MCPBase): self, memory: str, ) -> AsyncGenerator[str, None]: """生成最终回答""" - template = _env.from_string(FINAL_ANSWER[self.language]) + template = _env.from_string(FINAL_ANSWER[self._language]) prompt = template.render( memory=memory, - goal=self.goal, + goal=self._goal, ) - async for chunk in self.llm.call( + async for chunk in self._llm.reasoning.call( [{"role": "user", "content": prompt}], streaming=True, temperature=0.07, diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index b1a9d16fb845914115c0fd21982c9664071d8c38..63233cfade0ebc2995861efb594fbf896ea7e63a 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -55,6 +55,73 @@ user's goal. ), } +GENERATE_FLOW_EXCUTE_RISK: dict[LanguageType, str] = { + LanguageType.CHINESE: dedent( + r""" + 你是一个智能助手,你的任务是根据用户的目标和当前的工具集合,评估当前流程的风险。 + + # 样例 + ## 目标 + 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 + ## 工具集合 + 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 + + - mysql_analyzer 分析MySQL数据库性能 + - performance_tuner 调优数据库性能 + + ## 输出 + { + "risk": "high", + "reason": "当前目标实现带来的风险较高,因为需要通过performance_tuner工具对MySQL数据库进行调优,而该\ +工具可能会对数据库的性能和稳定性产生较大的影响,因此风险评估为高。" + } + + # 现在开始评估当前流程的风险: + ## 目标 + {{goal}} + ## 工具集合 + + {% for tool in tools %} + - {{tool.id}} {{tool.name}};{{tool.description}} + {% endfor %} + + ## 输出 + """, + ), + LanguageType.ENGLISH: dedent( + r""" + You are an intelligent assistant, your task is to evaluate the risk of the current process based on the \ +user's goal and the current tool set. + # Example + # Goal + I need to scan the current MySQL database, analyze performance bottlenecks, and optimize it. + # Tool Set + You can access and use some tools, which will be given in the XML tag. + + - mysql_analyzer Analyze MySQL database performance + - performance_tuner Tune database performance + + # Output + { + "risk": "high", + "reason": "The risk brought by the realization of the current goal is relatively high, because it \ +is necessary to tune the MySQL database through the performance_tuner tool, which may have a greater impact on \ +the performance and stability of the database. Therefore, the risk assessment is high." + } + # Now start evaluating the risk of the current process: + # Goal + {{goal}} + # Tool Set + + {% for tool in tools %} + - {{tool.id}} {{tool.name}}; {{tool.description}} + {% endfor %} + + # Output + """, + ), +} + GEN_STEP: dict[LanguageType, str] = { LanguageType.CHINESE: dedent( r""" @@ -803,7 +870,7 @@ GEN_PARAMS: dict[LanguageType, str] = { # 样例 # 工具信息 - < tool > + < name > mysql_analyzer < /name > < description > 分析MySQL数据库性能 < /description > < / tool > diff --git a/apps/scheduler/pool/mcp/pool.py b/apps/scheduler/pool/mcp/pool.py index fdd3e53fd7d4c8263ae47e0c91d55df740efa927..a130b08ae88752c29c53c0865f9f2cbd5941dea4 100644 --- a/apps/scheduler/pool/mcp/pool.py +++ b/apps/scheduler/pool/mcp/pool.py @@ -22,7 +22,7 @@ class MCPPool(metaclass=SingletonMeta): def __init__(self) -> None: """初始化MCP池""" - self.pool = {} + self.pool: dict[str, dict[str, MCPClient]] = {} async def init_mcp(self, mcp_id: str, user_sub: str) -> MCPClient | None: diff --git a/apps/scheduler/scheduler/prompt.py b/apps/scheduler/scheduler/prompt.py new file mode 100644 index 0000000000000000000000000000000000000000..73551d7a3f2d7bda25fcd08e2dcaa930b41752db --- /dev/null +++ b/apps/scheduler/scheduler/prompt.py @@ -0,0 +1,110 @@ +"""Scheduler相关的大模型提示词""" + +from apps.schemas.enum_var import LanguageType + +FLOW_SELECT: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" + + + 根据历史对话(包括工具调用结果)和用户问题,从给出的选项列表中,选出最符合要求的那一项。 + 在输出之前,请先思考,并使用“”标签给出思考过程。 + 结果需要使用JSON格式输出,输出格式为:{{ "choice": "选项名称" }} + + + + + 使用天气API,查询明天杭州的天气信息 + + + + API + HTTP请求,获得返回的JSON数据 + + + SQL + 查询数据库,获得数据库表中的数据 + + + + + + API 工具可以通过 API 来获取外部数据,而天气信息可能就存储在外部数据中,由于用户说明中明确 \ +提到了天气 API 的使用,因此应该优先使用 API 工具。SQL 工具用于从数据库中获取信息,考虑到天气数据的可变性和动态性\ +,不太可能存储在数据库中,因此 SQL 工具的优先级相对较低,最佳选择似乎是“API:请求特定 API,获取返回的 JSON 数据”。 + + + + {{ "choice": "API" }} + + + + + + + {question} + + + + {choice_list} + + + + + 让我们一步一步思考。 + """, + LanguageType.ENGLISH: r""" + + + Based on the historical dialogue (including tool call results) and user question, select the \ +most suitable option from the given option list. + Before outputting, please think carefully and use the "" tag to give the thinking \ +process. + The output needs to be in JSON format, the output format is: {{ "choice": "option name" }} + + + + + Use the weather API to query the weather information of Hangzhou \ +tomorrow + + + + API + HTTP request, get the returned JSON data + + + SQL + Query the database, get the data in the database table + + + + + + The API tool can get external data through API, and the weather information may be stored \ +in external data. Since the user clearly mentioned the use of weather API, it should be given priority to the API \ +tool. The SQL tool is used to get information from the database, considering the variability and dynamism of weather \ +data, it is unlikely to be stored in the database, so the priority of the SQL tool is relatively low, \ +The best choice seems to be "API: request a specific API, get the returned JSON data". + + + + {{ "choice": "API" }} + + + + + + {question} + + + + {choice_list} + + + + + Let's think step by step. + + + """, +} diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 3de6392a32a44095e388d4206050b0829dbff133..6bd3eff6dc00ad83a666193f82c491816f678306 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -9,7 +9,6 @@ from datetime import UTC, datetime from apps.common.queue import MessageQueue from apps.llm.embedding import Embedding from apps.llm.function import FunctionLLM -from apps.llm.patterns.rewrite import QuestionRewrite from apps.llm.reasoning import ReasoningLLM from apps.models.task import Task, TaskRuntime from apps.models.user import User @@ -76,6 +75,7 @@ class Scheduler: metadata=Task( id=task_id, userSub=user_sub, + conversationId=self.post_body.conversation_id, ), runtime=TaskRuntime( taskId=task_id, @@ -272,14 +272,6 @@ class Scheduler: logger.error("[Scheduler] 未找到Agent应用") return - if background.conversation and self.task.state.flow_status == ExecutorStatus.INIT: - try: - question_obj = QuestionRewrite() - post_body.question = await question_obj.generate( - history=background.conversation, question=post_body.question, llm=reasion_llm, - ) - except Exception: - logger.exception("[Scheduler] 问题重写失败") if app_metadata.app_type == AppType.FLOW.value: logger.info("[Scheduler] 获取工作流元数据") flow_info = await Pool().get_flow_metadata(app_info.app_id) diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py index 8b94684d13f12809d6d1b1638a6d789ff93da0c0..45280b0c7855f9414b178392fae1539a7d83e0f1 100644 --- a/apps/schemas/mcp.py +++ b/apps/schemas/mcp.py @@ -37,7 +37,7 @@ class MCPServerStdioConfig(MCPBasicConfig): class MCPServerSSEConfig(MCPBasicConfig): """MCP 服务器配置""" - url: str = Field(description="MCP 服务器地址", default="http://example.com/sse", pattern=r"^https?://.+$") + url: str = Field(description="MCP 服务器地址", default="http://example.com/sse", pattern=r"^https?://.*$") class MCPServerItem(BaseModel): @@ -60,18 +60,25 @@ class MCPServerConfig(MCPServerItem): author: str = Field(description="MCP 服务器上传者", default="") +class Risk(str, Enum): + """MCP工具风险类型""" + + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + + class FlowName(BaseModel): """MCP 流程名称""" flow_name: str = Field(description="MCP 流程名称", default="") -class Risk(str, Enum): - """MCP工具风险类型""" +class FlowRisk(BaseModel): + """MCP 流程风险评估结果""" - LOW = "low" - MEDIUM = "medium" - HIGH = "high" + risk: Risk = Field(description="风险类型", default=Risk.LOW) + reason: str = Field(description="风险原因", default="") class ToolRisk(BaseModel): diff --git a/apps/services/rag.py b/apps/services/rag.py index 43e35de8b0bd69acd7705edb8a27032f82d5acc3..77492da5eaa661116f51da5aea4a5ecb4e02835d 100644 --- a/apps/services/rag.py +++ b/apps/services/rag.py @@ -12,7 +12,6 @@ import httpx from fastapi import status from apps.common.config import config -from apps.llm.patterns.rewrite import QuestionRewrite from apps.llm.reasoning import ReasoningLLM from apps.llm.token import TokenCalculator from apps.schemas.enum_var import EventType, LanguageType