diff --git a/apps/common/queue.py b/apps/common/queue.py index 58731bcc49992c01a29ab8675e7fe580f04ccec6..a916c4607fb67646f2ad78c377d1d515058bc4b8 100644 --- a/apps/common/queue.py +++ b/apps/common/queue.py @@ -8,7 +8,6 @@ from collections.abc import AsyncGenerator from datetime import UTC, datetime from typing import Any -from apps.models.task import Task from apps.schemas.enum_var import EventType from apps.schemas.message import ( HeartbeatData, @@ -16,6 +15,8 @@ from apps.schemas.message import ( MessageFlow, MessageMetadata, ) +from apps.schemas.scheduler import LLMConfig +from apps.schemas.task import TaskData logger = logging.getLogger(__name__) @@ -35,43 +36,42 @@ class MessageQueue: self._close = False self._heartbeat_task = asyncio.get_event_loop().create_task(self._heartbeat()) - async def push_output(self, task: Task, event_type: str, data: dict[str, Any]) -> None: + async def push_output(self, task: TaskData, llm: LLMConfig, event_type: str, data: dict[str, Any]) -> None: """组装用于向用户(前端/Shell端)输出的消息""" if event_type == EventType.DONE.value: await self._queue.put("[DONE]") return # 计算当前Step时间 - step_time = round((datetime.now(UTC).timestamp() - task.tokens.time), 3) + step_time = round((datetime.now(UTC).timestamp() - task.runtime.time), 3) step_time = max(step_time, 0) metadata = MessageMetadata( timeCost=step_time, - inputTokens=task.tokens.input_tokens, - outputTokens=task.tokens.output_tokens, + inputTokens=llm.reasoning.input_tokens, + outputTokens=llm.reasoning.output_tokens, ) if task.state: # 如果使用了Flow flow = MessageFlow( - appId=task.state.app_id, - flowId=task.state.flow_id, - flowName=task.state.flow_name, - flowStatus=task.state.flow_status, - stepId=task.state.step_id, - stepName=task.state.step_name, - stepDescription=task.state.step_description, - stepStatus=task.state.step_status, + appId=task.state.appId, + flowId=task.state.executorId, + flowName=task.state.executorName, + flowStatus=task.state.executorStatus, + stepId=task.state.stepId, + stepName=task.state.stepName, + stepDescription=task.state.stepDescription, + stepStatus=task.state.stepStatus, ) else: flow = None message = MessageBase( event=event_type, - id=task.ids.record_id, - groupId=task.ids.group_id, - conversationId=task.ids.conversation_id, - taskId=task.id, + id=task.metadata.record_id, + conversationId=task.metadata.conversationId, + taskId=task.metadata.id, metadata=metadata, flow=flow, content=data, diff --git a/apps/constants.py b/apps/constants.py index 9c02fd8da5a09d1e699b5dbdb03f914a3292a727..53b1be8115e73213fc45aeee436f4ea7674ddba2 100644 --- a/apps/constants.py +++ b/apps/constants.py @@ -16,8 +16,10 @@ SLIDE_WINDOW_TIME = 15 OIDC_ACCESS_TOKEN_EXPIRE_TIME = 30 # OIDC 刷新Token 过期时间(分钟) OIDC_REFRESH_TOKEN_EXPIRE_TIME = 180 -# 滑动窗口限流 最大请求数 -SLIDE_WINDOW_QUESTION_COUNT = 10 +# 滑动窗口限流 最大请求数(按单个用户统计) +SLIDE_WINDOW_QUESTION_COUNT = 5 +# 全局同时运行任务上限(与用户无关) +MAX_CONCURRENT_TASKS = 30 # API Call 最大返回值长度(字符) MAX_API_RESPONSE_LENGTH = 8192 # Session时间,单位为分钟 diff --git a/apps/exceptions.py b/apps/exceptions.py index 1831246e3169f051c1e9dcf5c5d18ca62b74ed56..d546520f0f4f9da3ee3a898e4a586e627cab482f 100644 --- a/apps/exceptions.py +++ b/apps/exceptions.py @@ -21,9 +21,5 @@ class FlowEdgeValidationError(Exception): """service.flow 流程边验证错误""" -class LoginSettingsError(Exception): - """manager.session 登录设置错误""" - - class ActivityError(Exception): """service.activity 活动错误""" diff --git a/apps/llm/patterns/__init__.py b/apps/llm/patterns/__init__.py deleted file mode 100644 index 8a29c93cc645ac06dd121423c88de28e8f0d2a05..0000000000000000000000000000000000000000 --- a/apps/llm/patterns/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""LLM大模型Prompt模板""" - -from apps.llm.patterns.core import CorePattern -from apps.llm.patterns.executor import ( - ExecutorSummary, -) - -__all__ = [ - "CorePattern", - "ExecutorSummary", -] diff --git a/apps/llm/patterns/core.py b/apps/llm/patterns/core.py deleted file mode 100644 index dec94f5d2ac4282d46a6ef349f3c72a6c0e98713..0000000000000000000000000000000000000000 --- a/apps/llm/patterns/core.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""基础大模型范式抽象类""" - -from abc import ABC, abstractmethod -from textwrap import dedent - -from apps.schemas.enum_var import LanguageType - - -class CorePattern(ABC): - """基础大模型范式抽象类""" - - @staticmethod - def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: - """默认的Prompt内容;分别返回系统提示词和用户提示词""" - return {}, {} - - def __init__( - self, - system_prompt: dict[LanguageType, str] | None = None, - user_prompt: dict[LanguageType, str] | None = None, - ) -> None: - """ - 检查是否已经自定义了Prompt;有的话就用自定义的;同时对Prompt进行空格清除 - - :param system_prompt: 系统提示词,f-string格式 - :param user_prompt: 用户提示词,f-string格式 - """ - self.input_tokens = 0 - self.output_tokens = 0 - - self.system_prompt, self.user_prompt = self._default() - - if system_prompt is not None: - self.system_prompt = system_prompt - - if user_prompt is not None: - self.user_prompt = user_prompt - - self.system_prompt = {lang: dedent(prompt).strip("\n") for lang, prompt in self.system_prompt.items()} - self.user_prompt = {lang: dedent(prompt).strip("\n") for lang, prompt in self.user_prompt.items()} - - @abstractmethod - async def generate(self, **kwargs): # noqa: ANN003, ANN201 - """调用大模型,生成结果""" - raise NotImplementedError diff --git a/apps/llm/patterns/executor.py b/apps/llm/patterns/executor.py deleted file mode 100644 index 81861ab798326e023a1c2939da930c244ed65d2e..0000000000000000000000000000000000000000 --- a/apps/llm/patterns/executor.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""使用大模型生成Executor的思考内容""" - -from typing import TYPE_CHECKING - -from apps.llm.reasoning import ReasoningLLM -from apps.llm.snippet import convert_context_to_prompt, facts_to_prompt -from apps.schemas.enum_var import LanguageType - -from .core import CorePattern - -if TYPE_CHECKING: - from apps.schemas.scheduler import ExecutorBackground - - -class ExecutorSummary(CorePattern): - """使用大模型进行生成Executor初始背景""" - - @staticmethod - def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: - """默认的Prompt内容""" - return { - LanguageType.CHINESE: r"You are a helpful assistant.", - LanguageType.ENGLISH: r"You are a helpful assistant.", - }, { - LanguageType.CHINESE: r""" - - 根据给定的对话记录和关键事实,生成一个三句话背景总结。这个总结将用于后续对话的上下文理解。 - - 生成总结的要求如下: - 1. 突出重要信息点,例如时间、地点、人物、事件等。 - 2. “关键事实”中的内容可在生成总结时作为已知信息。 - 3. 输出时请不要包含XML标签,确保信息准确性,不得编造信息。 - 4. 总结应少于3句话,应少于300个字。 - - 对话记录将在标签中给出,关键事实将在标签中给出。 - - - {conversation} - - - {facts} - - - 现在,请开始生成背景总结: - """, - LanguageType.ENGLISH: r""" - - Based on the given conversation records and key facts, generate a three-sentence background \ -summary.This summary will be used for context understanding in subsequent conversations. - - The requirements for generating the summary are as follows: - 1. Highlight important information points, such as time, location, people, events, etc. - 2. The content in the "key facts" can be used as known information when generating the summary. - 3. Do not include XML tags in the output, ensure the accuracy of the information, and do not \ -make up information. - 4. The summary should be less than 3 sentences and less than 300 words. - - The conversation records will be given in the tag, and the key facts will be given \ -in the tag. - - - {conversation} - - - {facts} - - - Now, please start generating the background summary: - """, - } - - async def generate(self, **kwargs) -> str: # noqa: ANN003 - """进行初始背景生成""" - background: ExecutorBackground = kwargs["background"] - conversation_str = convert_context_to_prompt(background.conversation) - facts_str = facts_to_prompt(background.facts) - language = kwargs.get("language", LanguageType.CHINESE) - - messages = [ - {"role": "system", "content": "You are a helpful assistant."}, - { - "role": "user", - "content": self.user_prompt[language].format( - facts=facts_str, - conversation=conversation_str, - ), - }, - ] - - result = "" - llm = ReasoningLLM() - async for chunk in llm.call(messages, streaming=False, temperature=0.7): - result += chunk - self.input_tokens = llm.input_tokens - self.output_tokens = llm.output_tokens - - return result.strip().strip("\n") diff --git a/apps/llm/patterns/rewoo.py b/apps/llm/patterns/rewoo.py deleted file mode 100644 index 561e7d0926ef7c837ca903ff3a03cc782a672570..0000000000000000000000000000000000000000 --- a/apps/llm/patterns/rewoo.py +++ /dev/null @@ -1,354 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""规划生成命令行""" - -from apps.llm.reasoning import ReasoningLLM -from apps.schemas.enum_var import LanguageType - -from .core import CorePattern - - -class InitPlan(CorePattern): - """规划生成命令行""" - - @staticmethod - def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: - """默认的Prompt内容""" - return { - LanguageType.CHINESE: r"You are a helpful assistant.", - LanguageType.ENGLISH: r"You are a helpful assistant.", - }, { - LanguageType.CHINESE: r""" - - - 你是一个计划生成器。对于给定的目标,**制定一个简单的计划**,该计划可以逐步生成合适的命令行参数和标志。 - - 你会收到一个"命令前缀",这是已经确定和生成的命令部分。你需要基于这个前缀使用标志和参数来完成命令。 - - 在每一步中,指明使用哪个外部工具以及工具输入来获取证据。 - - 工具可以是以下之一: - (1) Option["指令"]:查询最相似的命令行标志。只接受一个输入参数,"指令"必须是搜索字符串。\ -搜索字符串应该详细且包含必要的数据。 - (2) Argument[名称]<值>:将任务中的数据放置到命令行的特定位置。接受两个输入参数。 - - 所有步骤必须以"Plan: "开头,且少于150个单词。 - 不要添加任何多余的步骤。 - 确保每个步骤都包含所需的所有信息 - 不要跳过步骤。 - 不要在证据后面添加任何额外数据。 - - - - - 开始示例 - - 任务:在后台运行一个新的alpine:latest容器,将主机/root文件夹挂载至/data,并执行top命令。 - 前缀:`docker run` - 用法:`docker run ${OPTS} ${image} ${command}`。这是一个Python模板字符串。OPTS是所有标志的\ -占位符。参数必须是 ["image", "command"] 其中之一。 - 前缀描述:二进制程序`docker`的描述为"Docker容器平台",`run`子命令的描述为"从镜像创建并运行一个新的容器"。 - - Plan: 我需要一个标志使容器在后台运行。 #E1 = Option[在后台运行单个容器] - Plan: 我需要一个标志,将主机/root目录挂载至容器内/data目录。 #E2 = \ -Option[挂载主机/root目录至/data目录] - Plan: 我需要从任务中解析出镜像名称。 #E3 = Argument[image] - Plan: 我需要指定容器中运行的命令。 #E4 = Argument[command] - Final: 组装上述线索,生成最终命令。 #F - - - - - 任务:{instruction} - 前缀:`{binary_name} {subcmd_name}` - 用法:`{subcmd_usage}`。这是一个Python模板字符串。OPTS是所有标志的占位符。参数必须是 {argument_list} \ -其中之一。 - 前缀描述:二进制程序`{binary_name}`的描述为"{binary_description}",`{subcmd_name}`子命令的描述为\ - "{subcmd_description}"。 - - 现在生成相应的计划: - """, - LanguageType.ENGLISH: r""" - - - You are a plan generator. For a given goal, **draft a simple plan** that can step-by-step \ -generate the appropriate command line arguments and flags. - - You will receive a "command prefix", which is the already determined and generated command \ -part. You need to use the flags and arguments based on this prefix to complete the command. - - In each step, specify which external tool to use and the tool input to get the evidence. - - The tool can be one of the following: - (1) Option["instruction"]: Query the most similar command line flag. Only accepts one input \ -parameter, "instruction" must be a search string. The search string should be detailed and contain necessary data. - (2) Argument["name"]: Place the data from the task into a specific position in the \ -command line. Accepts two input parameters. - - All steps must start with "Plan: " and be less than 150 words. - Do not add any extra steps. - Ensure each step contains all the required information - do not skip steps. - Do not add any extra data after the evidence. - - - - Task: Run a new alpine:latest container in the background, mount the host /root folder to \ -/data, and execute the top command. - Prefix: `docker run` - Usage: `docker run ${OPTS} ${image} ${command}`. This is a Python template string. OPTS is \ -a placeholder for all flags. The arguments must be one of ["image", "command"]. - Prefix description: The description of binary program `docker` is "Docker container platform"\ -, and the description of `run` subcommand is "Create and run a new container from an image". - - Plan: I need a flag to make the container run in the background. #E1 = Option[Run a single \ -container in the background] - Plan: I need a flag to mount the host /root directory to /data directory in the \ -container. #E2 = Option[Mounthost /root directory to /data directory] - Plan: I need to parse the image name from the task. #E3 = Argument[image] - Plan: I need to specify the command to be run in the container. #E4 = Argument[command] - Final: Assemble the above clues to generate the final command. #F - - - - Task: {instruction} - Prefix: `{binary_name} {subcmd_name}` - Usage: `{subcmd_usage}`. This is a Python template string. OPTS is a placeholder for all flags. \ -The arguments must be one of {argument_list}. - Prefix description: The description of binary program `{binary_name}` is "{binary_description}", \ -and the description of `{subcmd_name}` subcommand is "{subcmd_description}". - - Generate the corresponding plan now: - """, - } - - - async def generate(self, **kwargs) -> str: # noqa: ANN003 - """生成命令行evidence""" - spec = kwargs["spec"] - binary_name = kwargs["binary_name"] - subcmd_name = kwargs["subcmd_name"] - binary_description = spec[binary_name][0] - subcmd_usage = spec[binary_name][2][subcmd_name][1] - subcmd_description = spec[binary_name][2][subcmd_name][0] - language = kwargs.get("language", LanguageType.CHINESE) - - argument_list = [] - for key in spec[binary_name][2][subcmd_name][3]: - argument_list += [key] - - messages = [ - {"role": "system", "content": self.system_prompt[language]}, - {"role": "user", "content": self.user_prompt[language].format( - instruction=kwargs["instruction"], - binary_name=binary_name, - subcmd_name=subcmd_name, - binary_description=binary_description, - subcmd_description=subcmd_description, - subcmd_usage=subcmd_usage, - argument_list=argument_list, - )}, - ] - - result = "" - llm = ReasoningLLM() - async for chunk in llm.call(messages, streaming=False): - result += chunk - self.input_tokens = llm.input_tokens - self.output_tokens = llm.output_tokens - - return result - - -class PlanEvaluator(CorePattern): - """计划评估器""" - - @staticmethod - def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: - """默认的Prompt内容""" - return { - LanguageType.CHINESE: r"You are a helpful assistant.", - LanguageType.ENGLISH: r"You are a helpful assistant.", - }, { - LanguageType.CHINESE: r""" - - - 你是一个计划评估器。你的任务是评估给定的计划是否合理和完整。 - - 一个好的计划应该: - 1. 涵盖原始任务的所有要求 - 2. 使用适当的工具收集必要的信息 - 3. 具有清晰和逻辑的步骤 - 4. 没有冗余或不必要的步骤 - - 对于计划中的每个步骤,评估: - 1. 工具选择是否适当 - 2. 输入参数是否清晰和充分 - 3. 该步骤是否有助于实现最终目标 - - 请回复: - "VALID" - 如果计划良好且完整 - "INVALID: <原因>" - 如果计划有问题,请解释原因 - - - - 任务:{instruction} - 计划:{plan} - - 现在评估计划,并回复"VALID"或"INVALID: <原因>": - """, - LanguageType.ENGLISH: r""" - - - You are a plan replanner. When the plan is evaluated as invalid, you need to generate a new, \ -improved plan. - - The new plan should: - 1. Solve all problems mentioned in the evaluation - 2. Keep the same format as the original plan - 3. Be more precise and complete - 4. Use appropriate tools for each step - - Follow the same format as the original plan: - - Each step should start with "Plan: " - - Include tool usage with appropriate parameters - - Keep steps concise and focused - - End with the "Final" step - - - - Task: {instruction} - Original Plan: {plan} - Evaluation: {evaluation} - - Now evaluate the plan and reply "VALID" or "INVALID: ": - """, - } - - async def generate(self, **kwargs) -> str: # noqa: ANN003 - """生成计划评估结果""" - language = kwargs.get("language", LanguageType.CHINESE) - - messages = [ - {"role": "system", "content": self.system_prompt[language]}, - {"role": "user", "content": self.user_prompt[language].format( - instruction=kwargs["instruction"], - plan=kwargs["plan"], - )}, - ] - - result = "" - llm = ReasoningLLM() - async for chunk in llm.call(messages, streaming=False): - result += chunk - self.input_tokens = llm.input_tokens - self.output_tokens = llm.output_tokens - - return result - - -class RePlanner(CorePattern): - """重新规划器""" - - system_prompt: str = r""" - 你是一个计划重新规划器。当计划被评估为无效时,你需要生成一个新的、改进的计划。 - - 新计划应该: - 1. 解决评估中提到的所有问题 - 2. 保持与原始计划相同的格式 - 3. 更加精确和完整 - 4. 为每个步骤使用适当的工具 - - 遵循与原始计划相同的格式: - - 每个步骤应以"Plan: "开头 - - 包含带有适当参数的工具使用 - - 保持步骤简洁和重点突出 - - 以"Final"步骤结束 - """ - """系统提示词""" - - user_prompt: str = r""" - 任务:{instruction} - 原始计划:{plan} - 评估:{evaluation} - - 生成一个新的、改进的计划,解决评估中提到的所有问题。 - """ - """用户提示词""" - - @staticmethod - def _default() -> tuple[dict[LanguageType, str], dict[LanguageType, str]]: - """默认的Prompt内容""" - return { - LanguageType.CHINESE: r"You are a helpful assistant.", - LanguageType.ENGLISH: r"You are a helpful assistant.", - }, { - LanguageType.CHINESE: r""" - - - 你是一个计划重新规划器。当计划被评估为无效时,你需要生成一个新的、改进的计划。 - - 新计划应该: - 1. 解决评估中提到的所有问题 - 2. 保持与原始计划相同的格式 - 3. 更加精确和完整 - 4. 为每个步骤使用适当的工具 - - 遵循与原始计划相同的格式: - - 每个步骤应以"Plan: "开头 - - 包含带有适当参数的工具使用 - - 保持步骤简洁和重点突出 - - 以"Final"步骤结束 - - - - 任务:{instruction} - 原始计划:{plan} - 评估:{evaluation} - - 生成一个新的、改进的计划,解决评估中提到的所有问题: - """, - LanguageType.ENGLISH: r""" - - - You are a plan replanner. When the plan is evaluated as invalid, you need to generate a new, \ -improved plan. - - The new plan should: - 1. Solve all problems mentioned in the evaluation - 2. Keep the same format as the original plan - 3. Be more precise and complete - 4. Use appropriate tools for each step - - Follow the same format as the original plan: - - Each step should start with "Plan: " - - Include tool usage with appropriate parameters - - Keep steps concise and focused - - End with the "Final" step - - - - Task: {instruction} - Original Plan: {plan} - Evaluation: {evaluation} - - Now, generate a new, improved plan that solves all problems mentioned in the evaluation: - """, - } - - async def generate(self, **kwargs) -> str: # noqa: ANN003 - """生成重新规划结果""" - messages = [ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": self.user_prompt.format( - instruction=kwargs["instruction"], - plan=kwargs["plan"], - evaluation=kwargs["evaluation"], - )}, - ] - - result = "" - llm = ReasoningLLM() - async for chunk in llm.call(messages, streaming=False): - result += chunk - self.input_tokens = llm.input_tokens - self.output_tokens = llm.output_tokens - - return result diff --git a/apps/llm/snippet.py b/apps/llm/snippet.py deleted file mode 100644 index 45d53336017ca5f78ece15a17ad48dcba8dc3087..0000000000000000000000000000000000000000 --- a/apps/llm/snippet.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""上下文转提示词""" - -from typing import Any - - -def convert_context_to_prompt(context: list[dict[str, str]]) -> str: - """上下文转提示词""" - prompt = "\n" - for item in context: - prompt += f"<{item['role']}>\n{item['content']}\n\n" - prompt += "\n" - return prompt - - -def facts_to_prompt(facts: list[str]) -> str: - """事实转提示词""" - prompt = "\n" - for item in facts: - prompt += f"- {item}\n" - prompt += "\n" - return prompt - - -def choices_to_prompt(choices: list[dict[str, Any]]) -> tuple[str, list[str]]: - """将选项转换为Prompt""" - choices_list = [item["name"] for item in choices] - - prompt = "\n" - for item in choices: - prompt += f"{item['name']}{item['description']}\n" - prompt += "\n" - - return prompt, choices_list diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index 906291792bb398e6f3b76d2bad4b0fddb583db12..3c35ffd862d2efdd9038fc97b662dd90b2bd9131 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -10,9 +10,8 @@ from fastapi.responses import JSONResponse from apps.dependency.user import verify_personal_token, verify_session from apps.exceptions import InstancePermissionError -from apps.schemas.appcenter import AppFlowInfo, AppPermissionData +from apps.schemas.appcenter import AppFlowInfo, AppPermissionData, ChangeFavouriteAppRequest, CreateAppRequest from apps.schemas.enum_var import AppFilterType, AppType -from apps.schemas.request_data import ChangeFavouriteAppRequest, CreateAppRequest from apps.schemas.response_data import ( BaseAppOperationMsg, BaseAppOperationRsp, diff --git a/apps/routers/blacklist.py b/apps/routers/blacklist.py index 1e0af9f5c2db4e663a22e412415a32853460aca4..e51ad1853ad129af93cd630fd227f5b0fbdee1f6 100644 --- a/apps/routers/blacklist.py +++ b/apps/routers/blacklist.py @@ -5,17 +5,17 @@ from fastapi import APIRouter, Depends, Request, status from fastapi.responses import JSONResponse from apps.dependency.user import verify_admin, verify_personal_token, verify_session -from apps.schemas.request_data import ( +from apps.schemas.blacklist import ( AbuseProcessRequest, AbuseRequest, - QuestionBlacklistRequest, - UserBlacklistRequest, -) -from apps.schemas.response_data import ( GetBlacklistQuestionMsg, GetBlacklistQuestionRsp, GetBlacklistUserMsg, GetBlacklistUserRsp, + QuestionBlacklistRequest, + UserBlacklistRequest, +) +from apps.schemas.response_data import ( ResponseData, ) from apps.services.blacklist import ( diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 0c5ab7e92cff8c9509bc80fa1cedb6e685b4a42a..791936f770947f418eaba119dc1ade909d00911b 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -15,7 +15,7 @@ from apps.common.wordscheck import WordsCheck from apps.dependency import verify_personal_token, verify_session from apps.scheduler.scheduler import Scheduler from apps.scheduler.scheduler.context import save_data -from apps.schemas.enum_var import ExecutorStatus, LanguageType +from apps.schemas.enum_var import ExecutorStatus from apps.schemas.request_data import RequestData, RequestDataApp from apps.schemas.response_data import ResponseData from apps.services.activity import Activity diff --git a/apps/routers/comment.py b/apps/routers/comment.py index 93a9edb636f3b069defef38e554d963afdbc9abd..4b69ba47e4a47adc11fb64297d0f0132f50d13ed 100644 --- a/apps/routers/comment.py +++ b/apps/routers/comment.py @@ -8,8 +8,8 @@ from fastapi import APIRouter, Depends, Request, status from fastapi.responses import JSONResponse from apps.dependency import verify_personal_token, verify_session +from apps.schemas.comment import AddCommentData from apps.schemas.record import RecordComment -from apps.schemas.request_data import AddCommentData from apps.schemas.response_data import ResponseData from apps.services.comment import CommentManager diff --git a/apps/routers/conversation.py b/apps/routers/conversation.py index 65799a8f4d122121b7c272d4be862d221a24d99e..9cedd85e4b125f6cbba9b9bce40410fa3e289e15 100644 --- a/apps/routers/conversation.py +++ b/apps/routers/conversation.py @@ -11,7 +11,7 @@ from fastapi.responses import JSONResponse from apps.dependency import verify_personal_token, verify_session from apps.models.conversation import Conversation -from apps.schemas.request_data import ( +from apps.schemas.conversation import ( ChangeConversationData, DeleteConversationData, ) diff --git a/apps/routers/mcp_service.py b/apps/routers/mcp_service.py index 9389587b53b681c8b3c4efaf5e48bfa926524feb..0f88e23336911b174899652aaff32ac2be602173 100644 --- a/apps/routers/mcp_service.py +++ b/apps/routers/mcp_service.py @@ -1,7 +1,6 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. """FastAPI 语义接口中心相关路由""" -import json import logging from typing import Annotated @@ -10,7 +9,7 @@ from fastapi.responses import JSONResponse from apps.dependency.user import verify_admin, verify_personal_token, verify_session from apps.schemas.enum_var import SearchType -from apps.schemas.request_data import ActiveMCPServiceRequest, UpdateMCPServiceRequest +from apps.schemas.mcp import ActiveMCPServiceRequest, UpdateMCPServiceRequest from apps.schemas.response_data import ( ActiveMCPServiceRsp, BaseMCPServiceOperationMsg, diff --git a/apps/routers/service.py b/apps/routers/service.py index 7e10541401f2a5fa8e4775fadd04c66f92d9fd20..7fd40269d0e227297936421a5cd17202a2c038ba 100644 --- a/apps/routers/service.py +++ b/apps/routers/service.py @@ -11,7 +11,6 @@ from fastapi.responses import JSONResponse from apps.dependency.user import verify_personal_token, verify_session from apps.exceptions import InstancePermissionError from apps.schemas.enum_var import SearchType -from apps.schemas.request_data import ChangeFavouriteServiceRequest, UpdateServiceRequest from apps.schemas.response_data import ( BaseServiceOperationMsg, ChangeFavouriteServiceMsg, @@ -25,6 +24,7 @@ from apps.schemas.response_data import ( UpdateServiceMsg, UpdateServiceRsp, ) +from apps.schemas.service import ChangeFavouriteServiceRequest, UpdateServiceRequest from apps.services.service import ServiceCenterManager logger = logging.getLogger(__name__) diff --git a/apps/scheduler/call/cmd/assembler.py b/apps/scheduler/call/cmd/assembler.py index 54c1d2ee6f7735eb4839d03f9c778288d3e7aee8..6ac19edb0f3751b36c028f0d4f52682c6ba1248b 100644 --- a/apps/scheduler/call/cmd/assembler.py +++ b/apps/scheduler/call/cmd/assembler.py @@ -4,8 +4,6 @@ import string from typing import Any, Literal, Optional -from apps.llm.patterns.select import Select - class CommandlineAssembler: """命令行组装器""" diff --git a/apps/scheduler/call/cmd/prompt.py b/apps/scheduler/call/cmd/prompt.py new file mode 100644 index 0000000000000000000000000000000000000000..90c062d3c19cd4f6df9ca2277a08ccc3f5075b79 --- /dev/null +++ b/apps/scheduler/call/cmd/prompt.py @@ -0,0 +1,218 @@ +"""命令行生成器相关提示词""" + +from textwrap import dedent + +from apps.schemas.enum_var import LanguageType + +CREATE: dict[LanguageType, str] = { + LanguageType.CHINESE: dedent(r""" + + + 你是一个计划生成器。对于给定的目标,**制定一个简单的计划**,该计划可以逐步生成合适的命令行参数和标志。 + + 你会收到一个"命令前缀",这是已经确定和生成的命令部分。你需要基于这个前缀使用标志和参数来完成命令。 + + 在每一步中,指明使用哪个外部工具以及工具输入来获取证据。 + + 工具可以是以下之一: + (1) Option["指令"]:查询最相似的命令行标志。只接受一个输入参数,"指令"必须是搜索字符串。\ +搜索字符串应该详细且包含必要的数据。 + (2) Argument[名称]<值>:将任务中的数据放置到命令行的特定位置。接受两个输入参数。 + + 所有步骤必须以"Plan: "开头,且少于150个单词。 + 不要添加任何多余的步骤。 + 确保每个步骤都包含所需的所有信息 - 不要跳过步骤。 + 不要在证据后面添加任何额外数据。 + + + + + 开始示例 + + 任务:在后台运行一个新的alpine:latest容器,将主机/root文件夹挂载至/data,并执行top命令。 + 前缀:`docker run` + 用法:`docker run ${OPTS} ${image} ${command}`。这是一个Python模板字符串。OPTS是所有标志的\ +占位符。参数必须是 ["image", "command"] 其中之一。 + 前缀描述:二进制程序`docker`的描述为"Docker容器平台",`run`子命令的描述为"从镜像创建并运行一个新的容器"。 + + Plan: 我需要一个标志使容器在后台运行。 #E1 = Option[在后台运行单个容器] + Plan: 我需要一个标志,将主机/root目录挂载至容器内/data目录。 #E2 = \ +Option[挂载主机/root目录至/data目录] + Plan: 我需要从任务中解析出镜像名称。 #E3 = Argument[image] + Plan: 我需要指定容器中运行的命令。 #E4 = Argument[command] + Final: 组装上述线索,生成最终命令。 #F + + + + + 任务:{{instruction}} + 前缀:`{{binary_name}} {{subcmd_name}}` + 用法:`{{subcmd_usage}}`。这是一个Python模板字符串。OPTS是所有标志的占位符。参数必须是 {{argument_list}} \ +其中之一。 + 前缀描述:二进制程序`{{binary_name}}`的描述为"{{binary_description}}",`{{subcmd_name}}`子命令的描述为\ + "{{subcmd_description}}"。 + + 现在生成相应的计划: + """), + LanguageType.ENGLISH: dedent(r""" + + + You are a plan generator. For a given goal, **draft a simple plan** that can step-by-step \ +generate the appropriate command line arguments and flags. + + You will receive a "command prefix", which is the already determined and generated command \ +part. You need to use the flags and arguments based on this prefix to complete the command. + + In each step, specify which external tool to use and the tool input to get the evidence. + + The tool can be one of the following: + (1) Option["instruction"]: Query the most similar command line flag. Only accepts one input \ +parameter, "instruction" must be a search string. The search string should be detailed and contain necessary data. + (2) Argument["name"]: Place the data from the task into a specific position in the \ +command line. Accepts two input parameters. + + All steps must start with "Plan: " and be less than 150 words. + Do not add any extra steps. + Ensure each step contains all the required information - do not skip steps. + Do not add any extra data after the evidence. + + + + Task: Run a new alpine:latest container in the background, mount the host /root folder to \ +/data, and execute the top command. + Prefix: `docker run` + Usage: `docker run ${OPTS} ${image} ${command}`. This is a Python template string. OPTS is \ +a placeholder for all flags. The arguments must be one of ["image", "command"]. + Prefix description: The description of binary program `docker` is "Docker container platform"\ +, and the description of `run` subcommand is "Create and run a new container from an image". + + Plan: I need a flag to make the container run in the background. #E1 = Option[Run a single \ +container in the background] + Plan: I need a flag to mount the host /root directory to /data directory in the \ +container. #E2 = Option[Mounthost /root directory to /data directory] + Plan: I need to parse the image name from the task. #E3 = Argument[image] + Plan: I need to specify the command to be run in the container. #E4 = Argument[command] + Final: Assemble the above clues to generate the final command. #F + + + + Task: {{instruction}} + Prefix: `{{binary_name}} {{subcmd_name}}` + Usage: `{{subcmd_usage}}`. This is a Python template string. OPTS is a placeholder for all flags. \ +The arguments must be one of {{argument_list}}. + Prefix description: The description of binary program `{{binary_name}}` is "{{binary_description}}", \ +and the description of `{{subcmd_name}}` subcommand is "{{subcmd_description}}". + + Generate the corresponding plan now: + """), +} + +EVALUATE: dict[LanguageType, str] = { + LanguageType.CHINESE: dedent(r""" + + + 你是一个计划评估器。你的任务是评估给定的计划是否合理和完整。 + + 一个好的计划应该: + 1. 涵盖原始任务的所有要求 + 2. 使用适当的工具收集必要的信息 + 3. 具有清晰和逻辑的步骤 + 4. 没有冗余或不必要的步骤 + + 对于计划中的每个步骤,评估: + 1. 工具选择是否适当 + 2. 输入参数是否清晰和充分 + 3. 该步骤是否有助于实现最终目标 + + 请回复: + "VALID" - 如果计划良好且完整 + "INVALID: <原因>" - 如果计划有问题,请解释原因 + + + + 任务:{{instruction}} + 计划:{{plan}} + + 现在评估计划,并回复"VALID"或"INVALID: <原因>": + """), + LanguageType.ENGLISH: dedent(r""" + + + You are a plan replanner. When the plan is evaluated as invalid, you need to generate a new, \ +improved plan. + + The new plan should: + 1. Solve all problems mentioned in the evaluation + 2. Keep the same format as the original plan + 3. Be more precise and complete + 4. Use appropriate tools for each step + + Follow the same format as the original plan: + - Each step should start with "Plan: " + - Include tool usage with appropriate parameters + - Keep steps concise and focused + - End with the "Final" step + + + + Task: {{instruction}} + Original Plan: {{plan}} + Evaluation: {{evaluation}} + + Now evaluate the plan and reply "VALID" or "INVALID: ": + """), +} + +REPLAN: dict[LanguageType, str] = { + LanguageType.CHINESE: dedent(r""" + + + 你是一个计划重新规划器。当计划被评估为无效时,你需要生成一个新的、改进的计划。 + + 新计划应该: + 1. 解决评估中提到的所有问题 + 2. 保持与原始计划相同的格式 + 3. 更加精确和完整 + 4. 为每个步骤使用适当的工具 + + 遵循与原始计划相同的格式: + - 每个步骤应以"Plan: "开头 + - 包含带有适当参数的工具使用 + - 保持步骤简洁和重点突出 + - 以"Final"步骤结束 + + + + 任务:{{instruction}} + 原始计划:{{plan}} + 评估:{{evaluation}} + + 生成一个新的、改进的计划,解决评估中提到的所有问题: + """), + LanguageType.ENGLISH: dedent(r""" + + + You are a plan replanner. When the plan is evaluated as invalid, you need to generate a new, \ +improved plan. + + The new plan should: + 1. Solve all problems mentioned in the evaluation + 2. Keep the same format as the original plan + 3. Be more precise and complete + 4. Use appropriate tools for each step + + Follow the same format as the original plan: + - Each step should start with "Plan: " + - Include tool usage with appropriate parameters + - Keep steps concise and focused + - End with the "Final" step + + + + Task: {{instruction}} + Original Plan: {{plan}} + Evaluation: {{evaluation}} + + Now, generate a new, improved plan that solves all problems mentioned in the evaluation: + """), +} diff --git a/apps/scheduler/call/cmd/schema.py b/apps/scheduler/call/cmd/schema.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..039e00ae361645a4d9c1b91914fc95689e4dbf04 100644 --- a/apps/scheduler/call/cmd/schema.py +++ b/apps/scheduler/call/cmd/schema.py @@ -0,0 +1 @@ +"""命令行生成工具 数据结构""" diff --git a/apps/scheduler/call/cmd/solver.py b/apps/scheduler/call/cmd/solver.py index 7eeb0b6d8387dd4cb58dee134fc9e9b68b44f76b..49cdd1714a6a3ee4814dc076b7b8f1f2ad32652a 100644 --- a/apps/scheduler/call/cmd/solver.py +++ b/apps/scheduler/call/cmd/solver.py @@ -5,7 +5,6 @@ import copy import re from typing import Any -from apps.llm.patterns.json_gen import Json from apps.scheduler.call.cmd.assembler import CommandlineAssembler diff --git a/apps/scheduler/call/rag/rag.py b/apps/scheduler/call/rag/rag.py index 16f99d5ae9c94abcfa7d4aac981c6ff8dbd42dac..d69fbe1cebf8273105e4a3a3ce64ce8d58c30920 100644 --- a/apps/scheduler/call/rag/rag.py +++ b/apps/scheduler/call/rag/rag.py @@ -3,10 +3,13 @@ import logging from collections.abc import AsyncGenerator +from datetime import UTC, datetime from typing import Any import httpx from fastapi import status +from jinja2 import BaseLoader +from jinja2.sandbox import SandboxedEnvironment from pydantic import Field from apps.common.config import config @@ -20,9 +23,15 @@ from apps.schemas.scheduler import ( ) from .prompt import QUESTION_REWRITE -from .schema import RAGInput, RAGOutput, SearchMethod +from .schema import ( + CHUNK_ELEMENT_TOKENS, + QuestionRewriteOutput, + RAGInput, + RAGOutput, + SearchMethod, +) -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): @@ -56,7 +65,7 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): """初始化RAG工具""" if not call_vars.ids.session_id: err = "[RAG] 未设置Session ID" - logger.error(err) + _logger.error(err) raise CallError(message=err, data={}) return RAGInput( @@ -73,12 +82,130 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): tokensLimit=self.tokens_limit, ) + async def _assemble_doc_info( + self, + doc_chunk_list: list[dict[str, Any]], + max_tokens: int, + ) -> tuple[str, list[dict[str, Any]]]: + """组装文档信息""" + bac_info = "" + doc_info_list = [] + doc_cnt = 0 + doc_id_map = {} + remaining_tokens = max_tokens * 0.8 + + for doc_chunk in doc_chunk_list: + if doc_chunk["docId"] not in doc_id_map: + doc_cnt += 1 + t = doc_chunk.get("docCreatedAt", None) + if isinstance(t, str): + t = datetime.strptime(t, "%Y-%m-%d %H:%M").replace( + tzinfo=UTC, + ) + t = round(t.replace(tzinfo=UTC).timestamp(), 3) + else: + t = round(datetime.now(UTC).timestamp(), 3) + doc_info_list.append({ + "id": doc_chunk["docId"], + "order": doc_cnt, + "name": doc_chunk.get("docName", ""), + "author": doc_chunk.get("docAuthor", ""), + "extension": doc_chunk.get("docExtension", ""), + "abstract": doc_chunk.get("docAbstract", ""), + "size": doc_chunk.get("docSize", 0), + "created_at": t, + }) + doc_id_map[doc_chunk["docId"]] = doc_cnt + doc_index = doc_id_map[doc_chunk["docId"]] + + if bac_info: + bac_info += "\n\n" + bac_info += f"""""" + + for chunk in doc_chunk["chunks"]: + if remaining_tokens <= CHUNK_ELEMENT_TOKENS: + break + chunk_text = chunk["text"] + chunk_text = TokenCalculator().get_k_tokens_words_from_content( + content=chunk_text, k=remaining_tokens) + remaining_tokens -= TokenCalculator().calculate_token_length(messages=[ + {"role": "user", "content": ""}, + {"role": "user", "content": chunk_text}, + {"role": "user", "content": ""}, + ], pure_text=True) + bac_info += f""" + + {chunk_text} + + """ + bac_info += "" + return bac_info, doc_info_list + + async def _get_doc_info(self, doc_ids: list[str], data: RAGInput) -> AsyncGenerator[CallOutputChunk, None]: + """获取文档信息""" + url = config.rag.rag_service.rstrip("/") + "/chunk/search" + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self._sys_vars.ids.session_id}", + } + doc_chunk_list = [] + if doc_ids: + default_kb_id = "00000000-0000-0000-0000-000000000000" + tmp_data = RAGQueryReq( + kbIds=[default_kb_id], + query=data.query, + topK=data.top_k, + docIds=doc_ids, + searchMethod=data.search_method, + isRelatedSurrounding=data.is_related_surrounding, + isClassifyByDoc=data.is_classify_by_doc, + isRerank=data.is_rerank, + tokensLimit=max_tokens, + ) + try: + async with httpx.AsyncClient(timeout=30) as client: + data_json = tmp_data.model_dump(exclude_none=True, by_alias=True) + response = await client.post(url, headers=headers, json=data_json) + if response.status_code == status.HTTP_200_OK: + result = response.json() + doc_chunk_list += result["result"]["docChunks"] + except Exception: + _logger.exception("[RAG] 获取文档分片失败") + if data.kb_ids: + try: + async with httpx.AsyncClient(timeout=30) as client: + data_json = data.model_dump(exclude_none=True, by_alias=True) + response = await client.post(url, headers=headers, json=data_json) + # 检查响应状态码 + if response.status_code == status.HTTP_200_OK: + result = response.json() + doc_chunk_list += result["result"]["docChunks"] + except Exception: + _logger.exception("[RAG] 获取文档分片失败") + return doc_chunk_list + async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """调用RAG工具""" data = RAGInput(**input_data) - question_obj = QuestionRewrite() - question = await question_obj.generate(question=data.question) - data.question = question + # 使用Jinja2渲染问题重写模板,并用JsonGenerator解析结果 + try: + env = SandboxedEnvironment( + loader=BaseLoader(), + autoescape=False, + trim_blocks=True, + lstrip_blocks=True, + ) + tmpl = env.from_string(QUESTION_REWRITE[self._sys_vars.language]) + prompt = tmpl.render(history="", question=data.question) + + # 使用_json方法直接获取JSON结果 + json_result = await self._json([ + {"role": "user", "content": prompt}, + ], schema=QuestionRewriteOutput.model_json_schema()) + # 直接使用解析后的JSON结果 + data.question = QuestionRewriteOutput.model_validate(json_result).question + except Exception: + _logger.exception("[RAG] 问题重写失败,使用原始问题") url = config.rag.rag_service.rstrip("/") + "/chunk/search" headers = { @@ -112,7 +239,7 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): return text = response.text - logger.error("[RAG] 调用失败:%s", text) + _logger.error("[RAG] 调用失败:%s", text) raise CallError( message=f"rag调用失败:{text}", diff --git a/apps/scheduler/call/rag/schema.py b/apps/scheduler/call/rag/schema.py index c0535d46c985b54370502d817324ec75d3cb85bd..f3a6ec69bcb645ba2add574cb24e2c7c54cfa983 100644 --- a/apps/scheduler/call/rag/schema.py +++ b/apps/scheduler/call/rag/schema.py @@ -3,10 +3,12 @@ from enum import Enum -from pydantic import Field +from pydantic import BaseModel, Field from apps.scheduler.call.core import DataBase +CHUNK_ELEMENT_TOKENS = 5 + class SearchMethod(str, Enum): """搜索方法""" @@ -19,6 +21,12 @@ class SearchMethod(str, Enum): ENHANCED_BY_LLM = "enhanced_by_llm" +class QuestionRewriteOutput(BaseModel): + """问题重写工具的输出""" + + question: str = Field(description="用户输入") + + class RAGOutput(DataBase): """RAG工具的输出""" diff --git a/apps/scheduler/call/summary/prompt.py b/apps/scheduler/call/summary/prompt.py new file mode 100644 index 0000000000000000000000000000000000000000..dc58749d66680a87e0fe8990585c1104c4e2471f --- /dev/null +++ b/apps/scheduler/call/summary/prompt.py @@ -0,0 +1,62 @@ +"""总结工具的提示词""" +from textwrap import dedent + +from apps.schemas.enum_var import LanguageType + +SUMMARY_PROMPT: dict[LanguageType, str] = { + LanguageType.CHINESE: dedent( + r""" + + 根据给定的对话记录和关键事实,生成一个三句话背景总结。这个总结将用于后续对话的上下文理解。 + + 生成总结的要求如下: + 1. 突出重要信息点,例如时间、地点、人物、事件等。 + 2. “关键事实”中的内容可在生成总结时作为已知信息。 + 3. 输出时请不要包含XML标签,确保信息准确性,不得编造信息。 + 4. 总结应少于3句话,应少于300个字。 + + 对话记录将在标签中给出,关键事实将在标签中给出。 + + + {% for item in conversation %} + <{{item.role}}> + {{item.content}} + + {% endfor %} + + + {% for fact in facts %}{{fact}}{% if not loop.last %}\n{% endif %}{% endfor %} + + + 现在,请开始生成背景总结: + """), + LanguageType.ENGLISH: dedent( + r""" + + Based on the given conversation records and key facts, generate a three-sentence background \ +summary.This summary will be used for context understanding in subsequent conversations. + + The requirements for generating the summary are as follows: + 1. Highlight important information points, such as time, location, people, events, etc. + 2. The content in the "key facts" can be used as known information when generating the summary. + 3. Do not include XML tags in the output, ensure the accuracy of the information, and do not \ +make up information. + 4. The summary should be less than 3 sentences and less than 300 words. + + The conversation records will be given in the tag, and the key facts will be given \ +in the tag. + + + {% for item in conversation %} + <{{item.role}}> + {{item.content}} + + {% endfor %} + + + {% for fact in facts %}{{fact}}{% if not loop.last %}\n{% endif %}{% endfor %} + + + Now, please start generating the background summary: + """), +} diff --git a/apps/scheduler/call/summary/summary.py b/apps/scheduler/call/summary/summary.py index e0abafa45f6e1d235d6cd90f9e1b60e6ba71d8ea..af95d93ca5660962fdcaf955d1f2ab043d59f24e 100644 --- a/apps/scheduler/call/summary/summary.py +++ b/apps/scheduler/call/summary/summary.py @@ -4,9 +4,10 @@ from collections.abc import AsyncGenerator from typing import TYPE_CHECKING, Any, Self +from jinja2 import BaseLoader +from jinja2.sandbox import SandboxedEnvironment from pydantic import Field -from apps.llm.patterns.executor import ExecutorSummary from apps.models.node import NodeInfo from apps.scheduler.call.core import CoreCall, DataBase from apps.schemas.enum_var import CallOutputType, LanguageType @@ -17,6 +18,7 @@ from apps.schemas.scheduler import ( ExecutorBackground, ) +from .prompt import SUMMARY_PROMPT from .schema import SummaryOutput if TYPE_CHECKING: @@ -62,8 +64,28 @@ class Summary(CoreCall, input_model=DataBase, output_model=SummaryOutput): async def _exec(self, _input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """执行工具""" - summary_obj = ExecutorSummary() - summary = await summary_obj.generate(background=self.context, language=self._sys_vars.language) + # 创建 Jinja2 环境 + env = SandboxedEnvironment( + loader=BaseLoader(), + autoescape=True, + trim_blocks=True, + lstrip_blocks=True, + ) + + # 使用模板生成提示词 + template = env.from_string(SUMMARY_PROMPT[self._sys_vars.language]) + prompt = template.render( + conversation=self.context.conversation, + facts=self.context.facts, + ) + + # 调用 LLM 生成总结 + summary = "" + async for chunk in self._llm([ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt}, + ]): + summary += chunk yield CallOutputChunk(type=CallOutputType.TEXT, content=summary) diff --git a/apps/scheduler/executor/__init__.py b/apps/scheduler/executor/__init__.py index 2e31bb1df8c8f8c411ac2b66abda789947aa4dc6..636c687fc39cedace0816245d52e8baa2f1f61d8 100644 --- a/apps/scheduler/executor/__init__.py +++ b/apps/scheduler/executor/__init__.py @@ -1,2 +1,8 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """Executor模块""" + +from .agent import MCPAgentExecutor +from .flow import FlowExecutor +from .qa import QAExecutor + +__all__ = ["FlowExecutor", "MCPAgentExecutor", "QAExecutor"] diff --git a/apps/scheduler/executor/base.py b/apps/scheduler/executor/base.py index 19daeb6a6cd9f09f2cf9387a0224ab2d8cfafcc7..5ccda250c382872b1f108cc5603e002cefdcc346 100644 --- a/apps/scheduler/executor/base.py +++ b/apps/scheduler/executor/base.py @@ -73,7 +73,8 @@ class BaseExecutor(BaseModel, ABC): data = TextAddContent(text=data).model_dump(exclude_none=True, by_alias=True) await self.msg_queue.push_output( - self.task.metadata, + self.task, + self.llm, event_type=event_type, data=data, ) diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index e94431a4a54ef4259b95842bde28d9fef95be9b2..3362c918dcecca0ab168148b6dff1244e9316e72 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -14,7 +14,6 @@ from apps.schemas.enum_var import EventType, ExecutorStatus, LanguageType, Speci from apps.schemas.flow import Flow, Step from apps.schemas.request_data import RequestDataApp from apps.schemas.task import StepQueueItem -from apps.services.task import TaskManager from .base import BaseExecutor from .step import StepExecutor @@ -62,18 +61,14 @@ class FlowExecutor(BaseExecutor): flow: Flow flow_id: str = Field(description="Flow ID") - question: str = Field(description="用户输入") post_body_app: RequestDataApp = Field(description="请求体中的app信息") - current_step: StepQueueItem | None = Field( - description="当前执行的步骤", - exclude=True, - default=None, - ) async def init(self) -> None: """初始化FlowExecutor""" logger.info("[FlowExecutor] 加载Executor状态") + await self._load_history() + # 尝试恢复State if ( self.state @@ -81,7 +76,7 @@ class FlowExecutor(BaseExecutor): ): # 创建ExecutorState self.state = ExecutorCheckpoint( - taskId=self.task.id, + taskId=self.task.metadata.id, appId=self.post_body_app.app_id, executorId=str(self.flow_id), executorName=self.flow.name, @@ -144,7 +139,7 @@ class FlowExecutor(BaseExecutor): return [] if self.current_step.step.type == SpecialCallType.CHOICE.value: # 如果是choice节点,获取分支ID - branch_id = self.context[-1].outputData["branch_id"] + branch_id = self.task.context[-1].outputData["branch_id"] if branch_id: next_steps = await self._find_next_id(str(self.state.stepId) + "." + branch_id) logger.info("[FlowExecutor] 分支ID:%s", branch_id) diff --git a/apps/scheduler/executor/prompt.py b/apps/scheduler/executor/prompt.py index 8aea17705303f89d1b2f5f122249817d6c6cd779..4200461ff760a2f556b717ad4b868b5d3250b449 100644 --- a/apps/scheduler/executor/prompt.py +++ b/apps/scheduler/executor/prompt.py @@ -18,18 +18,18 @@ EXECUTOR_REASONING: dict[LanguageType, str] = { - {tool_name} - {tool_description} - {tool_output} + {{tool_name}} + {{tool_description}} + {{tool_output}} - {last_thought} + {{last_thought}} 你当前需要解决的问题是: - {user_question} + {{user_question}} 请综合以上信息,再次一步一步地进行思考,并给出见解和行动: @@ -51,18 +51,18 @@ and give the next action. - {tool_name} - {tool_description} - {tool_output} + {{tool_name}} + {{tool_description}} + {{tool_output}} - {last_thought} + {{last_thought}} The question you need to solve is: - {user_question} + {{user_question}} Please integrate the above information, think step by step again, provide insights, and give actions: diff --git a/apps/scheduler/executor/qa.py b/apps/scheduler/executor/qa.py index 3ff995e703554da9a8daa35cff9d329183d2ae2c..1fc185664ceb673199906b59e19c554356bd7ef0 100644 --- a/apps/scheduler/executor/qa.py +++ b/apps/scheduler/executor/qa.py @@ -46,14 +46,12 @@ class QAExecutor(BaseExecutor): # 推送消息 if content_obj.event_type == EventType.TEXT_ADD.value: - await self.msg_queue.push_output( - task=self.task, + await self._push_message( event_type=content_obj.event_type, data=TextAddContent(text=content_obj.content).model_dump(exclude_none=True, by_alias=True), ) elif content_obj.event_type == EventType.DOCUMENT_ADD.value: - await self.msg_queue.push_output( - task=self.task, + await self._push_message( event_type=content_obj.event_type, data=DocumentAddContent( documentId=content_obj.content.get("id", ""), @@ -89,8 +87,8 @@ class QAExecutor(BaseExecutor): elif content_obj.event_type == EventType.DOCUMENT_ADD.value: task.runtime.documents.append(content_obj.content) task.state.flow_status = ExecutorStatus.SUCCESS - except Exception as e: - _logger.error(f"[Scheduler] RAG服务发生错误: {e}") + except Exception: + _logger.exception("[Scheduler] RAG服务发生错误 ") task.state.flow_status = ExecutorStatus.ERROR # 保存答案 self.task.runtime.fullAnswer = full_answer diff --git a/apps/scheduler/mcp/prompt.py b/apps/scheduler/mcp/prompt.py index 78b3c77264e0d5b27686ecfc4c711d5c75bc6b7a..476094f4f8836701b31aeaf14648fc383c742cf5 100644 --- a/apps/scheduler/mcp/prompt.py +++ b/apps/scheduler/mcp/prompt.py @@ -12,7 +12,6 @@ MCP_SELECT: dict[LanguageType, str] = { 请分析用户的目标,并生成一个计划。你后续将根据这个计划,一步一步地完成用户的目标。 # 一个好的计划应该: - 1. 能够成功完成用户的目标 2. 计划中的每一个步骤必须且只能使用一个工具。 3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。 @@ -20,7 +19,6 @@ MCP_SELECT: dict[LanguageType, str] = { 5.生成的计划必须要覆盖用户的目标,不能遗漏任何用户目标中的内容。 # 生成计划时的注意事项: - - 每一条计划包含3个部分: - 计划内容:描述单个计划步骤的大致内容 - 工具ID:必须从下文的工具列表中选择 @@ -45,7 +43,6 @@ MCP_SELECT: dict[LanguageType, str] = { - 计划不得多于{{ max_num }}条,且每条计划内容应少于150字。 # 工具 - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 @@ -57,13 +54,10 @@ MCP_SELECT: dict[LanguageType, str] = { # 样例 - ## 目标 - 在后台运行一个新的alpine:latest容器,将主机/root文件夹挂载至/data,并执行top命令。 ## 计划 - 1. 这个目标需要使用Docker来完成,首先需要选择合适的MCP Server 2. 目标可以拆解为以下几个部分: @@ -102,9 +96,7 @@ MCP_SELECT: dict[LanguageType, str] = { ``` # 现在开始生成计划: - ## 目标 - {{goal}} # 计划 @@ -116,7 +108,6 @@ MCP_SELECT: dict[LanguageType, str] = { Your task is to select the most appropriate MCP server based on your current goals. ## Things to note when selecting an MCP server: - 1. Ensure you fully understand your current goals and select the most appropriate MCP server. 2. Please select from the provided list of MCP servers; do not generate your own. 3. Please provide the rationale for your choice before making your selection. @@ -135,22 +126,17 @@ additional content: 6. The following example is for reference only. Do not use it as a basis for selecting an MCP server. ## Example - ### Goal - I need an MCP server to complete a task. ### MCP Server List - - **mcp_1**: "MCP Server 1"; Description of MCP Server 1 - **mcp_2**: "MCP Server 2"; Description of MCP Server 2 ### Think step by step: - Because the current goal requires an MCP server to complete a task, select mcp_1. ### Select Result - ```json { "mcp": "mcp_1" @@ -158,19 +144,15 @@ additional content: ``` ## Let's get started! - ### Goal - {{goal}} ### MCP Server List - {% for mcp in mcp_list %} - **{{mcp.id}}**: "{{mcp.name}}"; {{mcp.description}} {% endfor %} ### Think step by step: - """, ), } @@ -182,7 +164,6 @@ CREATE_PLAN: dict[str, str] = { 请分析用户的目标,并生成一个计划。你后续将根据这个计划,一步一步地完成用户的目标。 # 一个好的计划应该: - 1. 能够成功完成用户的目标 2. 计划中的每一个步骤必须且只能使用一个工具。 3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。 @@ -190,7 +171,6 @@ CREATE_PLAN: dict[str, str] = { 5.生成的计划必须要覆盖用户的目标,不能遗漏任何用户目标中的内容。 # 生成计划时的注意事项: - - 每一条计划包含3个部分: - 计划内容:描述单个计划步骤的大致内容 - 工具ID:必须从下文的工具列表中选择 @@ -215,7 +195,6 @@ CREATE_PLAN: dict[str, str] = { - 计划不得多于{{ max_num }}条,且每条计划内容应少于150字。 # 工具 - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 @@ -227,13 +206,10 @@ CREATE_PLAN: dict[str, str] = { # 样例 - ## 目标 - 在后台运行一个新的alpine:latest容器,将主机/root文件夹挂载至/data,并执行top命令。 ## 计划 - 1. 这个目标需要使用Docker来完成,首先需要选择合适的MCP Server 2. 目标可以拆解为以下几个部分: @@ -272,9 +248,7 @@ CREATE_PLAN: dict[str, str] = { ``` # 现在开始生成计划: - ## 目标 - {{goal}} # 计划 @@ -287,14 +261,12 @@ CREATE_PLAN: dict[str, str] = { goal step by step. # A good plan should: - 1. Be able to successfully achieve the user's goal. 2. Each step in the plan must use only one tool. 3. The steps in the plan must have clear and logical steps, without redundant or unnecessary steps. 4. The last step in the plan must be a Final tool to ensure that the plan is executed. # Things to note when generating plans: - - Each plan contains three parts: - Plan content: Describes the general content of a single plan step - Tool ID: Must be selected from the tool list below @@ -322,7 +294,6 @@ For example: "Result[3]" refers to the result after the third plan is executed. 150 words. # Tools - You can access and use some tools, which will be given in the XML tags. @@ -335,14 +306,11 @@ result. # Example - ## Target - Run a new alpine:latest container in the background, mount the host/root folder to /data, and execute the \ top command. ## Plan - 1. This goal needs to be completed using Docker. First, you need to select a suitable MCP Server. 2. The goal can be broken down into the following parts: @@ -384,9 +352,7 @@ is Result[2]", ``` # Now start generating the plan: - ## Goal - {{goal}} # Plan @@ -401,26 +367,22 @@ EVALUATE_PLAN: dict[LanguageType, str] = { 请根据给定的计划,和当前计划执行的实际情况,分析当前计划是否合理和完整,并生成改进后的计划。 # 一个好的计划应该: - 1. 能够成功完成用户的目标 2. 计划中的每一个步骤必须且只能使用一个工具。 3. 计划中的步骤必须具有清晰和逻辑的步骤,没有冗余或不必要的步骤。 4. 计划中的最后一步必须是Final工具,以确保计划执行结束。 # 你此前的计划是: - {{ plan }} # 这个计划的执行情况是: - 计划的执行情况将放置在 XML标签中。 - {{ memory }} + {{ memory }} # 进行评估时的注意事项: - - 请一步一步思考,解析用户的目标,并指导你接下来的生成。思考过程应放置在 XML标签中。 - 评估结果分为两个部分: - 计划评估的结论 @@ -450,26 +412,22 @@ EVALUATE_PLAN: dict[LanguageType, str] = { reasonable and complete, and generate an improved plan. # A good plan should: - 1. Be able to successfully achieve the user's goal. 2. Each step in the plan must use only one tool. 3. The steps in the plan must have clear and logical steps, without redundant or unnecessary steps. 4. The last step in the plan must be a Final tool to ensure the completion of the plan execution. # Your previous plan was: - {{ plan }} # The execution status of this plan is: - The execution status of the plan will be placed in the XML tags. - {{ memory }} + {{ memory }} # Notes when conducting the evaluation: - - Please think step by step, analyze the user's goal, and guide your subsequent generation. The thinking \ process should be placed in the XML tags. - The evaluation results are divided into two parts: @@ -491,7 +449,6 @@ process should be placed in the XML tags. ``` # Start evaluating the plan now: - """, ), } @@ -501,21 +458,17 @@ FINAL_ANSWER: dict[str, str] = { 综合理解计划执行结果和背景信息,向用户报告目标的完成情况。 # 用户目标 - {{ goal }} # 计划执行情况 - 为了完成上述目标,你实施了以下计划: {{ memory }} # 其他背景信息: - {{ status }} # 现在,请根据以上信息,向用户报告目标的完成情况: - """, ), LanguageType.ENGLISH: dedent( @@ -524,21 +477,17 @@ FINAL_ANSWER: dict[str, str] = { the completion status of the goal. # User goal - {{ goal }} # Plan execution status - To achieve the above goal, you implemented the following plan: {{ memory }} # Other background information: - {{ status }} # Now, based on the above information, please report to the user the completion status of the goal: - """, ), } diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index 63233cfade0ebc2995861efb594fbf896ea7e63a..68a1f0a5745cbcf2e9261c51cbc1e37555455237 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -66,8 +66,8 @@ GENERATE_FLOW_EXCUTE_RISK: dict[LanguageType, str] = { ## 工具集合 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - - mysql_analyzer 分析MySQL数据库性能 - - performance_tuner 调优数据库性能 + - mysql_analyzer 分析MySQL数据库性能 + - performance_tuner 调优数据库性能 ## 输出 { @@ -82,7 +82,7 @@ GENERATE_FLOW_EXCUTE_RISK: dict[LanguageType, str] = { ## 工具集合 {% for tool in tools %} - - {{tool.id}} {{tool.name}};{{tool.description}} + - {{tool.id}} {{tool.name}};{{tool.description}} {% endfor %} ## 输出 @@ -98,8 +98,8 @@ user's goal and the current tool set. # Tool Set You can access and use some tools, which will be given in the XML tag. - - mysql_analyzer Analyze MySQL database performance - - performance_tuner Tune database performance + - mysql_analyzer Analyze MySQL database performance + - performance_tuner Tune database performance # Output { @@ -114,7 +114,7 @@ the performance and stability of the database. Therefore, the risk assessment is # Tool Set {% for tool in tools %} - - {{tool.id}} {{tool.name}}; {{tool.description}} + - {{tool.id}} {{tool.name}}; {{tool.description}} {% endfor %} # Output @@ -134,7 +134,7 @@ GEN_STEP: dict[LanguageType, str] = { 3.不要选择不存在的工具。 4.如果你认为当前已经达成了用户的目标,可以直接返回Final工具,表示计划执行结束。 5.tool_id中的工具ID必须是当前工具集合中存在的工具ID,而不是工具的名称。 - 6.工具在 XML标签中给出,工具的id在 下的 XML标签中给出。 + 6.工具在 XML标签中给出,工具的id在 下的 XML标签中给出。 # 样例 1 # 目标 @@ -325,18 +325,19 @@ RISK_EVALUATE: dict[LanguageType, str] = { } ``` # 样例 - # 工具名称 - mysql_analyzer - # 工具描述 - 分析MySQL数据库性能 - # 工具入参 + ## 工具 + + mysql_analyzer + 分析MySQL数据库性能 + + ## 工具入参 { "host": "192.0.0.1", "port": 3306, "username": "root", "password": "password" } - # 附加信息 + ## 附加信息 1. 当前MySQL数据库的版本是8.0.26 2. 当前MySQL数据库的配置文件路径是/etc/my.cnf,并含有以下配置项 ```ini @@ -344,7 +345,7 @@ RISK_EVALUATE: dict[LanguageType, str] = { innodb_buffer_pool_size=1G innodb_log_file_size=256M ``` - # 输出 + ## 输出 ```json { "risk": "中", @@ -352,16 +353,18 @@ RISK_EVALUATE: dict[LanguageType, str] = { 请确保在非生产环境中执行此操作。" } ``` - # 工具 + + # 现在开始评估工具执行风险: + ## 工具 {{tool_name}} {{tool_description}} - # 工具入参 + ## 工具入参 {{input_param}} - # 附加信息 + ## 附加信息 {{additional_info}} - # 输出 + ## 输出 """, ), LanguageType.ENGLISH: dedent( @@ -376,18 +379,19 @@ input parameters, and additional information, and output a warning. } ``` # Example - # Tool name - mysql_analyzer - # Tool description - Analyzes MySQL database performance - # Tool input + ## Tool + + mysql_analyzer + Analyzes MySQL database performance + + ## Tool input { "host": "192.0.0.1", "port": 3306, "username": "root", "password": "password" } - # Additional information + ## Additional information 1. The current MySQL database version is 8.0.26 2. The current MySQL database configuration file path is /etc/my.cnf and contains the following \ configuration items @@ -396,7 +400,7 @@ configuration items innodb_buffer_pool_size=1G innodb_log_file_size=256M ``` - # Output + ## Output ```json { "risk": "medium", @@ -404,17 +408,18 @@ configuration items database performance. This operation should only be performed in a non-production environment." } ``` - # Tool + + # Now start evaluating the tool execution risk: + ## Tool {{tool_name}} {{tool_description}} - # Tool Input Parameters + ## Tool Input Parameters {{input_param}} - # Additional Information + ## Additional Information {{additional_info}} - # Output - + ## Output """, ), } @@ -430,10 +435,12 @@ IS_PARAM_ERROR: dict[LanguageType, str] = { "is_param_error": true/false, } ``` + # 样例 - # 用户目标 + ## 用户目标 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 - # 历史 + + ## 历史 第1步:生成端口扫描命令 - 调用工具 `command_generator`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}` - 执行状态:成功 @@ -442,43 +449,53 @@ IS_PARAM_ERROR: dict[LanguageType, str] = { - 调用工具 `command_executor`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}` - 执行状态:成功 - 得到数据:`{"result": "success"}` - # 当前步骤 + + ## 当前步骤 - step_3 - mysql_analyzer - 分析MySQL数据库性能 + step_3 + mysql_analyzer + 分析MySQL数据库性能 - # 工具入参 + + ## 工具入参 { "host": "192.0.0.1", "port": 3306, "username": "root", "password": "password" } - # 工具运行报错 + + ## 工具运行报错 执行MySQL性能分析命令时,出现了错误:`host is not correct`。 - # 输出 + ## 输出 ```json { "is_param_error": true } ``` - # 用户目标 + + # 现在开始判断工具执行失败是否是因为参数错误导致的: + ## 用户目标 {{goal}} - # 历史 + + ## 历史 {{history}} - # 当前步骤 + + ## 当前步骤 {{step_id}} {{step_name}} {{step_instruction}} - # 工具入参 + + ## 工具入参 {{input_param}} - # 工具运行报错 + + ## 工具运行报错 {{error_message}} - # 输出 + + ## 输出 """, ), LanguageType.ENGLISH: dedent( @@ -492,10 +509,12 @@ due to parameter errors. "is_param_error": true/false, } ``` + # Example - # User Goal + ## User Goal I need to scan the current MySQL database, analyze performance bottlenecks, and optimize it. - # History + + ## History Step 1: Generate a port scan command - Call the `command_generator` tool and provide `{"command": "nmap -sS -p--open 192.168.1.1"}` - Execution Status: Success @@ -504,43 +523,53 @@ due to parameter errors. - Call the `command_executor` tool and provide `{"command": "nmap -sS -p--open 192.168.1.1"}` - Execution Status: Success - Result: `{"result": "success"}` - # Current step + + ## Current step - step_3 - mysql_analyzer - Analyze MySQL database performance + step_3 + mysql_analyzer + Analyze MySQL database performance - # Tool input parameters + + ## Tool input parameters { "host": "192.0.0.1", "port": 3306, "username": "root", "password": "password" } - # Tool execution error + + ## Tool execution error When executing the MySQL performance analysis command, an error occurred: `host is not correct`. - # Output + ## Output ```json { "is_param_error": true } ``` - # User goal + + # Now start judging whether the tool execution failure is due to parameter errors: + ## User goal {{goal}} - # History + + ## History {{history}} - # Current step + + ## Current step - {{step_id}} - {{step_name}} - {{step_instruction}} + {{step_id}} + {{step_name}} + {{step_instruction}} - # Tool input parameters + + ## Tool input parameters {{input_param}} - # Tool error + + ## Tool error {{error_message}} - # Output + + ## Output """, ), } @@ -556,11 +585,12 @@ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION: dict[LanguageType, str] = { 3. 描述应该避免使用过于专业的术语,以便用户能够理解。 4. 描述应该尽量简短,控制在50字以内。 5. 只输出自然语言描述,不要输出其他内容。 + # 样例 - # 工具信息 + ## 工具信息 - port_scanner - 扫描主机端口 + port_scanner + 扫描主机端口 { "type": "object", @@ -586,31 +616,38 @@ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION: dict[LanguageType, str] = { } - # 工具入参 + + ## 工具入参 { "host": "192.0.0.1", "port": 3306, "username": "root", "password": "password" } - # 报错信息 + + ## 报错信息 执行端口扫描命令时,出现了错误:`password is not correct`。 - # 输出 + + ## 输出 扫描端口时发生错误:密码不正确。请检查输入的密码是否正确,并重试。 + # 现在开始转换报错信息: - # 工具信息 + ## 工具信息 - {{tool_name}} - {{tool_description}} + {{tool_name}} + {{tool_description}} - {{input_schema}} + {{input_schema}} + # 工具入参 {{input_params}} + # 报错信息 {{error_message}} - # 输出 + + ## 输出 """, ), LanguageType.ENGLISH: dedent( @@ -624,11 +661,12 @@ and clear, allowing users to understand the cause and impact of the error. 3. The description should avoid using overly technical terms so that users can understand it. 4. The description should be as brief as possible, within 50 words. 5. Only output the natural language description, do not output other content. + # Example - # Tool Information + ## Tool Information - port_scanner - Scan host ports + port_scanner + Scan host ports { "type": "object", @@ -654,32 +692,39 @@ and clear, allowing users to understand the cause and impact of the error. } - # Tool input + + ## Tool input { "host": "192.0.0.1", "port": 3306, "username": "root", "password": "password" } - # Error message + + ## Error message An error occurred while executing the port scan command: `password is not correct`. - # Output + + ## Output An error occurred while scanning the port: The password is incorrect. Please check that the password \ you entered is correct and try again. + # Now start converting the error message: - # Tool information + ## Tool information - {{tool_name}} - {{tool_description}} + {{tool_name}} + {{tool_description}} - {{input_schema}} + {{input_schema}} - # Tool input parameters + + ## Tool input parameters {{input_params}} - # Error message + + ## Error message {{error_message}} - # Output + + ## Output """, ), } @@ -698,20 +743,24 @@ GET_MISSING_PARAMS: dict[LanguageType, str] = { "password": "请补充密码" } ``` + # 样例 - # 工具名称 - mysql_analyzer - # 工具描述 - 分析MySQL数据库性能 - # 工具入参 + ## 工具 + + mysql_analyzer + 分析MySQL数据库性能 + + + ## 工具入参 { "host": "192.0.0.1", "port": 3306, "username": "root", "password": "password" } - # 工具入参schema - { + + ## 工具入参schema + { "type": "object", "properties": { "host": { @@ -745,9 +794,11 @@ GET_MISSING_PARAMS: dict[LanguageType, str] = { }, "required": ["host", "port", "username", "password"] } - # 运行报错 + + #$ 运行报错 {"err_msg": "执行端口扫描命令时,出现了错误:`password is not correct`。", "data": {} } - # 输出 + + ## 输出 ```json { "host": "192.0.0.1", @@ -756,18 +807,24 @@ GET_MISSING_PARAMS: dict[LanguageType, str] = { "password": null } ``` - # 工具 + + # 现在开始获取缺失的参数: + ## 工具 - {{tool_name}} - {{tool_description}} + {{tool_name}} + {{tool_description}} - # 工具入参 + + ## 工具入参 {{input_param}} - # 工具入参schema(部分字段允许为null) + + ## 工具入参schema(部分字段允许为null) {{input_schema}} - # 运行报错 + + ## 运行报错 {{error_message}} - # 输出 + + ## 输出 """, ), LanguageType.ENGLISH: dedent( @@ -783,19 +840,26 @@ input parameters, input parameter schema, and runtime errors, and output a JSON- "password": "Please provide the password" } ``` + # Example - # Tool Name - mysql_analyzer - # Tool Description - Analyze MySQL database performance - # Tool Input Parameters + ## Tool + + mysql_analyzer + Analyze MySQL database performance + + + ## Tool Input Parameters + ```json { "host": "192.0.0.1", "port": 3306, "username": "root", "password": "password" } - # Tool Input Parameter Schema + ``` + + ## Tool Input Parameter Schema + ```json { "type": "object", "properties": { @@ -830,10 +894,12 @@ input parameters, input parameter schema, and runtime errors, and output a JSON- }, "required": ["host", "port", "username", "password"] } - # Error info + + ## Error info {"err_msg": "When executing the port scan command, an error occurred: `password is not correct`.", \ "data": {} } - # Output + + ## Output ```json { "host": "192.0.0.1", @@ -842,18 +908,24 @@ input parameters, input parameter schema, and runtime errors, and output a JSON- "password": null } ``` - # Tool + + # Now start getting the missing parameters: + ## Tool - {{tool_name}} - {{tool_description}} + {{tool_name}} + {{tool_description}} - # Tool input parameters + + ## Tool input parameters {{input_param}} - # Tool input parameter schema (some fields can be null) + + ## Tool input parameter schema (some fields can be null) {{input_schema}} - # Error info + + ## Error info {{error_message}} - # Output + + ## Output """, ), } @@ -869,17 +941,20 @@ GEN_PARAMS: dict[LanguageType, str] = { 3.生成的参数必须符合阶段性目标。 # 样例 - # 工具信息 + ## 工具信息 - < name > mysql_analyzer < /name > - < description > 分析MySQL数据库性能 < /description > - < / tool > - # 总目标 + mysql_analyzer + 分析MySQL数据库性能 + + + ## 总目标 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优,ip地址是192.168.1.1,端口是3306,用户名是root,\ 密码是password。 - # 当前阶段目标 + + ## 当前阶段目标 我要连接MySQL数据库,分析性能瓶颈,并调优。 - # 工具入参的schema + + ## 工具入参的schema { "type": "object", "properties": { @@ -902,7 +977,8 @@ GEN_PARAMS: dict[LanguageType, str] = { }, "required": ["host", "port", "username", "password"] } - # 背景信息 + + ## 背景信息 第1步:生成端口扫描命令 - 调用工具 `command_generator`,并提供参数 `帮我生成一个mysql端口扫描命令` - 执行状态:成功 @@ -911,7 +987,8 @@ GEN_PARAMS: dict[LanguageType, str] = { - 调用工具 `command_executor`,并提供参数 `{"command": "nmap -sS -p--open 192.168.1.1"}` - 执行状态:成功 - 得到数据:`{"result": "success"}` - # 输出 + + ## 输出 ```json { "host": "192.168.1.1", @@ -920,19 +997,26 @@ GEN_PARAMS: dict[LanguageType, str] = { "password": "password" } ``` - # 工具 - < tool > - < name > {{tool_name}} < /name > - < description > {{tool_description}} < /description > - < / tool > + + # 现在开始生成工具入参: + ## 工具 + + {{tool_name}} + {{tool_description}} + + # 总目标 {{goal}} + # 当前阶段目标 {{current_goal}} + # 工具入参scheme {{input_schema}} + # 背景信息 {{background_info}} + # 输出 """, ).strip("\n"), @@ -948,17 +1032,20 @@ generate tool input parameters. 3. The generated parameters must conform to the phased goals. # Example - # Tool Information - < tool > - < name >mysql_analyzer < /name > - < description > Analyze MySQL Database Performance < /description > - < / tool > - # Overall Goal + ## Tool Information + + mysql_analyzer + Analyze MySQL Database Performance + + + ## Overall Goal I need to scan the current MySQL database, analyze performance bottlenecks, and optimize it. The IP \ address is 192.168.1.1, the port is 3306, the username is root, and the password is password. - # Current Phase Goal + + ## Current Phase Goal I need to connect to the MySQL database, analyze performance bottlenecks, and optimize it. - # Tool input schema + + ## Tool input schema { "type": "object", "properties": { @@ -981,7 +1068,8 @@ address is 192.168.1.1, the port is 3306, the username is root, and the password }, "required": ["host", "port", "username", "password"] } - # Background information + + ## Background information Step 1: Generate a port scan command - Call the `command_generator` tool and provide the `Help me generate a MySQL port scan \ command` parameter @@ -993,7 +1081,8 @@ command` parameter 192.168.1.1"}` - Execution status: Success - Received data: `{"result": "success"}` - # Output + + ## Output ```json { "host": "192.168.1.1", @@ -1002,20 +1091,27 @@ command` parameter "password": "password" } ``` - # Tool - < tool > - < name > {{tool_name}} < /name > - < description > {{tool_description}} < /description > - < / tool > - # Overall goal + + # Now start generating tool input parameters: + ## Tool + + {{tool_name}} + {{tool_description}} + + + ## Overall goal {{goal}} - # Current stage goal + + ## Current stage goal {{current_goal}} - # Tool input scheme + + ## Tool input scheme {{input_schema}} - # Background information + + ## Background information {{background_info}} - # Output + + ## Output """, ), } @@ -1030,16 +1126,19 @@ REPAIR_PARAMS: dict[LanguageType, str] = { 1.最终修复的参数要符合目标和工具入参的schema。 # 样例 - # 工具信息 + ## 工具信息 - mysql_analyzer - 分析MySQL数据库性能 + mysql_analyzer + 分析MySQL数据库性能 - # 总目标 + + ## 总目标 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 - # 当前阶段目标 + + ## 当前阶段目标 我要连接MySQL数据库,分析性能瓶颈,并调优。 - # 工具入参的schema + + ## 工具入参的schema { "type": "object", "properties": { @@ -1062,23 +1161,28 @@ REPAIR_PARAMS: dict[LanguageType, str] = { }, "required": ["host", "port", "username", "password"] } - # 工具当前的入参 + + ## 工具当前的入参 { "host": "192.0.0.1", "port": 3306, "username": "root", "password": "password" } - # 工具的报错 + + ## 工具的报错 执行端口扫描命令时,出现了错误:`password is not correct`。 - # 补充的参数 + + ## 补充的参数 { "username": "admin", "password": "admin123" } - # 补充的参数描述 + + ## 补充的参数描述 用户希望使用admin用户和admin123密码来连接MySQL数据库。 - # 输出 + + ## 输出 ```json { "host": "192.0.0.1", @@ -1087,26 +1191,36 @@ REPAIR_PARAMS: dict[LanguageType, str] = { "password": "admin123" } ``` - # 工具 + + # 现在开始修复工具入参: + ## 工具 - {{tool_name}} - {{tool_description}} + {{tool_name}} + {{tool_description}} - # 总目标 + + ## 总目标 {{goal}} - # 当前阶段目标 + + ## 当前阶段目标 {{current_goal}} - # 工具入参scheme + + ## 工具入参Schema {{input_schema}} - # 工具入参 + + ## 工具入参 {{input_param}} - # 运行报错 + + ## 运行报错 {{error_message}} - # 补充的参数 + + ## 补充的参数 {{params}} - # 补充的参数描述 + + ## 补充的参数描述 {{params_description}} - # 输出 + + ## 输出 """, ), LanguageType.ENGLISH: dedent( @@ -1117,12 +1231,13 @@ parameter schema, tool current input parameters, tool error, supplemented parame parameter descriptions. # Example - # Tool information + ## Tool information - mysql_analyzer - Analyze MySQL database performance + mysql_analyzer + Analyze MySQL database performance - # Tool input parameter schema + + ## Tool input parameter schema { "type": "object", "properties": { @@ -1145,23 +1260,28 @@ parameter descriptions. }, "required": ["host", "port", "username", "password"] } - # Current tool input parameters + + ## Current tool input parameters { "host": "192.0.0.1", "port": 3306, "username": "root", "password": "password" } - # Tool error + + ## Tool error When executing the port scan command, an error occurred: `password is not correct`. - # Supplementary parameters + + ## Supplementary parameters { "username": "admin", "password": "admin123" } - # Supplementary parameter description + + ## Supplementary parameter description The user wants to use the admin user and the admin123 password to connect to the MySQL database. - # Output + + ## Output ```json { "host": "192.0.0.1", @@ -1170,22 +1290,30 @@ parameter descriptions. "password": "admin123" } ``` - # Tool + + # Now start fixing tool input parameters: + ## Tool {{tool_name}} {{tool_description}} - # Tool input schema + + ## Tool input schema {{input_schema}} - # Tool input parameters + + ## Tool input parameters {{input_param}} - # Runtime error + + ## Runtime error {{error_message}} - # Supplementary parameters + + ## Supplementary parameters {{params}} - # Supplementary parameter descriptions + + ## Supplementary parameter descriptions {{params_description}} - # Output + + ## Output """, ), } @@ -1196,17 +1324,14 @@ FINAL_ANSWER: dict[LanguageType, str] = { 综合理解计划执行结果和背景信息,向用户报告目标的完成情况。 # 用户目标 - {{goal}} # 计划执行情况 - 为了完成上述目标,你实施了以下计划: {{memory}} # 其他背景信息: - {{status}} # 现在,请根据以上信息,向用户报告目标的完成情况: @@ -1219,17 +1344,14 @@ FINAL_ANSWER: dict[LanguageType, str] = { completion status to the user. # User Goal - {{goal}} # Plan Execution Status - To achieve the above goal, you implemented the following plan: {{memory}} # Additional Background Information: - {{status}} # Now, based on the above information, report the goal completion status to the user: diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index e41363e917102d582db62d0fefe32cb4e7b39ece..355edcef252f3b51ed38c7a5f351d7a7c9b96935 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -130,6 +130,11 @@ class Pool: await MCPLoader.init() + async def set_vector(self) -> None: + """向数据库中写入向量化数据""" + await CallLoader().set_vector() + + async def get_flow_metadata(self, app_id: uuid.UUID) -> list[FlowInfo]: """从数据库中获取特定App的全部Flow的元数据""" async with postgres.session() as session: diff --git a/apps/scheduler/scheduler/__init__.py b/apps/scheduler/scheduler/__init__.py index a2f9eed40bd90e3bf9a3fbd6d1458dcb3c834978..7785a9cb47d73e5780e4f8616a86df6cafe09c95 100644 --- a/apps/scheduler/scheduler/__init__.py +++ b/apps/scheduler/scheduler/__init__.py @@ -1,6 +1,6 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """调度器模块""" -from apps.scheduler.scheduler.scheduler import Scheduler +from .scheduler import Scheduler __all__ = ["Scheduler"] diff --git a/apps/scheduler/scheduler/context.py b/apps/scheduler/scheduler/context.py index 986bbc44f89f873617cd3786e0d8957755c81911..653b4beabead7a8466f115a612b2a47f2990f000 100644 --- a/apps/scheduler/scheduler/context.py +++ b/apps/scheduler/scheduler/context.py @@ -7,12 +7,10 @@ from datetime import UTC, datetime from apps.common.security import Security from apps.models.record import Record, RecordMetadata -from apps.scheduler.scheduler import Scheduler from apps.schemas.enum_var import StepStatus from apps.schemas.record import ( FlowHistory, RecordContent, - RecordGroupDocument, ) from apps.services.appcenter import AppCenterManager from apps.services.document import DocumentManager diff --git a/apps/scheduler/scheduler/flow.py b/apps/scheduler/scheduler/flow.py deleted file mode 100644 index 62041c1d7375ffbf8479bd0b8466ef1af9b772ea..0000000000000000000000000000000000000000 --- a/apps/scheduler/scheduler/flow.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""Scheduler中,关于Flow的逻辑""" - -import logging -import uuid - -from apps.llm.patterns import Select -from apps.scheduler.pool.pool import Pool -from apps.schemas.request_data import RequestDataApp - -logger = logging.getLogger(__name__) - - -class FlowChooser: - """Flow选择器""" - - def __init__(self, task_id: uuid.UUID, question: str, user_selected: RequestDataApp | None = None) -> None: - """初始化Flow选择器""" - self.task_id = task_id - self._question = question - self._user_selected = user_selected - - - async def get_top_flow(self) -> str: - """获取Top1 Flow""" - # 获取所选应用的所有Flow - if not self._user_selected or not self._user_selected.app_id: - return "KnowledgeBase" - - flow_list = await Pool().get_flow_metadata(self._user_selected.app_id) - if not flow_list: - return "KnowledgeBase" - - logger.info("[FlowChooser] 选择任务 %s 最合适的Flow", self.task_id) - choices = [{ - "name": flow.id, - "description": f"{flow.name}, {flow.description}", - } for flow in flow_list] - select_obj = Select() - return await select_obj.generate(question=self._question, choices=choices) diff --git a/apps/scheduler/scheduler/prompt.py b/apps/scheduler/scheduler/prompt.py index 73551d7a3f2d7bda25fcd08e2dcaa930b41752db..f95c273a4b896814a28bf4f8963ab9a3536b18ec 100644 --- a/apps/scheduler/scheduler/prompt.py +++ b/apps/scheduler/scheduler/prompt.py @@ -41,11 +41,11 @@ FLOW_SELECT: dict[LanguageType, str] = { - {question} + {{question}} - {choice_list} + {{choice_list}} @@ -94,11 +94,11 @@ The best choice seems to be "API: request a specific API, get the returned JSON - {question} + {{question}} - {choice_list} + {{choice_list}} diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 6bd3eff6dc00ad83a666193f82c491816f678306..bf1b3185da7d7474176b116425dfd1e436944991 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -6,24 +6,25 @@ import logging import uuid from datetime import UTC, datetime +from jinja2 import BaseLoader +from jinja2.sandbox import SandboxedEnvironment + from apps.common.queue import MessageQueue from apps.llm.embedding import Embedding -from apps.llm.function import FunctionLLM +from apps.llm.function import FunctionLLM, JsonGenerator from apps.llm.reasoning import ReasoningLLM from apps.models.task import Task, TaskRuntime from apps.models.user import User -from apps.scheduler.executor.agent import MCPAgentExecutor -from apps.scheduler.executor.flow import FlowExecutor +from apps.scheduler.executor import FlowExecutor, MCPAgentExecutor, QAExecutor from apps.scheduler.pool.pool import Pool -from apps.scheduler.scheduler.flow import FlowChooser -from apps.schemas.enum_var import AppType, EventType, ExecutorStatus +from apps.schemas.enum_var import AppType, EventType, ExecutorStatus, LanguageType from apps.schemas.message import ( InitContent, InitContentFeature, ) from apps.schemas.rag_data import RAGQueryReq from apps.schemas.request_data import RequestData -from apps.schemas.scheduler import ExecutorBackground, LLMConfig +from apps.schemas.scheduler import ExecutorBackground, LLMConfig, TopFlow from apps.schemas.task import TaskData from apps.services.activity import Activity from apps.services.appcenter import AppCenterManager @@ -32,6 +33,8 @@ from apps.services.llm import LLMManager from apps.services.task import TaskManager from apps.services.user import UserManager +from .prompt import FLOW_SELECT + logger = logging.getLogger(__name__) @@ -85,8 +88,19 @@ class Scheduler: ) self.task = task + # Jinja2 + self._env = SandboxedEnvironment( + loader=BaseLoader(), + autoescape=False, + trim_blocks=True, + lstrip_blocks=True, + ) + + # LLM + await self._get_scheduler_llm(post_body.llm_id) + - async def push_init_message( + async def _push_init_message( self, context_num: int, *, is_flow: bool = False, ) -> None: """推送初始化消息""" @@ -112,7 +126,8 @@ class Scheduler: # 推送初始化消息 await self.queue.push_output( - task=self.task, + self.task, + self.llm, event_type=EventType.INIT.value, data=InitContent(feature=feature, createdAt=created_at).model_dump(exclude_none=True, by_alias=True), ) @@ -140,7 +155,7 @@ class Scheduler: kill_event.set() - async def get_scheduler_llm(self, reasoning_llm_id: str) -> LLMConfig: + async def _get_scheduler_llm(self, reasoning_llm_id: str) -> LLMConfig: """获取RAG大模型""" # 获取当前会话使用的大模型 reasoning_llm = await LLMManager.get_llm(reasoning_llm_id) @@ -184,6 +199,49 @@ class Scheduler: ) + async def get_top_flow(self) -> str: + """获取Top1 Flow""" + if not self.llm.function: + err = "[Scheduler] 未设置Function模型" + logger.error(err) + raise RuntimeError(err) + + # 获取所选应用的所有Flow + if not self.post_body.app or not self.post_body.app.app_id: + err = "[Scheduler] 未选择应用" + logger.error(err) + raise RuntimeError(err) + + flow_list = await Pool().get_flow_metadata(self.post_body.app.app_id) + if not flow_list: + err = "[Scheduler] 未找到应用中合法的Flow" + logger.error(err) + raise RuntimeError(err) + + logger.info("[Scheduler] 选择应用 %s 最合适的Flow", self.post_body.app.app_id) + choices = [{ + "name": flow.id, + "description": f"{flow.name}, {flow.description}", + } for flow in flow_list] + + # 根据用户语言选择模板 + template = self._env.from_string(FLOW_SELECT[self.task.runtime.language]) + # 渲染模板 + prompt = template.render( + template, + question=self.post_body.question, + choice_list=choices, + ) + schema = TopFlow.model_json_schema() + schema["properties"]["choice"]["enum"] = [choice["name"] for choice in choices] + result_str = await JsonGenerator(self.llm.function, self.post_body.question, [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt}, + ], schema).generate() + result = TopFlow.model_validate(result_str) + return result.choice + + async def run(self) -> None: """运行调度器""" # 如果是智能问答,直接执行 @@ -198,14 +256,14 @@ class Scheduler: if self.task.state.appId: rag_method = False + if rag_method: - llm = await self.get_scheduler_llm() - kb_ids = await KnowledgeBaseManager.get_selected_kb(self.task.ids.user_sub) - self.task = await push_init_message(self.task, self.queue, 3, is_flow=False) + kb_ids = await KnowledgeBaseManager.get_selected_kb(self.task.metadata.userSub) + await self._push_init_message(3, is_flow=False) rag_data = RAGQueryReq( kbIds=kb_ids, query=self.post_body.question, - tokensLimit=llm.max_tokens, + tokensLimit=self.llm.reasoning.config.maxToken, ) # 启动监控任务和主任务 main_task = asyncio.create_task(push_rag_message( @@ -219,7 +277,6 @@ class Scheduler: return # 获取上下文 - context, facts = await get_context(self.task.ids.user_sub, self.post_body, app_data.history_len) if app_data.app_type == AppType.FLOW: # 需要执行Flow is_flow = True @@ -228,13 +285,8 @@ class Scheduler: is_flow = False # 需要执行Flow self.task = await push_init_message(self.task, self.queue, app_data.history_len, is_flow=is_flow) - # 组装上下文 - executor_background = ExecutorBackground( - conversation=context, - facts=facts, - ) # 启动监控任务和主任务 - main_task = asyncio.create_task(self.run_executor(self.queue, self.post_body, executor_background)) + main_task = asyncio.create_task(self._run_agent(self.queue, self.post_body, executor_background)) # 等待任一任务完成 done, pending = await asyncio.wait( [main_task, monitor], @@ -261,79 +313,81 @@ class Scheduler: return - async def run_executor( - self, queue: MessageQueue, post_body: RequestData, background: ExecutorBackground, - ) -> None: - """构造Executor并执行""" - # 获取agent信息 - app_collection = MongoDB().get_collection("app") - app_metadata = AppPool.model_validate(await app_collection.find_one({"_id": app_info.app_id})) - if not app_metadata: - logger.error("[Scheduler] 未找到Agent应用") - return - if app_metadata.app_type == AppType.FLOW.value: - logger.info("[Scheduler] 获取工作流元数据") - flow_info = await Pool().get_flow_metadata(app_info.app_id) + async def _run_qa(self) -> None: + pass - # 如果flow_info为空,则直接返回 - if not flow_info: - logger.error("[Scheduler] 未找到工作流元数据") - return - # 如果用户选了特定的Flow - if app_info.flow_id: - logger.info("[Scheduler] 获取工作流定义") - flow_id = app_info.flow_id - flow_data = await Pool().get_flow(app_info.app_id, flow_id) - else: - # 如果用户没有选特定的Flow,则根据语义选择一个Flow - logger.info("[Scheduler] 选择最合适的流") - flow_chooser = FlowChooser(self.task, post_body.question, app_info) - flow_id = await flow_chooser.get_top_flow() - self.task = flow_chooser.task - logger.info("[Scheduler] 获取工作流定义") - flow_data = await Pool().get_flow(app_info.app_id, flow_id) - - # 如果flow_data为空,则直接返回 - if not flow_data: - logger.error("[Scheduler] 未找到工作流定义") - return + async def _run_flow(self) -> None: + logger.info("[Scheduler] 获取工作流元数据") + flow_info = await Pool().get_flow_metadata(app_info.app_id) - # 初始化Executor - logger.info("[Scheduler] 初始化Executor") - - flow_exec = FlowExecutor( - flow_id=flow_id, - flow=flow_data, - task=self.task, - msg_queue=queue, - question=post_body.question, - post_body_app=app_info, - background=background, - ) + # 如果flow_info为空,则直接返回 + if not flow_info: + logger.error("[Scheduler] 未找到工作流元数据") + return - # 开始运行 - logger.info("[Scheduler] 运行Executor") - await flow_exec.init() - await flow_exec.run() - self.task = flow_exec.task - elif app_metadata.app_type == AppType.AGENT.value: - # 初始化Executor - agent_exec = MCPAgentExecutor( - task=self.task, - msg_queue=queue, - question=post_body.question, - history_len=app_metadata.history_len, - background=background, - agent_id=app_info.app_id, - params=post_body.params, - ) - # 开始运行 - logger.info("[Scheduler] 运行Executor") - await agent_exec.run() - self.task = agent_exec.task + # 如果用户选了特定的Flow + if app_info.flow_id: + logger.info("[Scheduler] 获取工作流定义") + flow_id = app_info.flow_id + flow_data = await Pool().get_flow(app_info.app_id, flow_id) else: - logger.error("[Scheduler] 无效的应用类型") + # 如果用户没有选特定的Flow,则根据语义选择一个Flow + logger.info("[Scheduler] 选择最合适的流") + flow_chooser = FlowChooser(self.task, post_body.question, app_info) + flow_id = await flow_chooser.get_top_flow() + self.task = flow_chooser.task + logger.info("[Scheduler] 获取工作流定义") + flow_data = await Pool().get_flow(app_info.app_id, flow_id) + + # 如果flow_data为空,则直接返回 + if not flow_data: + logger.error("[Scheduler] 未找到工作流定义") + return - return + # 初始化Executor + logger.info("[Scheduler] 初始化Executor") + + flow_exec = FlowExecutor( + flow_id=flow_id, + flow=flow_data, + task=self.task, + msg_queue=queue, + question=post_body.question, + post_body_app=app_info, + background=background, + llm=self.llm, + ) + + # 开始运行 + logger.info("[Scheduler] 运行Executor") + await flow_exec.init() + await flow_exec.run() + self.task = flow_exec.task + + + async def _run_agent( + self, queue: MessageQueue, post_body: RequestData, background: ExecutorBackground, + ) -> None: + """构造Executor并执行""" + # 初始化Executor + agent_exec = MCPAgentExecutor( + task=self.task, + msg_queue=queue, + question=post_body.question, + history_len=app_metadata.history_len, + background=background, + agent_id=app_info.app_id, + params=post_body.params, + llm=self.llm, + ) + # 开始运行 + logger.info("[Scheduler] 运行Executor") + await agent_exec.run() + self.task = agent_exec.task + + + async def _save_task(self) -> None: + """保存Task""" + await TaskManager.save_task(self.task) diff --git a/apps/scheduler/slot/slot.py b/apps/scheduler/slot/slot.py index c14422238af8efa4aca619d6bce82f9c4bc8e1c3..565261dce7744bb362fad06112d201c3e264e9f4 100644 --- a/apps/scheduler/slot/slot.py +++ b/apps/scheduler/slot/slot.py @@ -281,45 +281,49 @@ class Slot: """从JSON Schema中提取类型描述""" return self._extract_type_desc(self._schema) - def get_params_node_from_schema(self, root: str = "") -> ParamsNode: + def _extract_params_node_recursive( # noqa: C901 + self, schema_node: dict[str, Any], name: str = "", path: str = "", + ) -> ParamsNode | None: + """递归提取ParamsNode""" + if "type" not in schema_node: + return None + + param_type = schema_node["type"] + if isinstance(param_type, list): + return None # 不支持多类型 + if param_type == "object": + param_type = Type.DICT + elif param_type == "array": + param_type = Type.LIST + elif param_type == "string": + param_type = Type.STRING + elif param_type in ["number", "integer"]: + param_type = Type.NUMBER + elif param_type == "boolean": + param_type = Type.BOOL + else: + err = f"[Slot] 不支持的参数类型: {param_type}" + logger.warning(err) + return None + sub_params = [] + + if param_type == Type.DICT and "properties" in schema_node: + for key, value in schema_node["properties"].items(): + sub_param = self._extract_params_node_recursive(value, name=key, path=f"{path}/{key}") + if sub_param: + sub_params.append(sub_param) + else: + # 对于非对象类型,直接返回空子参数 + sub_params = None + return ParamsNode(paramName=name, + paramPath=path, + paramType=param_type, + subParams=sub_params) + + def get_params_node_from_schema(self, root: str = "") -> ParamsNode | None: """从JSON Schema中提取ParamsNode""" - def _extract_params_node(schema_node: dict[str, Any], name: str = "", path: str = "") -> ParamsNode: - """递归提取ParamsNode""" - if "type" not in schema_node: - return None - - param_type = schema_node["type"] - if isinstance(param_type, list): - return None # 不支持多类型 - if param_type == "object": - param_type = Type.DICT - elif param_type == "array": - param_type = Type.LIST - elif param_type == "string": - param_type = Type.STRING - elif param_type in ["number", "integer"]: - param_type = Type.NUMBER - elif param_type == "boolean": - param_type = Type.BOOL - else: - logger.warning(f"[Slot] 不支持的参数类型: {param_type}") - return None - sub_params = [] - - if param_type == Type.DICT and "properties" in schema_node: - for key, value in schema_node["properties"].items(): - sub_param = _extract_params_node(value, name=key, path=f"{path}/{key}") - if sub_param: - sub_params.append(sub_param) - else: - # 对于非对象类型,直接返回空子参数 - sub_params = None - return ParamsNode(paramName=name, - paramPath=path, - paramType=param_type, - subParams=sub_params) try: - return _extract_params_node(self._schema, name=root, path=root) + return self._extract_params_node_recursive(self._schema, name=root, path=root) except Exception: logger.exception("[Slot] 提取ParamsNode失败") return None @@ -477,46 +481,47 @@ class Slot: def add_null_to_basic_types(self) -> dict[str, Any]: """递归地为 JSON Schema 中的基础类型(bool、number等)添加 null 选项""" - def add_null_to_basic_types(schema: dict[str, Any]) -> dict[str, Any]: - """ - 递归地为 JSON Schema 中的基础类型(bool、number等)添加 null 选项 - - :param schema: 原始 JSON Schema - :return: 修改后的 JSON Schema - """ - # 如果不是字典类型(schema),直接返回 - if not isinstance(schema, dict): - return schema - - # 处理当前节点的 type 字段 - if "type" in schema: - # 处理单一类型字符串 - if isinstance(schema["type"], str): - if schema["type"] in ["boolean", "number", "string", "integer"]: - schema["type"] = [schema["type"], "null"] - - # 处理类型数组 - elif isinstance(schema["type"], list): - for i, t in enumerate(schema["type"]): - if isinstance(t, str) and t in ["boolean", "number", "string", "integer"]: - if "null" not in schema["type"]: - schema["type"].append("null") - break - - # 递归处理 properties 字段(对象类型) - if "properties" in schema: - for prop, prop_schema in schema["properties"].items(): - schema["properties"][prop] = add_null_to_basic_types(prop_schema) - - # 递归处理 items 字段(数组类型) - if "items" in schema: - schema["items"] = add_null_to_basic_types(schema["items"]) - - # 递归处理 anyOf, oneOf, allOf 字段 - for keyword in ["anyOf", "oneOf", "allOf"]: - if keyword in schema: - schema[keyword] = [add_null_to_basic_types(sub_schema) for sub_schema in schema[keyword]] - - return schema schema_copy = copy.deepcopy(self._schema) - return add_null_to_basic_types(schema_copy) + return add_null_to_basic_types_func(schema_copy) + +def add_null_to_basic_types_func(schema: dict[str, Any]) -> dict[str, Any]: # noqa: C901, PLR0912 + """ + 递归地为 JSON Schema 中的基础类型(bool、number等)添加 null 选项 + + :param schema: 原始 JSON Schema + :return: 修改后的 JSON Schema + """ + # 如果不是字典类型(schema),直接返回 + if not isinstance(schema, dict): + return schema + + # 处理当前节点的 type 字段 + if "type" in schema: + # 处理单一类型字符串 + if isinstance(schema["type"], str): + if schema["type"] in ["boolean", "number", "string", "integer"]: + schema["type"] = [schema["type"], "null"] + + # 处理类型数组 + elif isinstance(schema["type"], list): + for t in schema["type"]: + if isinstance(t, str) and t in ["boolean", "number", "string", "integer"]: + if "null" not in schema["type"]: + schema["type"].append("null") + break + + # 递归处理 properties 字段(对象类型) + if "properties" in schema: + for prop, prop_schema in schema["properties"].items(): + schema["properties"][prop] = add_null_to_basic_types_func(prop_schema) + + # 递归处理 items 字段(数组类型) + if "items" in schema: + schema["items"] = add_null_to_basic_types_func(schema["items"]) + + # 递归处理 anyOf, oneOf, allOf 字段 + for keyword in ["anyOf", "oneOf", "allOf"]: + if keyword in schema: + schema[keyword] = [add_null_to_basic_types_func(sub_schema) for sub_schema in schema[keyword]] + + return schema diff --git a/apps/schemas/appcenter.py b/apps/schemas/appcenter.py index bdda4481bec17d9fd0b4aa47bd868c66285086bf..deb8c20fa14c557455d147a3bd8df9baf2cc96fc 100644 --- a/apps/schemas/appcenter.py +++ b/apps/schemas/appcenter.py @@ -67,3 +67,15 @@ class AppData(BaseModel): default_factory=lambda: AppPermissionData(authorizedUsers=None), description="权限配置") workflows: list[AppFlowInfo] = Field(default=[], description="工作流信息列表") mcp_service: list[str] = Field(default=[], alias="mcpService", description="MCP服务id列表") + + +class CreateAppRequest(AppData): + """POST /api/app 请求数据结构""" + + app_id: str | None = Field(None, alias="appId", description="应用ID") + + +class ChangeFavouriteAppRequest(BaseModel): + """PUT /api/app/{appId} 请求数据结构""" + + favorited: bool = Field(..., description="是否收藏") diff --git a/apps/schemas/blacklist.py b/apps/schemas/blacklist.py new file mode 100644 index 0000000000000000000000000000000000000000..ec8a8e061517c5ba4792f21758d55a973387794a --- /dev/null +++ b/apps/schemas/blacklist.py @@ -0,0 +1,64 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""黑名单相关数据结构""" + +import uuid + +from pydantic import BaseModel + +from apps.models.blacklist import Blacklist +from apps.schemas.response_data import ResponseData + + +class QuestionBlacklistRequest(BaseModel): + """POST /api/blacklist/question 请求数据结构""" + + id: str + question: str + answer: str + is_deletion: int + + +class UserBlacklistRequest(BaseModel): + """POST /api/blacklist/user 请求数据结构""" + + user_sub: str + is_ban: int + + +class AbuseRequest(BaseModel): + """POST /api/blacklist/complaint 请求数据结构""" + + record_id: uuid.UUID + reason: str + reason_type: str + + +class AbuseProcessRequest(BaseModel): + """POST /api/blacklist/abuse 请求数据结构""" + + id: uuid.UUID + is_deletion: int + + +class GetBlacklistUserMsg(BaseModel): + """GET /api/blacklist/user Result数据结构""" + + user_subs: list[str] + + +class GetBlacklistUserRsp(ResponseData): + """GET /api/blacklist/user 返回数据结构""" + + result: GetBlacklistUserMsg + + +class GetBlacklistQuestionMsg(BaseModel): + """GET /api/blacklist/question Result数据结构""" + + question_list: list[Blacklist] + + +class GetBlacklistQuestionRsp(ResponseData): + """GET /api/blacklist/question 返回数据结构""" + + result: GetBlacklistQuestionMsg diff --git a/apps/schemas/comment.py b/apps/schemas/comment.py new file mode 100644 index 0000000000000000000000000000000000000000..f3668a87af013efb4e8991780640f256acb7fadf --- /dev/null +++ b/apps/schemas/comment.py @@ -0,0 +1,14 @@ +from apps.schemas.enum_var import CommentType + + +from pydantic import BaseModel, Field + + +class AddCommentData(BaseModel): + """添加评论""" + + record_id: str + comment: CommentType + dislike_reason: str = Field(default="", max_length=200) + reason_link: str = Field(default="", max_length=200) + reason_description: str = Field(default="", max_length=500) \ No newline at end of file diff --git a/apps/schemas/config.py b/apps/schemas/config.py index d81900a9025a05845e28cf936c42c404d6d60c70..dba8fab9c8a43b6a0b551d039c28e1a1c6c5ab29 100644 --- a/apps/schemas/config.py +++ b/apps/schemas/config.py @@ -38,15 +38,6 @@ class LoginConfig(BaseModel): settings: OIDCConfig | FixedUserConfig = Field(description="OIDC 配置") -class EmbeddingConfig(BaseModel): - """Embedding配置""" - - type: str = Field(description="Embedding接口类型", default="openai") - endpoint: str = Field(description="Embedding模型地址") - api_key: str = Field(description="Embedding模型API Key") - model: str = Field(description="Embedding模型名称") - - class RAGConfig(BaseModel): """RAG配置""" @@ -68,16 +59,6 @@ class MinioConfig(BaseModel): secure: bool = Field(description="MinIO是否启用SSL", default=False) -class MongoDBConfig(BaseModel): - """MongoDB配置""" - - host: str = Field(description="MongoDB主机名") - port: int = Field(description="MongoDB端口号", default=27017) - user: str = Field(description="MongoDB用户名") - password: str = Field(description="MongoDB密码") - database: str = Field(description="MongoDB数据库名") - - class PostgresConfig(BaseModel): """Postgres配置""" @@ -115,11 +96,9 @@ class ConfigModel(BaseModel): deploy: DeployConfig login: LoginConfig - embedding: EmbeddingConfig rag: RAGConfig fastapi: FastAPIConfig minio: MinioConfig - mongodb: MongoDBConfig postgres: PostgresConfig security: SecurityConfig check: CheckConfig diff --git a/apps/schemas/conversation.py b/apps/schemas/conversation.py new file mode 100644 index 0000000000000000000000000000000000000000..ec9db6e69a37b00133d2b0df63c6812cdb7041cf --- /dev/null +++ b/apps/schemas/conversation.py @@ -0,0 +1,13 @@ +from pydantic import BaseModel, Field + + +class ChangeConversationData(BaseModel): + """修改会话信息""" + + title: str = Field(..., min_length=1, max_length=2000) + + +class DeleteConversationData(BaseModel): + """删除会话""" + + conversation_list: list[uuid.UUID] = Field(alias="conversationList") \ No newline at end of file diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py index 45280b0c7855f9414b178392fae1539a7d83e0f1..fda1044fe37fba55309119a9a4a3f006967b0a2c 100644 --- a/apps/schemas/mcp.py +++ b/apps/schemas/mcp.py @@ -119,3 +119,21 @@ class Step(BaseModel): tool_id: str = Field(description="工具ID") description: str = Field(description="步骤描述") + + +class UpdateMCPServiceRequest(BaseModel): + """POST /api/mcpservice 请求数据结构""" + + service_id: str | None = Field(None, alias="serviceId", description="服务ID(更新时传递)") + name: str = Field(..., description="MCP服务名称") + description: str = Field(..., description="MCP服务描述") + overview: str = Field(..., description="MCP服务概述") + config: dict[str, Any] = Field(..., description="MCP服务配置") + mcp_type: MCPType = Field(description="MCP传输协议(Stdio/SSE/Streamable)", default=MCPType.STDIO, alias="mcpType") + + +class ActiveMCPServiceRequest(BaseModel): + """POST /api/mcp/{serviceId} 请求数据结构""" + + active: bool = Field(description="是否激活mcp服务") + mcp_env: dict[str, Any] | None = Field(default=None, description="MCP服务环境变量", alias="mcpEnv") diff --git a/apps/schemas/message.py b/apps/schemas/message.py index 629915b21670ec98ad572f3d4cd2ffcf9b83e9bc..b8c137c76e5405d021adab1f91df47fa208e836c 100644 --- a/apps/schemas/message.py +++ b/apps/schemas/message.py @@ -30,11 +30,11 @@ class HeartbeatData(BaseModel): class MessageFlow(BaseModel): """消息中有关Flow信息的部分""" - app_id: str = Field(description="插件ID", alias="appId") + app_id: uuid.UUID = Field(description="插件ID", alias="appId") flow_id: str = Field(description="Flow ID", alias="flowId") flow_name: str = Field(description="Flow名称", alias="flowName") flow_status: ExecutorStatus = Field(description="Flow状态", alias="flowStatus", default=ExecutorStatus.UNKNOWN) - step_id: str = Field(description="当前步骤ID", alias="stepId") + step_id: uuid.UUID = Field(description="当前步骤ID", alias="stepId") step_name: str = Field(description="当前步骤名称", alias="stepName") sub_step_id: str | None = Field(description="当前子步骤ID", alias="subStepId", default=None) sub_step_name: str | None = Field(description="当前子步骤名称", alias="subStepName", default=None) @@ -101,9 +101,8 @@ class MessageBase(HeartbeatData): """基础消息事件结构""" id: str = Field(min_length=36, max_length=36) - group_id: str = Field(min_length=36, max_length=36, alias="groupId") conversation_id: uuid.UUID = Field(min_length=36, max_length=36, alias="conversationId") - task_id: str = Field(min_length=36, max_length=36, alias="taskId") + task_id: uuid.UUID = Field(min_length=36, max_length=36, alias="taskId") flow: MessageFlow | None = None content: Any | None = Field(default=None, description="消息内容") metadata: MessageMetadata diff --git a/apps/schemas/pool.py b/apps/schemas/pool.py deleted file mode 100644 index c3b105c8c18963e78f634440ff928985642add45..0000000000000000000000000000000000000000 --- a/apps/schemas/pool.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""App和Service等数据库内数据结构""" - -from datetime import UTC, datetime -from typing import Any - -from pydantic import BaseModel, Field - - -class ServiceApiInfo(BaseModel): - """外部服务API信息""" - - filename: str = Field(description="OpenAPI文件名") - description: str = Field(description="OpenAPI中关于API的Summary") - path: str = Field(description="OpenAPI文件路径") - - -class Node(BaseModel): - """Node合并后的信息(不存库)""" - - id: str = Field(alias="_id") - name: str - description: str - created_at: float = Field(default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3)) - service_id: str | None = Field(description="Node所属的Service ID", default=None) - call_id: str = Field(description="所使用的Call的ID") - params_schema: dict[str, Any] = Field(description="Node的参数schema", default={}) - output_schema: dict[str, Any] = Field(description="Node输出的完整Schema", default={}) diff --git a/apps/schemas/rag_data.py b/apps/schemas/rag_data.py index ed097c903bf349ad43176df4762b96ac123b8539..605559e49fc5b6b4f698af0fadaab03c466d92cb 100644 --- a/apps/schemas/rag_data.py +++ b/apps/schemas/rag_data.py @@ -1,6 +1,7 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """请求RAG相关接口时,使用的数据类型""" +import uuid from typing import Any, Literal from pydantic import BaseModel, Field @@ -9,7 +10,7 @@ from pydantic import BaseModel, Field class RAGQueryReq(BaseModel): """查询RAG时的POST请求体""" - kb_ids: list[str] = Field(default=[], description="资产id", alias="kbIds") + kb_ids: list[uuid.UUID] = Field(default=[], description="资产id", alias="kbIds") query: str = Field(default="", description="查询内容") top_k: int = Field(default=5, description="返回的结果数量", alias="topK") doc_ids: list[str] | None = Field(default=None, description="文档id", alias="docIds") diff --git a/apps/schemas/record.py b/apps/schemas/record.py index d8e237e763591df7ce5c26a01a92570e2ea791c6..27673e51e60f4957fa0bd0d3d7ab8972b2e12545 100644 --- a/apps/schemas/record.py +++ b/apps/schemas/record.py @@ -92,20 +92,6 @@ class RecordData(BaseModel): created_at: float = Field(alias="createdAt") -class RecordGroupDocument(BaseModel): - """RecordGroup关联的文件""" - - id: str = Field(default_factory=lambda: str(uuid.uuid4()), alias="_id") - order: int = Field(default=0, description="文档顺序") - author: str = Field(default="", description="文档作者") - name: str = Field(description="文档名称") - abstract: str = Field(description="文档摘要", default="") - extension: str = Field(description="文档扩展名", default="") - size: int = Field(description="文档大小,单位是KB", default=0) - associated: Literal["question", "answer"] - created_at: float = Field(default=0.0, description="文档创建时间") - - class FlowHistory(BaseModel): """Flow执行历史""" diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py index 2ca0bf8c8f992b943fdc8e56277d7967a03649b6..6b08614f8822ee583d414c9171009490c3d74ed5 100644 --- a/apps/schemas/request_data.py +++ b/apps/schemas/request_data.py @@ -2,14 +2,11 @@ """FastAPI 请求体""" import uuid -from typing import Any from pydantic import BaseModel, Field -from .appcenter import AppData -from .enum_var import CommentType, LanguageType +from .enum_var import LanguageType from .flow_topology import FlowItem -from .mcp import MCPType from .message import FlowParams @@ -33,102 +30,7 @@ class RequestData(BaseModel): app: RequestDataApp | None = Field(default=None, description="应用") debug: bool = Field(default=False, description="是否调试") task_id: str | None = Field(default=None, alias="taskId", description="任务ID") - - -class QuestionBlacklistRequest(BaseModel): - """POST /api/blacklist/question 请求数据结构""" - - id: str - question: str - answer: str - is_deletion: int - - -class UserBlacklistRequest(BaseModel): - """POST /api/blacklist/user 请求数据结构""" - - user_sub: str - is_ban: int - - -class AbuseRequest(BaseModel): - """POST /api/blacklist/complaint 请求数据结构""" - - record_id: uuid.UUID - reason: str - reason_type: str - - -class AbuseProcessRequest(BaseModel): - """POST /api/blacklist/abuse 请求数据结构""" - - id: uuid.UUID - is_deletion: int - - -class CreateAppRequest(AppData): - """POST /api/app 请求数据结构""" - - app_id: str | None = Field(None, alias="appId", description="应用ID") - - -class ChangeFavouriteAppRequest(BaseModel): - """PUT /api/app/{appId} 请求数据结构""" - - favorited: bool = Field(..., description="是否收藏") - - -class UpdateMCPServiceRequest(BaseModel): - """POST /api/mcpservice 请求数据结构""" - - service_id: str | None = Field(None, alias="serviceId", description="服务ID(更新时传递)") - name: str = Field(..., description="MCP服务名称") - description: str = Field(..., description="MCP服务描述") - overview: str = Field(..., description="MCP服务概述") - config: dict[str, Any] = Field(..., description="MCP服务配置") - mcp_type: MCPType = Field(description="MCP传输协议(Stdio/SSE/Streamable)", default=MCPType.STDIO, alias="mcpType") - - -class ActiveMCPServiceRequest(BaseModel): - """POST /api/mcp/{serviceId} 请求数据结构""" - - active: bool = Field(description="是否激活mcp服务") - mcp_env: dict[str, Any] | None = Field(default=None, description="MCP服务环境变量", alias="mcpEnv") - - -class UpdateServiceRequest(BaseModel): - """POST /api/service 请求数据结构""" - - service_id: uuid.UUID | None = Field(None, alias="serviceId", description="服务ID(更新时传递)") - data: dict[str, Any] = Field(..., description="对应 YAML 内容的数据对象") - - -class ChangeFavouriteServiceRequest(BaseModel): - """PUT /api/service/{serviceId} 请求数据结构""" - - favorited: bool = Field(..., description="是否收藏") - - -class ChangeConversationData(BaseModel): - """修改会话信息""" - - title: str = Field(..., min_length=1, max_length=2000) - - -class DeleteConversationData(BaseModel): - """删除会话""" - - conversation_list: list[uuid.UUID] = Field(alias="conversationList") - - -class AddCommentData(BaseModel): - """添加评论""" - - record_id: str - comment: CommentType - dislike_reason: str = Field(default="", max_length=200) - reason_link: str = Field(default="", max_length=200) - reason_description: str = Field(default="", max_length=500) + llm_id: str = Field(alias="llmId", description="大模型ID") class PostTagData(BaseModel): diff --git a/apps/schemas/response_data.py b/apps/schemas/response_data.py index ac630b3fdc211d3380dbab9c889fea50c2af5459..ce1c003cdb15a6960cff86cb5c334e49be0af7c3 100644 --- a/apps/schemas/response_data.py +++ b/apps/schemas/response_data.py @@ -6,7 +6,6 @@ from typing import Any from pydantic import BaseModel, Field -from apps.models.blacklist import Blacklist from apps.models.mcp import MCPInstallStatus, MCPTools from .appcenter import AppCenterCardItem, AppData @@ -57,37 +56,6 @@ class HealthCheckRsp(BaseModel): status: str -class GetBlacklistUserMsg(BaseModel): - """GET /api/blacklist/user Result数据结构""" - - user_subs: list[str] - - -class GetBlacklistUserRsp(ResponseData): - """GET /api/blacklist/user 返回数据结构""" - - result: GetBlacklistUserMsg - - -class GetBlacklistQuestionMsg(BaseModel): - """GET /api/blacklist/question Result数据结构""" - - question_list: list[Blacklist] - - -class GetBlacklistQuestionRsp(ResponseData): - """GET /api/blacklist/question 返回数据结构""" - - result: GetBlacklistQuestionMsg - - -class KbIteam(BaseModel): - """GET /api/conversation Result数据结构""" - - kb_id: str = Field(alias="kbId") - kb_name: str = Field(alias="kbName") - - class ConversationListItem(BaseModel): """GET /api/conversation Result数据结构""" @@ -97,7 +65,6 @@ class ConversationListItem(BaseModel): created_time: str = Field(alias="createdTime") app_id: str = Field(alias="appId") debug: bool = Field(alias="debug") - kb_list: list[KbIteam] = Field(alias="kbList", default=[]) class ConversationListMsg(BaseModel): diff --git a/apps/schemas/scheduler.py b/apps/schemas/scheduler.py index 068d03cef2c446ffdf0569ac94f61796b7e90652..3dc97b9850ed9d4fdec1ce2e834ef7ebf3d40ce7 100644 --- a/apps/schemas/scheduler.py +++ b/apps/schemas/scheduler.py @@ -71,3 +71,9 @@ class CallOutputChunk(BaseModel): type: CallOutputType = Field(description="输出类型") content: str | dict[str, Any] = Field(description="输出内容") + + +class TopFlow(BaseModel): + """最匹配用户输入的Flow""" + + choice: str = Field(description="最匹配用户输入的Flow的名称") diff --git a/apps/schemas/service.py b/apps/schemas/service.py new file mode 100644 index 0000000000000000000000000000000000000000..b7c803ad08c7d5bd70cfdcd16cc0ee7aaba34bfe --- /dev/null +++ b/apps/schemas/service.py @@ -0,0 +1,18 @@ +from pydantic import BaseModel, Field + + +import uuid +from typing import Any + + +class UpdateServiceRequest(BaseModel): + """POST /api/service 请求数据结构""" + + service_id: uuid.UUID | None = Field(None, alias="serviceId", description="服务ID(更新时传递)") + data: dict[str, Any] = Field(..., description="对应 YAML 内容的数据对象") + + +class ChangeFavouriteServiceRequest(BaseModel): + """PUT /api/service/{serviceId} 请求数据结构""" + + favorited: bool = Field(..., description="是否收藏") \ No newline at end of file diff --git a/apps/services/activity.py b/apps/services/activity.py index be41a238d68c10daed5e32ea0e3655d4a846fb45..475ac5e9fd69c5342b396e912fb04536d4dddfe5 100644 --- a/apps/services/activity.py +++ b/apps/services/activity.py @@ -6,51 +6,48 @@ from datetime import UTC, datetime, timedelta from sqlalchemy import delete, func, select from apps.common.postgres import postgres -from apps.constants import SLIDE_WINDOW_QUESTION_COUNT, SLIDE_WINDOW_TIME +from apps.constants import MAX_CONCURRENT_TASKS, SLIDE_WINDOW_QUESTION_COUNT, SLIDE_WINDOW_TIME from apps.exceptions import ActivityError from apps.models.session import SessionActivity class Activity: - """用户活动控制,限制单用户同一时间只能提问一个问题""" + """活动控制:全局并发限制,同时最多有 n 个任务在执行(与用户无关)""" - # TODO:改为同一时间整个系统最多有n个task在执行,与用户无关 @staticmethod async def is_active(user_sub: str) -> bool: """ - 判断当前用户是否正在提问(占用GPU资源) + 判断系统是否达到全局并发上限 - :param user_sub: 用户实体ID - :return: 判断结果,正在提问则返回True + :param user_sub: 用户实体ID(兼容现有接口签名) + :return: 达到并发上限返回 True,否则 False """ + _ = user_sub time = datetime.now(tz=UTC) async with postgres.session() as session: - # 检查窗口内总请求数 + # 单用户滑动窗口限流:统计该用户在窗口内的请求数 count = (await session.scalars(select(func.count(SessionActivity.id)).where( + SessionActivity.userSub == user_sub, SessionActivity.timestamp >= time - timedelta(seconds=SLIDE_WINDOW_TIME), SessionActivity.timestamp <= time, ))).one() if count >= SLIDE_WINDOW_QUESTION_COUNT: return True - # 检查用户是否正在提问 - active = (await session.scalars(select(SessionActivity).where( - SessionActivity.userSub == user_sub, - ))).one_or_none() - return bool(active) + # 全局并发检查:当前活跃任务数量是否达到上限 + current_active = (await session.scalars(select(func.count(SessionActivity.id)))).one() + return current_active >= MAX_CONCURRENT_TASKS @staticmethod async def set_active(user_sub: str) -> None: - """设置用户的活跃标识""" + """设置活跃标识:当未超过全局并发上限时登记一个活动任务""" time = datetime.now(UTC) - # 设置用户活跃状态 async with postgres.session() as session: - active = ( - await session.scalars(select(SessionActivity).where(SessionActivity.userSub == user_sub)) - ).one_or_none() - if active: - err = "用户正在提问" + # 并发上限校验 + current_active = (await session.scalars(select(func.count(SessionActivity.id)))).one() + if current_active >= MAX_CONCURRENT_TASKS: + err = "系统并发已达上限" raise ActivityError(err) await session.merge(SessionActivity(userSub=user_sub, timestamp=time)) await session.commit() @@ -59,11 +56,10 @@ class Activity: @staticmethod async def remove_active(user_sub: str) -> None: """ - 清除用户的活跃标识,释放GPU资源 + 释放一个活动任务名额(按发起者标识清除对应记录) :param user_sub: 用户实体ID """ - # 清除用户当前活动标识 async with postgres.session() as session: await session.execute( delete(SessionActivity).where(SessionActivity.userSub == user_sub), diff --git a/apps/services/flow.py b/apps/services/flow.py index 4b57065c452358324c59e7de6dcacb0e68f60a9b..8e8678026ef872e08aa22e6d4346cf18a3a2f983 100644 --- a/apps/services/flow.py +++ b/apps/services/flow.py @@ -179,7 +179,7 @@ class FlowManager: raise ValueError(err) flow_config = await FlowLoader.load(app_id, flow_id) - focus_point = flow_config.focusPoint or PositionItem(x=0, y=0) + focus_point = flow_config.basicConfig.focusPoint or PositionItem(x=0, y=0) flow_item = FlowItem( flowId=flow_id, name=flow_config.name, @@ -188,8 +188,8 @@ class FlowManager: nodes=[], edges=[], focusPoint=focus_point, - connectivity=flow_config.connectivity, - debug=flow_config.debug, + connectivity=flow_config.checkStatus.connectivity, + debug=flow_config.checkStatus.debug, ) for node_id, node_config in flow_config.steps.items(): @@ -214,7 +214,7 @@ class FlowManager: for edge_config in flow_config.edges: edge_from = edge_config.edge_from branch_id = "" - tmp_list = edge_config.edge_from.split(".") + tmp_list = edge_config.edge_from if len(tmp_list) == 0 or len(tmp_list) > FLOW_SPLIT_LEN: logger.error("[FlowManager] Flow中边的格式错误") continue @@ -338,8 +338,8 @@ class FlowManager: # 检查是否是修改动作;检查修改前后是否等价 old_flow_config = await FlowLoader.load(app_id, flow_id) - if old_flow_config and old_flow_config.debug: - flow_config.debug = await FlowManager.is_flow_config_equal(old_flow_config, flow_config) + if old_flow_config and old_flow_config.checkStatus.debug: + flow_config.checkStatus.debug = await FlowManager.is_flow_config_equal(old_flow_config, flow_config) await FlowLoader.save(app_id, flow_id, flow_config) @@ -389,7 +389,7 @@ class FlowManager: if flow is None: return False - flow.debug = debug + flow.checkStatus.debug = debug # 保存到文件系统 await FlowLoader.save(app_id=app_id, flow_id=flow_id, flow=flow) return True diff --git a/apps/services/mcp_service.py b/apps/services/mcp_service.py index 59b22420ca60f6374400bab9e0434bde37362204..b77801e3a16bd4066472970911ddfa95d1702bfb 100644 --- a/apps/services/mcp_service.py +++ b/apps/services/mcp_service.py @@ -32,8 +32,8 @@ from apps.schemas.mcp import ( MCPServerConfig, MCPServerSSEConfig, MCPServerStdioConfig, + UpdateMCPServiceRequest, ) -from apps.schemas.request_data import UpdateMCPServiceRequest from apps.schemas.response_data import MCPServiceCardItem logger = logging.getLogger(__name__) @@ -110,7 +110,7 @@ class MCPServiceManager: :return: MCP服务列表 """ mcpservice_pools = await MCPServiceManager._search_mcpservice( - search_type, keyword, page, is_active=is_active, is_installed=is_install, + search_type, keyword, page, user_sub, is_active=is_active, is_installed=is_install, ) return [ MCPServiceCardItem( diff --git a/apps/services/rag.py b/apps/services/rag.py index 77492da5eaa661116f51da5aea4a5ecb4e02835d..f7cd42c9a05c1bfe09265f387681c5389fafb124 100644 --- a/apps/services/rag.py +++ b/apps/services/rag.py @@ -5,133 +5,17 @@ import json import logging import re from collections.abc import AsyncGenerator -from datetime import UTC, datetime -from typing import Any -import httpx -from fastapi import status - -from apps.common.config import config -from apps.llm.reasoning import ReasoningLLM from apps.llm.token import TokenCalculator from apps.schemas.enum_var import EventType, LanguageType from apps.schemas.rag_data import RAGQueryReq -from apps.services.llm import LLMManager -from apps.services.session import SessionManager logger = logging.getLogger(__name__) -CHUNK_ELEMENT_TOKENS = 5 class RAG: """调用RAG服务,获取知识库答案""" - @staticmethod - async def get_doc_info_from_rag( - user_sub: str, max_tokens: int | None, doc_ids: list[str], data: RAGQueryReq, - ) -> list[dict[str, Any]]: - """获取RAG服务的文档信息""" - session_id = await SessionManager.get_session_by_user_sub(user_sub) - url = config.rag.rag_service.rstrip("/") + "/chunk/search" - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {session_id}", - } - doc_chunk_list = [] - if doc_ids: - default_kb_id = "00000000-0000-0000-0000-000000000000" - tmp_data = RAGQueryReq( - kbIds=[default_kb_id], - query=data.query, - topK=data.top_k, - docIds=doc_ids, - searchMethod=data.search_method, - isRelatedSurrounding=data.is_related_surrounding, - isClassifyByDoc=data.is_classify_by_doc, - isRerank=data.is_rerank, - tokensLimit=max_tokens, - ) - try: - async with httpx.AsyncClient(timeout=30) as client: - data_json = tmp_data.model_dump(exclude_none=True, by_alias=True) - response = await client.post(url, headers=headers, json=data_json) - if response.status_code == status.HTTP_200_OK: - result = response.json() - doc_chunk_list += result["result"]["docChunks"] - except Exception: - logger.exception("[RAG] 获取文档分片失败") - if data.kb_ids: - try: - async with httpx.AsyncClient(timeout=30) as client: - data_json = data.model_dump(exclude_none=True, by_alias=True) - response = await client.post(url, headers=headers, json=data_json) - # 检查响应状态码 - if response.status_code == status.HTTP_200_OK: - result = response.json() - doc_chunk_list += result["result"]["docChunks"] - except Exception: - logger.exception("[RAG] 获取文档分片失败") - return doc_chunk_list - - @staticmethod - async def assemble_doc_info( - doc_chunk_list: list[dict[str, Any]], - max_tokens: int, - ) -> tuple[str, list[dict[str, Any]]]: - """组装文档信息""" - bac_info = "" - doc_info_list = [] - doc_cnt = 0 - doc_id_map = {} - remaining_tokens = max_tokens * 0.8 - - for doc_chunk in doc_chunk_list: - if doc_chunk["docId"] not in doc_id_map: - doc_cnt += 1 - t = doc_chunk.get("docCreatedAt", None) - if isinstance(t, str): - t = datetime.strptime(t, "%Y-%m-%d %H:%M").replace( - tzinfo=UTC, - ) - t = round(t.replace(tzinfo=UTC).timestamp(), 3) - else: - t = round(datetime.now(UTC).timestamp(), 3) - doc_info_list.append({ - "id": doc_chunk["docId"], - "order": doc_cnt, - "name": doc_chunk.get("docName", ""), - "author": doc_chunk.get("docAuthor", ""), - "extension": doc_chunk.get("docExtension", ""), - "abstract": doc_chunk.get("docAbstract", ""), - "size": doc_chunk.get("docSize", 0), - "created_at": t, - }) - doc_id_map[doc_chunk["docId"]] = doc_cnt - doc_index = doc_id_map[doc_chunk["docId"]] - - if bac_info: - bac_info += "\n\n" - bac_info += f"""""" - - for chunk in doc_chunk["chunks"]: - if remaining_tokens <= CHUNK_ELEMENT_TOKENS: - break - chunk_text = chunk["text"] - chunk_text = TokenCalculator().get_k_tokens_words_from_content( - content=chunk_text, k=remaining_tokens) - remaining_tokens -= TokenCalculator().calculate_token_length(messages=[ - {"role": "user", "content": ""}, - {"role": "user", "content": chunk_text}, - {"role": "user", "content": ""}, - ], pure_text=True) - bac_info += f""" - - {chunk_text} - - """ - bac_info += "" - return bac_info, doc_info_list - @staticmethod async def chat_with_llm_base_on_rag( # noqa: C901, PLR0913 user_sub: str, @@ -142,21 +26,8 @@ class RAG: language: LanguageType = LanguageType.CHINESE, ) -> AsyncGenerator[str, None]: """获取RAG服务的结果""" - llm_config = await LLMManager.get_llm(llm_id) - if not llm_config: - err = "[RAG] 未设置问答所用LLM" - logger.error(err) - raise RuntimeError(err) - reasion_llm = ReasoningLLM(llm_config) - if history: - try: - question_obj = QuestionRewrite() - data.query = await question_obj.generate( - history=history, question=data.query, llm=reasion_llm, language=language, - ) - except Exception: - logger.exception("[RAG] 问题重写失败") + pass doc_chunk_list = await RAG.get_doc_info_from_rag( user_sub=user_sub, max_tokens=llm_config.maxToken, doc_ids=doc_ids, data=data, ) diff --git a/apps/services/task.py b/apps/services/task.py index 3be87900f52d73d218fe4d08e27260a94ebd3c52..ee794a6278b3b201d33d9f7803d94316abe52209 100644 --- a/apps/services/task.py +++ b/apps/services/task.py @@ -64,8 +64,6 @@ class TaskManager: if not runtime: runtime = TaskRuntime( taskId=task_id, - inputToken=0, - outputToken=0, ) state = (await session.scalars( diff --git a/apps/services/user.py b/apps/services/user.py index 7159584126c5a761b547b6d1cc1c157b614fcd04..8a74d24672494a4e438a585ad949b340d7e47e31 100644 --- a/apps/services/user.py +++ b/apps/services/user.py @@ -4,7 +4,7 @@ import logging from datetime import UTC, datetime -from sqlalchemy import select +from sqlalchemy import func, select from apps.common.postgres import postgres from apps.models.user import User @@ -18,7 +18,7 @@ class UserManager: """用户相关操作""" @staticmethod - async def list_user(n: int = 10, page: int = 1) -> list[User]: + async def list_user(n: int = 10, page: int = 1) -> tuple[list[User], int]: """ 获取所有用户 @@ -27,8 +27,9 @@ class UserManager: :return: 所有用户列表 """ async with postgres.session() as session: + count = await session.scalar(select(func.count(User.id))) users = (await session.scalars(select(User).offset((page - 1) * n).limit(n))).all() - return list(users) + return list(users), count or 0 @staticmethod