From 23cff34c9adc30e1786b09cf4dcd636641970ba8 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Sat, 19 Apr 2025 16:20:01 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E6=9B=B4=E6=96=B0Recommend=E6=8F=90?= =?UTF-8?q?=E7=A4=BA=E8=AF=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/llm/patterns/recommend.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/apps/llm/patterns/recommend.py b/apps/llm/patterns/recommend.py index 1bd43c83..2d236d68 100644 --- a/apps/llm/patterns/recommend.py +++ b/apps/llm/patterns/recommend.py @@ -21,9 +21,10 @@ class Recommend(CorePattern): user_prompt: str = r""" - 根据提供的对话和附加信息(用户倾向、历史问题列表等),生成三个预测问题。 + 根据提供的对话和附加信息(用户倾向、历史问题列表、工具信息等),生成三个预测问题。 历史提问列表展示的是用户发生在历史对话之前的提问,仅为背景参考作用。 - 对话将在标签中给出,用户倾向将在标签中给出,历史问题列表将在标签中给出。 + 对话将在标签中给出,用户倾向将在标签中给出,\ + 历史问题列表将在标签中给出,工具信息将在标签中给出。 生成预测问题时的要求: 1. 以用户口吻生成预测问题,数量必须为3个,必须为疑问句或祈使句,必须少于30字。 @@ -50,6 +51,12 @@ class Recommend(CorePattern): 简单介绍一下杭州 杭州有哪些著名景点? + + + 景点查询 + 查询景点信息 + + ["杭州", "旅游"] 现在,进行问题生成: @@ -73,6 +80,11 @@ class Recommend(CorePattern): {history_questions} + + {tool_name} + {tool_description} + + {user_preference} 现在,进行问题生成: @@ -118,6 +130,8 @@ class Recommend(CorePattern): conversation=convert_context_to_prompt(kwargs["conversation"]), history_questions=history_questions, user_preference=user_preference, + tool_name=kwargs["tool_name"], + tool_description=kwargs["tool_description"], )}, ] -- Gitee From c212cb15478bd4362182fed02b22d618b9073d85 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Sat, 19 Apr 2025 16:20:20 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E8=BE=93=E5=87=BA=E5=B1=95=E7=A4=BA?= =?UTF-8?q?=E8=87=AA=E5=AE=9A=E4=B9=89schema?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/manager/node.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/manager/node.py b/apps/manager/node.py index a9d3362f..41a3e4f7 100644 --- a/apps/manager/node.py +++ b/apps/manager/node.py @@ -115,5 +115,7 @@ class NodeManager: # 返回参数Schema return ( NodeManager.merge_params_schema(call_class.model_json_schema(), node_data.known_params or {}), - call_class.output_type.model_json_schema(), # type: ignore[attr-defined] + call_class.output_model.model_json_schema( # type: ignore[attr-defined] + override=node_data.override_output if node_data.override_output else {}, + ), ) -- Gitee From 0f6356d291ba9535c6a939cb7717cd3077520c1c Mon Sep 17 00:00:00 2001 From: z30057876 Date: Sat, 19 Apr 2025 16:21:05 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E6=9B=B4=E6=96=B0Call=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/call/core.py | 52 +++++++++++++++--------------- apps/scheduler/call/empty.py | 6 ++-- apps/scheduler/pool/loader/call.py | 4 +-- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py index 7a90b08d..1ab76c80 100644 --- a/apps/scheduler/call/core.py +++ b/apps/scheduler/call/core.py @@ -13,6 +13,7 @@ from pydantic import BaseModel, ConfigDict, Field from pydantic.json_schema import SkipJsonSchema from apps.entities.enum_var import CallOutputType +from apps.entities.pool import NodePool from apps.entities.scheduler import ( CallIds, CallInfo, @@ -43,6 +44,7 @@ class CoreCall(BaseModel): name: SkipJsonSchema[str] = Field(description="Step的名称", exclude=True) description: SkipJsonSchema[str] = Field(description="Step的描述", exclude=True) + node: SkipJsonSchema[NodePool | None] = Field(description="节点信息", exclude=True) input_model: ClassVar[SkipJsonSchema[type[DataBase]]] = Field( description="Call的输入Pydantic类型;不包含override的模板", exclude=True, @@ -70,28 +72,12 @@ class CoreCall(BaseModel): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" - err = "[CoreCall] 必须手动实现cls_info方法" + err = "[CoreCall] 必须手动实现info方法" raise NotImplementedError(err) - @property - def input_type(self) -> type[DataBase]: - """返回输入类型""" - class InputType(self.input_model): - pass - return InputType - - - @property - def output_type(self) -> type[DataBase]: - """返回输出类型""" - class OutputType(self._output_type_template): - pass - return OutputType - - @staticmethod def _assemble_call_vars(executor: "StepExecutor") -> CallVars: """组装CallVars""" @@ -106,6 +92,7 @@ class CoreCall(BaseModel): flow_id=executor.task.state.flow_id, session_id=executor.task.ids.session_id, user_sub=executor.task.ids.user_sub, + app_id=executor.task.state.app_id, ), question=executor.question, history=executor.task.context, @@ -114,21 +101,28 @@ class CoreCall(BaseModel): @classmethod - async def init(cls, executor: "StepExecutor", **kwargs: Any) -> tuple[Self, dict[str, Any]]: + async def instance(cls, executor: "StepExecutor", node: NodePool | None, **kwargs: Any) -> Self: """实例化Call类""" - sys_vars = cls._assemble_call_vars(executor) - - call_obj = cls( + obj = cls( name=executor.step.step.name, description=executor.step.step.description, + node=node, **kwargs, ) - input_data = await call_obj._init(sys_vars) - return call_obj, input_data + + await obj._set_input(executor) + return obj - async def _init(self, call_vars: CallVars) -> dict[str, Any]: - """实例化Call类,并返回Call的输入""" + async def _set_input(self, executor: "StepExecutor") -> None: + """获取Call的输入""" + self._sys_vars = self._assemble_call_vars(executor) + input_data = await self._init(self._sys_vars) + self.input = input_data.model_dump(by_alias=True, exclude_none=True) + + + async def _init(self, call_vars: CallVars) -> DataBase: + """初始化Call类,并返回Call的输入""" err = "[CoreCall] 初始化方法必须手动实现" raise NotImplementedError(err) @@ -137,7 +131,13 @@ class CoreCall(BaseModel): """Call类实例的流式输出方法""" yield CallOutputChunk(type=CallOutputType.TEXT, content="") + + async def _after_exec(self, input_data: dict[str, Any]) -> None: + """Call类实例的执行后方法""" + + async def exec(self, executor: "StepExecutor", input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """Call类实例的执行方法""" async for chunk in self._exec(input_data): yield chunk + await self._after_exec(input_data) diff --git a/apps/scheduler/call/empty.py b/apps/scheduler/call/empty.py index 84dc9ae7..f33832c2 100644 --- a/apps/scheduler/call/empty.py +++ b/apps/scheduler/call/empty.py @@ -16,14 +16,14 @@ class Empty(CoreCall, input_model=DataBase, output_model=DataBase): """空Call""" @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="空白", description="空白节点,用于占位") - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> DataBase: """初始化""" - return {} + return DataBase() async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/pool/loader/call.py b/apps/scheduler/pool/loader/call.py index 6be3ea00..bd465f17 100644 --- a/apps/scheduler/pool/loader/call.py +++ b/apps/scheduler/pool/loader/call.py @@ -37,7 +37,7 @@ class CallLoader: # 检查合法性 for call_id in system_call.__all__: call_cls = getattr(system_call, call_id) - call_info = call_cls.cls_info() + call_info = call_cls.info() call_metadata.append( CallPool( @@ -82,7 +82,7 @@ class CallLoader: for call_id in call_package.__all__: try: call_cls = getattr(call_package, call_id) - call_info = call_cls.cls_info() + call_info = call_cls.info() except AttributeError as e: err = f"[CallLoader] 载入工具call.{call_dir_name}.{call_id}失败:{e};跳过载入。" logger.info(err) -- Gitee From b9049341bb41dedfd01ce844d2a5599297a23fa5 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Sat, 19 Apr 2025 16:21:43 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E6=9B=B4=E6=96=B0Call=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/call/api/api.py | 23 +--- apps/scheduler/call/choice/choice.py | 8 +- apps/scheduler/call/choice/schema.py | 11 ++ apps/scheduler/call/convert/convert.py | 12 +- apps/scheduler/call/facts/facts.py | 18 +-- apps/scheduler/call/graph/graph.py | 6 +- apps/scheduler/call/llm/llm.py | 10 +- apps/scheduler/call/rag/rag.py | 9 +- apps/scheduler/call/search/schema.py | 19 +++ apps/scheduler/call/search/search.py | 16 +-- apps/scheduler/call/slot/slot.py | 22 ++-- apps/scheduler/call/sql/schema.py | 1 + apps/scheduler/call/sql/sql.py | 20 ++-- apps/scheduler/call/suggest/schema.py | 16 +-- apps/scheduler/call/suggest/suggest.py | 155 +++++++++++++++++++++---- apps/scheduler/call/summary/summary.py | 21 ++-- 16 files changed, 246 insertions(+), 121 deletions(-) diff --git a/apps/scheduler/call/api/api.py b/apps/scheduler/call/api/api.py index 38d732e9..0e42b973 100644 --- a/apps/scheduler/call/api/api.py +++ b/apps/scheduler/call/api/api.py @@ -65,32 +65,17 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="API调用", description="向某一个API接口发送HTTP请求,获取数据。") - @classmethod - async def init(cls, executor: "StepExecutor", **kwargs: Any) -> tuple[Self, dict[str, Any]]: - """初始化工具""" - cls_obj = cls( - name=executor.step.step.name, - description=executor.step.step.description, - **kwargs, - ) - - call_vars = cls._assemble_call_vars(executor) - input_data = await cls_obj._init(call_vars) - - return cls_obj, input_data - - - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> APIInput: """初始化API调用工具""" # 获取对应API的Service Metadata try: service_metadata = await ServiceCenterManager.get_service_data( - call_vars.ids.user_sub, call_vars.ids.service_id, + call_vars.ids.user_sub, self.node.service_id or "", ) service_metadata = ServiceMetadata.model_validate(service_metadata) except Exception as e: @@ -109,7 +94,7 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): method=self.method, query=self.query, body=self.body, - ).model_dump(exclude_none=True, by_alias=True) + ) async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/call/choice/choice.py b/apps/scheduler/call/choice/choice.py index c493ea80..f6a66e85 100644 --- a/apps/scheduler/call/choice/choice.py +++ b/apps/scheduler/call/choice/choice.py @@ -5,7 +5,8 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ from enum import Enum -from pydantic import BaseModel +from apps.scheduler.call.choice.schema import ChoiceInput, ChoiceOutput +from apps.scheduler.call.core import CoreCall class Operator(str, Enum): @@ -14,8 +15,7 @@ class Operator(str, Enum): pass -class ChoiceInput(BaseModel): - """Choice工具的输入格式""" +class Choice(CoreCall, input_model=ChoiceInput, output_model=ChoiceOutput): + """Choice工具""" pass - diff --git a/apps/scheduler/call/choice/schema.py b/apps/scheduler/call/choice/schema.py index e69de29b..ae209868 100644 --- a/apps/scheduler/call/choice/schema.py +++ b/apps/scheduler/call/choice/schema.py @@ -0,0 +1,11 @@ +"""Choice Call的输入和输出""" + +from apps.scheduler.call.core import DataBase + + +class ChoiceInput(DataBase): + """Choice Call的输入""" + + +class ChoiceOutput(DataBase): + """Choice Call的输出""" diff --git a/apps/scheduler/call/convert/convert.py b/apps/scheduler/call/convert/convert.py index f15fd2f8..9952e041 100644 --- a/apps/scheduler/call/convert/convert.py +++ b/apps/scheduler/call/convert/convert.py @@ -30,23 +30,23 @@ class Convert(CoreCall, input_model=ConvertInput, output_model=ConvertOutput): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="模板转换", description="使用jinja2语法和jsonnet语法,将自然语言信息和原始数据进行格式化。") - async def _init(self, syscall_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> ConvertInput: """初始化工具""" - await super()._init(syscall_vars) + await super()._init(call_vars) - self._history = syscall_vars.history - self._question = syscall_vars.question + self._history = call_vars.history + self._question = call_vars.question self._env = SandboxedEnvironment( loader=BaseLoader(), autoescape=False, trim_blocks=True, lstrip_blocks=True, ) - return ConvertInput().model_dump(exclude_none=True, by_alias=True) + return ConvertInput() async def _exec(self) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/call/facts/facts.py b/apps/scheduler/call/facts/facts.py index 0c099729..0e0e15d7 100644 --- a/apps/scheduler/call/facts/facts.py +++ b/apps/scheduler/call/facts/facts.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, Any, Self from pydantic import Field from apps.entities.enum_var import CallOutputType +from apps.entities.pool import NodePool from apps.entities.scheduler import CallInfo, CallOutputChunk, CallVars from apps.llm.patterns.domain import Domain from apps.llm.patterns.facts import Facts @@ -24,28 +25,27 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="提取事实", description="从对话上下文和文档片段中提取事实。") @classmethod - async def init(cls, executor: "StepExecutor", **kwargs: Any) -> tuple[Self, dict[str, Any]]: + async def instance(cls, executor: "StepExecutor", node: NodePool | None, **kwargs: Any) -> Self: """初始化工具""" - cls_obj = cls( + obj = cls( answer=executor.task.runtime.answer, name=executor.step.step.name, description=executor.step.step.description, + node=node, **kwargs, ) - call_vars = cls._assemble_call_vars(executor) - input_data = await cls_obj._init(call_vars) + await obj._set_input(executor) + return obj - return cls_obj, input_data - - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> FactsInput: """初始化工具""" # 组装必要变量 message = [ @@ -57,7 +57,7 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): task_id=call_vars.ids.task_id, user_sub=call_vars.ids.user_sub, message=message, - ).model_dump(exclude_none=True, by_alias=True) + ) async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/call/graph/graph.py b/apps/scheduler/call/graph/graph.py index da0783b4..020326b6 100644 --- a/apps/scheduler/call/graph/graph.py +++ b/apps/scheduler/call/graph/graph.py @@ -30,12 +30,12 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="图表", description="将SQL查询出的数据转换为图表") - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> RenderInput: """初始化Render Call,校验参数,读取option模板""" await super()._init(call_vars) @@ -52,7 +52,7 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): question=call_vars.question, task_id=call_vars.ids.task_id, data=self.data, - ).model_dump(exclude_none=True, by_alias=True) + ) async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/call/llm/llm.py b/apps/scheduler/call/llm/llm.py index a8dfd790..6bb3ad1c 100644 --- a/apps/scheduler/call/llm/llm.py +++ b/apps/scheduler/call/llm/llm.py @@ -37,11 +37,13 @@ class LLM(CoreCall, input_model=LLMInput, output_model=LLMOutput): system_prompt: str = Field(description="大模型系统提示词", default="") user_prompt: str = Field(description="大模型用户提示词", default=LLM_DEFAULT_PROMPT) + @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="大模型", description="以指定的提示词和上下文信息调用大模型,并获得输出。") + async def _prepare_message(self, call_vars: CallVars) -> list[dict[str, Any]]: """准备消息""" # 创建共享的 Environment 实例 @@ -87,12 +89,14 @@ class LLM(CoreCall, input_model=LLMInput, output_model=LLMOutput): {"role": "user", "content": user_input}, ] - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + + async def _init(self, call_vars: CallVars) -> LLMInput: """初始化LLM工具""" return LLMInput( task_id=call_vars.ids.task_id, message=await self._prepare_message(call_vars), - ).model_dump(exclude_none=True, by_alias=True) + ) + async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """运行LLM Call""" diff --git a/apps/scheduler/call/rag/rag.py b/apps/scheduler/call/rag/rag.py index b73b21f8..270e6172 100644 --- a/apps/scheduler/call/rag/rag.py +++ b/apps/scheduler/call/rag/rag.py @@ -34,12 +34,14 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): top_k: int = Field(description="返回的答案数量(经过整合以及上下文关联)", default=5) retrieval_mode: Literal["chunk", "full_text"] = Field(description="检索模式", default="chunk") + @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="知识库", description="查询知识库,从文档中获取必要信息") - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + + async def _init(self, call_vars: CallVars) -> RAGInput: """初始化RAG工具""" self._task_id = call_vars.ids.task_id return RAGInput( @@ -47,7 +49,8 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): kb_sn=self.knowledge_base, top_k=self.top_k, retrieval_mode=RetrievalMode(self.retrieval_mode), - ).model_dump(by_alias=True, exclude_none=True) + ) + async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """调用RAG工具""" diff --git a/apps/scheduler/call/search/schema.py b/apps/scheduler/call/search/schema.py index e69de29b..6813b3ae 100644 --- a/apps/scheduler/call/search/schema.py +++ b/apps/scheduler/call/search/schema.py @@ -0,0 +1,19 @@ +"""Search Call的输入和输出""" + +from typing import Any + +from pydantic import Field + +from apps.scheduler.call.core import DataBase + + +class SearchInput(DataBase): + """搜索工具输入""" + + query: str = Field(description="搜索关键词") + + +class SearchRet(DataBase): + """搜索工具返回值""" + + data: list[dict[str, Any]] = Field(description="搜索结果") diff --git a/apps/scheduler/call/search/search.py b/apps/scheduler/call/search/search.py index e12d3858..006938fa 100644 --- a/apps/scheduler/call/search/search.py +++ b/apps/scheduler/call/search/search.py @@ -1,28 +1,22 @@ """搜索工具""" -from typing import Any, ClassVar -from pydantic import BaseModel, Field +from typing import Any, ClassVar from apps.entities.scheduler import CallVars from apps.scheduler.call.core import CoreCall +from apps.scheduler.call.search.schema import SearchInput, SearchRet -class SearchRet(BaseModel): - """搜索工具返回值""" - - data: list[dict[str, Any]] = Field(description="搜索结果") - - -class Search(CoreCall, ret_type=SearchRet): +class Search(CoreCall, input_model=SearchInput, output_model=SearchRet): """搜索工具""" name: ClassVar[str] = "搜索" description: ClassVar[str] = "获取搜索引擎的结果" - async def _init(self, syscall_vars: CallVars, **kwargs: Any) -> dict[str, Any]: + async def _init(self, call_vars: CallVars, **kwargs: Any) -> SearchInput: """初始化工具""" self._query: str = kwargs["query"] - return {} + return SearchInput(query=self._query) async def _exec(self) -> dict[str, Any]: diff --git a/apps/scheduler/call/slot/slot.py b/apps/scheduler/call/slot/slot.py index 07640490..1c4455a4 100644 --- a/apps/scheduler/call/slot/slot.py +++ b/apps/scheduler/call/slot/slot.py @@ -7,6 +7,7 @@ import jinja2 from pydantic import Field from apps.entities.enum_var import CallOutputType +from apps.entities.pool import NodePool from apps.entities.scheduler import CallInfo, CallOutputChunk, CallVars from apps.llm.patterns.json_gen import Json from apps.manager.task import TaskManager @@ -29,7 +30,7 @@ class Slot(CoreCall, input_model=SlotInput, output_model=SlotOutput): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="参数自动填充", description="根据步骤历史,自动填充参数") @@ -53,20 +54,21 @@ class Slot(CoreCall, input_model=SlotInput, output_model=SlotOutput): ) @classmethod - async def init(cls, executor: "StepExecutor", **kwargs: Any) -> tuple[Self, dict[str, Any]]: + async def instance(cls, executor: "StepExecutor", node: NodePool | None, **kwargs: Any) -> Self: """实例化Call类""" - call_obj = cls( + obj = cls( + name=executor.step.step.name, + description=executor.step.step.description, facts=executor.background.facts, summary=executor.task.runtime.summary, + node=node, **kwargs, ) + await obj._set_input(executor) + return obj - sys_vars = cls._assemble_call_vars(executor) - input_data = await call_obj._init(sys_vars) - return call_obj, input_data - - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> SlotInput: """初始化""" self._task_id = call_vars.ids.task_id self._flow_history = await TaskManager.get_flow_history_by_task_id(self._task_id, self.step_num) @@ -74,14 +76,14 @@ class Slot(CoreCall, input_model=SlotInput, output_model=SlotOutput): if not self.current_schema: return SlotInput( remaining_schema={}, - ).model_dump(by_alias=True, exclude_none=True) + ) self._processor = SlotProcessor(self.current_schema) remaining_schema = self._processor.check_json(self.data) return SlotInput( remaining_schema=remaining_schema, - ).model_dump(by_alias=True, exclude_none=True) + ) async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/call/sql/schema.py b/apps/scheduler/call/sql/schema.py index b1dc669d..66c5945e 100644 --- a/apps/scheduler/call/sql/schema.py +++ b/apps/scheduler/call/sql/schema.py @@ -17,3 +17,4 @@ class SQLOutput(DataBase): """SQL工具的输出""" dataset: list[dict[str, Any]] = Field(description="SQL工具的执行结果") + sql: str = Field(description="SQL语句") diff --git a/apps/scheduler/call/sql/sql.py b/apps/scheduler/call/sql/sql.py index e3ef25c7..2d4a973a 100644 --- a/apps/scheduler/call/sql/sql.py +++ b/apps/scheduler/call/sql/sql.py @@ -37,16 +37,16 @@ class SQL(CoreCall, input_model=SQLInput, output_model=SQLOutput): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="SQL查询", description="使用大模型生成SQL语句,用于查询数据库中的结构化数据") - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> SQLInput: """初始化SQL工具。""" return SQLInput( question=call_vars.question, - ).model_dump(by_alias=True, exclude_none=True) + ) async def _generate_sql(self, data: SQLInput) -> list[dict[str, Any]]: @@ -87,7 +87,10 @@ class SQL(CoreCall, input_model=SQLInput, output_model=SQLOutput): return sql_list - async def _execute_sql(self, sql_list: list[dict[str, Any]]) -> list[dict[str, Any]] | None: + async def _execute_sql( + self, + sql_list: list[dict[str, Any]], + ) -> tuple[list[dict[str, Any]] | None, str | None]: """执行SQL语句并返回结果""" headers = {"Content-Type": "application/json"} @@ -105,14 +108,14 @@ class SQL(CoreCall, input_model=SQLInput, output_model=SQLOutput): if response.status == status.HTTP_200_OK: result = await response.json() if result["code"] == status.HTTP_200_OK: - return result["result"] + return result["result"], sql_dict["sql"] else: text = await response.text() logger.error("[SQL] 调用失败:%s", text) except Exception: logger.exception("[SQL] 调用失败") - return None + return None, None async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: @@ -128,8 +131,8 @@ class SQL(CoreCall, input_model=SQLInput, output_model=SQLOutput): ) # 执行SQL语句 - sql_exec_results = await self._execute_sql(sql_list) - if sql_exec_results is None: + sql_exec_results, sql_exec = await self._execute_sql(sql_list) + if sql_exec_results is None or sql_exec is None: raise CallError( message="SQL查询错误:SQL语句执行失败!", data={}, @@ -138,6 +141,7 @@ class SQL(CoreCall, input_model=SQLInput, output_model=SQLOutput): # 返回结果 data = SQLOutput( dataset=sql_exec_results, + sql=sql_exec, ).model_dump(exclude_none=True, by_alias=True) yield CallOutputChunk( diff --git a/apps/scheduler/call/suggest/schema.py b/apps/scheduler/call/suggest/schema.py index bae027d0..c263192a 100644 --- a/apps/scheduler/call/suggest/schema.py +++ b/apps/scheduler/call/suggest/schema.py @@ -12,24 +12,18 @@ class SingleFlowSuggestionConfig(BaseModel): question: str | None = Field(default=None, description="固定的推荐问题") -class SuggestionOutputItem(BaseModel): - """问题推荐结果的单个条目""" - - question: str - app_id: str - flow_id: str - flow_description: str - - class SuggestionInput(DataBase): """问题推荐输入""" question: str - task_id: str user_sub: str + history_questions: list[str] class SuggestionOutput(DataBase): """问题推荐结果""" - output: list[SuggestionOutputItem] + question: str + app_id: str = Field(alias="appId") + flow_id: str = Field(alias="flowId") + flow_description: str = Field(alias="flowDescription") diff --git a/apps/scheduler/call/suggest/suggest.py b/apps/scheduler/call/suggest/suggest.py index 28ea2b43..443e5d2c 100644 --- a/apps/scheduler/call/suggest/suggest.py +++ b/apps/scheduler/call/suggest/suggest.py @@ -4,14 +4,25 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ +import random from collections.abc import AsyncGenerator from typing import TYPE_CHECKING, Any, Self from pydantic import Field - -from apps.entities.scheduler import CallInfo, CallOutputChunk, CallVars +from pydantic.json_schema import SkipJsonSchema + +from apps.common.security import Security +from apps.entities.enum_var import CallOutputType +from apps.entities.pool import NodePool +from apps.entities.record import RecordContent +from apps.entities.scheduler import ( + CallError, + CallInfo, + CallOutputChunk, + CallVars, +) from apps.llm.patterns.recommend import Recommend -from apps.manager.task import TaskManager +from apps.manager.record import RecordManager from apps.manager.user_domain import UserDomainManager from apps.scheduler.call.core import CoreCall from apps.scheduler.call.suggest.schema import ( @@ -27,20 +38,22 @@ if TYPE_CHECKING: class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionOutput): """问题推荐""" - configs: list[SingleFlowSuggestionConfig] = Field(description="问题推荐配置", min_length=1) + configs: list[SingleFlowSuggestionConfig] = Field(description="问题推荐配置") num: int = Field(default=3, ge=1, le=6, description="推荐问题的总数量(必须大于等于configs中涉及的Flow的数量)") - context: list[dict[str, str]] = Field(description="Executor的上下文") + context: SkipJsonSchema[list[dict[str, str]]] = Field(description="Executor的上下文", exclude=True) + conversation_id: SkipJsonSchema[str] = Field(description="对话ID", exclude=True) @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="问题推荐", description="在答案下方显示推荐的下一个问题") - async def init(self, executor: "StepExecutor", **kwargs: Any) -> tuple[Self, dict[str, Any]]: + @classmethod + async def instance(cls, executor: "StepExecutor", node: NodePool | None, **kwargs: Any) -> Self: """初始化""" - self.context = [ + context = [ { "role": "user", "content": executor.task.runtime.question, @@ -50,33 +63,127 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO "content": executor.task.runtime.answer, }, ] - - return await super().init(executor, **kwargs) + obj = cls( + name=executor.step.step.name, + description=executor.step.step.description, + node=node, + context=context, + conversation_id=executor.task.ids.conversation_id, + **kwargs, + ) + await obj._set_input(executor) + return obj - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> SuggestionInput: """初始化""" + from apps.manager.appcenter import AppCenterManager + + self._task_id = call_vars.ids.task_id + self._history_questions = await self._get_history_questions( + call_vars.ids.user_sub, + self.conversation_id, + ) + self._app_id = call_vars.ids.app_id + self._flow_id = call_vars.ids.flow_id + app_metadata = await AppCenterManager.fetch_app_data_by_id(self._app_id) + self._avaliable_flows = {} + for flow in app_metadata.flows: + self._avaliable_flows[flow.id] = flow.description + return SuggestionInput( question=call_vars.question, - task_id=call_vars.ids.task_id, user_sub=call_vars.ids.user_sub, - ).model_dump(by_alias=True, exclude_none=True) + history_questions=self._history_questions, + ) + + + async def _get_history_questions(self, user_sub: str, conversation_id: str) -> list[str]: + """获取当前对话的历史问题""" + records = await RecordManager.query_record_by_conversation_id( + user_sub, + conversation_id, + 15, + ) + + history_questions = [] + for record in records: + record_data = RecordContent.model_validate_json(Security.decrypt(record.content, record.key)) + history_questions.append(record_data.question) + return history_questions async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """运行问题推荐""" data = SuggestionInput(**input_data) - # 获取当前任务 - task = await TaskManager.get_task(task_id=data.task_id) + + # 配置不正确 + if self.num < len(self.configs): + raise CallError( + message="推荐问题的数量必须大于等于配置的数量", + data={}, + ) # 获取当前用户的画像 user_domain = await UserDomainManager.get_user_domain_by_user_sub_and_topk(data.user_sub, 5) - - recommended_questions = await Recommend().generate( - data.task_id, - conversation=self.context, - user_preference=user_domain, - history_questions=task.runtime.history, - ) - - yield CallOutputChunk(content="") + # 已推送问题数量 + pushed_questions = 0 + + # 先处理configs + for config in self.configs: + if config.flow_id not in self._avaliable_flows: + raise CallError( + message="配置的Flow ID不存在", + data={}, + ) + + if config.question: + question = config.question + else: + questions = await Recommend().generate( + self._task_id, + conversation=self.context, + user_preference=user_domain, + history_questions=self._history_questions, + tool_name=config.flow_id, + tool_description=self._avaliable_flows[config.flow_id], + ) + question = questions[random.randint(0, len(questions) - 1)] # noqa: S311 + + yield CallOutputChunk( + type=CallOutputType.DATA, + content=SuggestionOutput( + question=question, + appId=self._app_id, + flowId=config.flow_id, + flowDescription=self._avaliable_flows[config.flow_id], + ).model_dump(by_alias=True, exclude_none=True), + ) + pushed_questions += 1 + + + while pushed_questions < self.num: + recommended_questions = await Recommend().generate( + self._task_id, + conversation=self.context, + user_preference=user_domain, + history_questions=self._history_questions, + tool_name=self._flow_id, + tool_description=self._avaliable_flows[self._flow_id], + ) + + # 只会关联当前flow + for question in recommended_questions: + if pushed_questions >= self.num: + break + + yield CallOutputChunk( + type=CallOutputType.DATA, + content=SuggestionOutput( + question=question, + appId=self._app_id, + flowId=self._flow_id, + flowDescription=self._avaliable_flows[self._flow_id], + ).model_dump(by_alias=True, exclude_none=True), + ) + pushed_questions += 1 diff --git a/apps/scheduler/call/summary/summary.py b/apps/scheduler/call/summary/summary.py index a4514f7c..b7293366 100644 --- a/apps/scheduler/call/summary/summary.py +++ b/apps/scheduler/call/summary/summary.py @@ -10,6 +10,7 @@ from typing import TYPE_CHECKING, Any, Self from pydantic import Field from apps.entities.enum_var import CallOutputType +from apps.entities.pool import NodePool from apps.entities.scheduler import ( CallInfo, CallOutputChunk, @@ -31,29 +32,29 @@ class Summary(CoreCall, input_model=SummaryInput, output_model=SummaryOutput): context: ExecutorBackground = Field(description="对话上下文") @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="理解上下文", description="使用大模型,理解对话上下文") @classmethod - async def init(cls, executor: "StepExecutor", **kwargs: Any) -> tuple[Self, dict[str, Any]]: - """初始化工具""" - cls_obj = cls( + async def instance(cls, executor: "StepExecutor", node: NodePool | None, **kwargs: Any) -> Self: + """实例化工具""" + obj = cls( context=executor.background, name=executor.step.step.name, description=executor.step.step.description, + node=node, **kwargs, ) + await obj._set_input(executor) + return obj - sys_vars = cls._assemble_call_vars(executor) - input_data = await cls_obj._init(sys_vars) - return cls_obj, input_data - async def _init(self, call_vars: CallVars) -> dict[str, Any]: - """初始化工具""" + async def _init(self, call_vars: CallVars) -> SummaryInput: + """初始化工具,返回输入""" return SummaryInput( task_id=call_vars.ids.task_id, - ).model_dump(by_alias=True, exclude_none=True) + ) async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: -- Gitee From 5bb62571258bbb249ca7618f59233d5e3c090803 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Sat, 19 Apr 2025 16:22:15 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E6=9B=B4=E6=96=B0Executor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/entities/collection.py | 2 +- apps/entities/enum_var.py | 2 -- apps/entities/message.py | 9 --------- apps/entities/scheduler.py | 1 + apps/entities/task.py | 4 ++-- apps/scheduler/executor/node.py | 2 +- apps/scheduler/executor/step.py | 20 +++++++++++--------- 7 files changed, 16 insertions(+), 24 deletions(-) diff --git a/apps/entities/collection.py b/apps/entities/collection.py index eea85e02..af0b5a88 100644 --- a/apps/entities/collection.py +++ b/apps/entities/collection.py @@ -23,7 +23,7 @@ class Blacklist(BaseModel): question: str answer: str is_audited: bool = False - reason_type: list[str] = [] + reason_type: str = "" reason: str | None = None updated_at: float = Field(default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3)) diff --git a/apps/entities/enum_var.py b/apps/entities/enum_var.py index 284c89c9..3d19ab98 100644 --- a/apps/entities/enum_var.py +++ b/apps/entities/enum_var.py @@ -41,7 +41,6 @@ class EventType(str, Enum): TEXT_ADD = "text.add" GRAPH = "graph" DOCUMENT_ADD = "document.add" - SUGGEST = "suggest" FLOW_START = "flow.start" STEP_INPUT = "step.input" STEP_OUTPUT = "step.output" @@ -144,7 +143,6 @@ class SpecialCallType(str, Enum): SUMMARY = "Summary" FACTS = "Facts" SLOT = "Slot" - OUTPUT = "Output" LLM = "LLM" START = "start" END = "end" diff --git a/apps/entities/message.py b/apps/entities/message.py index bd30059c..557d311b 100644 --- a/apps/entities/message.py +++ b/apps/entities/message.py @@ -67,15 +67,6 @@ class DocumentAddContent(BaseModel): document_size: float = Field(ge=0, description="文档大小,单位是KB,保留两位小数", alias="documentSize") -class SuggestContent(BaseModel): - """suggest消息的content""" - - app_id: str = Field(description="插件ID", alias="appId") - flow_id: str = Field(description="Flow ID", alias="flowId") - flow_description: str = Field(description="Flow描述", alias="flowDescription") - question: str = Field(description="用户问题") - - class FlowStartContent(BaseModel): """flow.start消息的content""" diff --git a/apps/entities/scheduler.py b/apps/entities/scheduler.py index aedc2d8d..215efe5f 100644 --- a/apps/entities/scheduler.py +++ b/apps/entities/scheduler.py @@ -25,6 +25,7 @@ class CallIds(BaseModel): task_id: str = Field(description="任务ID") flow_id: str = Field(description="Flow ID") session_id: str = Field(description="当前用户的Session ID") + app_id: str = Field(description="当前应用的ID") user_sub: str = Field(description="当前用户的用户ID") diff --git a/apps/entities/task.py b/apps/entities/task.py index 04292c41..24a4f3d5 100644 --- a/apps/entities/task.py +++ b/apps/entities/task.py @@ -97,5 +97,5 @@ class StepQueueItem(BaseModel): step_id: str = Field(description="步骤ID") step: Step = Field(description="步骤") - enable_filling: bool = Field(description="是否启用填充", default=True) - to_user: bool = Field(description="是否输出给用户", default=False) + enable_filling: bool | None = Field(description="是否启用填充", default=None) + to_user: bool | None = Field(description="是否输出给用户", default=None) diff --git a/apps/scheduler/executor/node.py b/apps/scheduler/executor/node.py index 4b2b8ce6..7584dce4 100644 --- a/apps/scheduler/executor/node.py +++ b/apps/scheduler/executor/node.py @@ -23,7 +23,7 @@ class StepNode: flag = False if not hasattr(call_cls, "_exec") or not inspect.isasyncgenfunction(call_cls._exec): # noqa: SLF001 flag = False - if not hasattr(call_cls, "cls_info") or not callable(call_cls.cls_info): + if not hasattr(call_cls, "info") or not callable(call_cls.info): flag = False return flag diff --git a/apps/scheduler/executor/step.py b/apps/scheduler/executor/step.py index 1614e224..999df2ec 100644 --- a/apps/scheduler/executor/step.py +++ b/apps/scheduler/executor/step.py @@ -87,7 +87,7 @@ class StepExecutor(BaseExecutor): params.update(self.step.step.params) try: - self.obj, self.input = await call_cls.init(self, **params) + self.obj = await call_cls.instance(self, self.node, **params) except Exception: logger.exception("[StepExecutor] 初始化Call失败") raise @@ -121,20 +121,22 @@ class StepExecutor(BaseExecutor): await TaskManager.save_task(self.task.id, self.task) # 准备参数 params = { - "data": self.input, - "current_schema": self.obj.input_type.model_json_schema(), + "data": self.obj.input, + "current_schema": self.obj.input_model.model_json_schema( + override=self.node.override_input if self.node and self.node.override_input else {}, + ), } # 初始化填参 - slot_obj, slot_input = await Slot.init(self, **params) + slot_obj = await Slot.instance(self, self.node, **params) # 推送填参消息 - await self.push_message(EventType.STEP_INPUT.value, slot_input) + await self.push_message(EventType.STEP_INPUT.value, slot_obj.input) # 运行填参 - iterator = slot_obj.exec(self, slot_input) + iterator = slot_obj.exec(self, slot_obj.input) async for chunk in iterator: result: SlotOutput = SlotOutput.model_validate(chunk.content) - self.input.update(result.slot_data) + self.obj.input.update(result.slot_data) # 恢复State self.task.state.step_id = current_step_id # type: ignore[arg-type] @@ -181,10 +183,10 @@ class StepExecutor(BaseExecutor): logger.info("[StepExecutor] 运行步骤 %s", self.step.step.name) # 推送输入 - await self.push_message(EventType.STEP_INPUT.value, self.input) + await self.push_message(EventType.STEP_INPUT.value, self.obj.input) # 执行步骤 - iterator = self.obj.exec(self, self.input) + iterator = self.obj.exec(self, self.obj.input) try: content = await self._process_chunk(iterator, to_user=self.obj.to_user) -- Gitee