From 79febe5d282c01caaa84961efe804be7ce0af4bf Mon Sep 17 00:00:00 2001 From: zxstty Date: Thu, 31 Jul 2025 15:55:40 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dchoice=E5=88=86=E6=94=AF?= =?UTF-8?q?=E5=BC=95=E8=B5=B7=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/executor/flow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index 44425bc6..e30c70e1 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -117,7 +117,7 @@ class FlowExecutor(BaseExecutor): # 如果是choice节点,获取分支ID branch_id = self.task.context[-1]["output_data"]["branch_id"] if branch_id: - next_steps = self.flow.steps.get(self.task.state.step_id+"."+branch_id) + next_steps = self._find_next_id(self.task.state.step_id+"."+branch_id) logger.info("[FlowExecutor] 分支ID:%s", branch_id) else: logger.warning("[FlowExecutor] 没有找到分支ID,返回空列表") -- Gitee From 29b9b6e54c05ec4c9ef5c15dd371f1aaa7a1e3bf Mon Sep 17 00:00:00 2001 From: zxstty Date: Thu, 31 Jul 2025 16:13:40 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dtask=E7=9A=84context?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/executor/flow.py | 2 +- apps/scheduler/scheduler/context.py | 2 +- apps/schemas/task.py | 2 +- apps/services/task.py | 7 ++++--- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index e30c70e1..e0ddace5 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -115,7 +115,7 @@ class FlowExecutor(BaseExecutor): return [] if self.current_step.step.type == SpecialCallType.CHOICE.value: # 如果是choice节点,获取分支ID - branch_id = self.task.context[-1]["output_data"]["branch_id"] + branch_id = self.task.context[-1].output_data.get("branch_id", "") if branch_id: next_steps = self._find_next_id(self.task.state.step_id+"."+branch_id) logger.info("[FlowExecutor] 分支ID:%s", branch_id) diff --git a/apps/scheduler/scheduler/context.py b/apps/scheduler/scheduler/context.py index 4c2c4cf0..1e936ab1 100644 --- a/apps/scheduler/scheduler/context.py +++ b/apps/scheduler/scheduler/context.py @@ -188,7 +188,7 @@ async def save_data(task: Task, user_sub: str, post_body: RequestData) -> None: feature={}, ), createdAt=current_time, - flow=[i["_id"] for i in task.context], + flow=[context.id for context in task.context], ) # 检查是否存在group_id diff --git a/apps/schemas/task.py b/apps/schemas/task.py index 37fdebbf..3a371260 100644 --- a/apps/schemas/task.py +++ b/apps/schemas/task.py @@ -89,7 +89,7 @@ class Task(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4()), alias="_id") ids: TaskIds = Field(description="任务涉及的各种ID") - context: list[dict[str, Any]] = Field(description="Flow的步骤执行信息", default=[]) + context: list[FlowStepHistory] = Field(description="Flow的步骤执行信息", default=[]) state: ExecutorState | None = Field(description="Flow的状态", default=None) tokens: TaskTokens = Field(description="Token信息") runtime: TaskRuntime = Field(description="任务运行时数据") diff --git a/apps/services/task.py b/apps/services/task.py index 2456d96b..81924c58 100644 --- a/apps/services/task.py +++ b/apps/services/task.py @@ -13,6 +13,7 @@ from apps.schemas.task import ( TaskIds, TaskRuntime, TaskTokens, + FlowStepHistory ) from apps.services.record import RecordManager @@ -67,7 +68,7 @@ class TaskManager: return Task.model_validate(task) @staticmethod - async def get_context_by_record_id(record_group_id: str, record_id: str) -> list[dict[str, Any]]: + async def get_context_by_record_id(record_group_id: str, record_id: str) -> list[FlowStepHistory]: """根据record_group_id获取flow信息""" record_group_collection = MongoDB().get_collection("record_group") flow_context_collection = MongoDB().get_collection("flow_context") @@ -85,7 +86,7 @@ class TaskManager: for flow_context_id in records[0]["records"]["flow"]: flow_context = await flow_context_collection.find_one({"_id": flow_context_id}) if flow_context: - flow_context_list.append(flow_context) + flow_context_list.append(FlowStepHistory.model_validate(flow_context)) except Exception: logger.exception("[TaskManager] 获取record_id的flow信息失败") return [] @@ -93,7 +94,7 @@ class TaskManager: return flow_context_list @staticmethod - async def get_context_by_task_id(task_id: str, length: int = 0) -> list[dict[str, Any]]: + async def get_context_by_task_id(task_id: str, length: int = 0) -> list[F]: """根据task_id获取flow信息""" flow_context_collection = MongoDB().get_collection("flow_context") -- Gitee From c24093b6b6f78b9397f7e1817c4547a9e1371e18 Mon Sep 17 00:00:00 2001 From: zxstty Date: Thu, 31 Jul 2025 16:17:00 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dfind=5Fnext=5Fid=E6=B2=A1?= =?UTF-8?q?=E6=9C=89await=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/executor/flow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index e0ddace5..d87932bd 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -117,7 +117,7 @@ class FlowExecutor(BaseExecutor): # 如果是choice节点,获取分支ID branch_id = self.task.context[-1].output_data.get("branch_id", "") if branch_id: - next_steps = self._find_next_id(self.task.state.step_id+"."+branch_id) + next_steps = await self._find_next_id(self.task.state.step_id+"."+branch_id) logger.info("[FlowExecutor] 分支ID:%s", branch_id) else: logger.warning("[FlowExecutor] 没有找到分支ID,返回空列表") -- Gitee