diff --git a/apps/routers/record.py b/apps/routers/record.py index 29ff68e09014c0320520f949312e91883a924e8c..f73e6c9834147178e3141ed72689e4b42a927c6f 100644 --- a/apps/routers/record.py +++ b/apps/routers/record.py @@ -93,7 +93,6 @@ async def get_record(conversation_id: str, user_sub: Annotated[str, Depends(get_ steps=[], ) for flow_step in flow_step_list: - flow_step = FlowStepHistory.model_validate(flow_step) tmp_record.flow.steps.append( RecordFlowStep( stepId=flow_step.step_name, # TODO: 此处前端应该用name diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index 157c915943ba18524cbfb8647924d0457354bb51..d8400fdf32af2e6d073e74f66893c97a0e4c9a56 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -119,7 +119,7 @@ class FlowExecutor(BaseExecutor): return [] if self.current_step.step.type == SpecialCallType.CHOICE.value: # 如果是choice节点,获取分支ID - branch_id = self.task.context[-1]["output_data"]["branch_id"] + branch_id = self.task.context[-1].output_data["branch_id"] if branch_id: next_steps = await self._find_next_id(self.task.state.step_id + "." + branch_id) logger.info("[FlowExecutor] 分支ID:%s", branch_id) diff --git a/apps/scheduler/scheduler/context.py b/apps/scheduler/scheduler/context.py index 109a5456288a140adc35838f0072a7bd4276eb36..72f64a71006e09a053499268dac9d0245d692c2f 100644 --- a/apps/scheduler/scheduler/context.py +++ b/apps/scheduler/scheduler/context.py @@ -193,7 +193,7 @@ async def save_data(task: Task, user_sub: str, post_body: RequestData) -> None: flow_id=task.state.flow_id, flow_name=task.state.flow_name, flow_status=task.state.flow_status, - history_ids=[context["_id"] for context in task.context], + history_ids=[context.id for context in task.context], ) ) diff --git a/apps/schemas/task.py b/apps/schemas/task.py index e1901416cdd9c0ede4620b4f9556124baed225d3..9e9f1531707edaad5c306098662a9d4a6fbce64b 100644 --- a/apps/schemas/task.py +++ b/apps/schemas/task.py @@ -93,7 +93,7 @@ class Task(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4()), alias="_id") ids: TaskIds = Field(description="任务涉及的各种ID") - context: list[dict[str, Any]] = Field(description="Flow的步骤执行信息", default=[]) + context: list[FlowStepHistory] = Field(description="Flow的步骤执行信息", default=[]) state: ExecutorState = Field(description="Flow的状态", default=ExecutorState()) tokens: TaskTokens = Field(description="Token信息") runtime: TaskRuntime = Field(description="任务运行时数据") diff --git a/apps/services/task.py b/apps/services/task.py index dceee324e525b229f8ae41a07571e9bd211acdef..93604a933d78c84b36367d4873ead02b08193f15 100644 --- a/apps/services/task.py +++ b/apps/services/task.py @@ -13,6 +13,7 @@ from apps.schemas.task import ( TaskIds, TaskRuntime, TaskTokens, + FlowStepHistory ) from apps.services.record import RecordManager @@ -67,7 +68,7 @@ class TaskManager: return Task.model_validate(task) @staticmethod - async def get_context_by_record_id(record_group_id: str, record_id: str) -> list[dict[str, Any]]: + async def get_context_by_record_id(record_group_id: str, record_id: str) -> list[FlowStepHistory]: """根据record_group_id获取flow信息""" record_group_collection = MongoDB().get_collection("record_group") flow_context_collection = MongoDB().get_collection("flow_context") @@ -85,7 +86,7 @@ class TaskManager: for flow_context_id in records[0]["records"]["flow"]["history_ids"]: flow_context = await flow_context_collection.find_one({"_id": flow_context_id}) if flow_context: - flow_context_list.append(flow_context) + flow_context_list.append(FlowStepHistory.model_validate(flow_context)) except Exception: logger.exception("[TaskManager] 获取record_id的flow信息失败") return []