From 5176ae8da541eb68abb7cdfb3ffab263da4dd5bf Mon Sep 17 00:00:00 2001 From: Loshawn <2428333123@qq.com> Date: Fri, 15 Aug 2025 19:16:20 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9B=BD=E9=99=85=E5=8C=96-=E8=81=8A=E5=A4=A9?= =?UTF-8?q?=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/llm/patterns/core.py | 17 ++- apps/llm/patterns/executor.py | 108 +++++++++++--- apps/llm/patterns/facts.py | 58 +++++++- apps/llm/patterns/rewoo.py | 207 +++++++++++++++++++++----- apps/llm/patterns/rewrite.py | 78 +++++++++- apps/llm/patterns/select.py | 82 ++++++++-- apps/routers/chat.py | 3 + apps/routers/flow.py | 4 +- apps/scheduler/executor/agent.py | 127 ++++++++-------- apps/scheduler/executor/flow.py | 112 +++++++++----- apps/scheduler/executor/step.py | 2 +- apps/scheduler/pool/loader/flow.py | 33 ++-- apps/scheduler/scheduler/message.py | 15 +- apps/scheduler/scheduler/scheduler.py | 7 +- apps/schemas/enum_var.py | 7 + apps/schemas/message.py | 6 +- apps/schemas/request_data.py | 9 +- apps/schemas/task.py | 4 +- apps/services/flow.py | 39 ++++- apps/services/rag.py | 180 ++++++++++++++-------- 20 files changed, 811 insertions(+), 287 deletions(-) diff --git a/apps/llm/patterns/core.py b/apps/llm/patterns/core.py index 4ef8133a..f80c275d 100644 --- a/apps/llm/patterns/core.py +++ b/apps/llm/patterns/core.py @@ -3,22 +3,26 @@ from abc import ABC, abstractmethod from textwrap import dedent +from apps.schemas.enum_var import LanguageType class CorePattern(ABC): """基础大模型范式抽象类""" - system_prompt: str = "" + system_prompt: dict[LanguageType, str] = {} """系统提示词""" - user_prompt: str = "" + user_prompt: dict[LanguageType, str] = {} """用户提示词""" input_tokens: int = 0 """输入Token数量""" output_tokens: int = 0 """输出Token数量""" - - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: + def __init__( + self, + system_prompt: dict[LanguageType, str] | None = None, + user_prompt: dict[LanguageType, str] | None = None, + ) -> None: """ 检查是否已经自定义了Prompt;有的话就用自定义的;同时对Prompt进行空格清除 @@ -35,8 +39,9 @@ class CorePattern(ABC): err = "必须设置用户提示词!" raise ValueError(err) - self.system_prompt = dedent(self.system_prompt).strip("\n") - self.user_prompt = dedent(self.user_prompt).strip("\n") + self.system_prompt = {lang: dedent(prompt).strip("\n") for lang, prompt in self.system_prompt.items()} + + self.user_prompt = {lang: dedent(prompt).strip("\n") for lang, prompt in self.user_prompt.items()} @abstractmethod async def generate(self, **kwargs): # noqa: ANN003, ANN201 diff --git a/apps/llm/patterns/executor.py b/apps/llm/patterns/executor.py index f872fd2a..05225ad5 100644 --- a/apps/llm/patterns/executor.py +++ b/apps/llm/patterns/executor.py @@ -6,7 +6,7 @@ from typing import TYPE_CHECKING, Any from apps.llm.patterns.core import CorePattern from apps.llm.reasoning import ReasoningLLM from apps.llm.snippet import convert_context_to_prompt, facts_to_prompt - +from apps.schemas.enum_var import LanguageType if TYPE_CHECKING: from apps.schemas.scheduler import ExecutorBackground @@ -14,7 +14,8 @@ if TYPE_CHECKING: class ExecutorThought(CorePattern): """通过大模型生成Executor的思考内容""" - user_prompt: str = r""" + user_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" 你是一个可以使用工具的智能助手。 @@ -44,10 +45,46 @@ class ExecutorThought(CorePattern): 请综合以上信息,再次一步一步地进行思考,并给出见解和行动: - """ + """, + LanguageType.ENGLISH: r""" + + + You are an intelligent assistant who can use tools. + When answering user questions, you use a tool to get more information. + Please summarize the process of using the tool briefly, provide your insights, and give the next action. + + Note: + The information about the tool is given in the tag. + To help you better understand what happened, your previous thought process is given in the tag. + Do not include XML tags in the output, and keep the output brief and clear. + + + + + {tool_name} + {tool_description} + {tool_output} + + + + {last_thought} + + + + The question you need to solve is: + {user_question} + + + Please integrate the above information, think step by step again, provide insights, and give actions: + """, + } """用户提示词""" - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: + def __init__( + self, + system_prompt: dict[LanguageType, str] | None = None, + user_prompt: dict[LanguageType, str] | None = None, + ) -> None: """处理Prompt""" super().__init__(system_prompt, user_prompt) @@ -57,19 +94,23 @@ class ExecutorThought(CorePattern): last_thought: str = kwargs["last_thought"] user_question: str = kwargs["user_question"] tool_info: dict[str, Any] = kwargs["tool_info"] + language: LanguageType = kwargs.get("language", LanguageType.CHINESE) except Exception as e: err = "参数不正确!" raise ValueError(err) from e messages = [ {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": self.user_prompt.format( - last_thought=last_thought, - user_question=user_question, - tool_name=tool_info["name"], - tool_description=tool_info["description"], - tool_output=tool_info["output"], - )}, + { + "role": "user", + "content": self.user_prompt[language].format( + last_thought=last_thought, + user_question=user_question, + tool_name=tool_info["name"], + tool_description=tool_info["description"], + tool_output=tool_info["output"], + ), + }, ] llm = ReasoningLLM() @@ -85,7 +126,8 @@ class ExecutorThought(CorePattern): class ExecutorSummary(CorePattern): """使用大模型进行生成Executor初始背景""" - user_prompt: str = r""" + user_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" 根据给定的对话记录和关键事实,生成一个三句话背景总结。这个总结将用于后续对话的上下文理解。 @@ -105,10 +147,36 @@ class ExecutorSummary(CorePattern): 现在,请开始生成背景总结: - """ + """, + LanguageType.ENGLISH: r""" + + Based on the given conversation records and key facts, generate a three-sentence background summary. This summary will be used for context understanding in subsequent conversations. + + The requirements for generating the summary are as follows: + 1. Highlight important information points, such as time, location, people, events, etc. + 2. The content in the "key facts" can be used as known information when generating the summary. + 3. Do not include XML tags in the output, ensure the accuracy of the information, and do not make up information. + 4. The summary should be less than 3 sentences and less than 300 words. + + The conversation records will be given in the tag, and the key facts will be given in the tag. + + + {conversation} + + + {facts} + + + Now, please start generating the background summary: + """, + } """用户提示词""" - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: + def __init__( + self, + system_prompt: dict[LanguageType, str] | None = None, + user_prompt: dict[LanguageType, str] | None = None, + ) -> None: """初始化Background模式""" super().__init__(system_prompt, user_prompt) @@ -117,13 +185,17 @@ class ExecutorSummary(CorePattern): background: ExecutorBackground = kwargs["background"] conversation_str = convert_context_to_prompt(background.conversation) facts_str = facts_to_prompt(background.facts) + language = kwargs.get("language", LanguageType.CHINESE) messages = [ {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": self.user_prompt.format( - facts=facts_str, - conversation=conversation_str, - )}, + { + "role": "user", + "content": self.user_prompt[language].format( + facts=facts_str, + conversation=conversation_str, + ), + }, ] result = "" diff --git a/apps/llm/patterns/facts.py b/apps/llm/patterns/facts.py index 0b0381ff..ac833f51 100644 --- a/apps/llm/patterns/facts.py +++ b/apps/llm/patterns/facts.py @@ -9,6 +9,7 @@ from apps.llm.function import JsonGenerator from apps.llm.patterns.core import CorePattern from apps.llm.reasoning import ReasoningLLM from apps.llm.snippet import convert_context_to_prompt +from apps.schemas.enum_var import LanguageType logger = logging.getLogger(__name__) @@ -25,7 +26,8 @@ class Facts(CorePattern): system_prompt: str = "You are a helpful assistant." """系统提示词(暂不使用)""" - user_prompt: str = r""" + user_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" 从对话中提取关键信息,并将它们组织成独一无二的、易于理解的事实,包含用户偏好、关系、实体等有用信息。 @@ -63,21 +65,65 @@ class Facts(CorePattern): {conversation} - """ - """用户提示词""" + """, + LanguageType.ENGLISH: r""" + + + Extract key information from the conversation and organize it into unique, easily understandable facts that include user preferences, relationships, entities, etc. + The following are the types of information to be paid attention to and detailed instructions on how to handle the input data. + + **Types of information to be paid attention to** + 1. Entities: Entities involved in the conversation. For example: names, locations, organizations, events, etc. + 2. Preferences: Attitudes towards entities. For example: like, dislike, etc. + 3. Relationships: Relationships between the user and entities, or between two entities. For example: include, parallel, exclusive, etc. + 4. Actions: Specific actions that affect entities. For example: query, search, browse, click, etc. + + **Requirements** + 1. Facts must be accurate and can only be extracted from the conversation. Do not include information from the sample in the output. + 2. Facts must be clear, concise, and easy to understand. Must be less than 30 words. + 3. Output in the following JSON format: + + {{ + "facts": ["fact1", "fact2", "fact3"] + }} + + + + + What are the attractions in West Lake, Hangzhou? + West Lake in Hangzhou is a famous scenic spot in Hangzhou, Zhejiang Province, China, famous for its beautiful natural scenery and rich cultural heritage. There are many famous attractions around West Lake, including the famous Su Causeway, Bai Causeway, Broken Bridge, Three Pools Mirroring the Moon, etc. West Lake is famous for its clear water and surrounding mountains, and is one of the most famous lakes in China. + + + + {{ + "facts": ["West Lake has the famous attractions of Suzhou Embankment, Bai Embankment, Qiantang Bridge, San Tang Yin Yue, etc."] + }} + + + + {conversation} + + """, + } + """用户提示词""" - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: + def __init__( + self, + system_prompt: dict[LanguageType, str] | None = None, + user_prompt: dict[LanguageType, str] | None = None, + ) -> None: """初始化Prompt""" super().__init__(system_prompt, user_prompt) - async def generate(self, **kwargs) -> list[str]: # noqa: ANN003 """事实提取""" conversation = convert_context_to_prompt(kwargs["conversation"]) + language = kwargs.get("language", LanguageType.CHINESE) + messages = [ {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": self.user_prompt.format(conversation=conversation)}, + {"role": "user", "content": self.user_prompt[language].format(conversation=conversation)}, ] result = "" llm = ReasoningLLM() diff --git a/apps/llm/patterns/rewoo.py b/apps/llm/patterns/rewoo.py index ef78d926..cc4a44d7 100644 --- a/apps/llm/patterns/rewoo.py +++ b/apps/llm/patterns/rewoo.py @@ -3,12 +3,14 @@ from apps.llm.patterns.core import CorePattern from apps.llm.reasoning import ReasoningLLM +from apps.schemas.enum_var import LanguageType class InitPlan(CorePattern): """规划生成命令行""" - system_prompt: str = r""" + system_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" 你是一个计划生成器。对于给定的目标,**制定一个简单的计划**,该计划可以逐步生成合适的命令行参数和标志。 你会收到一个"命令前缀",这是已经确定和生成的命令部分。你需要基于这个前缀使用标志和参数来完成命令。 @@ -41,10 +43,54 @@ class InitPlan(CorePattern): 示例结束 让我们开始! - """ + """, + LanguageType.ENGLISH: r""" + You are a plan generator. For a given goal, **draft a simple plan** that can step-by-step generate the \ + appropriate command line arguments and flags. + + You will receive a "command prefix", which is the already determined and generated command part. You need to \ + use the flags and arguments based on this prefix to complete the command. + + In each step, specify which external tool to use and the tool input to get the evidence. + + The tool can be one of the following: + (1) Option["instruction"]: Query the most similar command line flag. Only accepts one input parameter, \ + "instruction" must be a search string. The search string should be detailed and contain necessary data. + (2) Argument["name"]: Place the data from the task into a specific position in the command line. \ + Accepts two input parameters. + + All steps must start with "Plan: " and be less than 150 words. + Do not add any extra steps. + Ensure each step contains all the required information - do not skip steps. + Do not add any extra data after the evidence. + + Start example + + Task: Run a new alpine:latest container in the background, mount the host /root folder to /data, and execute \ + the top command. + Prefix: `docker run` + Usage: `docker run ${OPTS} ${image} ${command}`. This is a Python template string. OPTS is a placeholder for all \ + flags. The arguments must be one of ["image", "command"]. + Prefix description: The description of binary program `docker` is "Docker container platform", and the \ + description of `run` subcommand is "Create and run a new container from an image". + + Plan: I need a flag to make the container run in the background. #E1 = Option[Run a single container in the \ + background] + Plan: I need a flag to mount the host /root directory to /data directory in the container. #E2 = Option[Mount \ + host /root directory to /data directory] + Plan: I need to parse the image name from the task. #E3 = Argument[image] + Plan: I need to specify the command to be run in the container. #E4 = Argument[command] + Final: Assemble the above clues to generate the final command. #F + + End example + + Let's get started! + """, + } """系统提示词""" - user_prompt: str = r""" + user_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" 任务:{instruction} 前缀:`{binary_name} {subcmd_name}` 用法:`{subcmd_usage}`。这是一个Python模板字符串。OPTS是所有标志的占位符。参数必须是 {argument_list} 其中之一。 @@ -52,10 +98,25 @@ class InitPlan(CorePattern): "{subcmd_description}"。 请生成相应的计划。 - """ + """, + LanguageType.ENGLISH: r""" + Task: {instruction} + Prefix: `{binary_name} {subcmd_name}` + Usage: `{subcmd_usage}`. This is a Python template string. OPTS is a placeholder for all flags. The arguments \ + must be one of {argument_list}. + Prefix description: The description of binary program `{binary_name}` is "{binary_description}", and the \ + description of `{subcmd_name}` subcommand is "{subcmd_description}". + + Please generate the corresponding plan. + """, + } """用户提示词""" - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: + def __init__( + self, + system_prompt: dict[LanguageType, str] | None = None, + user_prompt: dict[LanguageType, str] | None = None, + ) -> None: """处理Prompt""" super().__init__(system_prompt, user_prompt) @@ -64,6 +125,7 @@ class InitPlan(CorePattern): spec = kwargs["spec"] binary_name = kwargs["binary_name"] subcmd_name = kwargs["subcmd_name"] + language = kwargs.get("language", LanguageType.CHINESE) binary_description = spec[binary_name][0] subcmd_usage = spec[binary_name][2][subcmd_name][1] subcmd_description = spec[binary_name][2][subcmd_name][0] @@ -73,16 +135,19 @@ class InitPlan(CorePattern): argument_list += [key] messages = [ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": self.user_prompt.format( - instruction=kwargs["instruction"], - binary_name=binary_name, - subcmd_name=subcmd_name, - binary_description=binary_description, - subcmd_description=subcmd_description, - subcmd_usage=subcmd_usage, - argument_list=argument_list, - )}, + {"role": "system", "content": self.system_prompt[language]}, + { + "role": "user", + "content": self.user_prompt[language].format( + instruction=kwargs["instruction"], + binary_name=binary_name, + subcmd_name=subcmd_name, + binary_description=binary_description, + subcmd_description=subcmd_description, + subcmd_usage=subcmd_usage, + argument_list=argument_list, + ), + }, ] result = "" @@ -98,7 +163,8 @@ class InitPlan(CorePattern): class PlanEvaluator(CorePattern): """计划评估器""" - system_prompt: str = r""" + system_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" 你是一个计划评估器。你的任务是评估给定的计划是否合理和完整。 一个好的计划应该: @@ -115,29 +181,64 @@ class PlanEvaluator(CorePattern): 请回复: "VALID" - 如果计划良好且完整 "INVALID: <原因>" - 如果计划有问题,请解释原因 - """ + """, + LanguageType.ENGLISH: r""" + You are a plan evaluator. Your task is to evaluate whether the given plan is reasonable and complete. + + A good plan should: + 1. Cover all requirements of the original task + 2. Use appropriate tools to collect necessary information + 3. Have clear and logical steps + 4. Have no redundant or unnecessary steps + + For each step in the plan, evaluate: + 1. Whether the tool selection is appropriate + 2. Whether the input parameters are clear and sufficient + 3. Whether this step helps achieve the final goal + + Please reply: + "VALID" - If the plan is good and complete + "INVALID: <原因>" - If the plan has problems, please explain the reason + """, + } """系统提示词""" - user_prompt: str = r""" + user_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" 任务:{instruction} 计划:{plan} 评估计划并回复"VALID"或"INVALID: <原因>"。 - """ + """, + LanguageType.ENGLISH: r""" + Task: {instruction} + Plan: {plan} + + Evaluate the plan and reply with "VALID" or "INVALID: <原因>". + """, + } """用户提示词""" - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: + def __init__( + self, + system_prompt: dict[LanguageType, str] | None = None, + user_prompt: dict[LanguageType, str] | None = None, + ) -> None: """初始化Prompt""" super().__init__(system_prompt, user_prompt) async def generate(self, **kwargs) -> str: """生成计划评估结果""" + language = kwargs.get("language", LanguageType.CHINESE) messages = [ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": self.user_prompt.format( - instruction=kwargs["instruction"], - plan=kwargs["plan"], - )}, + {"role": "system", "content": self.system_prompt[language]}, + { + "role": "user", + "content": self.user_prompt[language].format( + instruction=kwargs["instruction"], + plan=kwargs["plan"], + ), + }, ] result = "" @@ -153,7 +254,8 @@ class PlanEvaluator(CorePattern): class RePlanner(CorePattern): """重新规划器""" - system_prompt: str = r""" + system_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" 你是一个计划重新规划器。当计划被评估为无效时,你需要生成一个新的、改进的计划。 新计划应该: @@ -167,31 +269,64 @@ class RePlanner(CorePattern): - 包含带有适当参数的工具使用 - 保持步骤简洁和重点突出 - 以"Final"步骤结束 - """ + """, + LanguageType.ENGLISH: r""" + You are a plan replanner. When the plan is evaluated as invalid, you need to generate a new, improved plan. + + The new plan should: + 1. Solve all problems mentioned in the evaluation + 2. Keep the same format as the original plan + 3. Be more precise and complete + 4. Use appropriate tools for each step + + Follow the same format as the original plan: + - Each step should start with "Plan: " + - Include tool usage with appropriate parameters + - Keep steps concise and focused + - End with the "Final" step + """, + } """系统提示词""" - user_prompt: str = r""" + user_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" 任务:{instruction} 原始计划:{plan} 评估:{evaluation} 生成一个新的、改进的计划,解决评估中提到的所有问题。 - """ + """, + LanguageType.ENGLISH: r""" + Task: {instruction} + Original Plan: {plan} + Evaluation: {evaluation} + + Generate a new, improved plan that solves all problems mentioned in the evaluation. + """, + } """用户提示词""" - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: + def __init__( + self, + system_prompt: dict[LanguageType, str] | None = None, + user_prompt: dict[LanguageType, str] | None = None, + ) -> None: """初始化Prompt""" super().__init__(system_prompt, user_prompt) async def generate(self, **kwargs) -> str: """生成重新规划结果""" + language = kwargs.get("language", LanguageType.CHINESE) messages = [ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": self.user_prompt.format( - instruction=kwargs["instruction"], - plan=kwargs["plan"], - evaluation=kwargs["evaluation"], - )}, + {"role": "system", "content": self.system_prompt[language]}, + { + "role": "user", + "content": self.user_prompt[language].format( + instruction=kwargs["instruction"], + plan=kwargs["plan"], + evaluation=kwargs["evaluation"], + ), + }, ] result = "" diff --git a/apps/llm/patterns/rewrite.py b/apps/llm/patterns/rewrite.py index 15d52ab2..6cf36e47 100644 --- a/apps/llm/patterns/rewrite.py +++ b/apps/llm/patterns/rewrite.py @@ -4,11 +4,13 @@ import logging from pydantic import BaseModel, Field +from textwrap import dedent from apps.llm.function import JsonGenerator from apps.llm.patterns.core import CorePattern from apps.llm.reasoning import ReasoningLLM from apps.llm.token import TokenCalculator +from apps.schemas.enum_var import LanguageType logger = logging.getLogger(__name__) @@ -21,7 +23,8 @@ class QuestionRewriteResult(BaseModel): class QuestionRewrite(CorePattern): """问题补全与重写""" - system_prompt: str = r""" + system_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" 根据历史对话,推断用户的实际意图并补全用户的提问内容,历史对话被包含在标签中,用户意图被包含在标签中。 @@ -72,26 +75,87 @@ class QuestionRewrite(CorePattern): {question} + """, + LanguageType.ENGLISH: r""" + + + Based on the historical dialogue, infer the user's actual intent and complete the user's question. The historical dialogue is contained within the tags, and the user's intent is contained within the tags. + Requirements: + 1. Please output in JSON format, referring to the example provided below; do not include any XML tags or any explanatory notes; + 2. If the user's current question is unrelated to the previous dialogue or you believe the user's question is already complete enough, directly output the user's question. + 3. The completed content must be precise and appropriate; do not fabricate any content. + 4. Output only the completed question; do not include any other content. + Example output format: + {{ + "question": "The completed question" + }} + + + + + + + What are the features of openEuler? + + + Compared to other operating systems, openEuler's features include support for multiple hardware architectures and providing a stable, secure, and efficient operating system platform. + + + + + What are the advantages of openEuler? + + + The advantages of openEuler include being open-source, having community support, and optimizations for cloud and edge computing. + + + + + + More details? + + + {{ + "question": "What are the features of openEuler? Please elaborate on its advantages and application scenarios." + }} + + + + + {history} + + + {question} + """ + } + """用户提示词""" - user_prompt: str = """ + user_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" 请输出补全后的问题 - """ + """, + LanguageType.ENGLISH: r""" + + Please output the completed question + + """} async def generate(self, **kwargs) -> str: # noqa: ANN003 """问题补全与重写""" history = kwargs.get("history", []) question = kwargs["question"] llm = kwargs.get("llm", None) + language = kwargs.get("language", LanguageType.CHINESE) if not llm: llm = ReasoningLLM() leave_tokens = llm._config.max_tokens leave_tokens -= TokenCalculator().calculate_token_length( messages=[ - {"role": "system", "content": self.system_prompt.format(history="", question=question)}, - {"role": "user", "content": self.user_prompt} + {"role": "system", "content": self.system_prompt[language].format(history="", question=question)}, + {"role": "user", "content": self.user_prompt[language]} ] ) if leave_tokens <= 0: @@ -113,8 +177,8 @@ class QuestionRewrite(CorePattern): qa = sub_qa + qa index += 2 messages = [ - {"role": "system", "content": self.system_prompt.format(history=qa, question=question)}, - {"role": "user", "content": self.user_prompt} + {"role": "system", "content": self.system_prompt[language].format(history=qa, question=question)}, + {"role": "user", "content": self.user_prompt[language]} ] result = "" async for chunk in llm.call(messages, streaming=False): diff --git a/apps/llm/patterns/select.py b/apps/llm/patterns/select.py index a6c496bd..924eeba3 100644 --- a/apps/llm/patterns/select.py +++ b/apps/llm/patterns/select.py @@ -11,6 +11,7 @@ from apps.llm.function import JsonGenerator from apps.llm.patterns.core import CorePattern from apps.llm.reasoning import ReasoningLLM from apps.llm.snippet import choices_to_prompt +from apps.schemas.enum_var import LanguageType logger = logging.getLogger(__name__) @@ -18,10 +19,14 @@ logger = logging.getLogger(__name__) class Select(CorePattern): """通过投票选择最佳答案""" - system_prompt: str = "You are a helpful assistant." + system_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: "你是一个有用的助手。", + LanguageType.ENGLISH: "You are a helpful assistant.", + } """系统提示词""" - user_prompt: str = r""" + user_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" 根据历史对话(包括工具调用结果)和用户问题,从给出的选项列表中,选出最符合要求的那一项。 @@ -71,7 +76,63 @@ class Select(CorePattern): 让我们一步一步思考。 - """ + """, + LanguageType.ENGLISH: r""" + + + Based on the historical dialogue (including tool call results) and user question, select the most \ + suitable option from the given option list. + Before outputting, please think carefully and use the "" tag to give the thinking process. + The output needs to be in JSON format, the output format is: {{ "choice": "option name" }} + + + + + Use the weather API to query the weather information of Hangzhou tomorrow + + + + API + HTTP request, get the returned JSON data + + + SQL + Query the database, get the data in the database table + + + + + + The API tool can get external data through API, and the weather information may be stored in \ + external data. Since the user clearly mentioned the use of weather API, it should be given \ + priority to the API tool.\ + The SQL tool is used to get information from the database, considering the variability and \ + dynamism of weather data, it is unlikely to be stored in the database, so the priority of \ + the SQL tool is relatively low, \ + The best choice seems to be "API: request a specific API, get the returned JSON data". + + + + {{ "choice": "API" }} + + + + + + {question} + + + + {choice_list} + + + + + Let's think step by step. + + + """, + } """用户提示词""" slot_schema: ClassVar[dict[str, Any]] = { @@ -86,17 +147,19 @@ class Select(CorePattern): } """最终输出的JSON Schema""" - - def __init__(self, system_prompt: str | None = None, user_prompt: str | None = None) -> None: - """初始化Prompt""" + def __init__( + self, + system_prompt: dict[LanguageType, str] | None = None, + user_prompt: dict[str, str] | None = None, + ) -> None: + """处理Prompt""" super().__init__(system_prompt, user_prompt) - async def _generate_single_attempt(self, user_input: str, choice_list: list[str]) -> str: """使用ReasoningLLM进行单次尝试""" logger.info("[Select] 单次选择尝试: %s", user_input) messages = [ - {"role": "system", "content": self.system_prompt}, + {"role": "system", "content": self.system_prompt[self.language]}, {"role": "user", "content": user_input}, ] result = "" @@ -128,6 +191,7 @@ class Select(CorePattern): result_list = [] background = kwargs.get("background", "无背景信息。") + self.language = kwargs.get("language", LanguageType.CHINESE) data_str = json.dumps(kwargs.get("data", {}), ensure_ascii=False) choice_prompt, choices_list = choices_to_prompt(kwargs["choices"]) @@ -141,7 +205,7 @@ class Select(CorePattern): return choices_list[0] logger.info("[Select] 选项列表: %s", choice_prompt) - user_input = self.user_prompt.format( + user_input = self.user_prompt[self.language].format( question=kwargs["question"], background=background, data=data_str, diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 888fbd04..462b10ab 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -18,6 +18,7 @@ from apps.scheduler.scheduler import Scheduler from apps.scheduler.scheduler.context import save_data from apps.schemas.request_data import RequestData, RequestDataApp from apps.schemas.response_data import ResponseData +from apps.schemas.enum_var import LanguageType from apps.schemas.task import Task from apps.services.activity import Activity from apps.services.blacklist import QuestionBlacklistManager, UserBlacklistManager @@ -65,6 +66,7 @@ async def init_task(post_body: RequestData, user_sub: str, session_id: str) -> T post_body.conversation_id = task.ids.conversation_id post_body.language = task.language post_body.question = task.runtime.question + task.language = post_body.language return task @@ -140,6 +142,7 @@ async def chat( session_id: Annotated[str, Depends(get_session)], ) -> StreamingResponse: """LLM流式对话接口""" + post_body.language = LanguageType.CHINESE if post_body.language in {"zh", LanguageType.CHINESE} else LanguageType.ENGLISH # 前端 Flow-Debug 传输为“zh" # 问题黑名单检测 if post_body.question is not None and not await QuestionBlacklistManager.check_blacklisted_questions(input_question=post_body.question): # 用户扣分 diff --git a/apps/routers/flow.py b/apps/routers/flow.py index 68c0c9f3..1de75aa8 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -21,6 +21,7 @@ from apps.schemas.response_data import ( NodeServiceListRsp, ResponseData, ) +from apps.schemas.enum_var import LanguageType from apps.services.appcenter import AppCenterManager from apps.services.application import AppManager from apps.services.flow import FlowManager @@ -46,9 +47,10 @@ router = APIRouter( ) async def get_services( user_sub: Annotated[str, Depends(get_user)], + language: LanguageType = Query(LanguageType.CHINESE, description="语言参数,默认为中文") ) -> NodeServiceListRsp: """获取用户可访问的节点元数据所在服务的信息""" - services = await FlowManager.get_service_by_user_id(user_sub) + services = await FlowManager.get_service_by_user_id(user_sub, language) if services is None: return NodeServiceListRsp( code=status.HTTP_404_NOT_FOUND, diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 9f714cc7..fd44aacf 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -1,38 +1,33 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """MCP Agent执行器""" -import anyio import logging import uuid -from pydantic import Field -from typing import Any + +import anyio from mcp.types import TextContent -from apps.llm.patterns.rewrite import QuestionRewrite +from pydantic import Field + from apps.llm.reasoning import ReasoningLLM from apps.scheduler.executor.base import BaseExecutor -from apps.schemas.enum_var import EventType, SpecialCallType, FlowStatus, StepStatus +from apps.schemas.enum_var import LanguageType from apps.scheduler.mcp_agent.host import MCPHost from apps.scheduler.mcp_agent.plan import MCPPlanner -from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID, MCPSelector -from apps.scheduler.pool.mcp.client import MCPClient +from apps.scheduler.mcp_agent.select import FINAL_TOOL_ID +from apps.scheduler.pool.mcp.pool import MCPPool +from apps.schemas.enum_var import EventType, FlowStatus, StepStatus from apps.schemas.mcp import ( - GoalEvaluationResult, - RestartStepIndex, - ToolRisk, - ErrorType, - ToolExcutionErrorType, - MCPPlan, MCPCollection, MCPTool, - Step + Step, ) -from apps.scheduler.pool.mcp.pool import MCPPool -from apps.schemas.task import ExecutorState, FlowStepHistory, StepQueueItem -from apps.schemas.message import param -from apps.services.task import TaskManager +from apps.schemas.message import FlowParams +from apps.schemas.task import FlowStepHistory from apps.services.appcenter import AppCenterManager from apps.services.mcp_service import MCPServiceManager +from apps.services.task import TaskManager from apps.services.user import UserManager + logger = logging.getLogger(__name__) @@ -46,13 +41,17 @@ class MCPAgentExecutor(BaseExecutor): mcp_list: list[MCPCollection] = Field(description="MCP服务器列表", default=[]) mcp_pool: MCPPool = Field(description="MCP池", default=MCPPool()) tools: dict[str, MCPTool] = Field( - description="MCP工具列表,key为tool_id", default={} + description="MCP工具列表,key为tool_id", + default={}, ) tool_list: list[MCPTool] = Field( - description="MCP工具列表,包含所有MCP工具", default=[] + description="MCP工具列表,包含所有MCP工具", + default=[], ) - params: param | bool | None = Field( - default=None, description="流执行过程中的参数补充", alias="params" + params: FlowParams | bool | None = Field( + default=None, + description="流执行过程中的参数补充", + alias="params", ) resoning_llm: ReasoningLLM = Field( default=ReasoningLLM(), @@ -107,25 +106,40 @@ class MCPAgentExecutor(BaseExecutor): if is_first: # 获取第一个输入参数 mcp_tool = self.tools[self.task.state.tool_id] - self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task) + self.task.state.current_input = await MCPHost._get_first_input_params( + mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task + ) else: # 获取后续输入参数 - if isinstance(self.params, param): + if isinstance(self.params, FlowParams): params = self.params.content params_description = self.params.description else: params = {} params_description = "" mcp_tool = self.tools[self.task.state.tool_id] - self.task.state.current_input = await MCPHost._fill_params(mcp_tool, self.task.runtime.question, self.task.state.step_description, self.task.state.current_input, self.task.state.error_message, params, params_description) + self.task.state.current_input = await MCPHost._fill_params( + mcp_tool, + self.task.runtime.question, + self.task.state.step_description, + self.task.state.current_input, + self.task.state.error_message, + params, + params_description, + self.task.language, + ) async def confirm_before_step(self) -> None: + """确认前步骤""" # 发送确认消息 mcp_tool = self.tools[self.task.state.tool_id] - confirm_message = await MCPPlanner.get_tool_risk(mcp_tool, self.task.state.current_input, "", self.resoning_llm) + confirm_message = await MCPPlanner.get_tool_risk( + mcp_tool, self.task.state.current_input, "", self.resoning_llm, self.task.language + ) await self.update_tokens() - await self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( - exclude_none=True, by_alias=True)) + await self.push_message( + EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(exclude_none=True, by_alias=True), + ) await self.push_message(EventType.FLOW_STOP, {}) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.WAITING @@ -145,7 +159,7 @@ class MCPAgentExecutor(BaseExecutor): ) ) - async def run_step(self): + async def run_step(self) -> None: """执行步骤""" self.task.state.flow_status = FlowStatus.RUNNING self.task.state.step_status = StepStatus.RUNNING @@ -153,12 +167,10 @@ class MCPAgentExecutor(BaseExecutor): mcp_client = (await self.mcp_pool.get(mcp_tool.mcp_id, self.task.ids.user_sub)) try: output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input) - except anyio.ClosedResourceError as e: - import traceback - logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, traceback.format_exc()) + except anyio.ClosedResourceError: + logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcp_id) await self.mcp_pool.stop(mcp_tool.mcp_id, self.task.ids.user_sub) - await self.mcp_pool._init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub) - logger.error("[MCPAgentExecutor] MCP客户端连接已关闭: %s, 错误: %s", mcp_tool.mcp_id, str(e)) + await self.mcp_pool.init_mcp(mcp_tool.mcp_id, self.task.ids.user_sub) self.task.state.step_status = StepStatus.ERROR return except Exception as e: @@ -184,14 +196,8 @@ class MCPAgentExecutor(BaseExecutor): } await self.update_tokens() - await self.push_message( - EventType.STEP_INPUT, - self.task.state.current_input - ) - await self.push_message( - EventType.STEP_OUTPUT, - output_params - ) + await self.push_message(EventType.STEP_INPUT, self.task.state.current_input) + await self.push_message(EventType.STEP_OUTPUT, output_params) self.task.context.append( FlowStepHistory( task_id=self.task.id, @@ -215,26 +221,21 @@ class MCPAgentExecutor(BaseExecutor): mcp_tool, self.task.state.current_input, self.task.state.error_message, - self.resoning_llm + self.resoning_llm, + self.task.language, ) await self.update_tokens() error_message = await MCPPlanner.change_err_message_to_description( error_message=self.task.state.error_message, tool=mcp_tool, input_params=self.task.state.current_input, - reasoning_llm=self.resoning_llm - ) - await self.push_message( - EventType.STEP_WAITING_FOR_PARAM, - data={ - "message": error_message, - "params": params_with_null - } + reasoning_llm=self.resoning_llm, + language=self.task.language, ) await self.push_message( - EventType.FLOW_STOP, - data={} + EventType.STEP_WAITING_FOR_PARAM, data={"message": error_message, "params": params_with_null} ) + await self.push_message(EventType.FLOW_STOP, data={}) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.PARAM self.task.context.append( @@ -257,6 +258,7 @@ class MCPAgentExecutor(BaseExecutor): ) async def get_next_step(self) -> None: + """获取下一步""" if self.task.state.step_cnt < self.max_steps: self.task.state.step_cnt += 1 history = await MCPHost.assemble_memory(self.task) @@ -264,7 +266,7 @@ class MCPAgentExecutor(BaseExecutor): step = None for i in range(max_retry): try: - step = await MCPPlanner.create_next_step(self.task.runtime.question, history, self.tool_list) + step = await MCPPlanner.create_next_step(self.task.runtime.question, history, self.tool_list, language=self.task.language) if step.tool_id in self.tools.keys(): break except Exception as e: @@ -405,6 +407,7 @@ class MCPAgentExecutor(BaseExecutor): mcp_tool, self.task.state.step_description, self.task.state.current_input, + language=self.task.language, ) if is_param_error.is_param_error: # 如果是参数错误,生成参数补充 @@ -443,10 +446,12 @@ class MCPAgentExecutor(BaseExecutor): await self.get_next_step() async def summarize(self) -> None: + """总结""" async for chunk in MCPPlanner.generate_answer( self.task.runtime.question, (await MCPHost.assemble_memory(self.task)), - self.resoning_llm + self.resoning_llm, + self.task.language, ): await self.push_message( EventType.TEXT_ADD, @@ -463,13 +468,13 @@ class MCPAgentExecutor(BaseExecutor): # 初始化状态 try: self.task.state.flow_id = str(uuid.uuid4()) - self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm) + self.task.state.flow_name = await MCPPlanner.get_flow_name( + self.task.runtime.question, self.resoning_llm, self.task.language + ) await TaskManager.save_task(self.task.id, self.task) await self.get_next_step() except Exception as e: - import traceback - logger.error("[MCPAgentExecutor] 初始化失败: %s", traceback.format_exc()) - logger.error("[MCPAgentExecutor] 初始化失败: %s", str(e)) + logger.exception("[MCPAgentExecutor] 初始化失败") self.task.state.flow_status = FlowStatus.ERROR self.task.state.error_message = str(e) await self.push_message( @@ -508,9 +513,7 @@ class MCPAgentExecutor(BaseExecutor): ) await self.summarize() except Exception as e: - import traceback - logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", traceback.format_exc()) - logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e)) + logger.exception("[MCPAgentExecutor] 执行过程中发生错误") self.task.state.flow_status = FlowStatus.ERROR self.task.state.error_message = str(e) self.task.state.step_status = StepStatus.ERROR diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index ebd4da8e..485cef34 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -11,7 +11,7 @@ from pydantic import Field from apps.scheduler.call.llm.prompt import LLM_ERROR_PROMPT from apps.scheduler.executor.base import BaseExecutor from apps.scheduler.executor.step import StepExecutor -from apps.schemas.enum_var import EventType, SpecialCallType, FlowStatus, StepStatus +from apps.schemas.enum_var import EventType, SpecialCallType, FlowStatus, StepStatus, LanguageType from apps.schemas.flow import Flow, Step from apps.schemas.request_data import RequestDataApp from apps.schemas.task import ExecutorState, StepQueueItem @@ -20,21 +20,37 @@ from apps.services.task import TaskManager logger = logging.getLogger(__name__) # 开始前的固定步骤 FIXED_STEPS_BEFORE_START = [ - Step( - name="理解上下文", - description="使用大模型,理解对话上下文", - node=SpecialCallType.SUMMARY.value, - type=SpecialCallType.SUMMARY.value, - ), + { + LanguageType.CHINESE: Step( + name="理解上下文", + description="使用大模型,理解对话上下文", + node=SpecialCallType.SUMMARY.value, + type=SpecialCallType.SUMMARY.value, + ), + LanguageType.ENGLISH: Step( + name="Understand context", + description="Use large model to understand the context of the dialogue", + node=SpecialCallType.SUMMARY.value, + type=SpecialCallType.SUMMARY.value, + ), + } ] # 结束后的固定步骤 FIXED_STEPS_AFTER_END = [ - Step( - name="记忆存储", - description="理解对话答案,并存储到记忆中", - node=SpecialCallType.FACTS.value, - type=SpecialCallType.FACTS.value, - ), + { + LanguageType.CHINESE: Step( + name="记忆存储", + description="理解对话答案,并存储到记忆中", + node=SpecialCallType.FACTS.value, + type=SpecialCallType.FACTS.value, + ), + LanguageType.ENGLISH: Step( + name="Memory storage", + description="Understand the answer of the dialogue and store it in the memory", + node=SpecialCallType.FACTS.value, + type=SpecialCallType.FACTS.value, + ), + } ] @@ -55,7 +71,11 @@ class FlowExecutor(BaseExecutor): """从数据库中加载FlowExecutor的状态""" logger.info("[FlowExecutor] 加载Executor状态") # 尝试恢复State - if self.task.state and self.task.state.flow_status != FlowStatus.INIT: + if ( + self.task.state + and self.task.state.flow_status != FlowStatus.INIT + and self.task.state.flow_status != FlowStatus.UNKNOWN + ): self.task.context = await TaskManager.get_context_by_task_id(self.task.id) else: # 创建ExecutorState @@ -67,7 +87,7 @@ class FlowExecutor(BaseExecutor): step_status=StepStatus.RUNNING, app_id=str(self.post_body_app.app_id), step_id="start", - step_name="开始", + step_name="开始" if self.task.language == LanguageType.CHINESE else "Start", ) self.validate_flow_state(self.task) # 是否到达Flow结束终点(变量) @@ -164,12 +184,14 @@ class FlowExecutor(BaseExecutor): # 头插开始前的系统步骤,并执行 for step in FIXED_STEPS_BEFORE_START: - self.step_queue.append(StepQueueItem( - step_id=str(uuid.uuid4()), - step=step, - enable_filling=False, - to_user=False, - )) + self.step_queue.append( + StepQueueItem( + step_id=str(uuid.uuid4()), + step=step.get(self.task.language, step[LanguageType.CHINESE]), + enable_filling=False, + to_user=False, + ) + ) await self._step_process() # 插入首个步骤 @@ -182,23 +204,29 @@ class FlowExecutor(BaseExecutor): if self.task.state.step_status == StepStatus.ERROR: # type: ignore[arg-type] logger.warning("[FlowExecutor] Executor出错,执行错误处理步骤") self.step_queue.clear() - self.step_queue.appendleft(StepQueueItem( - step_id=str(uuid.uuid4()), - step=Step( - name="错误处理", - description="错误处理", - node=SpecialCallType.LLM.value, - type=SpecialCallType.LLM.value, - params={ - "user_prompt": LLM_ERROR_PROMPT.replace( - "{{ error_info }}", - self.task.state.error_info["err_msg"], # type: ignore[arg-type] + self.step_queue.appendleft( + StepQueueItem( + step_id=str(uuid.uuid4()), + step=Step( + name=( + "错误处理" if self.task.language == LanguageType.CHINESE else "Error Handling" ), - }, - ), - enable_filling=False, - to_user=False, - )) + description=( + "错误处理" if self.task.language == LanguageType.CHINESE else "Error Handling" + ), + node=SpecialCallType.LLM.value, + type=SpecialCallType.LLM.value, + params={ + "user_prompt": LLM_ERROR_PROMPT[self.task.language].replace( + "{{ error_info }}", + self.task.state.error_info["err_msg"], # type: ignore[arg-type] + ), + }, + ), + enable_filling=False, + to_user=False, + ) + ) is_error = True # 错误处理后结束 self._reached_end = True @@ -222,10 +250,12 @@ class FlowExecutor(BaseExecutor): # 尾插运行结束后的系统步骤 for step in FIXED_STEPS_AFTER_END: - self.step_queue.append(StepQueueItem( - step_id=str(uuid.uuid4()), - step=step, - )) + self.step_queue.append( + StepQueueItem( + step_id=str(uuid.uuid4()), + step=step.get(self.task.language, step[LanguageType.CHINESE]), + ) + ) await self._step_process() # FlowStop需要返回总时间,需要倒推最初的开始时间(当前时间减去当前已用总时间) diff --git a/apps/scheduler/executor/step.py b/apps/scheduler/executor/step.py index 5a95e407..7c3808c2 100644 --- a/apps/scheduler/executor/step.py +++ b/apps/scheduler/executor/step.py @@ -215,7 +215,7 @@ class StepExecutor(BaseExecutor): await self.push_message(EventType.STEP_INPUT.value, self.obj.input) # 执行步骤 - iterator = self.obj.exec(self, self.obj.input) + iterator = self.obj.exec(self, self.obj.input, language=self.task.language) try: content = await self._process_chunk(iterator, to_user=self.obj.to_user) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 57344d40..75e894ec 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -11,7 +11,7 @@ import yaml from anyio import Path from apps.common.config import Config -from apps.schemas.enum_var import EdgeType +from apps.schemas.enum_var import NodeType,EdgeType from apps.schemas.flow import AppFlow, Flow from apps.schemas.pool import AppPool from apps.models.vector import FlowPoolVector @@ -77,25 +77,18 @@ class FlowLoader: err = f"[FlowLoader] 步骤名称不能以下划线开头:{key}" logger.error(err) raise ValueError(err) - if key == "start": - step["name"] = "开始" - step["description"] = "开始节点" - step["type"] = "start" - elif key == "end": - step["name"] = "结束" - step["description"] = "结束节点" - step["type"] = "end" - else: - try: - step["type"] = await NodeManager.get_node_call_id(step["node"]) - except ValueError as e: - logger.warning("[FlowLoader] 获取节点call_id失败:%s,错误信息:%s", step["node"], e) - step["type"] = "Empty" - step["name"] = ( - (await NodeManager.get_node_name(step["node"])) - if "name" not in step or step["name"] == "" - else step["name"] - ) + if step["type"]==NodeType.START.value or step["type"]==NodeType.END.value: + continue + try: + step["type"] = await NodeManager.get_node_call_id(step["node"]) + except ValueError as e: + logger.warning("[FlowLoader] 获取节点call_id失败:%s,错误信息:%s", step["node"], e) + step["type"] = "Empty" + step["name"] = ( + (await NodeManager.get_node_name(step["node"])) + if "name" not in step or step["name"] == "" + else step["name"] + ) return flow_yaml async def load(self, app_id: str, flow_id: str) -> Flow | None: diff --git a/apps/scheduler/scheduler/message.py b/apps/scheduler/scheduler/message.py index d43ba7fb..3c85a4e6 100644 --- a/apps/scheduler/scheduler/message.py +++ b/apps/scheduler/scheduler/message.py @@ -60,13 +60,20 @@ async def push_init_message( async def push_rag_message( - task: Task, queue: MessageQueue, user_sub: str, llm: LLM, history: list[dict[str, str]], - doc_ids: list[str], - rag_data: RAGQueryReq,) -> None: + task: Task, + queue: MessageQueue, + user_sub: str, + llm: LLM, + history: list[dict[str, str]], + doc_ids: list[str], + rag_data: RAGQueryReq, +) -> None: """推送RAG消息""" full_answer = "" try: - async for chunk in RAG.chat_with_llm_base_on_rag(user_sub, llm, history, doc_ids, rag_data): + async for chunk in RAG.chat_with_llm_base_on_rag( + user_sub, llm, history, doc_ids, rag_data, task.language + ): task, content_obj = await _push_rag_chunk(task, queue, chunk) if content_obj.event_type == EventType.TEXT_ADD.value: # 如果是文本消息,直接拼接到答案中 diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index b524d249..2bd51e74 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -240,7 +240,12 @@ class Scheduler: if background.conversation and self.task.state.flow_status == FlowStatus.INIT: try: question_obj = QuestionRewrite() - post_body.question = await question_obj.generate(history=background.conversation, question=post_body.question, llm=reasion_llm) + post_body.question = await question_obj.generate( + history=background.conversation, + question=post_body.question, + llm=reasion_llm, + language=post_body.language, + ) except Exception: logger.exception("[Scheduler] 问题重写失败") if app_metadata.app_type == AppType.FLOW.value: diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py index 3ae7c425..f76e7502 100644 --- a/apps/schemas/enum_var.py +++ b/apps/schemas/enum_var.py @@ -209,3 +209,10 @@ class AgentState(str, Enum): RUNNING = "RUNNING" FINISHED = "FINISHED" ERROR = "ERROR" + +class LanguageType(str, Enum): + """语言类型""" + + CHINESE = "zh_cn" + ENGLISH = "en" + diff --git a/apps/schemas/message.py b/apps/schemas/message.py index 5b465ee5..17a569ca 100644 --- a/apps/schemas/message.py +++ b/apps/schemas/message.py @@ -1,16 +1,18 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """队列中的消息结构""" -from typing import Any from datetime import UTC, datetime +from typing import Any + from pydantic import BaseModel, Field from apps.schemas.enum_var import EventType, FlowStatus, StepStatus from apps.schemas.record import RecordMetadata -class param(BaseModel): +class FlowParams(BaseModel): """流执行过程中的参数补充""" + content: dict[str, Any] = Field(default={}, description="流执行过程中的参数补充内容") description: str = Field(default="", description="流执行过程中的参数补充描述") diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py index 3fd5a67f..7a4d6bb6 100644 --- a/apps/schemas/request_data.py +++ b/apps/schemas/request_data.py @@ -7,10 +7,11 @@ from pydantic import BaseModel, Field from apps.common.config import Config from apps.schemas.appcenter import AppData -from apps.schemas.enum_var import CommentType +from apps.schemas.enum_var import CommentType, LanguageType from apps.schemas.flow_topology import FlowItem from apps.schemas.mcp import MCPType -from apps.schemas.message import param +from apps.schemas.message import FlowParams + class RequestDataApp(BaseModel): @@ -42,12 +43,12 @@ class RequestData(BaseModel): question: str | None = Field(default=None, max_length=2000, description="用户输入") conversation_id: str | None = Field(default=None, alias="conversationId", description="聊天ID") group_id: str | None = Field(default=None, alias="groupId", description="问答组ID") - language: str = Field(default="zh", description="语言") + language: LanguageType = Field(default=LanguageType.CHINESE, description="语言") files: list[str] = Field(default=[], description="文件列表") app: RequestDataApp | None = Field(default=None, description="应用") debug: bool = Field(default=False, description="是否调试") task_id: str | None = Field(default=None, alias="taskId", description="任务ID") - params: param | bool | None = Field(default=None, description="流执行过程中的参数补充", alias="params") + params: FlowParams | bool | None = Field(default=None, description="流执行过程中的参数补充", alias="params") class QuestionBlacklistRequest(BaseModel): diff --git a/apps/schemas/task.py b/apps/schemas/task.py index d3e7b036..2bd292b6 100644 --- a/apps/schemas/task.py +++ b/apps/schemas/task.py @@ -7,7 +7,7 @@ from typing import Any from pydantic import BaseModel, Field -from apps.schemas.enum_var import FlowStatus, StepStatus +from apps.schemas.enum_var import FlowStatus, StepStatus, LanguageType from apps.schemas.flow import Step from apps.schemas.mcp import MCPPlan @@ -99,8 +99,8 @@ class Task(BaseModel): state: ExecutorState = Field(description="Flow的状态", default=ExecutorState()) tokens: TaskTokens = Field(description="Token信息") runtime: TaskRuntime = Field(description="任务运行时数据") - language: str = Field(description="语言", default="zh") created_at: float = Field(default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3)) + language: LanguageType = Field(description="语言", default=LanguageType.CHINESE) class StepQueueItem(BaseModel): diff --git a/apps/services/flow.py b/apps/services/flow.py index 4d682e5a..f694cd72 100644 --- a/apps/services/flow.py +++ b/apps/services/flow.py @@ -9,7 +9,7 @@ from apps.common.mongo import MongoDB from apps.scheduler.pool.loader.flow import FlowLoader from apps.scheduler.slot.slot import Slot from apps.schemas.collection import User -from apps.schemas.enum_var import EdgeType, PermissionType +from apps.schemas.enum_var import EdgeType, PermissionType, LanguageType from apps.schemas.flow import Edge, Flow, Step from apps.schemas.flow_topology import ( EdgeItem, @@ -19,6 +19,7 @@ from apps.schemas.flow_topology import ( NodeServiceItem, PositionItem, ) +from apps.scheduler.pool.pool import Pool from apps.services.node import NodeManager logger = logging.getLogger(__name__) @@ -68,7 +69,9 @@ class FlowManager: return (result > 0) @staticmethod - async def get_node_id_by_service_id(service_id: str) -> list[NodeMetaDataItem] | None: + async def get_node_id_by_service_id( + service_id: str, language: LanguageType = LanguageType.CHINESE + ) -> list[NodeMetaDataItem] | None: """ serviceId获取service的接口数据,并将接口转换为节点元数据 @@ -91,11 +94,21 @@ class FlowManager: except Exception: logger.exception("[FlowManager] generate_from_schema 失败") continue + + if service_id == "": + call_class: type[BaseModel] = await Pool().get_call(node_pool_record["_id"]) + call_class.language = language + node_name = call_class.info().name + node_description = call_class.info().description + else: + node_name = node_pool_record["name"] + node_description = node_pool_record["description"] + node_meta_data_item = NodeMetaDataItem( nodeId=node_pool_record["_id"], callId=node_pool_record["call_id"], - name=node_pool_record["name"], - description=node_pool_record["description"], + name=node_name, + description=node_description, editable=True, createdAt=node_pool_record["created_at"], parameters=parameters, # 添加 parametersTemplate 参数 @@ -108,7 +121,9 @@ class FlowManager: return nodes_meta_data_items @staticmethod - async def get_service_by_user_id(user_sub: str) -> list[NodeServiceItem] | None: + async def get_service_by_user_id( + user_sub: str, language: LanguageType = LanguageType.CHINESE + ) -> list[NodeServiceItem] | None: """ 通过user_id获取用户自己上传的、其他人公开的且收藏的、受保护且有权限访问并收藏的service @@ -148,7 +163,14 @@ class FlowManager: sort=[("created_at", ASCENDING)], ) service_records = await service_records_cursor.to_list(length=None) - service_items = [NodeServiceItem(serviceId="", name="系统", type="system", nodeMetaDatas=[])] + service_items = [ + NodeServiceItem( + serviceId="", + name="系统" if language == LanguageType.CHINESE else "System", + type="system", + nodeMetaDatas=[], + ) + ] service_items += [ NodeServiceItem( serviceId=record["_id"], @@ -160,7 +182,9 @@ class FlowManager: for record in service_records ] for service_item in service_items: - node_meta_datas = await FlowManager.get_node_id_by_service_id(service_item.service_id) + node_meta_datas = await FlowManager.get_node_id_by_service_id( + service_item.service_id, language + ) if node_meta_datas is None: node_meta_datas = [] service_item.node_meta_datas = node_meta_datas @@ -410,7 +434,6 @@ class FlowManager: flow_config.debug = await FlowManager.is_flow_config_equal(old_flow_config, flow_config) else: flow_config.debug = False - logger.error(f'{flow_config}') await flow_loader.save(app_id, flow_id, flow_config) except Exception: logger.exception("[FlowManager] 存储/更新流失败") diff --git a/apps/services/rag.py b/apps/services/rag.py index b50db3b7..34b65843 100644 --- a/apps/services/rag.py +++ b/apps/services/rag.py @@ -16,7 +16,7 @@ from apps.llm.reasoning import ReasoningLLM from apps.llm.token import TokenCalculator from apps.schemas.collection import LLM from apps.schemas.config import LLMConfig -from apps.schemas.enum_var import EventType +from apps.schemas.enum_var import EventType, LanguageType from apps.schemas.rag_data import RAGQueryReq from apps.services.session import SessionManager @@ -28,59 +28,106 @@ class RAG: system_prompt: str = "You are a helpful assistant." """系统提示词""" - user_prompt = """' - - 你是openEuler社区的智能助手。请结合给出的背景信息, 回答用户的提问,并且基于给出的背景信息在相关句子后进行脚注。 - 一个例子将在中给出。 - 上下文背景信息将在中给出。 - 用户的提问将在中给出。 - 注意: - 1.输出不要包含任何XML标签,不要编造任何信息。若你认为用户提问与背景信息无关,请忽略背景信息直接作答。 - 2.脚注的格式为[[1]],[[2]],[[3]]等,脚注的内容为提供的文档的id。 - 3.脚注只出现在回答的句子的末尾,例如句号、问号等标点符号后面。 - 4.不要对脚注本身进行解释或说明。 - 5.请不要使用中的文档的id作为脚注。 - - + user_prompt: dict[LanguageType, str] = { + LanguageType.CHINESE: r""" + + 你是openEuler社区的智能助手。请结合给出的背景信息, 回答用户的提问,并且基于给出的背景信息在相关句子后进行脚注。 + 一个例子将在中给出。 + 上下文背景信息将在中给出。 + 用户的提问将在中给出。 + 注意: + 1.输出不要包含任何XML标签,不要编造任何信息。若你认为用户提问与背景信息无关,请忽略背景信息直接作答。 + 2.脚注的格式为[[1]],[[2]],[[3]]等,脚注的内容为提供的文档的id。 + 3.脚注只出现在回答的句子的末尾,例如句号、问号等标点符号后面。 + 4.不要对脚注本身进行解释或说明。 + 5.请不要使用中的文档的id作为脚注。 + + + + + + openEuler社区是一个开源操作系统社区,致力于推动Linux操作系统的发展。 + + + openEuler社区的目标是为用户提供一个稳定、安全、高效的操作系统平台,并且支持多种硬件架构。 + + + + + openEuler社区的成员来自世界各地,包括开发者、用户和企业。 + + + openEuler社区的成员共同努力,推动开源操作系统的发展,并且为用户提供支持和帮助。 + + + + + openEuler社区的目标是什么? + + + openEuler社区是一个开源操作系统社区,致力于推动Linux操作系统的发展。[[1]] + openEuler社区的目标是为用户提供一个稳定、安全、高效的操作系统平台,并且支持多种硬件架构。[[1]] + + + - - - openEuler社区是一个开源操作系统社区,致力于推动Linux操作系统的发展。 - - - openEuler社区的目标是为用户提供一个稳定、安全、高效的操作系统平台,并且支持多种硬件架构。 - - - - - openEuler社区的成员来自世界各地,包括开发者、用户和企业。 - - - openEuler社区的成员共同努力,推动开源操作系统的发展,并且为用户提供支持和帮助。 - - + {bac_info} - openEuler社区的目标是什么? + {user_question} - - openEuler社区是一个开源操作系统社区,致力于推动Linux操作系统的发展。[[1]] - openEuler社区的目标是为用户提供一个稳定、安全、高效的操作系统平台,并且支持多种硬件架构。[[1]] - - - - - {bac_info} - - - {user_question} - - """ + """, + LanguageType.ENGLISH: r""" + + You are a helpful assistant of openEuler community. Please answer the user's question based on the given background information and add footnotes after the related sentences. + An example will be given in . + The background information will be given in . + The user's question will be given in . + Note: + 1. Do not include any XML tags in the output, and do not make up any information. If you think the user's question is unrelated to the background information, please ignore the background information and directly answer. + 2. Your response should not exceed 250 words. + + + + + + openEuler community is an open source operating system community, committed to promoting the development of the Linux operating system. + + + openEuler community aims to provide users with a stable, secure, and efficient operating system platform, and support multiple hardware architectures. + + + + + Members of the openEuler community come from all over the world, including developers, users, and enterprises. + + + Members of the openEuler community work together to promote the development of open source operating systems, and provide support and assistance to users. + + + + + What is the goal of openEuler community? + + + openEuler community is an open source operating system community, committed to promoting the development of the Linux operating system. [[1]] + openEuler community aims to provide users with a stable, secure, and efficient operating system platform, and support multiple hardware architectures. [[1]] + + + + + {bac_info} + + + {user_question} + + """, + } @staticmethod - async def get_doc_info_from_rag(user_sub: str, max_tokens: int, - doc_ids: list[str], - data: RAGQueryReq) -> list[dict[str, Any]]: + async def get_doc_info_from_rag( + user_sub: str, max_tokens: int, doc_ids: list[str], data: RAGQueryReq + ) -> list[dict[str, Any]]: """获取RAG服务的文档信息""" session_id = await SessionManager.get_session_by_user_sub(user_sub) url = Config().get_config().rag.rag_service.rstrip("/") + "/chunk/search" @@ -138,15 +185,23 @@ class RAG: doc_cnt += 1 doc_id_map[doc_chunk["docId"]] = doc_cnt doc_index = doc_id_map[doc_chunk["docId"]] - leave_tokens -= token_calculator.calculate_token_length(messages=[ - {"role": "user", "content": f''''''}, - {"role": "user", "content": ""} + leave_tokens -= token_calculator.calculate_token_length( + messages=[ + { + "role": "user", + "content": f"""""", + }, + {"role": "user", "content": ""}, + ], + pure_text=True, + ) + tokens_of_chunk_element = token_calculator.calculate_token_length( + messages=[ + {"role": "user", "content": ""}, + {"role": "user", "content": ""}, ], - pure_text=True) - tokens_of_chunk_element = token_calculator.calculate_token_length(messages=[ - {"role": "user", "content": ""}, - {"role": "user", "content": ""}, - ], pure_text=True) + pure_text=True, + ) doc_cnt = 0 doc_id_map = {} for doc_chunk in doc_chunk_list: @@ -196,7 +251,12 @@ class RAG: @staticmethod async def chat_with_llm_base_on_rag( - user_sub: str, llm: LLM, history: list[dict[str, str]], doc_ids: list[str], data: RAGQueryReq + user_sub: str, + llm: LLM, + history: list[dict[str, str]], + doc_ids: list[str], + data: RAGQueryReq, + language: LanguageType = LanguageType.CHINESE, ) -> AsyncGenerator[str, None]: """获取RAG服务的结果""" reasion_llm = ReasoningLLM( @@ -210,7 +270,9 @@ class RAG: if history: try: question_obj = QuestionRewrite() - data.query = await question_obj.generate(history=history, question=data.query, llm=reasion_llm) + data.query = await question_obj.generate( + history=history, question=data.query, llm=reasion_llm, language=language + ) except Exception: logger.exception("[RAG] 问题重写失败") doc_chunk_list = await RAG.get_doc_info_from_rag( @@ -225,7 +287,7 @@ class RAG: }, { "role": "user", - "content": RAG.user_prompt.format( + "content": RAG.user_prompt[language].format( bac_info=bac_info, user_question=data.query, ), -- Gitee