From 7eabcbee0e88829b693bfd28a510663c25d13103 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Thu, 20 Feb 2025 10:54:54 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=A2=9E=E5=8A=A0Ray=20Dashboard=E7=AB=AF?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../euler_copilot/templates/framework/framework.yaml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/deploy/chart/euler_copilot/templates/framework/framework.yaml b/deploy/chart/euler_copilot/templates/framework/framework.yaml index 5c04487f8..53d10fb1b 100644 --- a/deploy/chart/euler_copilot/templates/framework/framework.yaml +++ b/deploy/chart/euler_copilot/templates/framework/framework.yaml @@ -10,9 +10,13 @@ spec: selector: app: framework ports: - - port: 8002 + - name: framework + port: 8002 targetPort: 8002 nodePort: {{ default nil .Values.euler_copilot.framework.service.nodePort }} + - name: framework-ray + port: 8265 + targetPort: 8265 --- apiVersion: apps/v1 @@ -43,6 +47,8 @@ spec: ports: - containerPort: 8002 protocol: TCP + - containerPort: 8265 + protocol: TCP livenessProbe: httpGet: path: /health_check @@ -54,8 +60,8 @@ spec: env: - name: TZ value: "Asia/Shanghai" - - name: PROD - value: "enable" + # - name: PROD + # value: "enable" volumeMounts: - mountPath: /euler-copilot-frame/config name: framework-shared -- Gitee From f97884b9455ba15f543437f33683b2a2e1e13889 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Thu, 20 Feb 2025 11:16:02 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=90=88=E5=B9=B6Deepseek=E9=80=82?= =?UTF-8?q?=E9=85=8D=E5=92=8C=E4=BC=98=E5=8C=96=EF=BC=882=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/config.py | 17 +- apps/common/queue.py | 3 +- apps/entities/flow.py | 10 +- apps/entities/flow_topology.py | 17 +- apps/entities/request_data.py | 6 +- apps/entities/{plugin.py => scheduler.py} | 0 apps/llm/patterns/core.py | 4 +- apps/llm/patterns/domain.py | 58 ++---- apps/llm/patterns/executor.py | 126 ++++++------- apps/llm/patterns/facts.py | 53 ++---- apps/llm/patterns/recommend.py | 175 ++++++++---------- apps/llm/patterns/select.py | 57 +++--- apps/llm/reasoning.py | 104 +++++++++-- apps/manager/session.py | 7 +- apps/scheduler/call/__init__.py | 6 +- apps/scheduler/call/api.py | 66 +++---- apps/scheduler/call/choice.py | 65 +------ .../call/{reformat.py => convert.py} | 2 +- apps/scheduler/call/core.py | 2 +- apps/scheduler/call/llm.py | 2 +- apps/scheduler/call/next_flow.py | 14 -- apps/scheduler/call/rag.py | 17 +- apps/scheduler/call/render/render.py | 2 +- apps/scheduler/call/sql.py | 2 +- apps/scheduler/call/suggest.py | 58 ++++++ apps/scheduler/executor/flow.py | 2 +- apps/scheduler/openapi.py | 2 +- apps/scheduler/pool/pool.py | 2 + apps/scheduler/scheduler/scheduler.py | 29 +-- apps/scheduler/slot/slot.py | 15 +- apps/service/suggestion.py | 169 ----------------- 31 files changed, 458 insertions(+), 634 deletions(-) rename apps/entities/{plugin.py => scheduler.py} (100%) rename apps/scheduler/call/{reformat.py => convert.py} (98%) delete mode 100644 apps/scheduler/call/next_flow.py create mode 100644 apps/scheduler/call/suggest.py delete mode 100644 apps/service/suggestion.py diff --git a/apps/common/config.py b/apps/common/config.py index 9d2332e57..66dd553d6 100644 --- a/apps/common/config.py +++ b/apps/common/config.py @@ -17,8 +17,6 @@ class ConfigModel(BaseModel): # DEPLOY DEPLOY_MODE: str = Field(description="oidc 部署方式", default="online") COOKIE_MODE: str = Field(description="COOKIE SET 方式", default="domain") - # WEB - WEB_FRONT_URL: str = Field(description="web前端地址") # Redis REDIS_HOST: str = Field(description="Redis主机名") REDIS_PORT: int = Field(description="Redis端口号", default=6379) @@ -38,14 +36,15 @@ class ConfigModel(BaseModel): SESSION_TTL: int = Field(description="用户需要刷新Token的间隔(min)", default=30) # Logging LOG: str = Field(description="日志记录模式") - # Vectorize - VECTORIZE_HOST: str = Field(description="Vectorize服务域名") + # Embedding + EMBEDDING_URL: str = Field(description="Embedding模型地址") + EMBEDDING_KEY: str = Field(description="Embedding模型API Key") + EMBEDDING_MODEL: str = Field(description="Embedding模型名称") # RAG RAG_HOST: str = Field(description="RAG服务域名") # FastAPI DOMAIN: str = Field(description="当前实例的域名") JWT_KEY: str = Field(description="JWT key", default=secrets.token_hex(16)) - PICKLE_KEY: str = Field(description="Pickle Key", default=secrets.token_hex(16)) # 风控 DETECT_TYPE: Optional[str] = Field(description="敏感词检测系统类型", default=None) WORDS_CHECK: Optional[str] = Field(description="AutoGPT敏感词检测系统API URL", default=None) @@ -72,17 +71,19 @@ class ConfigModel(BaseModel): HALF_KEY1: str = Field(description="Half key 1") HALF_KEY2: str = Field(description="Half key 2") HALF_KEY3: str = Field(description="Half key 3") - # OpenAI大模型 + # OpenAI API LLM_KEY: Optional[str] = Field(description="OpenAI API 密钥", default=None) LLM_URL: Optional[str] = Field(description="OpenAI API URL地址", default=None) LLM_MODEL: Optional[str] = Field(description="OpenAI API 模型名", default=None) - # 参数猜解 + LLM_MAX_TOKENS: int = Field(description="OpenAI API 最大Token数", default=8192) + LLM_TEMPERATURE: float = Field(description="OpenAI API 温度", default=0.7) + # 参数提取 SCHEDULER_BACKEND: Optional[str] = Field(description="参数猜解后端", default=None) SCHEDULER_MODEL: Optional[str] = Field(description="参数猜解模型名", default=None) SCHEDULER_URL: Optional[str] = Field(description="参数猜解 URL地址", default=None) SCHEDULER_API_KEY: Optional[str] = Field(description="参数猜解 API密钥", default=None) SCHEDULER_MAX_TOKENS: int = Field(description="参数猜解最大Token数", default=8192) - SCHEDULER_TEMPERATURE: float = Field(description="参数猜解温度", default=0.07) + SCHEDULER_TEMPERATURE: float = Field(description="参数猜解温度", default=0.7) # 插件位置 SERVICE_DIR: Optional[str] = Field(description="插件路径", default=None) # SQL接口路径 diff --git a/apps/common/queue.py b/apps/common/queue.py index eab5b7aab..50b02ba3a 100644 --- a/apps/common/queue.py +++ b/apps/common/queue.py @@ -75,8 +75,7 @@ class MessageQueue: history = tcb.flow_context[tcb.flow_state.step_id] flow = MessageFlow( - # TODO:appId 和 flowId 暂时使用flow_id - appId=history.flow_id, + appId=history.app_id, flowId=history.flow_id, stepId=history.step_id, stepStatus=history.status, diff --git a/apps/entities/flow.py b/apps/entities/flow.py index c916640b8..bafe3664f 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -4,7 +4,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ from typing import Any, Optional -from pydantic import BaseModel, Field, HttpUrl +from pydantic import BaseModel, Field from apps.entities.enum_var import ( EdgeType, @@ -41,13 +41,6 @@ class Step(BaseModel): params: dict[str, Any] = Field(description="用户手动指定的Node参数", default={}) -class NextFlow(BaseModel): - """Flow中“下一步”的数据格式""" - - flow_id: str - question: Optional[str] = None - - class FlowError(BaseModel): """Flow的错误处理节点""" @@ -63,7 +56,6 @@ class Flow(BaseModel): on_error: FlowError = FlowError(use_llm=True) steps: list[Step] = Field(description="节点列表", default=[]) edges: list[Edge] = Field(description="边列表", default=[]) - next_flow: Optional[list[NextFlow]] = None debug: bool = Field(description="是否经过调试", default=False) class MetadataBase(BaseModel): diff --git a/apps/entities/flow_topology.py b/apps/entities/flow_topology.py index a84bd66d1..619f6950a 100644 --- a/apps/entities/flow_topology.py +++ b/apps/entities/flow_topology.py @@ -3,14 +3,15 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ from typing import Any, Optional -from typing import Optional from pydantic import BaseModel, Field from apps.entities.enum_var import EdgeType, NodeType + class NodeMetaDataItem(BaseModel): """节点元数据类""" + node_meta_data_id: str = Field(alias="nodeMetaDataId") type: str name: str @@ -21,6 +22,7 @@ class NodeMetaDataItem(BaseModel): class NodeServiceItem(BaseModel): """GET /api/flow/service 中单个service信息以及service下的节点元数据的信息""" + service_id: str = Field(..., alias="serviceId", description="服务ID") name: str = Field(..., description="服务名称") type: str = Field(..., description="服务类型") @@ -28,24 +30,27 @@ class NodeServiceItem(BaseModel): created_at: str = Field(..., alias="createdAt", description="创建时间") class PositionItem(BaseModel): """请求/响应中的前端相对位置变量类""" + x: float = Field(default=0.0) y: float = Field(default=0.0) class DependencyItem(BaseModel): """请求/响应中的节点依赖变量类""" + node_id: str = Field(alias="nodeId") type: str class NodeItem(BaseModel): """请求/响应中的节点变量类""" + node_id: str = Field(alias="nodeId",default="") service_id: str = Field(alias="serviceId",default="") node_meta_data_id: str = Field(alias="nodeMetaDataId",default="") name: str=Field(default="") type: str = Field(default=NodeType.NORMAL.value) - description: str = Field(default='') + description: str = Field(default="") enable: bool = Field(default=True) parameters: dict[str, Any] = Field(default={}) depedency: Optional[DependencyItem] = None @@ -54,6 +59,7 @@ class NodeItem(BaseModel): class EdgeItem(BaseModel): """请求/响应中的边变量类""" + edge_id: str = Field(alias="edgeId") source_node: str = Field(alias="sourceNode") target_node: str = Field(alias="targetNode") @@ -63,9 +69,10 @@ class EdgeItem(BaseModel): class FlowItem(BaseModel): """请求/响应中的流变量类""" - flow_id: Optional[str] = Field(alias="flowId", default='flow id') - name: str = Field(default='flow name') - description: str = Field(default='flow description') + + flow_id: Optional[str] = Field(alias="flowId", default="flow id") + name: str = Field(default="flow name") + description: str = Field(default="flow description") enable: bool = Field(default=True) editable: bool = Field(default=True) nodes: list[NodeItem] = Field(default=[]) diff --git a/apps/entities/request_data.py b/apps/entities/request_data.py index d069bec51..8321a5b9e 100644 --- a/apps/entities/request_data.py +++ b/apps/entities/request_data.py @@ -31,11 +31,11 @@ class RequestData(BaseModel): """POST /api/chat 请求的总的数据结构""" question: str = Field(max_length=2000, description="用户输入") - conversation_id: str = Field(default=None, alias="conversationId", description="会话ID") - group_id: Optional[str] = Field(default=None, alias="groupId", description="群组ID") + conversation_id: str = Field(default=None, alias="conversationId", description="聊天ID") + group_id: Optional[str] = Field(default=None, alias="groupId", description="问答组ID") language: str = Field(default="zh", description="语言") files: list[str] = Field(default=[], description="文件列表") - app: RequestDataApp = Field(default=None, description="应用") + app: Optional[RequestDataApp] = Field(default=None, description="应用") features: RequestDataFeatures = Field(description="消息功能设置") debug: bool = Field(default=False, description="是否调试") diff --git a/apps/entities/plugin.py b/apps/entities/scheduler.py similarity index 100% rename from apps/entities/plugin.py rename to apps/entities/scheduler.py diff --git a/apps/llm/patterns/core.py b/apps/llm/patterns/core.py index a19ba8df0..fc2455936 100644 --- a/apps/llm/patterns/core.py +++ b/apps/llm/patterns/core.py @@ -29,8 +29,8 @@ class CorePattern(ABC): if user_prompt is not None: self.user_prompt = user_prompt - if not self.system_prompt or not self.user_prompt: - err = "必须设置系统提示词和用户提示词!" + if not self.user_prompt: + err = "必须设置用户提示词!" raise ValueError(err) self.system_prompt = dedent(self.system_prompt).strip("\n") diff --git a/apps/llm/patterns/domain.py b/apps/llm/patterns/domain.py index e2623bd8a..3036bda8a 100644 --- a/apps/llm/patterns/domain.py +++ b/apps/llm/patterns/domain.py @@ -12,47 +12,22 @@ from apps.llm.reasoning import ReasoningLLM class Domain(CorePattern): """从问答中提取领域信息""" - system_prompt: str = r""" - Your task is: Extract feature tags and categories from given conversations. - Tags and categories will be used in a recommendation system to offer search keywords to users. - - Conversations will be given between "" and "" tags. - - EXAMPLE 1 - - CONVERSATION: - - User: What is the weather in Beijing? - Assistant: It is sunny in Beijing. - - - OUTPUT: - Beijing, weather - END OF EXAMPLE 1 - - - EXAMPLE 2 - - CONVERSATION: - - User: Check CVEs on host 1 from 2024-01-01 to 2024-01-07. - Assistant: There are 3 CVEs on host 1 from 2024-01-01 to 2024-01-07, including CVE-2024-0001, CVE-2024-0002, and CVE-2024-0003. - - - OUTPUT: - CVE, host 1, Cybersecurity + user_prompt: str = r""" + 根据对话上文,提取推荐系统所需的关键词标签,要求: + 1. 实体名词、技术术语、时间范围、地点、产品等关键信息均可作为关键词标签 + 2. 至少一个关键词与对话的话题有关 + 3. 标签需精简,不得重复,不得超过10个字 - END OF EXAMPLE 2 - """ - """系统提示词""" + ==示例== + 样例对话: + 用户:北京天气如何? + 助手:北京今天晴。 - user_prompt: str = r""" - CONVERSATION: - - {conversation} - + 样例输出: + ["北京", "天气"] + ==结束示例== - OUTPUT: + 输出结果: """ """用户提示词""" @@ -75,10 +50,9 @@ class Domain(CorePattern): async def generate(self, task_id: str, **kwargs) -> list[str]: # noqa: ANN003 """从问答中提取领域信息""" - messages = [ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": self.user_prompt.format(conversation=kwargs["conversation"])}, - ] + messages = [{"role": "system", "content": ""}] + messages += kwargs["conversation"] + messages += [{"role": "user", "content": self.user_prompt}] result = "" async for chunk in ReasoningLLM().call(task_id, messages, streaming=False): diff --git a/apps/llm/patterns/executor.py b/apps/llm/patterns/executor.py index 5f39200c8..2bfed3cea 100644 --- a/apps/llm/patterns/executor.py +++ b/apps/llm/patterns/executor.py @@ -6,7 +6,7 @@ from collections.abc import AsyncGenerator from textwrap import dedent from typing import Any, Optional -from apps.entities.plugin import ExecutorBackground as ExecutorBackgroundEntity +from apps.entities.scheduler import ExecutorBackground as ExecutorBackgroundEntity from apps.llm.patterns.core import CorePattern from apps.llm.reasoning import ReasoningLLM @@ -14,24 +14,22 @@ from apps.llm.reasoning import ReasoningLLM class ExecutorThought(CorePattern): """通过大模型生成Executor的思考内容""" - system_prompt: str = r""" - You are an intelligent assistant equipped with tools to access necessary information. - Your task is to: succinctly summarize the tool usage process, provide your insights, and propose the next logical action. - """ + system_prompt: str = "" """系统提示词""" user_prompt: str = r""" - You previously utilized a tool named "{tool_name}" which performs the function of "{tool_description}". \ - The tool's generated output is: `{tool_output}` (with "message" as the natural language content and "output" as structured data). + 你是一个可以使用工具的智能助手。请简明扼要地总结工具的使用过程,提供你的见解,并给出下一步的行动。 + + 你之前使用了一个名为"{tool_name}"的工具,该工具的功能是"{tool_description}"。\ + 工具生成的输出是:`{tool_output}`(其中"message"是自然语言内容,"output"是结构化数据)。 - Your earlier thoughts were: + 你之前的思考是: {last_thought} - The current question you seek to resolve is: + 你当前需要解决的问题是: {user_question} - Consider the above information thoroughly; articulate your thoughts methodically, step by step. - Begin. + 请综合以上信息,再次一步一步地进行思考,并给出见解和行动: """ """用户提示词""" @@ -50,7 +48,7 @@ class ExecutorThought(CorePattern): raise ValueError(err) from e messages = [ - {"role": "system", "content": self.system_prompt}, + {"role": "system", "content": ""}, {"role": "user", "content": self.user_prompt.format( last_thought=last_thought, user_question=user_question, @@ -61,7 +59,7 @@ class ExecutorThought(CorePattern): ] result = "" - async for chunk in ReasoningLLM().call(task_id, messages, streaming=False, temperature=1.0): + async for chunk in ReasoningLLM().call(task_id, messages, streaming=False, temperature=0.7): result += chunk return result @@ -70,40 +68,24 @@ class ExecutorThought(CorePattern): class ExecutorBackground(CorePattern): """使用大模型进行生成Executor初始背景""" - system_prompt: str = r""" - 你是一位专门负责总结和分析对话的AI助手。你的任务是: - 1. 理解用户与AI之间的对话内容 - 2. 分析提供的关键事实列表 - 3. 结合之前的思考生成一个简洁但全面的背景总结 - 4. 确保总结包含对话中的重要信息点和关键概念 - 请用清晰、专业的语言输出总结,同时注意呈现预先考虑过的思考内容。 - """ + system_prompt: str = r"" """系统提示词""" user_prompt: str = r""" - 请分析以下内容: - - 1. 之前的思考: - + 根据对话上文,结合给定的AI助手思考过程,生成一个完整的背景总结。这个总结将用于后续对话的上下文理解。 + 生成总结的要求如下: + 1. 突出重要信息点,例如时间、地点、人物、事件等。 + 2. 下面给出的事实条目若与历史记录有关,则可以在生成总结时作为已知信息。 + 3. 确保信息准确性,不得编造信息。 + 4. 总结应少于1000个字。 + + 思考过程(在标签中): {thought} - - - 2. 对话记录(包含用户和AI的对话,在标签中): - - {conversation} - - 3. 关键事实(在标签中): - + 关键事实(在标签中): {facts} - - - 请基于以上信息,生成一个完整的背景总结。这个总结将用于后续对话的上下文理解。 - 要求: - - 突出重要信息点 - - 保持逻辑连贯性 - - 确保信息准确性 - 请开始总结。 + + 现在,请开始生成背景总结: """ """用户提示词""" @@ -116,23 +98,25 @@ class ExecutorBackground(CorePattern): background: ExecutorBackgroundEntity = kwargs["background"] # 转化字符串 - message_str = "" + messages = [] for item in background.conversation: - message_str += f"[{item['role']}] {item['content']}\n" - facts_str = "" + messages += [{"role": item["role"], "content": item["content"]}] + + facts_str = "\n" for item in background.facts: facts_str += f"- {item}\n" + facts_str += "" + if not background.thought: - background.thought = "这是新的对话,我还没有思考过。" + background.thought = "\n这是新的对话,我还没有思考过。\n" + else: + background.thought = f"\n{background.thought}\n" - user_input = self.user_prompt.format( - conversation=message_str, - facts=facts_str, - thought=background.thought, - ) - messages = [ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": user_input}, + messages += [ + {"role": "user", "content": self.user_prompt.format( + facts=facts_str, + thought=background.thought, + )}, ] result = "" @@ -145,24 +129,24 @@ class ExecutorBackground(CorePattern): class ExecutorResult(CorePattern): """使用大模型生成Executor的最终结果""" - system_prompt: str = r""" - 你是一个专业的智能助手,旨在根据背景信息等,回答用户的问题。 - - 要求: - - 使用中文回答问题,不要使用其他语言。 - - 提供的回答应当语气友好、通俗易懂,并包含尽可能完整的信息。 - """ + system_prompt: str = "" """系统提示词""" user_prompt: str = r""" + 你是AI智能助手,请回答用户的问题并满足以下要求: + 1. 使用中文回答问题,不要使用其他语言。 + 2. 回答应当语气友好、通俗易懂,并包含尽可能完整的信息。 + 3. 回答时应结合思考过程。 + 用户的问题是: {question} - 以下是一些供参考的背景信息: - {thought} - {final_output} + 思考过程(在标签中): + + {thought}{output} + - 现在,请根据以上信息,针对用户的问题提供准确而简洁的回答。 + 现在,请根据以上信息进行回答: """ """用户提示词""" @@ -179,10 +163,10 @@ class ExecutorResult(CorePattern): # 如果final_output不为空,则将final_output转换为字符串 if final_output: final_output_str = dedent(f""" - 你提供了{final_output['type']}类型数据:`{final_output['data']}`。\ - 这些数据已经使用恰当的办法向用户进行了展示,所以无需重复展示。\ - 当类型为“schema”时,证明用户的问题缺少回答所需的必要信息。\ - 我需要根据Schema的具体内容分析缺失哪些信息,并提示用户补充。 + 工具提供了{final_output['type']}类型数据:`{final_output['data']}`。\ + 这些数据已经使用恰当的办法向用户进行了展示,所以无需重复。\ + 若类型为“schema”,说明用户的问题缺少回答所需的必要信息。\ + 我需要根据schema的具体内容分析缺失哪些信息,并提示用户补充。 """) else: final_output_str = "" @@ -190,12 +174,12 @@ class ExecutorResult(CorePattern): user_input = self.user_prompt.format( question=question, thought=thought, - final_output=final_output_str, + output=final_output_str, ) messages = [ - {"role": "system", "content": self.system_prompt}, + {"role": "system", "content": ""}, {"role": "user", "content": user_input}, ] - async for chunk in ReasoningLLM().call(task_id, messages, streaming=True, temperature=1.0): + async for chunk in ReasoningLLM().call(task_id, messages, streaming=True, temperature=0.7): yield chunk diff --git a/apps/llm/patterns/facts.py b/apps/llm/patterns/facts.py index 8b1d0cf4e..665ef1746 100644 --- a/apps/llm/patterns/facts.py +++ b/apps/llm/patterns/facts.py @@ -1,5 +1,4 @@ """事实提取""" -import json from typing import Any, ClassVar, Optional from apps.llm.patterns.core import CorePattern @@ -10,10 +9,12 @@ from apps.llm.reasoning import ReasoningLLM class Facts(CorePattern): """事实提取""" - system_prompt: str = r""" - 你是一个信息提取助手,擅长从用户提供的个人信息中准确提取出偏好、关系、实体等有用信息,并将其进行归纳和整理。 - 你的任务是:从给出的对话中提取关键信息,并将它们组织成独一无二的、易于理解的事实。对话将以JSON格式给出,其中“question”为用户的输入,“answer”为回答。 - 以下是您需要关注的信息类型以及有关如何处理输入数据的详细说明。 + system_prompt: str = "" + """系统提示词(暂不使用)""" + + user_prompt: str = r""" + 从对话上文中提取关键信息,并将它们组织成独一无二的、易于理解的事实,包含用户偏好、关系、实体等有用信息。 + 以下是需要关注的信息类型以及有关如何处理输入数据的详细说明。 **你需要关注的信息类型** 1. 实体:对话中涉及到的实体。例如:姓名、地点、组织、事件等。 @@ -32,43 +33,19 @@ class Facts(CorePattern): } ``` - **样例** - EXAMPLE 1 - { - "question": "杭州西湖有哪些景点?", - "answer": "杭州西湖是中国浙江省杭州市的一个著名景点,以其美丽的自然风光和丰富的文化遗产而闻名。西湖周围有许多著名的景点,包括著名的苏堤、白堤、断桥、三潭印月等。西湖以其清澈的湖水和周围的山脉而著名,是中国最著名的湖泊之一。" - } + **输出格式示例** + 样例对话: + 用户:杭州西湖有哪些景点? + 助手:杭州西湖是中国浙江省杭州市的一个著名景点,以其美丽的自然风光和丰富的文化遗产而闻名。西湖周围有许多著名的景点,包括著名的苏堤、白堤、断桥、三潭印月等。西湖以其清澈的湖水和周围的山脉而著名,是中国最著名的湖泊之一。 - 事实信息: + 样例输出: ```json { "facts": ["杭州西湖有苏堤、白堤、断桥、三潭印月等景点"] } ``` - END OF EXAMPLE 1 - - EXAMPLE 2 - { - "question": "开放原子基金会是什么?", - "answer": "开放原子基金会(OpenAtom Foundation)是一个非营利性组织,旨在推动开源生态的发展。它由阿里巴巴、华为、腾讯等多家知名科技公司共同发起,致力于构建一个开放、协作、共享的开源社区。" - } - - 事实信息: - ```json - { - "facts": ["开放原子基金会是一个非营利性组织,旨在推动开源生态的发展", "开放原子基金会由阿里巴巴、华为、腾讯等多家知名科技公司共同发起"] - } - ``` - - END OF EXAMPLE 2 - """ - """系统提示词""" - - user_prompt: str = r""" - {message_json_str} - - 事实信息: + 现在,请开始输出结果: """ """用户提示词""" @@ -97,8 +74,10 @@ class Facts(CorePattern): async def generate(self, task_id: str, **kwargs) -> list[str]: # noqa: ANN003 """事实提取""" messages = [ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": self.user_prompt.format(message_json_str=json.dumps(kwargs["message"], ensure_ascii=False))}, + {"role": "system", "content": ""}, + {"role": "user", "content": kwargs["message"]["question"]}, + {"role": "assistant", "content": kwargs["message"]["answer"]}, + {"role": "user", "content": self.user_prompt}, ] result = "" async for chunk in ReasoningLLM().call(task_id, messages, streaming=False): diff --git a/apps/llm/patterns/recommend.py b/apps/llm/patterns/recommend.py index b6b1e9452..9397d0d38 100644 --- a/apps/llm/patterns/recommend.py +++ b/apps/llm/patterns/recommend.py @@ -2,115 +2,99 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from typing import Optional +from typing import Any, ClassVar, Optional from apps.llm.patterns.core import CorePattern +from apps.llm.patterns.json import Json from apps.llm.reasoning import ReasoningLLM class Recommend(CorePattern): """使用大模型进行推荐问题生成""" - system_prompt: str = r""" - 你是智能助手,负责分析问答历史并预测用户问题。 - - **任务说明:** - - 根据背景信息、工具描述和用户倾向预测问题。 - - **信息说明:** - - [Empty]标识空信息,如“背景信息: [Empty]”表示当前无背景信息。 - - 背景信息含最近1条完整问答信息及最多4条历史提问信息。 - - **要求:** - 1. 用用户口吻生成问题。 - 2. 优先使用工具描述进行预测,特别是与背景或倾向无关时。 - 3. 工具描述为空时,依据背景和倾向预测。 - 4. 生成的应为疑问句或祈使句,时间限制为30字。 - 5. 避免输出非必要信息。 - 6. 新生成的问题不得与“已展示问题”或“用户历史提问”重复或相似。 - - **示例:** - - EXAMPLE 1 - ## 工具描述 - 调用API,查询天气数据 - - ## 背景信息 - ### 用户历史提问 - Question 1: 简单介绍杭州 - Question 2: 杭州有哪些著名景点 - - ### 最近1轮问答 - Question: 帮我查询今天的杭州天气数据 - Answer: 杭州今天晴,气温20度,空气质量优。 - - ## 用户倾向 - ['旅游', '美食'] - - ## 已展示问题 - 杭州有什么好吃的? - - ## 预测问题 - 杭州西湖景区的门票价格是多少? - END OF EXAMPLE 1 - - EXAMPLE 2 - ## 工具描述 - [Empty] - - ## 背景信息 - ### 用户历史提问 - [Empty] - - ### 最近1轮问答 - Question: 帮我查询上周的销售数据 - Answer: 上周的销售数据如下: - 星期一:1000 - 星期二:1200 - 星期三:1100 - 星期四:1300 - 星期五:1400 - - ## 用户倾向 - ['销售', '数据分析'] - - ## 已展示问题 - [Empty] - - ## 预测问题 - 帮我分析上周的销售数据趋势 - END OF EXAMPLE 2 - - Let's begin. - """ + system_prompt: str = "" """系统提示词""" user_prompt: str = r""" - ## 工具描述 - {action_description} + ## 目标: + 根据上面的历史对话,结合给出的工具描述和用户倾向,生成三个预测问题。 + + ## 要求: + 信息说明: + - [Empty]的含义是“空信息”,如“工具描述: [Empty]”表示当前未使用工具。请忽略信息为空的项,正常进行问题预测。 + - 历史提问信息是用户发生在历史对话之前的提问,仅为背景参考作用。 + + 生成时需要遵循的要求: + 1. 从用户角度生成预测问题,数量必须为3个,必须为疑问句或祈使句,必须少于30字。 + 2. 预测问题应优先贴合工具描述,除非工具描述为空。 + 3. 预测问题必须精简,不得在问题中掺杂非必要信息,不得输出除问题以外的文字。 + 4. 请以如下格式输出: + + ```json + {{ + "predicted_questions": [ + "预测问题1", + "预测问题2", + "预测问题3" + ] + }} + ``` + + ## 样例: + 工具描述:调用API,查询天气数据 + + 用户历史提问: + - 简单介绍杭州 + - 杭州有哪些著名景点 + + 用户倾向: + ['旅游', '美食'] - ## 背景信息 - ### 用户历史提问 + 生成的预测问题: + ```json + {{ + "predicted_questions": [ + "杭州西湖景区的门票价格是多少?", + "杭州有哪些著名景点?", + "杭州的天气怎么样?" + ] + }} + ``` + + ## 现在,进行问题生成: + 工具描述:{action_description} + + 用户历史提问: {history_questions} - ### 最近1轮问答 - {recent_question} - - ## 用户倾向 + 用户倾向: {user_preference} - ## 已展示问题 - {shown_questions} - - ## 预测问题 + 生成的预测问题: + ```json """ """用户提示词""" + slot_schema: ClassVar[dict[str, Any]] = { + "type": "object", + "properties": { + "predicted_questions": { + "type": "array", + "description": "推荐的问题列表", + "items": { + "type": "string", + }, + }, + }, + "required": ["predicted_questions"], + } + """最终输出的JSON Schema""" + def __init__(self, system_prompt: Optional[str] = None, user_prompt: Optional[str] = None) -> None: """初始化推荐问题生成Prompt""" super().__init__(system_prompt, user_prompt) - async def generate(self, task_id: str, **kwargs) -> str: # noqa: ANN003 + async def generate(self, task_id: str, **kwargs) -> list[str]: # noqa: ANN003 """生成推荐问题""" if "action_description" not in kwargs or not kwargs["action_description"]: action_description = "[Empty]" @@ -127,26 +111,25 @@ class Recommend(CorePattern): else: history_questions = kwargs["history_questions"] - if "shown_questions" not in kwargs or not kwargs["shown_questions"]: - shown_questions = "[Empty]" - else: - shown_questions = kwargs["shown_questions"] - user_input = self.user_prompt.format( action_description=action_description, history_questions=history_questions, - recent_question=kwargs["recent_question"], - shown_questions=shown_questions, user_preference=user_preference, ) - messages = [ + messages = kwargs["recent_question"] + [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": user_input}, ] result = "" - async for chunk in ReasoningLLM().call(task_id, messages, streaming=False, temperature=1.0): + async for chunk in ReasoningLLM().call(task_id, messages, streaming=False, temperature=0.7, result_only=True): result += chunk + messages += [{"role": "assistant", "content": result}] + + question_dict = await Json().generate(task_id, conversation=messages, spec=self.slot_schema) + + if not question_dict or "predicted_questions" not in question_dict or not question_dict["predicted_questions"]: + return [] - return result + return question_dict["predicted_questions"] diff --git a/apps/llm/patterns/select.py b/apps/llm/patterns/select.py index 97aba3d93..58baa7c59 100644 --- a/apps/llm/patterns/select.py +++ b/apps/llm/patterns/select.py @@ -15,47 +15,42 @@ from apps.llm.reasoning import ReasoningLLM class Select(CorePattern): """通过投票选择最佳答案""" - system_prompt: str = r""" - Your task is: select the best option from the list of available options. The option should be able to answer \ - the question and be inferred from the context and the output. + system_prompt: str = r"" + """系统提示词""" - EXAMPLE - Question: 使用天气API,查询明天杭州的天气信息 + user_prompt: str = r""" + 根据历史对话(包括工具调用结果)和用户问题,从给出的选项列表中,选出最符合要求的那一项。 + 在输出之前,请先思考,并使用“”标签给出思考过程。 - Context: 人类首先询问了杭州有什么美食,之后询问了杭州有什么知名旅游景点。 + ==样例== + 用户问题: 使用天气API,查询明天杭州的天气信息 - Output: `{}` + 选项列表: + - [API] 请求特定API,获得返回的JSON数据 + - [SQL] 查询数据库,获得数据库表中的数据 - The available options are: - - API: 请求特定API,获得返回的JSON数据 - - SQL: 查询数据库,获得数据库表中的数据 + + API 工具可以通过 API 来获取外部数据,而天气信息可能就存储在外部数据中,由于用户说明中明确提到了天气 API 的使用,因此应该优先使用 API 工具。\ + SQL 工具用于从数据库中获取信息,考虑到天气数据的可变性和动态性,不太可能存储在数据库中,因此 SQL 工具的优先级相对较低,\ + 最佳选择似乎是“API:请求特定 API,获取返回的 JSON 数据”。 + - Let's think step by step. API tools can retrieve external data through the use of APIs, and weather \ - information may be stored in external data. As the user instructions explicitly mentioned the use of the weather API, \ - the API tool should be prioritized. SQL tools are used to retrieve information from databases. Given the variable \ - and dynamic nature of weather data, it is unlikely to be stored in a database. Therefore, the priority of \ - SQL tools is relatively low. The best option seems to be "API: request a specific API, get the \ - returned JSON data". - END OF EXAMPLE + 最符合要求的选项是: + API + ==结束样例== - Let's begin. - """ - """系统提示词""" + 用户问题: {question} - user_prompt: str = r""" - Question: {question} - - Context: {background} - - Output: `{data}` - - The available options are: + 选项列表: {choice_list} - Let's think step by step. + 思考: + + 让我们一步一步思考。 """ """用户提示词""" + slot_schema: ClassVar[dict[str, Any]] = { "type": "object", "properties": { @@ -78,8 +73,8 @@ class Select(CorePattern): choices_prompt = "" choice_str_list = [] for choice in choices: - choices_prompt += "- {}: {}\n".format(choice["branch"], choice["description"]) - choice_str_list.append(choice["branch"]) + choices_prompt += "- {}: {}\n".format(choice["name"], choice["description"]) + choice_str_list.append(choice["name"]) return choices_prompt, choice_str_list async def _generate_single_attempt(self, task_id: str, user_input: str, choice_list: list[str]) -> str: diff --git a/apps/llm/reasoning.py b/apps/llm/reasoning.py index 983eddb28..a9844f465 100644 --- a/apps/llm/reasoning.py +++ b/apps/llm/reasoning.py @@ -1,24 +1,32 @@ -"""推理/生成大模型调用 +"""问答大模型调用 Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ from collections.abc import AsyncGenerator +from typing import Optional import tiktoken from openai import AsyncOpenAI from apps.common.config import config from apps.common.singleton import Singleton +from apps.constants import LOGGER, REASONING_BEGIN_TOKEN, REASONING_END_TOKEN from apps.manager.task import TaskManager class ReasoningLLM(metaclass=Singleton): - """调用用于推理/生成的大模型""" + """调用用于问答的大模型""" _encoder = tiktoken.get_encoding("cl100k_base") def __init__(self) -> None: """判断配置文件里用了哪种大模型;初始化大模型客户端""" + if not config["LLM_KEY"]: + self._client = AsyncOpenAI( + base_url=config["LLM_URL"], + ) + return + self._client = AsyncOpenAI( api_key=config["LLM_KEY"], base_url=config["LLM_URL"], @@ -47,16 +55,23 @@ class ReasoningLLM(metaclass=Singleton): return messages - async def call(self, task_id: str, messages: list[dict[str, str]], - max_tokens: int = 8192, temperature: float = 0.07, *, streaming: bool = True) -> AsyncGenerator[str, None]: + async def call(self, task_id: str, messages: list[dict[str, str]], # noqa: C901, PLR0912 + max_tokens: Optional[int] = None, temperature: Optional[float] = None, + *, streaming: bool = True, result_only: bool = True) -> AsyncGenerator[str, None]: """调用大模型,分为流式和非流式两种""" input_tokens = self._calculate_token_length(messages) try: + msg_list = self._validate_messages(messages) except ValueError as e: err = f"消息格式错误:{e}" raise ValueError(err) from e + if max_tokens is None: + max_tokens = config["LLM_MAX_TOKENS"] + if temperature is None: + temperature = config["LLM_TEMPERATURE"] + stream = await self._client.chat.completions.create( model=config["LLM_MODEL"], messages=msg_list, # type: ignore[] @@ -65,17 +80,82 @@ class ReasoningLLM(metaclass=Singleton): stream=True, ) # type: ignore[] - if streaming: - result = "" - async for chunk in stream: + reasoning_content = "" + result = "" + + is_first_chunk = True + is_reasoning = True + reasoning_type = "" + + async for chunk in stream: + # 当前Chunk内的信息 + reason = "" + text = "" + + if is_first_chunk: + if hasattr(chunk.choices[0].delta, "reasoning_content"): + reason = "" + chunk.choices[0].delta.reasoning_content or "" + reasoning_type = "args" + is_reasoning = True + else: + for token in REASONING_BEGIN_TOKEN: + if token == (chunk.choices[0].delta.content or ""): + reason = "" + reasoning_type = "tokens" + is_reasoning = True + break + + # 当前已经不是第一个Chunk了 + is_first_chunk = False + + # 当前是正常问答 + if not is_reasoning: text = chunk.choices[0].delta.content or "" + + # 当前处于推理状态 + if not is_first_chunk and is_reasoning: + # 如果推理内容用特殊参数传递 + if reasoning_type == "args": + # 还在推理 + if hasattr(chunk.choices[0].delta, "reasoning_content"): + reason = chunk.choices[0].delta.reasoning_content or "" + # 推理结束 + else: + is_reasoning = False + reason = "" + + # 如果推理内容用特殊token传递 + elif reasoning_type == "tokens": + # 结束推理 + for token in REASONING_END_TOKEN: + if token == (chunk.choices[0].delta.content or ""): + is_reasoning = False + reason = "" + text = "" + break + # 还在推理 + if is_reasoning: + reason = chunk.choices[0].delta.content or "" + + # 推送消息 + if streaming: + # 如果需要推送推理内容 + if reason and not result_only: + yield reason + + # 推送text yield text - result += text - else: - result = "" - async for chunk in stream: - result += chunk.choices[0].delta.content or "" + + # 整理结果 + reasoning_content += reason + result += text + + if not streaming: + if not result_only: + yield reasoning_content yield result + LOGGER.info(f"推理LLM:{reasoning_content}\n\n{result}") + output_tokens = self._calculate_token_length([{"role": "assistant", "content": result}], pure_text=True) await TaskManager.update_token_summary(task_id, input_tokens, output_tokens) diff --git a/apps/manager/session.py b/apps/manager/session.py index cee4c08e8..ece5553d0 100644 --- a/apps/manager/session.py +++ b/apps/manager/session.py @@ -140,7 +140,8 @@ class SessionManager: csrf_value = f"{session_id}{rand}" csrf_b64 = base64.b64encode(bytes.fromhex(csrf_value)) - hmac_processor = hmac.new(key=bytes.fromhex(config["JWT_KEY"]), msg=csrf_b64, digestmod=hashlib.sha256) + jwt_key = base64.b64decode(config["JWT_KEY"]) + hmac_processor = hmac.new(key=jwt_key, msg=csrf_b64, digestmod=hashlib.sha256) signature = base64.b64encode(hmac_processor.digest()) csrf_b64 = csrf_b64.decode("utf-8") @@ -175,8 +176,8 @@ class SessionManager: except Exception as e: LOGGER.error(f"Get csrf token from session error: {e}") - hmac_obj = hmac.new(key=bytes.fromhex(config["JWT_KEY"]), - msg=token_msg[0].encode("utf-8"), digestmod=hashlib.sha256) + jwt_key = base64.b64decode(config["JWT_KEY"]) + hmac_obj = hmac.new(key=jwt_key, msg=token_msg[0].encode("utf-8"), digestmod=hashlib.sha256) signature = hmac_obj.digest() current_signature = base64.b64decode(token_msg[1]) diff --git a/apps/scheduler/call/__init__.py b/apps/scheduler/call/__init__.py index 467770842..ca09d4722 100644 --- a/apps/scheduler/call/__init__.py +++ b/apps/scheduler/call/__init__.py @@ -3,17 +3,17 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ from apps.scheduler.call.api import API -from apps.scheduler.call.choice import Choice +from apps.scheduler.call.convert import Reformat from apps.scheduler.call.llm import LLM -from apps.scheduler.call.reformat import Reformat from apps.scheduler.call.render.render import Render from apps.scheduler.call.sql import SQL +from apps.scheduler.call.suggest import Suggestion __all__ = [ "API", "LLM", "SQL", - "Choice", "Reformat", "Render", + "Suggestion", ] diff --git a/apps/scheduler/call/api.py b/apps/scheduler/call/api.py index 8b0a9aa9c..6558f30c0 100644 --- a/apps/scheduler/call/api.py +++ b/apps/scheduler/call/api.py @@ -10,7 +10,7 @@ from fastapi import status from pydantic import BaseModel, Field from apps.constants import LOGGER -from apps.entities.plugin import CallError, SysCallVars +from apps.entities.scheduler import CallError, SysCallVars from apps.manager.token import TokenManager from apps.scheduler.call.core import CoreCall from apps.scheduler.slot.slot import Slot @@ -21,7 +21,7 @@ class APIParams(BaseModel): full_url: str = Field(description="API接口的完整URL") method: Literal[ - "GET", "POST", + "get", "post", "put", "delete", "patch", ] = Field(description="API接口的HTTP Method") timeout: int = Field(description="工具超时时间", default=300) input_data: dict[str, Any] = Field(description="固定数据", default={}) @@ -46,15 +46,7 @@ class API(metaclass=CoreCall, param_cls=APIParams, output_cls=_APIOutput): def init(self, syscall_vars: SysCallVars, **kwargs) -> None: # noqa: ANN003 """初始化API调用工具""" - # 从spec中找出该接口对应的spec - for item in full_spec.endpoints: - name, _, _ = item - if name == self.params.endpoint: - self._spec = item - if not hasattr(self, "_spec"): - err = "[API] Endpoint not found." - raise ValueError(err) - + if len(self.) if kwargs["method"] == "POST": if "requestBody" in self._spec[2]: self.slot_schema, self._data_type = self._check_data_type(self._spec[2]["requestBody"]["content"]) @@ -79,23 +71,6 @@ class API(metaclass=CoreCall, param_cls=APIParams, output_cls=_APIOutput): raise RuntimeError from e - @staticmethod - def _process_response_schema(response_data: str, response_schema: dict[str, Any]) -> str: - """对API返回值进行逐个字段处理""" - # 工具执行报错,此时为错误信息,不予处理 - try: - response_dict = json.loads(response_data) - except Exception: - return response_data - - # openapi里没有HTTP 200对应的Schema,不予处理 - if not response_schema: - return response_data - - slot = Slot(response_schema) - return json.dumps(slot.process_json(response_dict), ensure_ascii=False) - - @staticmethod def parameters_to_spec(raw_schema: list[dict[str, Any]]) -> dict[str, Any]: """将OpenAPI中GET接口List形式的请求体Spec转换为JSON Schema""" @@ -116,7 +91,7 @@ class API(metaclass=CoreCall, param_cls=APIParams, output_cls=_APIOutput): async def _make_api_call(self, data: Optional[dict], files: aiohttp.FormData): # noqa: ANN202, C901 # 获取必要参数 - params: _APIParams = getattr(self, "_params") + params: APIParams = getattr(self, "_params") syscall_vars: SysCallVars = getattr(self, "_syscall_vars") """调用API""" @@ -148,18 +123,19 @@ class API(metaclass=CoreCall, param_cls=APIParams, output_cls=_APIOutput): ) req_header.update({"access-token": token}) - if params.method == "GET": + if params.method in ["get", "delete"]: req_params.update(data) - return self._session.get(params.full_url, params=req_params, headers=req_header, cookies=req_cookie, + return self._session.request(params.method, params.full_url, params=req_params, headers=req_header, cookies=req_cookie, timeout=params.timeout) - if params.method == "POST": + + if params.method in ["post", "put", "patch"]: if self._data_type == "form": form_data = files for key, val in data.items(): form_data.add_field(key, val) - return self._session.post(params.full_url, data=form_data, headers=req_header, cookies=req_cookie, + return self._session.request(params.method, params.full_url, data=form_data, headers=req_header, cookies=req_cookie, timeout=params.timeout) - return self._session.post(params.full_url, json=data, headers=req_header, cookies=req_cookie, + return self._session.request(params.method, params.full_url, json=data, headers=req_header, cookies=req_cookie, timeout=params.timeout) err = "Method not implemented." @@ -180,7 +156,7 @@ class API(metaclass=CoreCall, param_cls=APIParams, output_cls=_APIOutput): async def _call_api(self, slot_data: Optional[dict[str, Any]] = None) -> _APIOutput: # 获取必要参数 - params: _APIParams = getattr(self, "_params") + params: APIParams = getattr(self, "_params") LOGGER.info(f"调用接口{params.full_url},请求数据为{slot_data}") session_context = await self._make_api_call(slot_data, aiohttp.FormData()) @@ -202,18 +178,30 @@ class API(metaclass=CoreCall, param_cls=APIParams, output_cls=_APIOutput): response_schema = {} LOGGER.info(f"调用接口{params.full_url}, 结果为 {response_data}") + # 组装message + message = f"""You called the HTTP API "{params.full_url}", which is used to "{self._spec[2]['summary']}".""" # 如果没有返回结果 if response_data is None: return _APIOutput( http_code=response_status, output={}, - message=f"调用接口{params.full_url},作用为但返回值为空。", + message=message + "But the API returned an empty response.", ) - response_data = self._process_response_schema(response_data, response_schema) + # 如果返回值是JSON + try: + response_dict = json.loads(response_data) + except Exception as e: + err = f"返回值不是JSON:{e!s}" + LOGGER.error(err) + + # openapi里存在有HTTP 200对应的Schema,则进行处理 + if response_schema: + slot = Slot(response_schema) + response_data = json.dumps(slot.process_json(response_dict), ensure_ascii=False) + return _APIOutput( http_code=response_status, output=json.loads(response_data), - message=f"""调用API从外部数据源获取了数据。API和数据源的描述为:{usage}""", + message=message + """The API returned some data, and is shown in the "output" field below.""", ) - diff --git a/apps/scheduler/call/choice.py b/apps/scheduler/call/choice.py index 2d696e753..62b6d84e6 100644 --- a/apps/scheduler/call/choice.py +++ b/apps/scheduler/call/choice.py @@ -1,67 +1,20 @@ -"""工具:使用大模型做出选择 +"""工具:使用大模型或使用程序做出判断 Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from typing import Any +from enum import Enum -from pydantic import BaseModel, Field +from pydantic import BaseModel -from apps.entities.plugin import CallError, SysCallVars -from apps.llm.patterns.select import Select -from apps.scheduler.call.core import CoreCall +class Operator(str, Enum): + """Choice工具支持的运算符""" -class _ChoiceBranch(BaseModel): - """Choice工具的选项""" + pass - branch: str = Field(description="选项的名称") - description: str = Field(description="选项的描述") +class ChoiceInput(BaseModel): + """Choice工具的输入格式""" -class _ChoiceParams(BaseModel): - """Choice工具所需的额外参数""" + pass - propose: str = Field(description="针对哪一个问题进行答案选择?") - choices: list[_ChoiceBranch] = Field(description="Choice工具所有可能的选项") - - -class _ChoiceOutput(BaseModel): - """Choice工具的输出""" - - message: str = Field(description="Choice工具的输出") - next_step: str = Field(description="Choice工具的输出") - - -class Choice(metaclass=CoreCall): - """Choice工具。用于大模型在多个选项中选择一个,并跳转到对应的Step。""" - - name: str = "choice" - description: str = "选择工具,用于根据给定的上下文和问题,判断正确/错误,或从选项列表中选择最符合用户要求的一项。" - - - async def __call__(self, _slot_data: dict[str, Any]) -> _ChoiceOutput: - """调用Choice工具。""" - # 获取必要参数 - params: _ChoiceParams = getattr(self, "_params") - syscall_vars: SysCallVars = getattr(self, "_syscall_vars") - - previous_data = {} - if len(syscall_vars.history) > 0: - previous_data = syscall_vars.history[-1].output_data - - try: - choice_list = [item.model_dump() for item in params.choices] - result = await Select().generate( - question=params.propose, - background=syscall_vars.background, - data=previous_data, - choices=choice_list, - task_id=syscall_vars.task_id, - ) - except Exception as e: - raise CallError(message=f"选择工具调用失败:{e!s}", data={}) from e - - return _ChoiceOutput( - next_step=result, - message=f"针对“{params.propose}”,作出的选择为:{result}。", - ) diff --git a/apps/scheduler/call/reformat.py b/apps/scheduler/call/convert.py similarity index 98% rename from apps/scheduler/call/reformat.py rename to apps/scheduler/call/convert.py index 9e25e5f66..d26f0aa5b 100644 --- a/apps/scheduler/call/reformat.py +++ b/apps/scheduler/call/convert.py @@ -13,7 +13,7 @@ from jinja2 import BaseLoader, select_autoescape from jinja2.sandbox import SandboxedEnvironment from pydantic import BaseModel, Field -from apps.entities.plugin import SysCallVars +from apps.entities.scheduler import SysCallVars from apps.scheduler.call.core import CoreCall diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py index 271da63b7..61029e6f7 100644 --- a/apps/scheduler/call/core.py +++ b/apps/scheduler/call/core.py @@ -7,7 +7,7 @@ from typing import Any from pydantic import BaseModel -from apps.entities.plugin import SysCallVars +from apps.entities.scheduler import SysCallVars class CoreCall(type): diff --git a/apps/scheduler/call/llm.py b/apps/scheduler/call/llm.py index a99deea83..fda3ef815 100644 --- a/apps/scheduler/call/llm.py +++ b/apps/scheduler/call/llm.py @@ -11,7 +11,7 @@ from jinja2 import BaseLoader, select_autoescape from jinja2.sandbox import SandboxedEnvironment from pydantic import BaseModel, Field -from apps.entities.plugin import CallError, SysCallVars +from apps.entities.scheduler import CallError, SysCallVars from apps.llm.reasoning import ReasoningLLM from apps.scheduler.call.core import CoreCall diff --git a/apps/scheduler/call/next_flow.py b/apps/scheduler/call/next_flow.py deleted file mode 100644 index 67b49b276..000000000 --- a/apps/scheduler/call/next_flow.py +++ /dev/null @@ -1,14 +0,0 @@ -"""用于下一步工作流推荐的工具""" -from typing import Any - -from apps.scheduler.call.core import CoreCall - - -class NextFlowCall(metaclass=CoreCall): - """用于下一步工作流推荐的工具""" - - name = "next_flow" - description = "用于下一步工作流推荐的工具" - - async def __call__(self, _slot_data: dict[str, Any]): - """调用NextFlow工具""" diff --git a/apps/scheduler/call/rag.py b/apps/scheduler/call/rag.py index 08f7c974d..8d74235ed 100644 --- a/apps/scheduler/call/rag.py +++ b/apps/scheduler/call/rag.py @@ -2,13 +2,26 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ - from typing import Any +from pydantic import BaseModel, Field + from apps.scheduler.call.core import CoreCall -class RAG(metaclass=CoreCall): +class _RAGParams(BaseModel): + """RAG工具的参数""" + + question: str = Field(description="用户的问题") + + +class _RAGOutput(BaseModel): + """RAG工具的输出""" + + message: str = Field(description="RAG工具的输出") + + +class RAG(metaclass=CoreCall, param_cls=_RAGParams, output_cls=_RAGOutput): """RAG工具:查询知识库""" name: str = "rag" diff --git a/apps/scheduler/call/render/render.py b/apps/scheduler/call/render/render.py index f6eb2bf06..08079e11e 100644 --- a/apps/scheduler/call/render/render.py +++ b/apps/scheduler/call/render/render.py @@ -8,7 +8,7 @@ from typing import Any from pydantic import BaseModel, Field -from apps.entities.plugin import CallError, SysCallVars +from apps.entities.scheduler import CallError, SysCallVars from apps.scheduler.call.core import CoreCall from apps.scheduler.call.render.style import RenderStyle diff --git a/apps/scheduler/call/sql.py b/apps/scheduler/call/sql.py index 05701a2b0..0aca289c6 100644 --- a/apps/scheduler/call/sql.py +++ b/apps/scheduler/call/sql.py @@ -13,7 +13,7 @@ from sqlalchemy import text from apps.common.config import config from apps.constants import LOGGER -from apps.entities.plugin import CallError, SysCallVars +from apps.entities.scheduler import CallError, SysCallVars from apps.models.postgres import PostgreSQL from apps.scheduler.call.core import CoreCall diff --git a/apps/scheduler/call/suggest.py b/apps/scheduler/call/suggest.py new file mode 100644 index 000000000..ebfa3fde7 --- /dev/null +++ b/apps/scheduler/call/suggest.py @@ -0,0 +1,58 @@ +"""用于问题推荐的工具 + +Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +""" +from typing import Any, Optional + +from pydantic import BaseModel, Field + +from apps.entities.scheduler import CallError, SysCallVars +from apps.manager import TaskManager, UserDomainManager +from apps.scheduler.call.core import CoreCall + + +class _SingleFlowSuggestionConfig(BaseModel): + """涉及单个Flow的问题推荐配置""" + + flow_id: str + question: Optional[str] = Field(default=None, description="固定的推荐问题") + + +class _SuggestInput(BaseModel): + """用于问题推荐的工具""" + + configs: list[_SingleFlowSuggestionConfig] = Field(description="问题推荐配置", min_length=1) + num: int = Field(default=3, ge=1, le=6, description="推荐问题的总数量(必须大于等于configs中涉及的Flow的数量)") + + +class _SuggestionOutputItem(BaseModel): + """问题推荐结果的单个条目""" + + question: str + app_id: str + flow_id: str + flow_description: str + + +class _SuggestionOutput(BaseModel): + """问题推荐结果""" + + output: list[_SuggestionOutputItem] + + +class Suggestion(metaclass=CoreCall, param_cls=_SuggestInput, output_cls=_SuggestionOutput): + """问题推荐""" + + name: str = "suggest" + description: str = "问题推荐" + + + async def __call__(self, _slot_data: dict[str, Any]) -> _SuggestionOutput: + """运行问题推荐""" + sys_vars: SysCallVars = getattr(self, "_syscall_vars") + params: _SuggestInput = getattr(self, "_params") + + # 获取当前任务 + task = await TaskManager.get_task(sys_vars.task_id) + + diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index c663ffde3..b4fd064ac 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -8,7 +8,7 @@ from typing import Optional from apps.constants import LOGGER, MAX_SCHEDULER_HISTORY_SIZE from apps.entities.enum_var import StepStatus from apps.entities.flow import Step -from apps.entities.plugin import ( +from apps.entities.scheduler import ( SysCallVars, SysExecVars, ) diff --git a/apps/scheduler/openapi.py b/apps/scheduler/openapi.py index 88739a2fc..b5e665799 100644 --- a/apps/scheduler/openapi.py +++ b/apps/scheduler/openapi.py @@ -150,7 +150,7 @@ def reduce_openapi_spec(spec: dict) -> ReducedOpenAPISpec: endpoints = [ ReducedOpenAPIEndpoint( uri=route, - method=operation_name.upper(), + method=operation_name, name=docs.get("summary"), description=docs.get("description"), schema=reduce_endpoint_docs(dereference_refs(docs, full_schema=spec)), diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index 155e189c1..a772d732d 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -2,6 +2,8 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ +from apps.scheduler.pool.check import FileChecker + from apps.entities.flow_topology import DependencyItem, FlowItem, NodeItem, PositionItem diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 254e4b07c..4a4a9999f 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -15,10 +15,10 @@ from apps.entities.collection import ( Record, ) from apps.entities.enum_var import EventType, StepStatus -from apps.entities.plugin import ExecutorBackground, SysExecVars from apps.entities.rag_data import RAGQueryReq from apps.entities.record import RecordDocument from apps.entities.request_data import RequestData +from apps.entities.scheduler import ExecutorBackground, SysExecVars from apps.entities.task import RequestDataApp from apps.manager import ( DocumentManager, @@ -26,15 +26,16 @@ from apps.manager import ( TaskManager, UserManager, ) + # from apps.scheduler.executor import Executor from apps.scheduler.scheduler.context import generate_facts, get_context + # from apps.scheduler.scheduler.flow import choose_flow from apps.scheduler.scheduler.message import ( push_document_message, push_init_message, push_rag_message, ) -from apps.service.suggestion import plan_next_flow class Scheduler: @@ -70,11 +71,8 @@ class Scheduler: async def run(self, user_sub: str, session_id: str, post_body: RequestData) -> None: """运行调度器""" - # 捕获所有异常:出现问题就输出日志,并停止queue try: # 根据用户的请求,返回插件ID列表,选择Flow - # self._plugin_id, user_selected_flow = await choose_flow(self._task_id, post_body.question, post_body.apps) - user_selected_flow = None # 获取当前问答可供关联的文档 docs, doc_ids = await self._get_docs(user_sub, post_body) # 获取上下文;最多20轮 @@ -90,7 +88,7 @@ class Scheduler: question=post_body.question, language=post_body.language, document_ids=doc_ids, - kb_sn=None if user_info is None or not user_info.kb_id else user_info.kb_id, + kb_sn=None if not user_info.kb_id else user_info.kb_id, history=context, top_k=5, ) @@ -98,7 +96,7 @@ class Scheduler: # 状态位:是否需要生成推荐问题? need_recommend = True # 如果是智能问答,直接执行 - if not user_selected_flow: + if not post_body.app or post_body.app.app_id == "": # await push_init_message(self._task_id, self._queue, post_body, is_flow=False) await asyncio.sleep(0.1) for doc in docs: @@ -116,20 +114,15 @@ class Scheduler: conversation=context, facts=facts, ) - need_recommend = await self.run_executor(session_id, post_body, background, user_selected_flow) + need_recommend = await self.run_executor(session_id, post_body, background, post_body.app) # 生成推荐问题和事实提取 # 如果需要生成推荐问题,则生成 - if need_recommend: - routine_results = await asyncio.gather( - # generate_facts(self._task_id, post_body.question), - plan_next_flow(user_sub, self._task_id, self._queue, post_body.app), - ) - # else: - # routine_results = await asyncio.gather(generate_facts(self._task_id, post_body.question)) + # routine_results = await asyncio.gather(generate_facts(self._task_id, post_body.question)) # 保存事实信息 - self._facts = routine_results[0] + # self._facts = routine_results[0] + self._facts = [] # 发送结束消息 await self._queue.push_output(event_type=EventType.DONE, data={}) @@ -202,9 +195,7 @@ class Scheduler: user_sub=user_sub, data=encrypt_data, key=encrypt_config, - # facts=self._facts, - #TODO:暂时不使用facts - facts=[], + facts=self._facts, metadata=task.record.metadata, created_at=task.record.created_at, flow=task.new_context, diff --git a/apps/scheduler/slot/slot.py b/apps/scheduler/slot/slot.py index bedc3a498..8e9048539 100644 --- a/apps/scheduler/slot/slot.py +++ b/apps/scheduler/slot/slot.py @@ -14,7 +14,6 @@ from jsonschema.protocols import Validator from jsonschema.validators import extend from apps.constants import LOGGER -from apps.entities.plugin import CallResult from apps.llm.patterns.json import Json from apps.scheduler.slot.parser import ( SlotConstParser, @@ -115,8 +114,16 @@ class Slot: if "type" in spec_data: if spec_data["type"] == "array" and isinstance(json_value, list): + # 若Schema不标准,则不进行处理 + if "items" not in spec_data: + return json_value + # Schema标准 return [Slot._process_json_value(item, spec_data["items"]) for item in json_value] if spec_data["type"] == "object" and isinstance(json_value, dict): + # 若Schema不标准,则不进行处理 + if "properties" not in spec_data: + return json_value + # Schema标准 processed_dict = {} for key, val in json_value.items(): if key not in spec_data["properties"]: @@ -252,7 +259,7 @@ class Slot: @staticmethod - async def _llm_generate(task_id: str, question: str, thought: str, previous_output: Optional[CallResult], remaining_schema: dict[str, Any]) -> dict[str, Any]: + async def llm_param_gen(task_id: str, question: str, thought: str, previous_output: Optional[dict[str, Any]], remaining_schema: dict[str, Any]) -> dict[str, Any]: """使用LLM生成JSON参数""" # 组装工具消息 conversation = [ @@ -262,7 +269,7 @@ class Slot: if previous_output is not None: tool_str = f"""I used a tool to get extra information from other sources. \ - The output of the tool is "{previous_output.message}", with data `{json.dumps(previous_output.output, ensure_ascii=False)}`. + The output data of the tool is `{previous_output}`. The schema of the output is `{json.dumps(previous_output.output_schema, ensure_ascii=False)}`, which contains description of the output. """ @@ -282,7 +289,7 @@ class Slot: remaining_slot = self.check_json(result_json) # 如果还有未填充的部分,则尝试使用LLM生成 if remaining_slot: - generated_slot = await Slot._llm_generate( + generated_slot = await Slot.llm_param_gen( llm_params["task_id"], llm_params["question"], llm_params["thought"], diff --git a/apps/service/suggestion.py b/apps/service/suggestion.py deleted file mode 100644 index feb11975c..000000000 --- a/apps/service/suggestion.py +++ /dev/null @@ -1,169 +0,0 @@ -"""进行推荐问题生成 - -Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. -""" -import json -from textwrap import dedent - -from apps.common.queue import MessageQueue -from apps.common.security import Security -from apps.constants import LOGGER -from apps.entities.collection import RecordContent -from apps.entities.enum_var import EventType -from apps.entities.message import SuggestContent -from apps.entities.task import RequestDataApp -from apps.llm.patterns.recommend import Recommend -from apps.manager import ( - RecordManager, - TaskManager, - UserDomainManager, -) -# from apps.scheduler.pool.pool import Pool - -# 推荐问题条数 -MAX_RECOMMEND = 3 -# 用户领域条数 -USER_TOP_DOMAINS_NUM = 5 -# 历史问题条数 -HISTORY_QUESTIONS_NUM = 4 - - -async def plan_next_flow(user_sub: str, task_id: str, queue: MessageQueue, app: RequestDataApp) -> None: # noqa: C901, PLR0912 - """生成用户“下一步”Flow的推荐。 - - - 若Flow的配置文件中已定义`next_flow[]`字段,则直接使用该字段给定的值 - - 否则,使用LLM进行选择。将根据用户的插件选择情况限定范围 - - 选择“下一步”Flow后,根据当前Flow的执行结果和“下一步”Flow的描述,生成改写的或预测的问题。 - - :param summary: 上下文总结,包含当前Flow的执行结果。 - :param current_flow_name: 当前执行的Flow的Name,用于避免重复选择同一个Flow - :param user_selected_plugins: 用户选择的插件列表,用于限定推荐范围 - :return: 列表,包含“下一步”Flow的Name和预测问题 - """ - task = await TaskManager.get_task(task_id) - # 获取当前用户的领域 - user_domain = await UserDomainManager.get_user_domain_by_user_sub_and_topk(user_sub, USER_TOP_DOMAINS_NUM) - current_record = dedent(f""" - Question: {task.record.content.question} - Answer: {task.record.content.answer} - """) - generated_questions = "" - - records = await RecordManager.query_record_by_conversation_id(user_sub, task.record.conversation_id, HISTORY_QUESTIONS_NUM) - last_n_questions = "" - for i, record in enumerate(records): - data = RecordContent.model_validate(json.loads(Security.decrypt(record.data, record.key))) - last_n_questions += f"Question {i+1}: {data.question}\n" - - if task.flow_state is None: - # 当前没有使用Flow,进行普通推荐 - for _ in range(MAX_RECOMMEND): - question = await Recommend().generate( - task_id=task_id, - history_questions=last_n_questions, - recent_question=current_record, - user_preference=user_domain, - shown_questions=generated_questions, - ) - generated_questions += f"{question}\n" - content = SuggestContent( - question=question, - appId="", - flowId="", - flowDescription="", - ) - await queue.push_output(event_type=EventType.SUGGEST, data=content.model_dump(exclude_none=True, by_alias=True)) - return - - # 当前使用了Flow - flow_id = task.flow_state.name - app_id = task.flow_state.app_id - # TODO: 推荐flow待完善 - # _, flow_data = Pool().get_flow(flow_id, app_id) - # if flow_data is None: - # err = "Flow数据不存在" - # raise ValueError(err) - - # if flow_data.next_flow is None: - # # 根据用户选择的插件,选一次top_k flow - # app_ids = [] - # for plugin in user_selected_plugins: - # if plugin.app_id and plugin.app_id not in app_ids: - # app_ids.append(plugin.app_id) - # result = Pool().get_k_flows(task.record.content.question, app_ids) - # for i, flow in enumerate(result): - # if i >= MAX_RECOMMEND: - # break - # # 改写问题 - # rewrite_question = await Recommend().generate( - # task_id=task_id, - # action_description=flow.description, - # history_questions=last_n_questions, - # recent_question=current_record, - # user_preference=str(user_domain), - # shown_questions=generated_questions, - # ) - # generated_questions += f"{rewrite_question}\n" - - # content = SuggestContent( - # app_id=app_id, - # flow_id=flow_id, - # flow_description=str(flow.description), - # question=rewrite_question, - # ) - # await queue.push_output(event_type=EventType.SUGGEST, data=content.model_dump(exclude_none=True, by_alias=True)) - # return - - # 当前有next_flow - # for i, next_flow in enumerate(flow_data.next_flow): - # # 取前MAX_RECOMMEND个Flow,保持顺序 - # if i >= MAX_RECOMMEND: - # break - - # if next_flow.plugin is not None: - # next_flow_app_id = next_flow.plugin - # else: - # next_flow_app_id = app_id - - # flow_metadata, _ = next_flow.id, next_flow_app_id, - # flow_metadata, _ = Pool().get_flow( - # next_flow.id, - # next_flow_app_id, - # ) - - # # flow不合法 - # if flow_metadata is None: - # LOGGER.error(f"Flow {next_flow.id} in {next_flow_app_id} not found") - # continue - - # # 如果设置了question,直接使用这个question - # if next_flow.question is not None: - # content = SuggestContent( - # appId=next_flow_app_id, - # flowId=next_flow.id, - # flowDescription=str(flow_metadata.description), - # question=next_flow.question, - # ) - # await queue.push_output(event_type=EventType.SUGGEST, data=content.model_dump(exclude_none=True, by_alias=True)) - # continue - - # # 没有设置question,则需要生成问题 - # rewrite_question = await Recommend().generate( - # task_id=task_id, - # action_description=flow_metadata.description, - # history_questions=last_n_questions, - # recent_question=current_record, - # user_preference=str(user_domain), - # shown_questions=generated_questions, - # ) - # generated_questions += f"{rewrite_question}\n" - # content = SuggestContent( - # appId=next_flow_app_id, - # flowId=next_flow.id, - # flowDescription=str(flow_metadata.description), - # question=rewrite_question, - # ) - # await queue.push_output(event_type=EventType.SUGGEST, data=content.model_dump(exclude_none=True, by_alias=True)) - # continue - # return -- Gitee