diff --git a/apps/entities/collection.py b/apps/entities/collection.py index eea85e026e9e412a9ef29ccd9d01e722f251cb75..af0b5a88e128b2c40c6e86eb814a9889bde654aa 100644 --- a/apps/entities/collection.py +++ b/apps/entities/collection.py @@ -23,7 +23,7 @@ class Blacklist(BaseModel): question: str answer: str is_audited: bool = False - reason_type: list[str] = [] + reason_type: str = "" reason: str | None = None updated_at: float = Field(default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3)) diff --git a/apps/entities/enum_var.py b/apps/entities/enum_var.py index 284c89c959ec16442b17ca78204cfd01a0540380..3d19ab98698776f0ddba27a976ae9814197a98ec 100644 --- a/apps/entities/enum_var.py +++ b/apps/entities/enum_var.py @@ -41,7 +41,6 @@ class EventType(str, Enum): TEXT_ADD = "text.add" GRAPH = "graph" DOCUMENT_ADD = "document.add" - SUGGEST = "suggest" FLOW_START = "flow.start" STEP_INPUT = "step.input" STEP_OUTPUT = "step.output" @@ -144,7 +143,6 @@ class SpecialCallType(str, Enum): SUMMARY = "Summary" FACTS = "Facts" SLOT = "Slot" - OUTPUT = "Output" LLM = "LLM" START = "start" END = "end" diff --git a/apps/entities/message.py b/apps/entities/message.py index bd30059cc39729cfe7d0d019dbac2eb7e9ee63b4..557d311b0ccb677dace093b320c1c0a012b29adc 100644 --- a/apps/entities/message.py +++ b/apps/entities/message.py @@ -67,15 +67,6 @@ class DocumentAddContent(BaseModel): document_size: float = Field(ge=0, description="文档大小,单位是KB,保留两位小数", alias="documentSize") -class SuggestContent(BaseModel): - """suggest消息的content""" - - app_id: str = Field(description="插件ID", alias="appId") - flow_id: str = Field(description="Flow ID", alias="flowId") - flow_description: str = Field(description="Flow描述", alias="flowDescription") - question: str = Field(description="用户问题") - - class FlowStartContent(BaseModel): """flow.start消息的content""" diff --git a/apps/entities/scheduler.py b/apps/entities/scheduler.py index aedc2d8d940919e07df703db9aa923ec94b41aa3..215efe5f1ed89071a478d35b95ba210cec570906 100644 --- a/apps/entities/scheduler.py +++ b/apps/entities/scheduler.py @@ -25,6 +25,7 @@ class CallIds(BaseModel): task_id: str = Field(description="任务ID") flow_id: str = Field(description="Flow ID") session_id: str = Field(description="当前用户的Session ID") + app_id: str = Field(description="当前应用的ID") user_sub: str = Field(description="当前用户的用户ID") diff --git a/apps/entities/task.py b/apps/entities/task.py index 04292c4105f637c01244002fe9a07885b33159b6..24a4f3d56e4136c7e0f688baff9d8b602e1cdc04 100644 --- a/apps/entities/task.py +++ b/apps/entities/task.py @@ -97,5 +97,5 @@ class StepQueueItem(BaseModel): step_id: str = Field(description="步骤ID") step: Step = Field(description="步骤") - enable_filling: bool = Field(description="是否启用填充", default=True) - to_user: bool = Field(description="是否输出给用户", default=False) + enable_filling: bool | None = Field(description="是否启用填充", default=None) + to_user: bool | None = Field(description="是否输出给用户", default=None) diff --git a/apps/llm/patterns/recommend.py b/apps/llm/patterns/recommend.py index 1bd43c83e29f8fba1049bb129e8a4c9b5444c754..2d236d68e640d728dc99ad5bc442e699572561c4 100644 --- a/apps/llm/patterns/recommend.py +++ b/apps/llm/patterns/recommend.py @@ -21,9 +21,10 @@ class Recommend(CorePattern): user_prompt: str = r""" - 根据提供的对话和附加信息(用户倾向、历史问题列表等),生成三个预测问题。 + 根据提供的对话和附加信息(用户倾向、历史问题列表、工具信息等),生成三个预测问题。 历史提问列表展示的是用户发生在历史对话之前的提问,仅为背景参考作用。 - 对话将在标签中给出,用户倾向将在标签中给出,历史问题列表将在标签中给出。 + 对话将在标签中给出,用户倾向将在标签中给出,\ + 历史问题列表将在标签中给出,工具信息将在标签中给出。 生成预测问题时的要求: 1. 以用户口吻生成预测问题,数量必须为3个,必须为疑问句或祈使句,必须少于30字。 @@ -50,6 +51,12 @@ class Recommend(CorePattern): 简单介绍一下杭州 杭州有哪些著名景点? + + + 景点查询 + 查询景点信息 + + ["杭州", "旅游"] 现在,进行问题生成: @@ -73,6 +80,11 @@ class Recommend(CorePattern): {history_questions} + + {tool_name} + {tool_description} + + {user_preference} 现在,进行问题生成: @@ -118,6 +130,8 @@ class Recommend(CorePattern): conversation=convert_context_to_prompt(kwargs["conversation"]), history_questions=history_questions, user_preference=user_preference, + tool_name=kwargs["tool_name"], + tool_description=kwargs["tool_description"], )}, ] diff --git a/apps/manager/node.py b/apps/manager/node.py index a9d3362fd1bfe22f064af70d76bcb117722614b2..41a3e4f71c31d731a9735c98580fc26f8136443c 100644 --- a/apps/manager/node.py +++ b/apps/manager/node.py @@ -115,5 +115,7 @@ class NodeManager: # 返回参数Schema return ( NodeManager.merge_params_schema(call_class.model_json_schema(), node_data.known_params or {}), - call_class.output_type.model_json_schema(), # type: ignore[attr-defined] + call_class.output_model.model_json_schema( # type: ignore[attr-defined] + override=node_data.override_output if node_data.override_output else {}, + ), ) diff --git a/apps/scheduler/call/api/api.py b/apps/scheduler/call/api/api.py index 38d732e9139d92039985d37bc966b20ddce5a219..0e42b973cc0ee65cd35730c9e8626dba979d3b61 100644 --- a/apps/scheduler/call/api/api.py +++ b/apps/scheduler/call/api/api.py @@ -65,32 +65,17 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="API调用", description="向某一个API接口发送HTTP请求,获取数据。") - @classmethod - async def init(cls, executor: "StepExecutor", **kwargs: Any) -> tuple[Self, dict[str, Any]]: - """初始化工具""" - cls_obj = cls( - name=executor.step.step.name, - description=executor.step.step.description, - **kwargs, - ) - - call_vars = cls._assemble_call_vars(executor) - input_data = await cls_obj._init(call_vars) - - return cls_obj, input_data - - - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> APIInput: """初始化API调用工具""" # 获取对应API的Service Metadata try: service_metadata = await ServiceCenterManager.get_service_data( - call_vars.ids.user_sub, call_vars.ids.service_id, + call_vars.ids.user_sub, self.node.service_id or "", ) service_metadata = ServiceMetadata.model_validate(service_metadata) except Exception as e: @@ -109,7 +94,7 @@ class API(CoreCall, input_model=APIInput, output_model=APIOutput): method=self.method, query=self.query, body=self.body, - ).model_dump(exclude_none=True, by_alias=True) + ) async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/call/choice/choice.py b/apps/scheduler/call/choice/choice.py index c493ea80eae245d7200b82919c3abd4d5d0abade..f6a66e8531f3d61ab3e0b070871998aa03c46141 100644 --- a/apps/scheduler/call/choice/choice.py +++ b/apps/scheduler/call/choice/choice.py @@ -5,7 +5,8 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ from enum import Enum -from pydantic import BaseModel +from apps.scheduler.call.choice.schema import ChoiceInput, ChoiceOutput +from apps.scheduler.call.core import CoreCall class Operator(str, Enum): @@ -14,8 +15,7 @@ class Operator(str, Enum): pass -class ChoiceInput(BaseModel): - """Choice工具的输入格式""" +class Choice(CoreCall, input_model=ChoiceInput, output_model=ChoiceOutput): + """Choice工具""" pass - diff --git a/apps/scheduler/call/choice/schema.py b/apps/scheduler/call/choice/schema.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..ae20986895ff8dd1a7170737db53d8888ae2549a 100644 --- a/apps/scheduler/call/choice/schema.py +++ b/apps/scheduler/call/choice/schema.py @@ -0,0 +1,11 @@ +"""Choice Call的输入和输出""" + +from apps.scheduler.call.core import DataBase + + +class ChoiceInput(DataBase): + """Choice Call的输入""" + + +class ChoiceOutput(DataBase): + """Choice Call的输出""" diff --git a/apps/scheduler/call/convert/convert.py b/apps/scheduler/call/convert/convert.py index f15fd2f882d4566ba2b15a3a9f85eba4172c5ad3..9952e041aff8c2a3fec38e7d41056757ff868969 100644 --- a/apps/scheduler/call/convert/convert.py +++ b/apps/scheduler/call/convert/convert.py @@ -30,23 +30,23 @@ class Convert(CoreCall, input_model=ConvertInput, output_model=ConvertOutput): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="模板转换", description="使用jinja2语法和jsonnet语法,将自然语言信息和原始数据进行格式化。") - async def _init(self, syscall_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> ConvertInput: """初始化工具""" - await super()._init(syscall_vars) + await super()._init(call_vars) - self._history = syscall_vars.history - self._question = syscall_vars.question + self._history = call_vars.history + self._question = call_vars.question self._env = SandboxedEnvironment( loader=BaseLoader(), autoescape=False, trim_blocks=True, lstrip_blocks=True, ) - return ConvertInput().model_dump(exclude_none=True, by_alias=True) + return ConvertInput() async def _exec(self) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py index 7a90b08d7dd7cea3600c30a4f2a415c57229f1f6..1ab76c808fe88ad8bf749c9a8f4d06eb05bca8c6 100644 --- a/apps/scheduler/call/core.py +++ b/apps/scheduler/call/core.py @@ -13,6 +13,7 @@ from pydantic import BaseModel, ConfigDict, Field from pydantic.json_schema import SkipJsonSchema from apps.entities.enum_var import CallOutputType +from apps.entities.pool import NodePool from apps.entities.scheduler import ( CallIds, CallInfo, @@ -43,6 +44,7 @@ class CoreCall(BaseModel): name: SkipJsonSchema[str] = Field(description="Step的名称", exclude=True) description: SkipJsonSchema[str] = Field(description="Step的描述", exclude=True) + node: SkipJsonSchema[NodePool | None] = Field(description="节点信息", exclude=True) input_model: ClassVar[SkipJsonSchema[type[DataBase]]] = Field( description="Call的输入Pydantic类型;不包含override的模板", exclude=True, @@ -70,28 +72,12 @@ class CoreCall(BaseModel): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" - err = "[CoreCall] 必须手动实现cls_info方法" + err = "[CoreCall] 必须手动实现info方法" raise NotImplementedError(err) - @property - def input_type(self) -> type[DataBase]: - """返回输入类型""" - class InputType(self.input_model): - pass - return InputType - - - @property - def output_type(self) -> type[DataBase]: - """返回输出类型""" - class OutputType(self._output_type_template): - pass - return OutputType - - @staticmethod def _assemble_call_vars(executor: "StepExecutor") -> CallVars: """组装CallVars""" @@ -106,6 +92,7 @@ class CoreCall(BaseModel): flow_id=executor.task.state.flow_id, session_id=executor.task.ids.session_id, user_sub=executor.task.ids.user_sub, + app_id=executor.task.state.app_id, ), question=executor.question, history=executor.task.context, @@ -114,21 +101,28 @@ class CoreCall(BaseModel): @classmethod - async def init(cls, executor: "StepExecutor", **kwargs: Any) -> tuple[Self, dict[str, Any]]: + async def instance(cls, executor: "StepExecutor", node: NodePool | None, **kwargs: Any) -> Self: """实例化Call类""" - sys_vars = cls._assemble_call_vars(executor) - - call_obj = cls( + obj = cls( name=executor.step.step.name, description=executor.step.step.description, + node=node, **kwargs, ) - input_data = await call_obj._init(sys_vars) - return call_obj, input_data + + await obj._set_input(executor) + return obj - async def _init(self, call_vars: CallVars) -> dict[str, Any]: - """实例化Call类,并返回Call的输入""" + async def _set_input(self, executor: "StepExecutor") -> None: + """获取Call的输入""" + self._sys_vars = self._assemble_call_vars(executor) + input_data = await self._init(self._sys_vars) + self.input = input_data.model_dump(by_alias=True, exclude_none=True) + + + async def _init(self, call_vars: CallVars) -> DataBase: + """初始化Call类,并返回Call的输入""" err = "[CoreCall] 初始化方法必须手动实现" raise NotImplementedError(err) @@ -137,7 +131,13 @@ class CoreCall(BaseModel): """Call类实例的流式输出方法""" yield CallOutputChunk(type=CallOutputType.TEXT, content="") + + async def _after_exec(self, input_data: dict[str, Any]) -> None: + """Call类实例的执行后方法""" + + async def exec(self, executor: "StepExecutor", input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """Call类实例的执行方法""" async for chunk in self._exec(input_data): yield chunk + await self._after_exec(input_data) diff --git a/apps/scheduler/call/empty.py b/apps/scheduler/call/empty.py index 84dc9ae791ab7a9d5c8ab3ec1a4fe494ee5b0829..f33832c2fbc4e280c2f79965e5f80abe2bd93b05 100644 --- a/apps/scheduler/call/empty.py +++ b/apps/scheduler/call/empty.py @@ -16,14 +16,14 @@ class Empty(CoreCall, input_model=DataBase, output_model=DataBase): """空Call""" @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="空白", description="空白节点,用于占位") - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> DataBase: """初始化""" - return {} + return DataBase() async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/call/facts/facts.py b/apps/scheduler/call/facts/facts.py index 0c099729a2749c33373285e4494fccf682da72f3..0e0e15d73308c6fc776c81a648c6907c1054949e 100644 --- a/apps/scheduler/call/facts/facts.py +++ b/apps/scheduler/call/facts/facts.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, Any, Self from pydantic import Field from apps.entities.enum_var import CallOutputType +from apps.entities.pool import NodePool from apps.entities.scheduler import CallInfo, CallOutputChunk, CallVars from apps.llm.patterns.domain import Domain from apps.llm.patterns.facts import Facts @@ -24,28 +25,27 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="提取事实", description="从对话上下文和文档片段中提取事实。") @classmethod - async def init(cls, executor: "StepExecutor", **kwargs: Any) -> tuple[Self, dict[str, Any]]: + async def instance(cls, executor: "StepExecutor", node: NodePool | None, **kwargs: Any) -> Self: """初始化工具""" - cls_obj = cls( + obj = cls( answer=executor.task.runtime.answer, name=executor.step.step.name, description=executor.step.step.description, + node=node, **kwargs, ) - call_vars = cls._assemble_call_vars(executor) - input_data = await cls_obj._init(call_vars) + await obj._set_input(executor) + return obj - return cls_obj, input_data - - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> FactsInput: """初始化工具""" # 组装必要变量 message = [ @@ -57,7 +57,7 @@ class FactsCall(CoreCall, input_model=FactsInput, output_model=FactsOutput): task_id=call_vars.ids.task_id, user_sub=call_vars.ids.user_sub, message=message, - ).model_dump(exclude_none=True, by_alias=True) + ) async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/call/graph/graph.py b/apps/scheduler/call/graph/graph.py index da0783b4b476616ed06c58792eb91ca1bbf1fb06..020326b62dfd8f023d4b406abb7903704ae9335d 100644 --- a/apps/scheduler/call/graph/graph.py +++ b/apps/scheduler/call/graph/graph.py @@ -30,12 +30,12 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="图表", description="将SQL查询出的数据转换为图表") - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> RenderInput: """初始化Render Call,校验参数,读取option模板""" await super()._init(call_vars) @@ -52,7 +52,7 @@ class Graph(CoreCall, input_model=RenderInput, output_model=RenderOutput): question=call_vars.question, task_id=call_vars.ids.task_id, data=self.data, - ).model_dump(exclude_none=True, by_alias=True) + ) async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/call/llm/llm.py b/apps/scheduler/call/llm/llm.py index a8dfd7909b4cd1877f63ef834d0eb396e6d1d333..6bb3ad1cf5fa9e9fd8657a29b7c12777331020a5 100644 --- a/apps/scheduler/call/llm/llm.py +++ b/apps/scheduler/call/llm/llm.py @@ -37,11 +37,13 @@ class LLM(CoreCall, input_model=LLMInput, output_model=LLMOutput): system_prompt: str = Field(description="大模型系统提示词", default="") user_prompt: str = Field(description="大模型用户提示词", default=LLM_DEFAULT_PROMPT) + @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="大模型", description="以指定的提示词和上下文信息调用大模型,并获得输出。") + async def _prepare_message(self, call_vars: CallVars) -> list[dict[str, Any]]: """准备消息""" # 创建共享的 Environment 实例 @@ -87,12 +89,14 @@ class LLM(CoreCall, input_model=LLMInput, output_model=LLMOutput): {"role": "user", "content": user_input}, ] - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + + async def _init(self, call_vars: CallVars) -> LLMInput: """初始化LLM工具""" return LLMInput( task_id=call_vars.ids.task_id, message=await self._prepare_message(call_vars), - ).model_dump(exclude_none=True, by_alias=True) + ) + async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """运行LLM Call""" diff --git a/apps/scheduler/call/rag/rag.py b/apps/scheduler/call/rag/rag.py index b73b21f8be25e2251a76d3c41c3c1179983ce9f1..270e61724d2d8719b88953d51bbdc2462c39cf4f 100644 --- a/apps/scheduler/call/rag/rag.py +++ b/apps/scheduler/call/rag/rag.py @@ -34,12 +34,14 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): top_k: int = Field(description="返回的答案数量(经过整合以及上下文关联)", default=5) retrieval_mode: Literal["chunk", "full_text"] = Field(description="检索模式", default="chunk") + @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="知识库", description="查询知识库,从文档中获取必要信息") - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + + async def _init(self, call_vars: CallVars) -> RAGInput: """初始化RAG工具""" self._task_id = call_vars.ids.task_id return RAGInput( @@ -47,7 +49,8 @@ class RAG(CoreCall, input_model=RAGInput, output_model=RAGOutput): kb_sn=self.knowledge_base, top_k=self.top_k, retrieval_mode=RetrievalMode(self.retrieval_mode), - ).model_dump(by_alias=True, exclude_none=True) + ) + async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """调用RAG工具""" diff --git a/apps/scheduler/call/search/schema.py b/apps/scheduler/call/search/schema.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..6813b3ae0b99b1178f10ba6eaf0a16663cb6d513 100644 --- a/apps/scheduler/call/search/schema.py +++ b/apps/scheduler/call/search/schema.py @@ -0,0 +1,19 @@ +"""Search Call的输入和输出""" + +from typing import Any + +from pydantic import Field + +from apps.scheduler.call.core import DataBase + + +class SearchInput(DataBase): + """搜索工具输入""" + + query: str = Field(description="搜索关键词") + + +class SearchRet(DataBase): + """搜索工具返回值""" + + data: list[dict[str, Any]] = Field(description="搜索结果") diff --git a/apps/scheduler/call/search/search.py b/apps/scheduler/call/search/search.py index e12d385809e90ecf31284cc1b943404fc8764993..006938fa7cea23c306fe2dc54b8f89bf56b56f17 100644 --- a/apps/scheduler/call/search/search.py +++ b/apps/scheduler/call/search/search.py @@ -1,28 +1,22 @@ """搜索工具""" -from typing import Any, ClassVar -from pydantic import BaseModel, Field +from typing import Any, ClassVar from apps.entities.scheduler import CallVars from apps.scheduler.call.core import CoreCall +from apps.scheduler.call.search.schema import SearchInput, SearchRet -class SearchRet(BaseModel): - """搜索工具返回值""" - - data: list[dict[str, Any]] = Field(description="搜索结果") - - -class Search(CoreCall, ret_type=SearchRet): +class Search(CoreCall, input_model=SearchInput, output_model=SearchRet): """搜索工具""" name: ClassVar[str] = "搜索" description: ClassVar[str] = "获取搜索引擎的结果" - async def _init(self, syscall_vars: CallVars, **kwargs: Any) -> dict[str, Any]: + async def _init(self, call_vars: CallVars, **kwargs: Any) -> SearchInput: """初始化工具""" self._query: str = kwargs["query"] - return {} + return SearchInput(query=self._query) async def _exec(self) -> dict[str, Any]: diff --git a/apps/scheduler/call/slot/slot.py b/apps/scheduler/call/slot/slot.py index 07640490ab7e633c7ed544e358d2120281d995fa..1c4455a4ab96a35d6737b2602aff327ddd35b33e 100644 --- a/apps/scheduler/call/slot/slot.py +++ b/apps/scheduler/call/slot/slot.py @@ -7,6 +7,7 @@ import jinja2 from pydantic import Field from apps.entities.enum_var import CallOutputType +from apps.entities.pool import NodePool from apps.entities.scheduler import CallInfo, CallOutputChunk, CallVars from apps.llm.patterns.json_gen import Json from apps.manager.task import TaskManager @@ -29,7 +30,7 @@ class Slot(CoreCall, input_model=SlotInput, output_model=SlotOutput): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="参数自动填充", description="根据步骤历史,自动填充参数") @@ -53,20 +54,21 @@ class Slot(CoreCall, input_model=SlotInput, output_model=SlotOutput): ) @classmethod - async def init(cls, executor: "StepExecutor", **kwargs: Any) -> tuple[Self, dict[str, Any]]: + async def instance(cls, executor: "StepExecutor", node: NodePool | None, **kwargs: Any) -> Self: """实例化Call类""" - call_obj = cls( + obj = cls( + name=executor.step.step.name, + description=executor.step.step.description, facts=executor.background.facts, summary=executor.task.runtime.summary, + node=node, **kwargs, ) + await obj._set_input(executor) + return obj - sys_vars = cls._assemble_call_vars(executor) - input_data = await call_obj._init(sys_vars) - return call_obj, input_data - - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> SlotInput: """初始化""" self._task_id = call_vars.ids.task_id self._flow_history = await TaskManager.get_flow_history_by_task_id(self._task_id, self.step_num) @@ -74,14 +76,14 @@ class Slot(CoreCall, input_model=SlotInput, output_model=SlotOutput): if not self.current_schema: return SlotInput( remaining_schema={}, - ).model_dump(by_alias=True, exclude_none=True) + ) self._processor = SlotProcessor(self.current_schema) remaining_schema = self._processor.check_json(self.data) return SlotInput( remaining_schema=remaining_schema, - ).model_dump(by_alias=True, exclude_none=True) + ) async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/call/sql/schema.py b/apps/scheduler/call/sql/schema.py index b1dc669db95684eb676247f70961569b22788c72..66c5945e2338a2527954a2796d3a19f0dbc64c2e 100644 --- a/apps/scheduler/call/sql/schema.py +++ b/apps/scheduler/call/sql/schema.py @@ -17,3 +17,4 @@ class SQLOutput(DataBase): """SQL工具的输出""" dataset: list[dict[str, Any]] = Field(description="SQL工具的执行结果") + sql: str = Field(description="SQL语句") diff --git a/apps/scheduler/call/sql/sql.py b/apps/scheduler/call/sql/sql.py index e3ef25c71d7d0f361a14a7b7bb741ac6f3b5b87e..2d4a973a87d1353c7917333321fc737ed9c7339a 100644 --- a/apps/scheduler/call/sql/sql.py +++ b/apps/scheduler/call/sql/sql.py @@ -37,16 +37,16 @@ class SQL(CoreCall, input_model=SQLInput, output_model=SQLOutput): @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="SQL查询", description="使用大模型生成SQL语句,用于查询数据库中的结构化数据") - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> SQLInput: """初始化SQL工具。""" return SQLInput( question=call_vars.question, - ).model_dump(by_alias=True, exclude_none=True) + ) async def _generate_sql(self, data: SQLInput) -> list[dict[str, Any]]: @@ -87,7 +87,10 @@ class SQL(CoreCall, input_model=SQLInput, output_model=SQLOutput): return sql_list - async def _execute_sql(self, sql_list: list[dict[str, Any]]) -> list[dict[str, Any]] | None: + async def _execute_sql( + self, + sql_list: list[dict[str, Any]], + ) -> tuple[list[dict[str, Any]] | None, str | None]: """执行SQL语句并返回结果""" headers = {"Content-Type": "application/json"} @@ -105,14 +108,14 @@ class SQL(CoreCall, input_model=SQLInput, output_model=SQLOutput): if response.status == status.HTTP_200_OK: result = await response.json() if result["code"] == status.HTTP_200_OK: - return result["result"] + return result["result"], sql_dict["sql"] else: text = await response.text() logger.error("[SQL] 调用失败:%s", text) except Exception: logger.exception("[SQL] 调用失败") - return None + return None, None async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: @@ -128,8 +131,8 @@ class SQL(CoreCall, input_model=SQLInput, output_model=SQLOutput): ) # 执行SQL语句 - sql_exec_results = await self._execute_sql(sql_list) - if sql_exec_results is None: + sql_exec_results, sql_exec = await self._execute_sql(sql_list) + if sql_exec_results is None or sql_exec is None: raise CallError( message="SQL查询错误:SQL语句执行失败!", data={}, @@ -138,6 +141,7 @@ class SQL(CoreCall, input_model=SQLInput, output_model=SQLOutput): # 返回结果 data = SQLOutput( dataset=sql_exec_results, + sql=sql_exec, ).model_dump(exclude_none=True, by_alias=True) yield CallOutputChunk( diff --git a/apps/scheduler/call/suggest/schema.py b/apps/scheduler/call/suggest/schema.py index bae027d066815e86c5ad91c8999e66539ad9e8be..c263192a0c4d2a3017f7c11a9eee7d854a74016d 100644 --- a/apps/scheduler/call/suggest/schema.py +++ b/apps/scheduler/call/suggest/schema.py @@ -12,24 +12,18 @@ class SingleFlowSuggestionConfig(BaseModel): question: str | None = Field(default=None, description="固定的推荐问题") -class SuggestionOutputItem(BaseModel): - """问题推荐结果的单个条目""" - - question: str - app_id: str - flow_id: str - flow_description: str - - class SuggestionInput(DataBase): """问题推荐输入""" question: str - task_id: str user_sub: str + history_questions: list[str] class SuggestionOutput(DataBase): """问题推荐结果""" - output: list[SuggestionOutputItem] + question: str + app_id: str = Field(alias="appId") + flow_id: str = Field(alias="flowId") + flow_description: str = Field(alias="flowDescription") diff --git a/apps/scheduler/call/suggest/suggest.py b/apps/scheduler/call/suggest/suggest.py index 28ea2b438205c57950fc77a1076c9cda20ee592f..443e5d2cad15b2cc0a881e28f974dfe2830e7a6f 100644 --- a/apps/scheduler/call/suggest/suggest.py +++ b/apps/scheduler/call/suggest/suggest.py @@ -4,14 +4,25 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """ +import random from collections.abc import AsyncGenerator from typing import TYPE_CHECKING, Any, Self from pydantic import Field - -from apps.entities.scheduler import CallInfo, CallOutputChunk, CallVars +from pydantic.json_schema import SkipJsonSchema + +from apps.common.security import Security +from apps.entities.enum_var import CallOutputType +from apps.entities.pool import NodePool +from apps.entities.record import RecordContent +from apps.entities.scheduler import ( + CallError, + CallInfo, + CallOutputChunk, + CallVars, +) from apps.llm.patterns.recommend import Recommend -from apps.manager.task import TaskManager +from apps.manager.record import RecordManager from apps.manager.user_domain import UserDomainManager from apps.scheduler.call.core import CoreCall from apps.scheduler.call.suggest.schema import ( @@ -27,20 +38,22 @@ if TYPE_CHECKING: class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionOutput): """问题推荐""" - configs: list[SingleFlowSuggestionConfig] = Field(description="问题推荐配置", min_length=1) + configs: list[SingleFlowSuggestionConfig] = Field(description="问题推荐配置") num: int = Field(default=3, ge=1, le=6, description="推荐问题的总数量(必须大于等于configs中涉及的Flow的数量)") - context: list[dict[str, str]] = Field(description="Executor的上下文") + context: SkipJsonSchema[list[dict[str, str]]] = Field(description="Executor的上下文", exclude=True) + conversation_id: SkipJsonSchema[str] = Field(description="对话ID", exclude=True) @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="问题推荐", description="在答案下方显示推荐的下一个问题") - async def init(self, executor: "StepExecutor", **kwargs: Any) -> tuple[Self, dict[str, Any]]: + @classmethod + async def instance(cls, executor: "StepExecutor", node: NodePool | None, **kwargs: Any) -> Self: """初始化""" - self.context = [ + context = [ { "role": "user", "content": executor.task.runtime.question, @@ -50,33 +63,127 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO "content": executor.task.runtime.answer, }, ] - - return await super().init(executor, **kwargs) + obj = cls( + name=executor.step.step.name, + description=executor.step.step.description, + node=node, + context=context, + conversation_id=executor.task.ids.conversation_id, + **kwargs, + ) + await obj._set_input(executor) + return obj - async def _init(self, call_vars: CallVars) -> dict[str, Any]: + async def _init(self, call_vars: CallVars) -> SuggestionInput: """初始化""" + from apps.manager.appcenter import AppCenterManager + + self._task_id = call_vars.ids.task_id + self._history_questions = await self._get_history_questions( + call_vars.ids.user_sub, + self.conversation_id, + ) + self._app_id = call_vars.ids.app_id + self._flow_id = call_vars.ids.flow_id + app_metadata = await AppCenterManager.fetch_app_data_by_id(self._app_id) + self._avaliable_flows = {} + for flow in app_metadata.flows: + self._avaliable_flows[flow.id] = flow.description + return SuggestionInput( question=call_vars.question, - task_id=call_vars.ids.task_id, user_sub=call_vars.ids.user_sub, - ).model_dump(by_alias=True, exclude_none=True) + history_questions=self._history_questions, + ) + + + async def _get_history_questions(self, user_sub: str, conversation_id: str) -> list[str]: + """获取当前对话的历史问题""" + records = await RecordManager.query_record_by_conversation_id( + user_sub, + conversation_id, + 15, + ) + + history_questions = [] + for record in records: + record_data = RecordContent.model_validate_json(Security.decrypt(record.content, record.key)) + history_questions.append(record_data.question) + return history_questions async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """运行问题推荐""" data = SuggestionInput(**input_data) - # 获取当前任务 - task = await TaskManager.get_task(task_id=data.task_id) + + # 配置不正确 + if self.num < len(self.configs): + raise CallError( + message="推荐问题的数量必须大于等于配置的数量", + data={}, + ) # 获取当前用户的画像 user_domain = await UserDomainManager.get_user_domain_by_user_sub_and_topk(data.user_sub, 5) - - recommended_questions = await Recommend().generate( - data.task_id, - conversation=self.context, - user_preference=user_domain, - history_questions=task.runtime.history, - ) - - yield CallOutputChunk(content="") + # 已推送问题数量 + pushed_questions = 0 + + # 先处理configs + for config in self.configs: + if config.flow_id not in self._avaliable_flows: + raise CallError( + message="配置的Flow ID不存在", + data={}, + ) + + if config.question: + question = config.question + else: + questions = await Recommend().generate( + self._task_id, + conversation=self.context, + user_preference=user_domain, + history_questions=self._history_questions, + tool_name=config.flow_id, + tool_description=self._avaliable_flows[config.flow_id], + ) + question = questions[random.randint(0, len(questions) - 1)] # noqa: S311 + + yield CallOutputChunk( + type=CallOutputType.DATA, + content=SuggestionOutput( + question=question, + appId=self._app_id, + flowId=config.flow_id, + flowDescription=self._avaliable_flows[config.flow_id], + ).model_dump(by_alias=True, exclude_none=True), + ) + pushed_questions += 1 + + + while pushed_questions < self.num: + recommended_questions = await Recommend().generate( + self._task_id, + conversation=self.context, + user_preference=user_domain, + history_questions=self._history_questions, + tool_name=self._flow_id, + tool_description=self._avaliable_flows[self._flow_id], + ) + + # 只会关联当前flow + for question in recommended_questions: + if pushed_questions >= self.num: + break + + yield CallOutputChunk( + type=CallOutputType.DATA, + content=SuggestionOutput( + question=question, + appId=self._app_id, + flowId=self._flow_id, + flowDescription=self._avaliable_flows[self._flow_id], + ).model_dump(by_alias=True, exclude_none=True), + ) + pushed_questions += 1 diff --git a/apps/scheduler/call/summary/summary.py b/apps/scheduler/call/summary/summary.py index a4514f7c12b002f9dcf14d5ca2c7da0378fce102..b72933661527460ed628111f1a377ca9d0937d49 100644 --- a/apps/scheduler/call/summary/summary.py +++ b/apps/scheduler/call/summary/summary.py @@ -10,6 +10,7 @@ from typing import TYPE_CHECKING, Any, Self from pydantic import Field from apps.entities.enum_var import CallOutputType +from apps.entities.pool import NodePool from apps.entities.scheduler import ( CallInfo, CallOutputChunk, @@ -31,29 +32,29 @@ class Summary(CoreCall, input_model=SummaryInput, output_model=SummaryOutput): context: ExecutorBackground = Field(description="对话上下文") @classmethod - def cls_info(cls) -> CallInfo: + def info(cls) -> CallInfo: """返回Call的名称和描述""" return CallInfo(name="理解上下文", description="使用大模型,理解对话上下文") @classmethod - async def init(cls, executor: "StepExecutor", **kwargs: Any) -> tuple[Self, dict[str, Any]]: - """初始化工具""" - cls_obj = cls( + async def instance(cls, executor: "StepExecutor", node: NodePool | None, **kwargs: Any) -> Self: + """实例化工具""" + obj = cls( context=executor.background, name=executor.step.step.name, description=executor.step.step.description, + node=node, **kwargs, ) + await obj._set_input(executor) + return obj - sys_vars = cls._assemble_call_vars(executor) - input_data = await cls_obj._init(sys_vars) - return cls_obj, input_data - async def _init(self, call_vars: CallVars) -> dict[str, Any]: - """初始化工具""" + async def _init(self, call_vars: CallVars) -> SummaryInput: + """初始化工具,返回输入""" return SummaryInput( task_id=call_vars.ids.task_id, - ).model_dump(by_alias=True, exclude_none=True) + ) async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: diff --git a/apps/scheduler/executor/node.py b/apps/scheduler/executor/node.py index 4b2b8ce6d2a6d60a7e6db0418525977d0884e172..7584dce4f59c24f47ec4ef9e4abca1dd1840e38b 100644 --- a/apps/scheduler/executor/node.py +++ b/apps/scheduler/executor/node.py @@ -23,7 +23,7 @@ class StepNode: flag = False if not hasattr(call_cls, "_exec") or not inspect.isasyncgenfunction(call_cls._exec): # noqa: SLF001 flag = False - if not hasattr(call_cls, "cls_info") or not callable(call_cls.cls_info): + if not hasattr(call_cls, "info") or not callable(call_cls.info): flag = False return flag diff --git a/apps/scheduler/executor/step.py b/apps/scheduler/executor/step.py index 1614e224befb622f96c898df69e54cfefbb964b3..999df2ecaf339df760a49de84748a14ec5eb25ee 100644 --- a/apps/scheduler/executor/step.py +++ b/apps/scheduler/executor/step.py @@ -87,7 +87,7 @@ class StepExecutor(BaseExecutor): params.update(self.step.step.params) try: - self.obj, self.input = await call_cls.init(self, **params) + self.obj = await call_cls.instance(self, self.node, **params) except Exception: logger.exception("[StepExecutor] 初始化Call失败") raise @@ -121,20 +121,22 @@ class StepExecutor(BaseExecutor): await TaskManager.save_task(self.task.id, self.task) # 准备参数 params = { - "data": self.input, - "current_schema": self.obj.input_type.model_json_schema(), + "data": self.obj.input, + "current_schema": self.obj.input_model.model_json_schema( + override=self.node.override_input if self.node and self.node.override_input else {}, + ), } # 初始化填参 - slot_obj, slot_input = await Slot.init(self, **params) + slot_obj = await Slot.instance(self, self.node, **params) # 推送填参消息 - await self.push_message(EventType.STEP_INPUT.value, slot_input) + await self.push_message(EventType.STEP_INPUT.value, slot_obj.input) # 运行填参 - iterator = slot_obj.exec(self, slot_input) + iterator = slot_obj.exec(self, slot_obj.input) async for chunk in iterator: result: SlotOutput = SlotOutput.model_validate(chunk.content) - self.input.update(result.slot_data) + self.obj.input.update(result.slot_data) # 恢复State self.task.state.step_id = current_step_id # type: ignore[arg-type] @@ -181,10 +183,10 @@ class StepExecutor(BaseExecutor): logger.info("[StepExecutor] 运行步骤 %s", self.step.step.name) # 推送输入 - await self.push_message(EventType.STEP_INPUT.value, self.input) + await self.push_message(EventType.STEP_INPUT.value, self.obj.input) # 执行步骤 - iterator = self.obj.exec(self, self.input) + iterator = self.obj.exec(self, self.obj.input) try: content = await self._process_chunk(iterator, to_user=self.obj.to_user) diff --git a/apps/scheduler/pool/loader/call.py b/apps/scheduler/pool/loader/call.py index 6be3ea003692bef25446a00134a6dcff46e42ba9..bd465f17e4f0a100444151432b2fe8411ecc9d3b 100644 --- a/apps/scheduler/pool/loader/call.py +++ b/apps/scheduler/pool/loader/call.py @@ -37,7 +37,7 @@ class CallLoader: # 检查合法性 for call_id in system_call.__all__: call_cls = getattr(system_call, call_id) - call_info = call_cls.cls_info() + call_info = call_cls.info() call_metadata.append( CallPool( @@ -82,7 +82,7 @@ class CallLoader: for call_id in call_package.__all__: try: call_cls = getattr(call_package, call_id) - call_info = call_cls.cls_info() + call_info = call_cls.info() except AttributeError as e: err = f"[CallLoader] 载入工具call.{call_dir_name}.{call_id}失败:{e};跳过载入。" logger.info(err)