diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index a86ec4ac063c3cda5c6444ae17d7af5a6f6e7f46..44425bc6c9716385a92f4e4ea0cce6a730b067f1 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -46,6 +46,7 @@ class FlowExecutor(BaseExecutor): flow_id: str = Field(description="Flow ID") question: str = Field(description="用户输入") post_body_app: RequestDataApp = Field(description="请求体中的app信息") + current_step: StepQueueItem | None = Field(default=None, description="当前执行的步骤") async def load_state(self) -> None: """从数据库中加载FlowExecutor的状态""" @@ -69,13 +70,13 @@ class FlowExecutor(BaseExecutor): self._reached_end: bool = False self.step_queue: deque[StepQueueItem] = deque() - async def _invoke_runner(self, queue_item: StepQueueItem) -> None: + async def _invoke_runner(self) -> None: """单一Step执行""" # 创建步骤Runner step_runner = StepExecutor( msg_queue=self.msg_queue, task=self.task, - step=queue_item, + step=self.current_step, background=self.background, question=self.question, ) @@ -92,12 +93,12 @@ class FlowExecutor(BaseExecutor): """执行当前queue里面的所有步骤(在用户看来是单一Step)""" while True: try: - queue_item = self.step_queue.pop() + self.current_step = self.step_queue.pop() except IndexError: break # 执行Step - await self._invoke_runner(queue_item) + await self._invoke_runner() async def _find_next_id(self, step_id: str) -> list[str]: """查找下一个节点""" @@ -112,17 +113,17 @@ class FlowExecutor(BaseExecutor): # 如果当前步骤为结束,则直接返回 if self.task.state.step_id == "end" or not self.task.state.step_id: # type: ignore[arg-type] return [] - if self.task.state.step_name == "Choice": + if self.current_step.step.type == SpecialCallType.CHOICE.value: # 如果是choice节点,获取分支ID branch_id = self.task.context[-1]["output_data"]["branch_id"] if branch_id: - self.task.state.step_id = self.task.state.step_id + "." + branch_id + next_steps = self.flow.steps.get(self.task.state.step_id+"."+branch_id) logger.info("[FlowExecutor] 分支ID:%s", branch_id) else: logger.warning("[FlowExecutor] 没有找到分支ID,返回空列表") return [] - - next_steps = await self._find_next_id(self.task.state.step_id) # type: ignore[arg-type] + else: + next_steps = await self._find_next_id(self.task.state.step_id) # type: ignore[arg-type] # 如果step没有任何出边,直接跳到end if not next_steps: return [ diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py index d59ef79b4dd2b353bf871998deb1fbf9bc465a59..a8268ff184d529287f886888ed5c2e768724b8c6 100644 --- a/apps/schemas/enum_var.py +++ b/apps/schemas/enum_var.py @@ -150,7 +150,7 @@ class SpecialCallType(str, Enum): DIRECT_REPLY = "DirectReply" START = "start" END = "end" - CHOICE = "choice" + CHOICE = "Choice" class CommentType(str, Enum):