From cf4d309de30a924705a02194aae4568ad4459c21 Mon Sep 17 00:00:00 2001 From: zxstty Date: Thu, 31 Jul 2025 15:15:08 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9F=BA=E4=BA=8E=E5=BD=93=E5=89=8Dnode?= =?UTF-8?q?=E7=9A=84type=E5=88=A4=E6=96=AD=E6=80=8E=E4=B9=88=E8=B5=B0?= =?UTF-8?q?=E4=B8=8B=E4=B8=80=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/executor/flow.py | 22 +++++++++++++--------- apps/schemas/enum_var.py | 2 +- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index 382ef929..157c9159 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -46,6 +46,10 @@ class FlowExecutor(BaseExecutor): flow_id: str = Field(description="Flow ID") question: str = Field(description="用户输入") post_body_app: RequestDataApp = Field(description="请求体中的app信息") + current_step: StepQueueItem | None = Field( + description="当前执行的步骤", + default=None + ) async def load_state(self) -> None: """从数据库中加载FlowExecutor的状态""" @@ -70,13 +74,13 @@ class FlowExecutor(BaseExecutor): self._reached_end: bool = False self.step_queue: deque[StepQueueItem] = deque() - async def _invoke_runner(self, queue_item: StepQueueItem) -> None: + async def _invoke_runner(self) -> None: """单一Step执行""" # 创建步骤Runner step_runner = StepExecutor( msg_queue=self.msg_queue, task=self.task, - step=queue_item, + step=self.current_step, background=self.background, question=self.question, ) @@ -84,8 +88,8 @@ class FlowExecutor(BaseExecutor): # 初始化步骤 await step_runner.init() # 运行Step - await step_runner.run() + await step_runner.run() # 更新Task(已存过库) self.task = step_runner.task @@ -93,12 +97,12 @@ class FlowExecutor(BaseExecutor): """执行当前queue里面的所有步骤(在用户看来是单一Step)""" while True: try: - queue_item = self.step_queue.pop() + self.current_step = self.step_queue.pop() except IndexError: break # 执行Step - await self._invoke_runner(queue_item) + await self._invoke_runner() async def _find_next_id(self, step_id: str) -> list[str]: """查找下一个节点""" @@ -113,17 +117,17 @@ class FlowExecutor(BaseExecutor): # 如果当前步骤为结束,则直接返回 if self.task.state.step_id == "end" or not self.task.state.step_id: # type: ignore[arg-type] return [] - if self.task.state.step_name == "Choice": + if self.current_step.step.type == SpecialCallType.CHOICE.value: # 如果是choice节点,获取分支ID branch_id = self.task.context[-1]["output_data"]["branch_id"] if branch_id: - self.task.state.step_id = self.task.state.step_id + "." + branch_id + next_steps = await self._find_next_id(self.task.state.step_id + "." + branch_id) logger.info("[FlowExecutor] 分支ID:%s", branch_id) else: logger.warning("[FlowExecutor] 没有找到分支ID,返回空列表") return [] - - next_steps = await self._find_next_id(self.task.state.step_id) # type: ignore[arg-type] + else: + next_steps = await self._find_next_id(self.task.state.step_id) # type: ignore[arg-type] # 如果step没有任何出边,直接跳到end if not next_steps: return [ diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py index 49a6c250..5ff3381c 100644 --- a/apps/schemas/enum_var.py +++ b/apps/schemas/enum_var.py @@ -162,7 +162,7 @@ class SpecialCallType(str, Enum): LLM = "LLM" START = "start" END = "end" - CHOICE = "choice" + CHOICE = "Choice" class CommentType(str, Enum): -- Gitee