From f07e529d1af8dc1c8db1faca9fb23dcfd40f4c4b Mon Sep 17 00:00:00 2001 From: z30057876 Date: Tue, 4 Mar 2025 09:25:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0Executor=E5=92=8CCall?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/entities/scheduler.py | 2 +- apps/llm/patterns/__init__.py | 2 - apps/llm/patterns/executor.py | 104 +++---------- apps/llm/reasoning.py | 3 +- apps/manager/flow.py | 7 + apps/routers/auth.py | 2 +- apps/routers/chat.py | 8 +- apps/scheduler/call/api.py | 2 +- apps/scheduler/call/cmd/cmd.py | 2 +- apps/scheduler/call/convert.py | 2 +- apps/scheduler/call/core.py | 23 ++- apps/scheduler/call/empty.py | 11 +- apps/scheduler/call/llm.py | 38 ++++- apps/scheduler/call/rag.py | 22 +-- apps/scheduler/call/sql.py | 2 +- apps/scheduler/executor/flow.py | 206 ++++++++++++------------- apps/scheduler/executor/message.py | 17 +- apps/scheduler/pool/loader/flow.py | 2 + apps/scheduler/pool/loader/metadata.py | 2 + apps/scheduler/pool/loader/openapi.py | 2 + apps/scheduler/pool/pool.py | 2 +- apps/scheduler/yaml.py | 6 + deploy/secret_helper/main.py | 1 - 23 files changed, 237 insertions(+), 231 deletions(-) create mode 100644 apps/scheduler/yaml.py diff --git a/apps/entities/scheduler.py b/apps/entities/scheduler.py index 18a7c95aa..00003d0c8 100644 --- a/apps/entities/scheduler.py +++ b/apps/entities/scheduler.py @@ -15,7 +15,7 @@ class CallVars(BaseModel): 这一部分的参数由Executor填充,用户无法修改 """ - background: str = Field(description="上下文信息") + summary: str = Field(description="上下文信息") question: str = Field(description="改写后的用户输入") history: list[FlowStepHistory] = Field(description="Executor中历史工具的结构化数据", default=[]) task_id: str = Field(description="任务ID") diff --git a/apps/llm/patterns/__init__.py b/apps/llm/patterns/__init__.py index 20ef8bbcf..e50b5526a 100644 --- a/apps/llm/patterns/__init__.py +++ b/apps/llm/patterns/__init__.py @@ -6,7 +6,6 @@ from apps.llm.patterns.core import CorePattern from apps.llm.patterns.domain import Domain from apps.llm.patterns.executor import ( ExecutorSummary, - FinalThought, ExecutorThought, ) from apps.llm.patterns.json import Json @@ -17,7 +16,6 @@ __all__ = [ "CorePattern", "Domain", "ExecutorSummary", - "FinalThought", "ExecutorThought", "Json", "Recommend", diff --git a/apps/llm/patterns/executor.py b/apps/llm/patterns/executor.py index dc975900e..13b9bc86f 100644 --- a/apps/llm/patterns/executor.py +++ b/apps/llm/patterns/executor.py @@ -2,11 +2,9 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from collections.abc import AsyncGenerator -from textwrap import dedent from typing import Any, Optional -from apps.entities.scheduler import ExecutorBackground as ExecutorBackgroundEntity +from apps.entities.scheduler import ExecutorBackground from apps.llm.patterns.core import CorePattern from apps.llm.reasoning import ReasoningLLM @@ -31,7 +29,7 @@ class ExecutorThought(CorePattern): {tool_name} {tool_description} - {tool_output} + {tool_output} @@ -85,19 +83,24 @@ class ExecutorSummary(CorePattern): user_prompt: str = r""" - - 根据,结合给定的AI助手思考过程,生成一个完整的背景总结。这个总结将用于后续对话的上下文理解。 - 生成总结的要求如下: - 1. 突出重要信息点,例如时间、地点、人物、事件等。 - 2. 下面给出的事实条目若与历史记录有关,则可以在生成总结时作为已知信息。 - 3. 确保信息准确性,不得编造信息。 - 4. 总结应少于1000个字。 + 根据给定的AI助手思考过程和关键事实,生成一个三句话背景总结。这个总结将用于后续对话的上下文理解。 + + 生成总结的要求如下: + 1. 突出重要信息点,例如时间、地点、人物、事件等。 + 2. “关键事实”中的内容可在生成总结时作为已知信息。 + 3. 输出时请不要包含XML标签,确保信息准确性,不得编造信息。 + 4. 总结应少于3句话,应少于300个字。 - 思考过程(在标签中): - {thought} + AI助手思考过程将在标签中给出,关键事实将在标签中给出。 + - 关键事实(在标签中): - {facts} + + {thought} + + + + {facts} + 现在,请开始生成背景总结: """ @@ -109,12 +112,7 @@ class ExecutorSummary(CorePattern): async def generate(self, task_id: str, **kwargs) -> str: # noqa: ANN003 """进行初始背景生成""" - background: ExecutorBackgroundEntity = kwargs["background"] - - # 转化字符串 - messages = [] - for item in background.conversation: - messages += [{"role": item["role"], "content": item["content"]}] + background: ExecutorBackground = kwargs["background"] facts_str = "\n" for item in background.facts: @@ -138,67 +136,3 @@ class ExecutorSummary(CorePattern): result += chunk return result - - -class FinalThought(CorePattern): - """使用大模型生成Executor的最终结果""" - - user_prompt: str = r""" - - 你是AI智能助手,请回答用户的问题并满足以下要求: - - 1. 使用中文回答问题,不要使用其他语言。 - 2. 回答应当语气友好、通俗易懂,并包含尽可能完整的信息。 - 3. 回答时应结合思考过程。 - 4. 输出时请不要包含XML标签,不要编造任何信息。 - - 用户的问题将在标签中给出,你之前的思考过程将在标签中给出。 - - - - {question} - - - - {thought}{output} - - - 现在,请根据以上信息进行回答: - """ - """用户提示词""" - - - def __init__(self, system_prompt: Optional[str] = None, user_prompt: Optional[str] = None) -> None: - """初始化ExecutorResult模式""" - super().__init__(system_prompt, user_prompt) - - - async def generate(self, task_id: str, **kwargs) -> AsyncGenerator[str, Any]: # noqa: ANN003 - """进行ExecutorResult生成""" - question: str = kwargs["question"] - thought: str = kwargs["thought"] - final_output: dict[str, Any] = kwargs.get("final_output", {}) - - # 如果final_output不为空,则将final_output转换为字符串 - if final_output: - final_output_str = dedent(f""" - 工具提供了{final_output['type']}类型数据:`{final_output['data']}`。\ - 这些数据已经使用恰当的办法向用户进行了展示,所以无需重复。\ - 若类型为“schema”,说明用户的问题缺少回答所需的必要信息。\ - 我需要根据schema的具体内容分析缺失哪些信息,并提示用户补充。 - """) - else: - final_output_str = "" - - user_input = self.user_prompt.format( - question=question, - thought=thought, - output=final_output_str, - ) - messages = [ - {"role": "system", "content": ""}, - {"role": "user", "content": user_input}, - ] - - async for chunk in ReasoningLLM().call(task_id, messages, streaming=True, temperature=0.7): - yield chunk diff --git a/apps/llm/reasoning.py b/apps/llm/reasoning.py index fb74818ff..23d4a03e0 100644 --- a/apps/llm/reasoning.py +++ b/apps/llm/reasoning.py @@ -146,7 +146,8 @@ class ReasoningLLM: yield reason # 推送text - yield text + if text: + yield text # 整理结果 reasoning_content += reason diff --git a/apps/manager/flow.py b/apps/manager/flow.py index 124a1c25a..848b3c59c 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -417,6 +417,13 @@ class FlowManager: @staticmethod async def updata_flow_debug_by_app_and_flow_id(app_id: str, flow_id: str, debug: bool) -> bool: + """更新flow的debug状态 + + :param app_id: 应用的id + :param flow_id: 流的id + :param debug: 是否开启debug + :return: 是否更新成功 + """ try: flow_loader = FlowLoader() flow = await flow_loader.load(app_id, flow_id) diff --git a/apps/routers/auth.py b/apps/routers/auth.py index ae856ba02..33e3baf5e 100644 --- a/apps/routers/auth.py +++ b/apps/routers/auth.py @@ -176,7 +176,7 @@ async def oidc_redirect(action: Annotated[str, Query()] = "login"): # noqa: ANN ).model_dump(exclude_none=True, by_alias=True)) -# TODO(zwt): OIDC主动触发logout +# TODO: OIDC主动触发logout # 002 @router.post("/logout", response_model=ResponseData) async def oidc_logout(token: str): # noqa: ANN201 diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 3fae9bba5..f03e1e0d6 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -22,9 +22,7 @@ from apps.dependency import ( ) from apps.entities.request_data import RequestData from apps.entities.response_data import ResponseData -from apps.manager.appcenter import AppCenterManager from apps.manager.blacklist import QuestionBlacklistManager, UserBlacklistManager -from apps.routers.mock import mock_data from apps.scheduler.scheduler.context import save_data from apps.service.activity import Activity @@ -127,11 +125,7 @@ async def chat( if await Activity.is_active(user_sub): raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests") - if post_body.app and post_body.app.app_id: - # await AppCenterManager.update_recent_app(user_sub, post_body.app.app_id) - res = mock_data(post_body, user_sub, session_id) - else: - res = chat_generator(post_body, user_sub, session_id) + res = chat_generator(post_body, user_sub, session_id) return StreamingResponse( content=res, media_type="text/event-stream", diff --git a/apps/scheduler/call/api.py b/apps/scheduler/call/api.py index c267531ea..d6753a59c 100644 --- a/apps/scheduler/call/api.py +++ b/apps/scheduler/call/api.py @@ -39,7 +39,7 @@ class API(CoreCall, ret_type=APIOutput): auth: dict[str, Any] = Field(description="API鉴权信息", default={}) - async def __call__(self, syscall_vars: CallVars, **_kwargs: Any) -> APIOutput: + async def exec(self, syscall_vars: CallVars, **_kwargs: Any) -> APIOutput: """调用API,然后返回LLM解析后的数据""" self._session = aiohttp.ClientSession() try: diff --git a/apps/scheduler/call/cmd/cmd.py b/apps/scheduler/call/cmd/cmd.py index 790095c6c..7a23a574a 100644 --- a/apps/scheduler/call/cmd/cmd.py +++ b/apps/scheduler/call/cmd/cmd.py @@ -27,6 +27,6 @@ class Cmd(CoreCall): name: str = "cmd" description: str = "根据BTDL描述文件,生成命令。" - async def __call__(self, _slot_data: dict[str, Any]) -> _CmdOutput: + async def exec(self, _slot_data: dict[str, Any]) -> _CmdOutput: """调用Cmd工具""" pass diff --git a/apps/scheduler/call/convert.py b/apps/scheduler/call/convert.py index 80b7fdd96..1d7fdd8f8 100644 --- a/apps/scheduler/call/convert.py +++ b/apps/scheduler/call/convert.py @@ -38,7 +38,7 @@ class Convert(CoreCall): description: str = "从上一步的工具的原始JSON返回结果中,提取特定字段的信息。" - async def __call__(self, _slot_data: dict[str, Any]) -> _ConvertOutput: + async def exec(self, _slot_data: dict[str, Any]) -> _ConvertOutput: """调用Convert工具 :param _slot_data: 经用户确认后的参数(目前未使用) diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py index 59551dff5..acc6fb0f2 100644 --- a/apps/scheduler/call/core.py +++ b/apps/scheduler/call/core.py @@ -3,9 +3,10 @@ 所有Call类必须继承此类,并实现所有方法。 Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ +from collections.abc import AsyncGenerator from typing import Any, ClassVar -from pydantic import BaseModel, Field +from pydantic import BaseModel, ConfigDict, Field from apps.entities.scheduler import CallVars @@ -16,6 +17,11 @@ class CoreCall(BaseModel): name: ClassVar[str] = Field(description="Call的名称") description: ClassVar[str] = Field(description="Call的描述") + model_config = ConfigDict( + arbitrary_types_allowed=True, + extra="allow", + ) + ret_type: ClassVar[type[BaseModel]] def __init_subclass__(cls, ret_type: type[BaseModel], **kwargs: Any) -> None: @@ -23,12 +29,17 @@ class CoreCall(BaseModel): super().__init_subclass__(**kwargs) cls.ret_type = ret_type - class Config: - """Pydantic 配置类""" - arbitrary_types_allowed = True + async def init(self, syscall_vars: CallVars, **_kwargs: Any) -> dict[str, Any]: + """初始化Call类,并返回Call的输入""" + raise NotImplementedError - async def __call__(self, syscall_vars: CallVars, **kwargs: Any) -> type[BaseModel]: - """Call类实例的调用方法""" + async def exec(self) -> dict[str, Any]: + """Call类实例的非流式输出方法""" raise NotImplementedError + + + async def stream(self) -> AsyncGenerator[str, None]: + """Call类实例的流式输出方法""" + yield "" diff --git a/apps/scheduler/call/empty.py b/apps/scheduler/call/empty.py index 158cd0f9b..87d1fdea1 100644 --- a/apps/scheduler/call/empty.py +++ b/apps/scheduler/call/empty.py @@ -18,6 +18,11 @@ class Empty(CoreCall, ret_type=EmptyData): description: ClassVar[str] = "空白节点,用于占位" - async def __call__(self, _syscall_vars: CallVars, **_kwargs: Any) -> EmptyData: - """空Call""" - return EmptyData() + async def init(self, _syscall_vars: CallVars, **_kwargs: Any) -> dict[str, Any]: + """初始化""" + return {} + + + async def exec(self) -> dict[str, Any]: + """执行""" + return EmptyData().model_dump() diff --git a/apps/scheduler/call/llm.py b/apps/scheduler/call/llm.py index bc67c6f9c..ad38a5294 100644 --- a/apps/scheduler/call/llm.py +++ b/apps/scheduler/call/llm.py @@ -2,6 +2,8 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ +import logging +from collections.abc import AsyncGenerator from datetime import datetime from textwrap import dedent from typing import Any, ClassVar @@ -15,6 +17,7 @@ from apps.entities.scheduler import CallError, CallVars from apps.llm.reasoning import ReasoningLLM from apps.scheduler.call.core import CoreCall +logger = logging.getLogger("ray") LLM_DEFAULT_PROMPT = dedent( r""" @@ -52,13 +55,14 @@ class LLM(CoreCall, ret_type=LLMNodeOutput): system_prompt: str = Field(description="大模型系统提示词", default="") user_prompt: str = Field(description="大模型用户提示词", default=LLM_DEFAULT_PROMPT) - async def __call__(self, syscall_vars: CallVars, **_kwargs: Any) -> LLMNodeOutput: - """运行LLM Call""" + + async def _prepare_message(self, syscall_vars: CallVars) -> list[dict[str, Any]]: + """准备消息""" # 参数 time = datetime.now(tz=pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") formatter = { "time": time, - "context": syscall_vars.background, + "context": syscall_vars.summary, "question": syscall_vars.question, } @@ -83,17 +87,39 @@ class LLM(CoreCall, ret_type=LLMNodeOutput): except Exception as e: raise CallError(message=f"用户提示词渲染失败:{e!s}", data={}) from e - message = [ + return [ {"role": "system", "content": system_input}, {"role": "user", "content": user_input}, ] + + async def init(self, syscall_vars: CallVars, **_kwargs: Any) -> dict[str, Any]: + """初始化LLM工具""" + self._message = await self._prepare_message(syscall_vars) + self._task_id = syscall_vars.task_id + return { + "task_id": self._task_id, + "message": self._message, + } + + + async def exec(self) -> dict[str, Any]: + """运行LLM Call""" try: result = "" - async for chunk in ReasoningLLM().call(task_id=syscall_vars.task_id, messages=message): + async for chunk in ReasoningLLM().call(task_id=self._task_id, messages=self._message): result += chunk except Exception as e: raise CallError(message=f"大模型调用失败:{e!s}", data={}) from e result = result.strip().strip("\n") - return LLMNodeOutput(message=result) + return LLMNodeOutput(message=result).model_dump(exclude_none=True, by_alias=True) + + + async def stream(self) -> AsyncGenerator[str, None]: + """流式输出""" + try: + async for chunk in ReasoningLLM().call(task_id=self._task_id, messages=self._message): + yield chunk + except Exception as e: + raise CallError(message=f"大模型流式输出失败:{e!s}", data={}) from e diff --git a/apps/scheduler/call/rag.py b/apps/scheduler/call/rag.py index 5ac642928..fc717c8c4 100644 --- a/apps/scheduler/call/rag.py +++ b/apps/scheduler/call/rag.py @@ -33,23 +33,27 @@ class RAG(CoreCall, ret_type=RAGOutput): retrieval_mode: Literal["chunk", "full_text"] = Field(description="检索模式", default="chunk") - async def __call__(self, syscall_vars: CallVars, **_kwargs: Any) -> RAGOutput: - """调用RAG工具""" - params_dict = { + async def init(self, syscall_vars: CallVars, **_kwargs: Any) -> dict[str, Any]: + """初始化RAG工具""" + self._params_dict = { "kb_sn": self.knowledge_base, "top_k": self.top_k, "retrieval_mode": self.retrieval_mode, "content": syscall_vars.question, } - url = config["RAG_HOST"].rstrip("/") + "/chunk/get" - headers = { + self._url = config["RAG_HOST"].rstrip("/") + "/chunk/get" + self._headers = { "Content-Type": "application/json", } + return self._params_dict + + + async def exec(self) -> dict[str, Any]: + """调用RAG工具""" # 发送 GET 请求 - session = aiohttp.ClientSession() - async with session.post(url, headers=headers, json=params_dict) as response: + async with aiohttp.ClientSession() as session, session.post(self._url, headers=self._headers, json=self._params_dict) as response: # 检查响应状态码 if response.status == status.HTTP_200_OK: result = await response.json() @@ -62,7 +66,7 @@ class RAG(CoreCall, ret_type=RAGOutput): return RAGOutput( corpus=corpus, - ) + ).model_dump(exclude_none=True, by_alias=True) text = await response.text() logger.error("[RAG] 调用失败:%s", text) @@ -70,7 +74,7 @@ class RAG(CoreCall, ret_type=RAGOutput): raise CallError( message=f"rag调用失败:{text}", data={ - "question": syscall_vars.question, + "question": self._params_dict["content"], "status": response.status, "text": text, }, diff --git a/apps/scheduler/call/sql.py b/apps/scheduler/call/sql.py index 55b8d9d61..312c5faaf 100644 --- a/apps/scheduler/call/sql.py +++ b/apps/scheduler/call/sql.py @@ -41,7 +41,7 @@ class SQL(CoreCall): self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(300)) - async def __call__(self, _slot_data: dict[str, Any]) -> SQLOutput: + async def exec(self, _slot_data: dict[str, Any]) -> SQLOutput: """运行SQL工具""" # 获取必要参数 syscall_vars: CallVars = getattr(self, "_syscall_vars") diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index 4bc57791c..89ce109f1 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -3,6 +3,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ import asyncio +import inspect import logging from typing import Any, Optional @@ -15,16 +16,18 @@ from apps.entities.enum_var import StepStatus from apps.entities.flow import Flow, FlowError, Step from apps.entities.request_data import RequestDataApp from apps.entities.scheduler import CallVars, ExecutorBackground -from apps.entities.task import ExecutorState, TaskBlock +from apps.entities.task import ExecutorState, FlowStepHistory, TaskBlock from apps.llm.patterns.executor import ExecutorSummary from apps.manager.node import NodeManager from apps.manager.task import TaskManager from apps.scheduler.call.core import CoreCall +from apps.scheduler.call.llm import LLM, LLMNodeOutput from apps.scheduler.executor.message import ( push_flow_start, push_flow_stop, push_step_input, push_step_output, + push_text_output, ) from apps.scheduler.slot.slot import Slot @@ -54,10 +57,7 @@ class Executor(BaseModel): # 尝试恢复State if self.task.flow_state: self.flow_state = self.task.flow_state - # 如果flow_context为空,则从flow_history中恢复 - if not self.task.flow_context: - self.task.flow_context = await TaskManager.get_flow_history_by_task_id(self.task.record.task_id) - self.task.new_context = [] + self.task.flow_context = await TaskManager.get_flow_history_by_task_id(self.task.record.task_id) else: # 创建ExecutorState self.flow_state = ExecutorState( @@ -72,6 +72,8 @@ class Executor(BaseModel): ) # 是否结束运行 self._can_continue = True + # 向用户输出的内容 + self._final_answer = "" async def _check_cls(self, call_cls: Any) -> bool: @@ -81,46 +83,36 @@ class Executor(BaseModel): flag = False if not hasattr(call_cls, "description") or not isinstance(call_cls.description, str): flag = False - if not callable(call_cls) or not asyncio.iscoroutinefunction(call_cls.__call__): + if not hasattr(call_cls, "exec") or not asyncio.iscoroutinefunction(call_cls.exec): + flag = False + if not hasattr(call_cls, "stream") or not inspect.isasyncgenfunction(call_cls.stream): flag = False return flag - # TODO + # TODO: 默认错误处理步骤 async def _run_error(self, step: FlowError) -> dict[str, Any]: """运行错误处理步骤""" return {} - async def _get_call_cls(self, node_id: str) -> Optional[type[CoreCall]]: + async def _get_call_cls(self, node_id: str) -> type[CoreCall]: """获取并验证Call类""" # 检查flow_state是否为空 if not self.flow_state: - logger.error("[FlowExecutor] flow_state为空") - return None + err = "[FlowExecutor] flow_state为空" + raise ValueError(err) # 获取对应Node的call_id - try: - call_id = await NodeManager.get_node_call_id(node_id) - except Exception: - logger.exception("[FlowExecutor] 获取工具%s的call_id时发生错误", node_id) - self.flow_state.status = StepStatus.ERROR - return None - + call_id = await NodeManager.get_node_call_id(node_id) # 从Pool中获取对应的Call pool = ray.get_actor("pool") - try: - call_cls: type[CoreCall] = await pool.get_call.remote(call_id) - except Exception: - logger.exception("[FlowExecutor] 载入工具%s时发生错误", node_id) - self.flow_state.status = StepStatus.ERROR - return None + call_cls: type[CoreCall] = await pool.get_call.remote(call_id) # 检查Call合法性 if not await self._check_cls(call_cls): - logger.error("[FlowExecutor] 工具 %s 不符合Call标准要求", node_id) - self.flow_state.status = StepStatus.ERROR - return None + err = f"[FlowExecutor] 工具 {node_id} 不符合Call标准要求" + raise ValueError(err) return call_cls @@ -151,7 +143,7 @@ class Executor(BaseModel): self._can_continue = False self.flow_state.status = StepStatus.RUNNING # 推送空输入输出 - await push_step_input(self.task.record.task_id, self.queue, self.flow_state, self.flow) + await push_step_input(self.task.record.task_id, self.queue, self.flow_state, {}) self.flow_state.status = StepStatus.PARAM await push_step_output(self.task.record.task_id, self.queue, self.flow_state, {}) return False, None @@ -165,44 +157,60 @@ class Executor(BaseModel): return None - async def _execute_call(self, call_obj: Any, sys_vars: CallVars, node_id: str) -> dict[str, Any]: + async def _execute_call(self, call_obj: Any, *, is_final_answer: bool) -> dict[str, Any]: """执行Call并处理结果""" - if not call_obj: - logger.error("[FlowExecutor] 工具%s不存在", node_id) - return {} - - try: - result: BaseModel = await call_obj(sys_vars) - except Exception: - logger.exception("[FlowExecutor] 执行工具%s时发生错误", node_id) - self.flow_state.status = StepStatus.ERROR - return {} - - try: - result_data = result.model_dump(exclude_none=True, by_alias=True) - except Exception: - logger.exception("[FlowExecutor] 无法处理工具%s返回值", node_id) - self.flow_state.status = StepStatus.ERROR - return {} - + # call_obj一定合法;开始判断是否为最终结果 + if is_final_answer and isinstance(call_obj, LLM): + # 最后一步 & 是大模型步骤,直接流式输出 + async for chunk in call_obj.stream(): + await push_text_output(self.task.record.task_id, self.queue, chunk) + self._final_answer += chunk + self.flow_state.status = StepStatus.SUCCESS + return { + "message": self._final_answer, + } + + # 其他情况:先运行步骤,得到结果 + result: dict[str, Any] = await call_obj.exec() + if is_final_answer: + if call_obj.name == "Convert": + # 如果是Convert,直接输出转换之后的结果 + self._final_answer += result["message"] + await push_text_output(self.task.record.task_id, self.queue, self._final_answer) + self.flow_state.status = StepStatus.SUCCESS + return { + "message": self._final_answer, + } + # 其他工具,加一步大模型过程 + # FIXME: 使用单独的Prompt + self.flow_state.status = StepStatus.SUCCESS + return { + "message": self._final_answer, + } + + # 其他情况:返回结果 self.flow_state.status = StepStatus.SUCCESS - return result_data + return result async def _run_step(self, step_id: str, step_data: Step) -> None: """运行单个步骤""" logger.info("[FlowExecutor] 运行步骤 %s", step_data.name) - # 更新State + # 获取task + task_actor = ray.get_actor("task") + task: TaskBlock = await task_actor.get_task.remote(self.task.record.task_id) + # State写入ID和运行状态 self.flow_state.step_id = step_id + self.flow_state.step_name = step_id # FIXME: Step需要加名称和描述字段 self.flow_state.status = StepStatus.RUNNING - # 特殊类型的Node,跳过执行 - node_id = step_data.node + # 查找下一个步骤;判断是否是end前最后一步 + next_step = await self._get_next_step() + is_final_answer = next_step == "end" + # 获取并验证Call类 + node_id = step_data.node call_cls = await self._get_call_cls(node_id) - if call_cls is None: - logger.error("[FlowExecutor] Node %s 对应的工具不存在", node_id) - return # 准备系统变量 history = list(self.task.flow_context.values())[-STEP_HISTORY_SIZE:] @@ -212,16 +220,12 @@ class Executor(BaseModel): flow_id=self.post_body_app.flow_id, session_id=self.task.session_id, history=history, - background=self.flow_state.ai_summary, + summary=self.flow_state.ai_summary, ) # 初始化Call - try: - call_obj = call_cls.model_validate(step_data.params) - except Exception: - logger.exception("[FlowExecutor] 初始化工具 %s 时发生错误", call_cls.name) - self.flow_state.status = StepStatus.ERROR - return + call_obj = call_cls.model_validate(step_data.params) + input_data = await call_obj.init(sys_vars) # TODO: 处理slots # can_continue, slot_data = await self._process_slots(call_obj) @@ -229,54 +233,52 @@ class Executor(BaseModel): # return # 推送步骤输入 - await push_step_input(self.task.record.task_id, self.queue, self.flow_state, self.flow_state.filled_data) + await push_step_input(self.task.record.task_id, self.queue, self.flow_state, input_data) # 执行Call并获取结果 - result_data = await self._execute_call(call_obj, sys_vars, node_id) + result_data = await self._execute_call(call_obj, is_final_answer=is_final_answer) + task.flow_context[step_id] = FlowStepHistory( + task_id=self.task.record.task_id, + flow_id=self.post_body_app.flow_id, + step_id=step_id, + status=self.flow_state.status, + input_data=input_data, + output_data=result_data, + ) + await task_actor.set_task.remote(self.task.record.task_id, task) # 推送输出 await push_step_output(self.task.record.task_id, self.queue, self.flow_state, result_data) - return - - async def _handle_last_step(self) -> None: - """处理最后一步""" - # 如果当前步骤为结束,则直接返回 - if self.flow_state.step_id == "end": - return + # 更新下一步 + self.flow_state.step_id = next_step - async def _handle_next_step(self) -> None: - """处理下一步""" + async def _get_next_step(self) -> str: + """在当前步骤执行前,尝试获取下一步""" # 如果当前步骤为结束,则直接返回 - if self.flow_state.step_id == "end": - return + if self.flow_state.step_id == "end" or not self.flow_state.step_id: + # 如果是最后一步,设置停止标志 + self._can_continue = False + return "" - next_nodes = [] + if self.flow.steps[self.flow_state.step_id].node == "Choice": + # 如果是选择步骤,那么现在还不知道下一步是谁,直接返回 + return "" + + next_steps = [] # 遍历Edges,查找下一个节点 for edge in self.flow.edges: if edge.edge_from == self.flow_state.step_id: - next_nodes += [edge.edge_to] + next_steps += [edge.edge_to] - # TODO - # 处理分支(cloice工具) - # if self._flow_data.steps[self._next_step].call_type == "choice" and result.extra is not None: - # self._next_step = result.extra.get("next_step") - # return + # 如果step没有任何出边,直接跳到end + if not next_steps: + return "end" - # 处理下一步 - if not next_nodes: - self.flow_state.step_id = "end" - self.flow_state.step_name = "结束" - else: - self.flow_state.step_id = next_nodes[0] - self.flow_state.step_name = next_nodes[0] - # self.flow_state.step_name = self.flow.steps[next_nodes[0]].name - - logger.info("[FlowExecutor] 下一步 %s", self.flow_state.step_id) - # 如果是最后一步,设置停止标志 - if self.flow_state.step_id == "end": - self._can_continue = False + # FIXME: 目前只使用第一个出边 + logger.info("[FlowExecutor] 下一步 %s", next_steps[0]) + return next_steps[0] async def run(self) -> None: @@ -285,27 +287,25 @@ class Executor(BaseModel): 数据通过向Queue发送消息的方式传输 """ logger.info("[FlowExecutor] 运行工作流") - # 推送Flow开始 + # 推送Flow开始消息 await push_flow_start(self.task.record.task_id, self.queue, self.flow_state, self.question) + # 如果允许继续运行Flow while self._can_continue: - # 当前步骤不存在 - if self.flow_state.step_id not in self.flow.steps: + # Flow定义中找不到step + if not self.flow_state.step_id or (self.flow_state.step_id not in self.flow.steps): logger.error("[FlowExecutor] 当前步骤 %s 不存在", self.flow_state.step_id) self.flow_state.status = StepStatus.ERROR if self.flow_state.status == StepStatus.ERROR: - # 当前步骤为错误处理步骤 + # 执行错误处理步骤 logger.warning("[FlowExecutor] Executor出错,执行错误处理步骤") step = self.flow.on_error await self._run_error(step) else: - # 当前步骤为正常步骤 + # 执行正常步骤 step = self.flow.steps[self.flow_state.step_id] await self._run_step(self.flow_state.step_id, step) - # 处理下一步 - await self._handle_next_step() - - # Flow停止运行,推送消息 - await push_flow_stop(self.task.record.task_id, self.queue, self.flow_state, self.flow) + # 推送Flow停止消息 + await push_flow_stop(self.task.record.task_id, self.queue, self.flow_state, self.flow, self._final_answer) diff --git a/apps/scheduler/executor/message.py b/apps/scheduler/executor/message.py index 609debace..03a8a3fd1 100644 --- a/apps/scheduler/executor/message.py +++ b/apps/scheduler/executor/message.py @@ -12,6 +12,7 @@ from apps.entities.flow import Flow from apps.entities.message import ( FlowStartContent, FlowStopContent, + TextAddContent, ) from apps.entities.task import ( ExecutorState, @@ -98,14 +99,28 @@ async def assemble_flow_stop_content(state: ExecutorState, flow: Flow) -> FlowSt # data=chart_option, # ) -async def push_flow_stop(task_id: str, queue: actor.ActorHandle, state: ExecutorState, flow: Flow) -> None: +async def push_flow_stop(task_id: str, queue: actor.ActorHandle, state: ExecutorState, flow: Flow, final_answer: str) -> None: """推送Flow结束""" task_actor = ray.get_actor("task") task: TaskBlock = await task_actor.get_task.remote(task_id) # 设置state task.flow_state = state + # 保存最终输出 + task.record.content.answer = final_answer content = await assemble_flow_stop_content(state, flow) # 推送Stop消息 await queue.push_output.remote(task, event_type=EventType.FLOW_STOP, data=content.model_dump(exclude_none=True, by_alias=True)) # type: ignore[attr-defined] await task_actor.set_task.remote(task_id, task) + + +async def push_text_output(task_id: str, queue: actor.ActorHandle, text: str) -> None: + """推送文本输出""" + task_actor = ray.get_actor("task") + task: TaskBlock = await task_actor.get_task.remote(task_id) + + content = TextAddContent( + text=text, + ) + # 推送消息 + await queue.push_output.remote(task, event_type=EventType.TEXT_ADD, data=content.model_dump(exclude_none=True, by_alias=True)) # type: ignore[attr-defined] diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 479c01311..f69e791e3 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -15,6 +15,7 @@ from apps.constants import APP_DIR, FLOW_DIR from apps.entities.enum_var import EdgeType from apps.entities.flow import Flow from apps.manager.node import NodeManager +from apps.scheduler.yaml import str_presenter logger = logging.getLogger("ray") @@ -124,6 +125,7 @@ class FlowLoader: } async with aiofiles.open(flow_path, mode="w", encoding="utf-8") as f: + yaml.add_representer(str, str_presenter) await f.write(yaml.dump( flow_dict, allow_unicode=True, diff --git a/apps/scheduler/pool/loader/metadata.py b/apps/scheduler/pool/loader/metadata.py index 6f1665bae..408d45019 100644 --- a/apps/scheduler/pool/loader/metadata.py +++ b/apps/scheduler/pool/loader/metadata.py @@ -15,6 +15,7 @@ from apps.entities.flow import ( AppMetadata, ServiceMetadata, ) +from apps.scheduler.yaml import str_presenter class MetadataLoader: @@ -98,6 +99,7 @@ class MetadataLoader: data = metadata # 使用UTF-8保存YAML,忽略部分乱码 + yaml.add_representer(str, str_presenter) yaml_dict = yaml.dump( jsonable_encoder(data, exclude={"hashes"}), allow_unicode=True, diff --git a/apps/scheduler/pool/loader/openapi.py b/apps/scheduler/pool/loader/openapi.py index c2d864fbc..6c56d6648 100644 --- a/apps/scheduler/pool/loader/openapi.py +++ b/apps/scheduler/pool/loader/openapi.py @@ -19,6 +19,7 @@ from apps.scheduler.openapi import ( ReducedOpenAPISpec, reduce_openapi_spec, ) +from apps.scheduler.yaml import str_presenter @ray.remote @@ -164,6 +165,7 @@ class OpenAPILoader: """在文件系统上保存Service,并更新数据库""" try: + yaml.add_representer(str, str_presenter) yaml_data = yaml.safe_dump(yaml_dict) await yaml_path.write_text(yaml_data) except Exception as e: diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index 45d91c7bf..7de1f0d93 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -74,7 +74,7 @@ class Pool: await app_loader.load(app, checker.hashes[hash_key]) - # TODO + # TODO: 使用统一的保存入口 async def save(self, *, is_deletion: bool = False) -> None: """保存【单个】资源""" pass diff --git a/apps/scheduler/yaml.py b/apps/scheduler/yaml.py new file mode 100644 index 000000000..e5230a43d --- /dev/null +++ b/apps/scheduler/yaml.py @@ -0,0 +1,6 @@ +"""YAML表示器""" + +def str_presenter(dumper, data): # noqa: ANN001, ANN201, D103 + if "\n" in data: + return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|") + return dumper.represent_scalar("tag:yaml.org,2002:str", data) diff --git a/deploy/secret_helper/main.py b/deploy/secret_helper/main.py index be90de10d..5436c051a 100644 --- a/deploy/secret_helper/main.py +++ b/deploy/secret_helper/main.py @@ -5,7 +5,6 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. from pathlib import Path import yaml - from file_copy import copy from job import job -- Gitee