diff --git a/apps/entities/scheduler.py b/apps/entities/scheduler.py
index 18a7c95aa7ec44f9a05d053ed673cc11d24fa2ed..00003d0c835a48940285ea4e017b53fb9429006b 100644
--- a/apps/entities/scheduler.py
+++ b/apps/entities/scheduler.py
@@ -15,7 +15,7 @@ class CallVars(BaseModel):
这一部分的参数由Executor填充,用户无法修改
"""
- background: str = Field(description="上下文信息")
+ summary: str = Field(description="上下文信息")
question: str = Field(description="改写后的用户输入")
history: list[FlowStepHistory] = Field(description="Executor中历史工具的结构化数据", default=[])
task_id: str = Field(description="任务ID")
diff --git a/apps/llm/patterns/__init__.py b/apps/llm/patterns/__init__.py
index 20ef8bbcf740d00edcaf7813307f46caa18893e7..e50b5526ae268fcafbff8eface14f77c1be47ce8 100644
--- a/apps/llm/patterns/__init__.py
+++ b/apps/llm/patterns/__init__.py
@@ -6,7 +6,6 @@ from apps.llm.patterns.core import CorePattern
from apps.llm.patterns.domain import Domain
from apps.llm.patterns.executor import (
ExecutorSummary,
- FinalThought,
ExecutorThought,
)
from apps.llm.patterns.json import Json
@@ -17,7 +16,6 @@ __all__ = [
"CorePattern",
"Domain",
"ExecutorSummary",
- "FinalThought",
"ExecutorThought",
"Json",
"Recommend",
diff --git a/apps/llm/patterns/executor.py b/apps/llm/patterns/executor.py
index dc975900e708a75d03f867d3967754d43d04280d..13b9bc86f7186df5913ee20b7a157207bb7f5eda 100644
--- a/apps/llm/patterns/executor.py
+++ b/apps/llm/patterns/executor.py
@@ -2,11 +2,9 @@
Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved.
"""
-from collections.abc import AsyncGenerator
-from textwrap import dedent
from typing import Any, Optional
-from apps.entities.scheduler import ExecutorBackground as ExecutorBackgroundEntity
+from apps.entities.scheduler import ExecutorBackground
from apps.llm.patterns.core import CorePattern
from apps.llm.reasoning import ReasoningLLM
@@ -31,7 +29,7 @@ class ExecutorThought(CorePattern):
{tool_name}
{tool_description}
-
+
@@ -85,19 +83,24 @@ class ExecutorSummary(CorePattern):
user_prompt: str = r"""
-
- 根据,结合给定的AI助手思考过程,生成一个完整的背景总结。这个总结将用于后续对话的上下文理解。
- 生成总结的要求如下:
- 1. 突出重要信息点,例如时间、地点、人物、事件等。
- 2. 下面给出的事实条目若与历史记录有关,则可以在生成总结时作为已知信息。
- 3. 确保信息准确性,不得编造信息。
- 4. 总结应少于1000个字。
+ 根据给定的AI助手思考过程和关键事实,生成一个三句话背景总结。这个总结将用于后续对话的上下文理解。
+
+ 生成总结的要求如下:
+ 1. 突出重要信息点,例如时间、地点、人物、事件等。
+ 2. “关键事实”中的内容可在生成总结时作为已知信息。
+ 3. 输出时请不要包含XML标签,确保信息准确性,不得编造信息。
+ 4. 总结应少于3句话,应少于300个字。
- 思考过程(在标签中):
- {thought}
+ AI助手思考过程将在标签中给出,关键事实将在标签中给出。
+
- 关键事实(在标签中):
- {facts}
+
+ {thought}
+
+
+
+ {facts}
+
现在,请开始生成背景总结:
"""
@@ -109,12 +112,7 @@ class ExecutorSummary(CorePattern):
async def generate(self, task_id: str, **kwargs) -> str: # noqa: ANN003
"""进行初始背景生成"""
- background: ExecutorBackgroundEntity = kwargs["background"]
-
- # 转化字符串
- messages = []
- for item in background.conversation:
- messages += [{"role": item["role"], "content": item["content"]}]
+ background: ExecutorBackground = kwargs["background"]
facts_str = "\n"
for item in background.facts:
@@ -138,67 +136,3 @@ class ExecutorSummary(CorePattern):
result += chunk
return result
-
-
-class FinalThought(CorePattern):
- """使用大模型生成Executor的最终结果"""
-
- user_prompt: str = r"""
-
- 你是AI智能助手,请回答用户的问题并满足以下要求:
-
- 1. 使用中文回答问题,不要使用其他语言。
- 2. 回答应当语气友好、通俗易懂,并包含尽可能完整的信息。
- 3. 回答时应结合思考过程。
- 4. 输出时请不要包含XML标签,不要编造任何信息。
-
- 用户的问题将在标签中给出,你之前的思考过程将在标签中给出。
-
-
-
- {question}
-
-
-
- {thought}{output}
-
-
- 现在,请根据以上信息进行回答:
- """
- """用户提示词"""
-
-
- def __init__(self, system_prompt: Optional[str] = None, user_prompt: Optional[str] = None) -> None:
- """初始化ExecutorResult模式"""
- super().__init__(system_prompt, user_prompt)
-
-
- async def generate(self, task_id: str, **kwargs) -> AsyncGenerator[str, Any]: # noqa: ANN003
- """进行ExecutorResult生成"""
- question: str = kwargs["question"]
- thought: str = kwargs["thought"]
- final_output: dict[str, Any] = kwargs.get("final_output", {})
-
- # 如果final_output不为空,则将final_output转换为字符串
- if final_output:
- final_output_str = dedent(f"""
- 工具提供了{final_output['type']}类型数据:`{final_output['data']}`。\
- 这些数据已经使用恰当的办法向用户进行了展示,所以无需重复。\
- 若类型为“schema”,说明用户的问题缺少回答所需的必要信息。\
- 我需要根据schema的具体内容分析缺失哪些信息,并提示用户补充。
- """)
- else:
- final_output_str = ""
-
- user_input = self.user_prompt.format(
- question=question,
- thought=thought,
- output=final_output_str,
- )
- messages = [
- {"role": "system", "content": ""},
- {"role": "user", "content": user_input},
- ]
-
- async for chunk in ReasoningLLM().call(task_id, messages, streaming=True, temperature=0.7):
- yield chunk
diff --git a/apps/llm/reasoning.py b/apps/llm/reasoning.py
index fb74818ffff23b1cc71ed819e76b313504f8defe..23d4a03e0bcb417e01c77afda4809900abadeef6 100644
--- a/apps/llm/reasoning.py
+++ b/apps/llm/reasoning.py
@@ -146,7 +146,8 @@ class ReasoningLLM:
yield reason
# 推送text
- yield text
+ if text:
+ yield text
# 整理结果
reasoning_content += reason
diff --git a/apps/manager/flow.py b/apps/manager/flow.py
index 124a1c25ad30fc123e93a993835f8d0bf668d025..848b3c59cc4ddbcb339a7b734e734a73532a3e2d 100644
--- a/apps/manager/flow.py
+++ b/apps/manager/flow.py
@@ -417,6 +417,13 @@ class FlowManager:
@staticmethod
async def updata_flow_debug_by_app_and_flow_id(app_id: str, flow_id: str, debug: bool) -> bool:
+ """更新flow的debug状态
+
+ :param app_id: 应用的id
+ :param flow_id: 流的id
+ :param debug: 是否开启debug
+ :return: 是否更新成功
+ """
try:
flow_loader = FlowLoader()
flow = await flow_loader.load(app_id, flow_id)
diff --git a/apps/routers/auth.py b/apps/routers/auth.py
index ae856ba02d28621f1d4cfab1f1840b9d45b53b1a..33e3baf5e8506ec593426f4de75d6e1be3e38804 100644
--- a/apps/routers/auth.py
+++ b/apps/routers/auth.py
@@ -176,7 +176,7 @@ async def oidc_redirect(action: Annotated[str, Query()] = "login"): # noqa: ANN
).model_dump(exclude_none=True, by_alias=True))
-# TODO(zwt): OIDC主动触发logout
+# TODO: OIDC主动触发logout
# 002
@router.post("/logout", response_model=ResponseData)
async def oidc_logout(token: str): # noqa: ANN201
diff --git a/apps/routers/chat.py b/apps/routers/chat.py
index 3fae9bba53bb86ae5a470bac57b92166d529e54c..f03e1e0d672f05090763c64da70867d027f83898 100644
--- a/apps/routers/chat.py
+++ b/apps/routers/chat.py
@@ -22,9 +22,7 @@ from apps.dependency import (
)
from apps.entities.request_data import RequestData
from apps.entities.response_data import ResponseData
-from apps.manager.appcenter import AppCenterManager
from apps.manager.blacklist import QuestionBlacklistManager, UserBlacklistManager
-from apps.routers.mock import mock_data
from apps.scheduler.scheduler.context import save_data
from apps.service.activity import Activity
@@ -127,11 +125,7 @@ async def chat(
if await Activity.is_active(user_sub):
raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests")
- if post_body.app and post_body.app.app_id:
- # await AppCenterManager.update_recent_app(user_sub, post_body.app.app_id)
- res = mock_data(post_body, user_sub, session_id)
- else:
- res = chat_generator(post_body, user_sub, session_id)
+ res = chat_generator(post_body, user_sub, session_id)
return StreamingResponse(
content=res,
media_type="text/event-stream",
diff --git a/apps/scheduler/call/api.py b/apps/scheduler/call/api.py
index c267531ead6787c00c09fc8235e1997d84c59ef6..d6753a59c5d88008b0920f6a71ea494aba386308 100644
--- a/apps/scheduler/call/api.py
+++ b/apps/scheduler/call/api.py
@@ -39,7 +39,7 @@ class API(CoreCall, ret_type=APIOutput):
auth: dict[str, Any] = Field(description="API鉴权信息", default={})
- async def __call__(self, syscall_vars: CallVars, **_kwargs: Any) -> APIOutput:
+ async def exec(self, syscall_vars: CallVars, **_kwargs: Any) -> APIOutput:
"""调用API,然后返回LLM解析后的数据"""
self._session = aiohttp.ClientSession()
try:
diff --git a/apps/scheduler/call/cmd/cmd.py b/apps/scheduler/call/cmd/cmd.py
index 790095c6c27ef648d257e600af54132dcb1d5768..7a23a574ab8f53e0a6cd8995692960d7da7182b2 100644
--- a/apps/scheduler/call/cmd/cmd.py
+++ b/apps/scheduler/call/cmd/cmd.py
@@ -27,6 +27,6 @@ class Cmd(CoreCall):
name: str = "cmd"
description: str = "根据BTDL描述文件,生成命令。"
- async def __call__(self, _slot_data: dict[str, Any]) -> _CmdOutput:
+ async def exec(self, _slot_data: dict[str, Any]) -> _CmdOutput:
"""调用Cmd工具"""
pass
diff --git a/apps/scheduler/call/convert.py b/apps/scheduler/call/convert.py
index 80b7fdd96e450219453d8c493511a7ad9474cff5..1d7fdd8f892b5087e72986f2d4b662899114b277 100644
--- a/apps/scheduler/call/convert.py
+++ b/apps/scheduler/call/convert.py
@@ -38,7 +38,7 @@ class Convert(CoreCall):
description: str = "从上一步的工具的原始JSON返回结果中,提取特定字段的信息。"
- async def __call__(self, _slot_data: dict[str, Any]) -> _ConvertOutput:
+ async def exec(self, _slot_data: dict[str, Any]) -> _ConvertOutput:
"""调用Convert工具
:param _slot_data: 经用户确认后的参数(目前未使用)
diff --git a/apps/scheduler/call/core.py b/apps/scheduler/call/core.py
index 59551dff5d6b2df6b3ee67e9c9e9840bb8e3f75e..acc6fb0f258412b57ad707320c8ce2edf7181575 100644
--- a/apps/scheduler/call/core.py
+++ b/apps/scheduler/call/core.py
@@ -3,9 +3,10 @@
所有Call类必须继承此类,并实现所有方法。
Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved.
"""
+from collections.abc import AsyncGenerator
from typing import Any, ClassVar
-from pydantic import BaseModel, Field
+from pydantic import BaseModel, ConfigDict, Field
from apps.entities.scheduler import CallVars
@@ -16,6 +17,11 @@ class CoreCall(BaseModel):
name: ClassVar[str] = Field(description="Call的名称")
description: ClassVar[str] = Field(description="Call的描述")
+ model_config = ConfigDict(
+ arbitrary_types_allowed=True,
+ extra="allow",
+ )
+
ret_type: ClassVar[type[BaseModel]]
def __init_subclass__(cls, ret_type: type[BaseModel], **kwargs: Any) -> None:
@@ -23,12 +29,17 @@ class CoreCall(BaseModel):
super().__init_subclass__(**kwargs)
cls.ret_type = ret_type
- class Config:
- """Pydantic 配置类"""
- arbitrary_types_allowed = True
+ async def init(self, syscall_vars: CallVars, **_kwargs: Any) -> dict[str, Any]:
+ """初始化Call类,并返回Call的输入"""
+ raise NotImplementedError
- async def __call__(self, syscall_vars: CallVars, **kwargs: Any) -> type[BaseModel]:
- """Call类实例的调用方法"""
+ async def exec(self) -> dict[str, Any]:
+ """Call类实例的非流式输出方法"""
raise NotImplementedError
+
+
+ async def stream(self) -> AsyncGenerator[str, None]:
+ """Call类实例的流式输出方法"""
+ yield ""
diff --git a/apps/scheduler/call/empty.py b/apps/scheduler/call/empty.py
index 158cd0f9bdae46725ee19dd6aad693e81c689cad..87d1fdea1293c409b560da9402e7d754e8d3fad8 100644
--- a/apps/scheduler/call/empty.py
+++ b/apps/scheduler/call/empty.py
@@ -18,6 +18,11 @@ class Empty(CoreCall, ret_type=EmptyData):
description: ClassVar[str] = "空白节点,用于占位"
- async def __call__(self, _syscall_vars: CallVars, **_kwargs: Any) -> EmptyData:
- """空Call"""
- return EmptyData()
+ async def init(self, _syscall_vars: CallVars, **_kwargs: Any) -> dict[str, Any]:
+ """初始化"""
+ return {}
+
+
+ async def exec(self) -> dict[str, Any]:
+ """执行"""
+ return EmptyData().model_dump()
diff --git a/apps/scheduler/call/llm.py b/apps/scheduler/call/llm.py
index bc67c6f9cbcd1a16a040bbd8148fba35af194495..ad38a5294dc82db65a88088867b2742a0bd8e754 100644
--- a/apps/scheduler/call/llm.py
+++ b/apps/scheduler/call/llm.py
@@ -2,6 +2,8 @@
Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved.
"""
+import logging
+from collections.abc import AsyncGenerator
from datetime import datetime
from textwrap import dedent
from typing import Any, ClassVar
@@ -15,6 +17,7 @@ from apps.entities.scheduler import CallError, CallVars
from apps.llm.reasoning import ReasoningLLM
from apps.scheduler.call.core import CoreCall
+logger = logging.getLogger("ray")
LLM_DEFAULT_PROMPT = dedent(
r"""
@@ -52,13 +55,14 @@ class LLM(CoreCall, ret_type=LLMNodeOutput):
system_prompt: str = Field(description="大模型系统提示词", default="")
user_prompt: str = Field(description="大模型用户提示词", default=LLM_DEFAULT_PROMPT)
- async def __call__(self, syscall_vars: CallVars, **_kwargs: Any) -> LLMNodeOutput:
- """运行LLM Call"""
+
+ async def _prepare_message(self, syscall_vars: CallVars) -> list[dict[str, Any]]:
+ """准备消息"""
# 参数
time = datetime.now(tz=pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")
formatter = {
"time": time,
- "context": syscall_vars.background,
+ "context": syscall_vars.summary,
"question": syscall_vars.question,
}
@@ -83,17 +87,39 @@ class LLM(CoreCall, ret_type=LLMNodeOutput):
except Exception as e:
raise CallError(message=f"用户提示词渲染失败:{e!s}", data={}) from e
- message = [
+ return [
{"role": "system", "content": system_input},
{"role": "user", "content": user_input},
]
+
+ async def init(self, syscall_vars: CallVars, **_kwargs: Any) -> dict[str, Any]:
+ """初始化LLM工具"""
+ self._message = await self._prepare_message(syscall_vars)
+ self._task_id = syscall_vars.task_id
+ return {
+ "task_id": self._task_id,
+ "message": self._message,
+ }
+
+
+ async def exec(self) -> dict[str, Any]:
+ """运行LLM Call"""
try:
result = ""
- async for chunk in ReasoningLLM().call(task_id=syscall_vars.task_id, messages=message):
+ async for chunk in ReasoningLLM().call(task_id=self._task_id, messages=self._message):
result += chunk
except Exception as e:
raise CallError(message=f"大模型调用失败:{e!s}", data={}) from e
result = result.strip().strip("\n")
- return LLMNodeOutput(message=result)
+ return LLMNodeOutput(message=result).model_dump(exclude_none=True, by_alias=True)
+
+
+ async def stream(self) -> AsyncGenerator[str, None]:
+ """流式输出"""
+ try:
+ async for chunk in ReasoningLLM().call(task_id=self._task_id, messages=self._message):
+ yield chunk
+ except Exception as e:
+ raise CallError(message=f"大模型流式输出失败:{e!s}", data={}) from e
diff --git a/apps/scheduler/call/rag.py b/apps/scheduler/call/rag.py
index 5ac6429284609e6f0289a483665481fbaddd80cc..fc717c8c48c63fc57648ae59db9d5239004fbe2f 100644
--- a/apps/scheduler/call/rag.py
+++ b/apps/scheduler/call/rag.py
@@ -33,23 +33,27 @@ class RAG(CoreCall, ret_type=RAGOutput):
retrieval_mode: Literal["chunk", "full_text"] = Field(description="检索模式", default="chunk")
- async def __call__(self, syscall_vars: CallVars, **_kwargs: Any) -> RAGOutput:
- """调用RAG工具"""
- params_dict = {
+ async def init(self, syscall_vars: CallVars, **_kwargs: Any) -> dict[str, Any]:
+ """初始化RAG工具"""
+ self._params_dict = {
"kb_sn": self.knowledge_base,
"top_k": self.top_k,
"retrieval_mode": self.retrieval_mode,
"content": syscall_vars.question,
}
- url = config["RAG_HOST"].rstrip("/") + "/chunk/get"
- headers = {
+ self._url = config["RAG_HOST"].rstrip("/") + "/chunk/get"
+ self._headers = {
"Content-Type": "application/json",
}
+ return self._params_dict
+
+
+ async def exec(self) -> dict[str, Any]:
+ """调用RAG工具"""
# 发送 GET 请求
- session = aiohttp.ClientSession()
- async with session.post(url, headers=headers, json=params_dict) as response:
+ async with aiohttp.ClientSession() as session, session.post(self._url, headers=self._headers, json=self._params_dict) as response:
# 检查响应状态码
if response.status == status.HTTP_200_OK:
result = await response.json()
@@ -62,7 +66,7 @@ class RAG(CoreCall, ret_type=RAGOutput):
return RAGOutput(
corpus=corpus,
- )
+ ).model_dump(exclude_none=True, by_alias=True)
text = await response.text()
logger.error("[RAG] 调用失败:%s", text)
@@ -70,7 +74,7 @@ class RAG(CoreCall, ret_type=RAGOutput):
raise CallError(
message=f"rag调用失败:{text}",
data={
- "question": syscall_vars.question,
+ "question": self._params_dict["content"],
"status": response.status,
"text": text,
},
diff --git a/apps/scheduler/call/sql.py b/apps/scheduler/call/sql.py
index 55b8d9d61ff268488437f82970567e22aeb08e6d..312c5faafed695c2eb015d7f1ab22ca2197f03b4 100644
--- a/apps/scheduler/call/sql.py
+++ b/apps/scheduler/call/sql.py
@@ -41,7 +41,7 @@ class SQL(CoreCall):
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(300))
- async def __call__(self, _slot_data: dict[str, Any]) -> SQLOutput:
+ async def exec(self, _slot_data: dict[str, Any]) -> SQLOutput:
"""运行SQL工具"""
# 获取必要参数
syscall_vars: CallVars = getattr(self, "_syscall_vars")
diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py
index 4bc57791c190ca8473ad2b1efde93af43f1c21bf..89ce109f15f48ce2e94965031fc9b5390242cac6 100644
--- a/apps/scheduler/executor/flow.py
+++ b/apps/scheduler/executor/flow.py
@@ -3,6 +3,7 @@
Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved.
"""
import asyncio
+import inspect
import logging
from typing import Any, Optional
@@ -15,16 +16,18 @@ from apps.entities.enum_var import StepStatus
from apps.entities.flow import Flow, FlowError, Step
from apps.entities.request_data import RequestDataApp
from apps.entities.scheduler import CallVars, ExecutorBackground
-from apps.entities.task import ExecutorState, TaskBlock
+from apps.entities.task import ExecutorState, FlowStepHistory, TaskBlock
from apps.llm.patterns.executor import ExecutorSummary
from apps.manager.node import NodeManager
from apps.manager.task import TaskManager
from apps.scheduler.call.core import CoreCall
+from apps.scheduler.call.llm import LLM, LLMNodeOutput
from apps.scheduler.executor.message import (
push_flow_start,
push_flow_stop,
push_step_input,
push_step_output,
+ push_text_output,
)
from apps.scheduler.slot.slot import Slot
@@ -54,10 +57,7 @@ class Executor(BaseModel):
# 尝试恢复State
if self.task.flow_state:
self.flow_state = self.task.flow_state
- # 如果flow_context为空,则从flow_history中恢复
- if not self.task.flow_context:
- self.task.flow_context = await TaskManager.get_flow_history_by_task_id(self.task.record.task_id)
- self.task.new_context = []
+ self.task.flow_context = await TaskManager.get_flow_history_by_task_id(self.task.record.task_id)
else:
# 创建ExecutorState
self.flow_state = ExecutorState(
@@ -72,6 +72,8 @@ class Executor(BaseModel):
)
# 是否结束运行
self._can_continue = True
+ # 向用户输出的内容
+ self._final_answer = ""
async def _check_cls(self, call_cls: Any) -> bool:
@@ -81,46 +83,36 @@ class Executor(BaseModel):
flag = False
if not hasattr(call_cls, "description") or not isinstance(call_cls.description, str):
flag = False
- if not callable(call_cls) or not asyncio.iscoroutinefunction(call_cls.__call__):
+ if not hasattr(call_cls, "exec") or not asyncio.iscoroutinefunction(call_cls.exec):
+ flag = False
+ if not hasattr(call_cls, "stream") or not inspect.isasyncgenfunction(call_cls.stream):
flag = False
return flag
- # TODO
+ # TODO: 默认错误处理步骤
async def _run_error(self, step: FlowError) -> dict[str, Any]:
"""运行错误处理步骤"""
return {}
- async def _get_call_cls(self, node_id: str) -> Optional[type[CoreCall]]:
+ async def _get_call_cls(self, node_id: str) -> type[CoreCall]:
"""获取并验证Call类"""
# 检查flow_state是否为空
if not self.flow_state:
- logger.error("[FlowExecutor] flow_state为空")
- return None
+ err = "[FlowExecutor] flow_state为空"
+ raise ValueError(err)
# 获取对应Node的call_id
- try:
- call_id = await NodeManager.get_node_call_id(node_id)
- except Exception:
- logger.exception("[FlowExecutor] 获取工具%s的call_id时发生错误", node_id)
- self.flow_state.status = StepStatus.ERROR
- return None
-
+ call_id = await NodeManager.get_node_call_id(node_id)
# 从Pool中获取对应的Call
pool = ray.get_actor("pool")
- try:
- call_cls: type[CoreCall] = await pool.get_call.remote(call_id)
- except Exception:
- logger.exception("[FlowExecutor] 载入工具%s时发生错误", node_id)
- self.flow_state.status = StepStatus.ERROR
- return None
+ call_cls: type[CoreCall] = await pool.get_call.remote(call_id)
# 检查Call合法性
if not await self._check_cls(call_cls):
- logger.error("[FlowExecutor] 工具 %s 不符合Call标准要求", node_id)
- self.flow_state.status = StepStatus.ERROR
- return None
+ err = f"[FlowExecutor] 工具 {node_id} 不符合Call标准要求"
+ raise ValueError(err)
return call_cls
@@ -151,7 +143,7 @@ class Executor(BaseModel):
self._can_continue = False
self.flow_state.status = StepStatus.RUNNING
# 推送空输入输出
- await push_step_input(self.task.record.task_id, self.queue, self.flow_state, self.flow)
+ await push_step_input(self.task.record.task_id, self.queue, self.flow_state, {})
self.flow_state.status = StepStatus.PARAM
await push_step_output(self.task.record.task_id, self.queue, self.flow_state, {})
return False, None
@@ -165,44 +157,60 @@ class Executor(BaseModel):
return None
- async def _execute_call(self, call_obj: Any, sys_vars: CallVars, node_id: str) -> dict[str, Any]:
+ async def _execute_call(self, call_obj: Any, *, is_final_answer: bool) -> dict[str, Any]:
"""执行Call并处理结果"""
- if not call_obj:
- logger.error("[FlowExecutor] 工具%s不存在", node_id)
- return {}
-
- try:
- result: BaseModel = await call_obj(sys_vars)
- except Exception:
- logger.exception("[FlowExecutor] 执行工具%s时发生错误", node_id)
- self.flow_state.status = StepStatus.ERROR
- return {}
-
- try:
- result_data = result.model_dump(exclude_none=True, by_alias=True)
- except Exception:
- logger.exception("[FlowExecutor] 无法处理工具%s返回值", node_id)
- self.flow_state.status = StepStatus.ERROR
- return {}
-
+ # call_obj一定合法;开始判断是否为最终结果
+ if is_final_answer and isinstance(call_obj, LLM):
+ # 最后一步 & 是大模型步骤,直接流式输出
+ async for chunk in call_obj.stream():
+ await push_text_output(self.task.record.task_id, self.queue, chunk)
+ self._final_answer += chunk
+ self.flow_state.status = StepStatus.SUCCESS
+ return {
+ "message": self._final_answer,
+ }
+
+ # 其他情况:先运行步骤,得到结果
+ result: dict[str, Any] = await call_obj.exec()
+ if is_final_answer:
+ if call_obj.name == "Convert":
+ # 如果是Convert,直接输出转换之后的结果
+ self._final_answer += result["message"]
+ await push_text_output(self.task.record.task_id, self.queue, self._final_answer)
+ self.flow_state.status = StepStatus.SUCCESS
+ return {
+ "message": self._final_answer,
+ }
+ # 其他工具,加一步大模型过程
+ # FIXME: 使用单独的Prompt
+ self.flow_state.status = StepStatus.SUCCESS
+ return {
+ "message": self._final_answer,
+ }
+
+ # 其他情况:返回结果
self.flow_state.status = StepStatus.SUCCESS
- return result_data
+ return result
async def _run_step(self, step_id: str, step_data: Step) -> None:
"""运行单个步骤"""
logger.info("[FlowExecutor] 运行步骤 %s", step_data.name)
- # 更新State
+ # 获取task
+ task_actor = ray.get_actor("task")
+ task: TaskBlock = await task_actor.get_task.remote(self.task.record.task_id)
+ # State写入ID和运行状态
self.flow_state.step_id = step_id
+ self.flow_state.step_name = step_id # FIXME: Step需要加名称和描述字段
self.flow_state.status = StepStatus.RUNNING
- # 特殊类型的Node,跳过执行
- node_id = step_data.node
+ # 查找下一个步骤;判断是否是end前最后一步
+ next_step = await self._get_next_step()
+ is_final_answer = next_step == "end"
+
# 获取并验证Call类
+ node_id = step_data.node
call_cls = await self._get_call_cls(node_id)
- if call_cls is None:
- logger.error("[FlowExecutor] Node %s 对应的工具不存在", node_id)
- return
# 准备系统变量
history = list(self.task.flow_context.values())[-STEP_HISTORY_SIZE:]
@@ -212,16 +220,12 @@ class Executor(BaseModel):
flow_id=self.post_body_app.flow_id,
session_id=self.task.session_id,
history=history,
- background=self.flow_state.ai_summary,
+ summary=self.flow_state.ai_summary,
)
# 初始化Call
- try:
- call_obj = call_cls.model_validate(step_data.params)
- except Exception:
- logger.exception("[FlowExecutor] 初始化工具 %s 时发生错误", call_cls.name)
- self.flow_state.status = StepStatus.ERROR
- return
+ call_obj = call_cls.model_validate(step_data.params)
+ input_data = await call_obj.init(sys_vars)
# TODO: 处理slots
# can_continue, slot_data = await self._process_slots(call_obj)
@@ -229,54 +233,52 @@ class Executor(BaseModel):
# return
# 推送步骤输入
- await push_step_input(self.task.record.task_id, self.queue, self.flow_state, self.flow_state.filled_data)
+ await push_step_input(self.task.record.task_id, self.queue, self.flow_state, input_data)
# 执行Call并获取结果
- result_data = await self._execute_call(call_obj, sys_vars, node_id)
+ result_data = await self._execute_call(call_obj, is_final_answer=is_final_answer)
+ task.flow_context[step_id] = FlowStepHistory(
+ task_id=self.task.record.task_id,
+ flow_id=self.post_body_app.flow_id,
+ step_id=step_id,
+ status=self.flow_state.status,
+ input_data=input_data,
+ output_data=result_data,
+ )
+ await task_actor.set_task.remote(self.task.record.task_id, task)
# 推送输出
await push_step_output(self.task.record.task_id, self.queue, self.flow_state, result_data)
- return
-
- async def _handle_last_step(self) -> None:
- """处理最后一步"""
- # 如果当前步骤为结束,则直接返回
- if self.flow_state.step_id == "end":
- return
+ # 更新下一步
+ self.flow_state.step_id = next_step
- async def _handle_next_step(self) -> None:
- """处理下一步"""
+ async def _get_next_step(self) -> str:
+ """在当前步骤执行前,尝试获取下一步"""
# 如果当前步骤为结束,则直接返回
- if self.flow_state.step_id == "end":
- return
+ if self.flow_state.step_id == "end" or not self.flow_state.step_id:
+ # 如果是最后一步,设置停止标志
+ self._can_continue = False
+ return ""
- next_nodes = []
+ if self.flow.steps[self.flow_state.step_id].node == "Choice":
+ # 如果是选择步骤,那么现在还不知道下一步是谁,直接返回
+ return ""
+
+ next_steps = []
# 遍历Edges,查找下一个节点
for edge in self.flow.edges:
if edge.edge_from == self.flow_state.step_id:
- next_nodes += [edge.edge_to]
+ next_steps += [edge.edge_to]
- # TODO
- # 处理分支(cloice工具)
- # if self._flow_data.steps[self._next_step].call_type == "choice" and result.extra is not None:
- # self._next_step = result.extra.get("next_step")
- # return
+ # 如果step没有任何出边,直接跳到end
+ if not next_steps:
+ return "end"
- # 处理下一步
- if not next_nodes:
- self.flow_state.step_id = "end"
- self.flow_state.step_name = "结束"
- else:
- self.flow_state.step_id = next_nodes[0]
- self.flow_state.step_name = next_nodes[0]
- # self.flow_state.step_name = self.flow.steps[next_nodes[0]].name
-
- logger.info("[FlowExecutor] 下一步 %s", self.flow_state.step_id)
- # 如果是最后一步,设置停止标志
- if self.flow_state.step_id == "end":
- self._can_continue = False
+ # FIXME: 目前只使用第一个出边
+ logger.info("[FlowExecutor] 下一步 %s", next_steps[0])
+ return next_steps[0]
async def run(self) -> None:
@@ -285,27 +287,25 @@ class Executor(BaseModel):
数据通过向Queue发送消息的方式传输
"""
logger.info("[FlowExecutor] 运行工作流")
- # 推送Flow开始
+ # 推送Flow开始消息
await push_flow_start(self.task.record.task_id, self.queue, self.flow_state, self.question)
+ # 如果允许继续运行Flow
while self._can_continue:
- # 当前步骤不存在
- if self.flow_state.step_id not in self.flow.steps:
+ # Flow定义中找不到step
+ if not self.flow_state.step_id or (self.flow_state.step_id not in self.flow.steps):
logger.error("[FlowExecutor] 当前步骤 %s 不存在", self.flow_state.step_id)
self.flow_state.status = StepStatus.ERROR
if self.flow_state.status == StepStatus.ERROR:
- # 当前步骤为错误处理步骤
+ # 执行错误处理步骤
logger.warning("[FlowExecutor] Executor出错,执行错误处理步骤")
step = self.flow.on_error
await self._run_error(step)
else:
- # 当前步骤为正常步骤
+ # 执行正常步骤
step = self.flow.steps[self.flow_state.step_id]
await self._run_step(self.flow_state.step_id, step)
- # 处理下一步
- await self._handle_next_step()
-
- # Flow停止运行,推送消息
- await push_flow_stop(self.task.record.task_id, self.queue, self.flow_state, self.flow)
+ # 推送Flow停止消息
+ await push_flow_stop(self.task.record.task_id, self.queue, self.flow_state, self.flow, self._final_answer)
diff --git a/apps/scheduler/executor/message.py b/apps/scheduler/executor/message.py
index 609debace5bff894f921e7900767a6114e1d4e06..03a8a3fd1580bcd9a0f252d424f9e0576196f688 100644
--- a/apps/scheduler/executor/message.py
+++ b/apps/scheduler/executor/message.py
@@ -12,6 +12,7 @@ from apps.entities.flow import Flow
from apps.entities.message import (
FlowStartContent,
FlowStopContent,
+ TextAddContent,
)
from apps.entities.task import (
ExecutorState,
@@ -98,14 +99,28 @@ async def assemble_flow_stop_content(state: ExecutorState, flow: Flow) -> FlowSt
# data=chart_option,
# )
-async def push_flow_stop(task_id: str, queue: actor.ActorHandle, state: ExecutorState, flow: Flow) -> None:
+async def push_flow_stop(task_id: str, queue: actor.ActorHandle, state: ExecutorState, flow: Flow, final_answer: str) -> None:
"""推送Flow结束"""
task_actor = ray.get_actor("task")
task: TaskBlock = await task_actor.get_task.remote(task_id)
# 设置state
task.flow_state = state
+ # 保存最终输出
+ task.record.content.answer = final_answer
content = await assemble_flow_stop_content(state, flow)
# 推送Stop消息
await queue.push_output.remote(task, event_type=EventType.FLOW_STOP, data=content.model_dump(exclude_none=True, by_alias=True)) # type: ignore[attr-defined]
await task_actor.set_task.remote(task_id, task)
+
+
+async def push_text_output(task_id: str, queue: actor.ActorHandle, text: str) -> None:
+ """推送文本输出"""
+ task_actor = ray.get_actor("task")
+ task: TaskBlock = await task_actor.get_task.remote(task_id)
+
+ content = TextAddContent(
+ text=text,
+ )
+ # 推送消息
+ await queue.push_output.remote(task, event_type=EventType.TEXT_ADD, data=content.model_dump(exclude_none=True, by_alias=True)) # type: ignore[attr-defined]
diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py
index 479c01311e9a0dee350020e1bda464cafa965b26..f69e791e30da4034a40eb23a8ab6f349bc0fa8b3 100644
--- a/apps/scheduler/pool/loader/flow.py
+++ b/apps/scheduler/pool/loader/flow.py
@@ -15,6 +15,7 @@ from apps.constants import APP_DIR, FLOW_DIR
from apps.entities.enum_var import EdgeType
from apps.entities.flow import Flow
from apps.manager.node import NodeManager
+from apps.scheduler.yaml import str_presenter
logger = logging.getLogger("ray")
@@ -124,6 +125,7 @@ class FlowLoader:
}
async with aiofiles.open(flow_path, mode="w", encoding="utf-8") as f:
+ yaml.add_representer(str, str_presenter)
await f.write(yaml.dump(
flow_dict,
allow_unicode=True,
diff --git a/apps/scheduler/pool/loader/metadata.py b/apps/scheduler/pool/loader/metadata.py
index 6f1665bae6e64d04e47cb0b275ecbbb96f7c8f1d..408d450193bdde12c5ad86c90ef70841cbf3b34b 100644
--- a/apps/scheduler/pool/loader/metadata.py
+++ b/apps/scheduler/pool/loader/metadata.py
@@ -15,6 +15,7 @@ from apps.entities.flow import (
AppMetadata,
ServiceMetadata,
)
+from apps.scheduler.yaml import str_presenter
class MetadataLoader:
@@ -98,6 +99,7 @@ class MetadataLoader:
data = metadata
# 使用UTF-8保存YAML,忽略部分乱码
+ yaml.add_representer(str, str_presenter)
yaml_dict = yaml.dump(
jsonable_encoder(data, exclude={"hashes"}),
allow_unicode=True,
diff --git a/apps/scheduler/pool/loader/openapi.py b/apps/scheduler/pool/loader/openapi.py
index c2d864fbc6ac325dd4d62f904b64626058239a87..6c56d664817b17810d40f147d1cb328f4633cc5e 100644
--- a/apps/scheduler/pool/loader/openapi.py
+++ b/apps/scheduler/pool/loader/openapi.py
@@ -19,6 +19,7 @@ from apps.scheduler.openapi import (
ReducedOpenAPISpec,
reduce_openapi_spec,
)
+from apps.scheduler.yaml import str_presenter
@ray.remote
@@ -164,6 +165,7 @@ class OpenAPILoader:
"""在文件系统上保存Service,并更新数据库"""
try:
+ yaml.add_representer(str, str_presenter)
yaml_data = yaml.safe_dump(yaml_dict)
await yaml_path.write_text(yaml_data)
except Exception as e:
diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py
index 45d91c7bff8343e2faaa1e6dfed254f24a5b3f45..7de1f0d936711ce2008480a41c0a4d1404549a46 100644
--- a/apps/scheduler/pool/pool.py
+++ b/apps/scheduler/pool/pool.py
@@ -74,7 +74,7 @@ class Pool:
await app_loader.load(app, checker.hashes[hash_key])
- # TODO
+ # TODO: 使用统一的保存入口
async def save(self, *, is_deletion: bool = False) -> None:
"""保存【单个】资源"""
pass
diff --git a/apps/scheduler/yaml.py b/apps/scheduler/yaml.py
new file mode 100644
index 0000000000000000000000000000000000000000..e5230a43dbe9018e2284d0630e888324455eef87
--- /dev/null
+++ b/apps/scheduler/yaml.py
@@ -0,0 +1,6 @@
+"""YAML表示器"""
+
+def str_presenter(dumper, data): # noqa: ANN001, ANN201, D103
+ if "\n" in data:
+ return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
+ return dumper.represent_scalar("tag:yaml.org,2002:str", data)
diff --git a/deploy/secret_helper/main.py b/deploy/secret_helper/main.py
index be90de10d7da50bad617be57a12124e339923909..5436c051abcf97c2a1061c3f039adefdfaf74695 100644
--- a/deploy/secret_helper/main.py
+++ b/deploy/secret_helper/main.py
@@ -5,7 +5,6 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved.
from pathlib import Path
import yaml
-
from file_copy import copy
from job import job