From 3fa7df8f8be3628d5e8f5bd3d31653bfa0371768 Mon Sep 17 00:00:00 2001 From: jayj97 Date: Thu, 14 Aug 2025 17:27:41 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=85=81=E8=AE=B8=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E6=B5=81node=E7=9A=84=E4=B8=8B=E4=B8=80=E6=AD=A5?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=A4=9A=E4=B8=AA=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/executor/flow.py | 25 +++++++++++++++++++++---- apps/services/flow_validate.py | 9 +++++---- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index b6ec9d75..a3cd2b01 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -73,6 +73,11 @@ class FlowExecutor(BaseExecutor): self._reached_end: bool = False self.step_queue: deque[StepQueueItem] = deque() + # A -> B1 -> C + # -> B2 -> + # make sure node C run only once + self._executed_steps: set[str] = set() + async def _invoke_runner(self) -> None: """单一Step执行""" # 创建步骤Runner @@ -100,9 +105,17 @@ class FlowExecutor(BaseExecutor): except IndexError: break + # Check if it has been executed + if self.current_step.step_id in self._executed_steps: + logger.info("[FlowExecutor] 步骤 %s 已经执行过,跳过执行", self.current_step.step_id) + continue + # 执行Step await self._invoke_runner() + # mark as executed + self._executed_steps.add(self.current_step.step_id) + async def _find_next_id(self, step_id: str) -> list[str]: """查找下一个节点""" next_ids = [] @@ -127,19 +140,19 @@ class FlowExecutor(BaseExecutor): flow_id=self.task.state.flow_id, conversation_id=self.task.ids.conversation_id ) - + if branch_id: # 构建带分支ID的edge_from edge_from = f"{self.task.state.step_id}.{branch_id}" logger.info("[FlowExecutor] 从变量池获取分支ID:%s,查找边:%s", branch_id, edge_from) - + # 在edges中查找对应的下一个节点 next_steps = [] for edge in self.flow.edges: if edge.edge_from == edge_from: next_steps.append(edge.edge_to) logger.info("[FlowExecutor] 找到下一个节点:%s", edge.edge_to) - + if not next_steps: logger.warning("[FlowExecutor] 没有找到分支 %s 对应的边", edge_from) else: @@ -232,7 +245,11 @@ class FlowExecutor(BaseExecutor): # 没有下一个节点,结束 self._reached_end = True for step in next_step: - self.step_queue.append(step) + # only append to queue if it's not executed + if step.step_id not in self._executed_steps: + self.step_queue.append(step) + else: + logger.info("[FlowExecutor] 步骤 %s 已经执行过,不再添加到队列中", step.step_id) # 尾插运行结束后的系统步骤 for step in FIXED_STEPS_AFTER_END: diff --git a/apps/services/flow_validate.py b/apps/services/flow_validate.py index b343fb7b..a8c173b5 100644 --- a/apps/services/flow_validate.py +++ b/apps/services/flow_validate.py @@ -158,10 +158,11 @@ class FlowService: if e.source_node not in branches: branches[e.source_node] = set() - if e.branch_id in branches[e.source_node]: - err = f"[FlowService] 边{e.edge_id}的分支{e.branch_id}重复" - logger.error(err) - raise FlowEdgeValidationError(err) + # FEATURE: allow one node's next_step have multiple nodes, next step node's run like stack FILO + # if e.branch_id in branches[e.source_node]: + # err = f"[FlowService] 边{e.edge_id}的分支{e.branch_id}重复" + # logger.error(err) + # raise FlowEdgeValidationError(err) branches[e.source_node].add(e.branch_id) in_deg[e.target_node] = in_deg.get(e.target_node, 0) + 1 -- Gitee