diff --git a/apps/entities/task.py b/apps/entities/task.py index c68e5216336e3310c6396d062b793d5940707a1e..a86202f6ba521d9a0bcb3c63124462e51cceef6c 100644 --- a/apps/entities/task.py +++ b/apps/entities/task.py @@ -25,6 +25,7 @@ class FlowStepHistory(BaseModel): task_id: str = Field(description="任务ID") flow_id: str = Field(description="FlowID") step_id: str = Field(description="当前步骤名称") + step_name: str = Field(description="当前步骤名称") status: StepStatus = Field(description="当前步骤状态") input_data: dict[str, Any] = Field(description="当前Step执行的输入", default={}) output_data: dict[str, Any] = Field(description="当前Step执行后的结果", default={}) diff --git a/apps/manager/task.py b/apps/manager/task.py index 6d6e4bb4594ee61bf3346f9eedbf46503e591c74..c5cab6eea751aa91c4fa6348d501c8d1d552897e 100644 --- a/apps/manager/task.py +++ b/apps/manager/task.py @@ -100,7 +100,7 @@ class TaskManager: record_group = await record_group_collection.aggregate([ {"$match": {"_id": record_group_id}}, {"$unwind": "$records"}, - {"$match": {"records.record_id": record_id}}, + {"$match": {"records.id": record_id}}, ]) records = await record_group.to_list(length=1) if not records: diff --git a/apps/routers/record.py b/apps/routers/record.py index fe8c767b9fdee7a83ae5ed97e8a6ff75499bc075..adc43a0294fd9f25ea1248bf4dad7a3fc8bb5f18 100644 --- a/apps/routers/record.py +++ b/apps/routers/record.py @@ -24,6 +24,7 @@ from apps.entities.response_data import ( RecordListRsp, ResponseData, ) +from apps.entities.task import FlowStepHistory from apps.manager.conversation import ConversationManager from apps.manager.document import DocumentManager from apps.manager.record import RecordManager @@ -87,20 +88,22 @@ async def get_record(conversation_id: str, user_sub: Annotated[str, Depends(get_ # 获得Record关联的flow数据 flow_list = await TaskManager.get_context_by_record_id(record_group.id, record.id) if flow_list: + first_flow = FlowStepHistory.model_validate(flow_list[0]) tmp_record.flow = RecordFlow( - id=flow_list[0].id, + id=first_flow.id, recordId=record.id, - flowId=flow_list[0].flow_id, + flowId=first_flow.flow_id, stepNum=len(flow_list), steps=[], ) for flow in flow_list: + flow_step = FlowStepHistory.model_validate(flow) tmp_record.flow.steps.append( RecordFlowStep( - stepId=flow.step_id, - stepStatus=flow.status, - input=flow.input_data, - output=flow.output_data, + stepId=flow_step.step_name, #TODO: 此处前端应该用name + stepStatus=flow_step.status, + input=flow_step.input_data, + output=flow_step.output_data, ), ) diff --git a/apps/scheduler/executor/step.py b/apps/scheduler/executor/step.py index 6b584ddfbc0d4461aa6d90ef73c259ec5706ce58..8ebde73de9bcbf797160c7e4f10dd9d2026312de 100644 --- a/apps/scheduler/executor/step.py +++ b/apps/scheduler/executor/step.py @@ -39,15 +39,6 @@ class StepExecutor(BaseExecutor): super().__init__(**kwargs) self.validate_flow_state(self.task) - self._history = FlowStepHistory( - task_id=self.task.id, - flow_id=self.task.state.flow_id, # type: ignore[arg-type] - step_id=self.task.state.step_id, # type: ignore[arg-type] - status=self.task.state.status, # type: ignore[arg-type] - input_data={}, - output_data={}, - ) - async def init(self) -> None: """初始化步骤""" @@ -191,12 +182,8 @@ class StepExecutor(BaseExecutor): # 进行自动参数填充 await self._run_slot_filling() - # 更新history - self._history.input_data = self.obj.input - self._history.output_data = {} # 更新状态 self.task.state.status = StepStatus.RUNNING # type: ignore[arg-type] - self._history.status = self.task.state.status # type: ignore[arg-type] await TaskManager.save_task(self.task.id, self.task) # 推送输入 await self.push_message(EventType.STEP_INPUT.value, self.obj.input) @@ -215,17 +202,25 @@ class StepExecutor(BaseExecutor): # 更新执行状态 self.task.state.status = StepStatus.SUCCESS # type: ignore[arg-type] - self._history.status = self.task.state.status # type: ignore[arg-type] # 更新history if isinstance(content, str): - self._history.output_data = TextAddContent(text=content).model_dump(exclude_none=True, by_alias=True) + output_data = TextAddContent(text=content).model_dump(exclude_none=True, by_alias=True) else: - self._history.output_data = content + output_data = content # 更新context - self.task.context.append(self._history.model_dump(exclude_none=True, by_alias=True)) + history = FlowStepHistory( + task_id=self.task.id, + flow_id=self.task.state.flow_id, # type: ignore[arg-type] + step_id=self.step.step_id, + step_name=self.step.step.name, + status=self.task.state.status, # type: ignore[arg-type] + input_data=self.obj.input, + output_data=output_data, + ) + self.task.context.append(history.model_dump(exclude_none=True, by_alias=True)) await TaskManager.save_task(self.task.id, self.task) # 推送输出 - await self.push_message(EventType.STEP_OUTPUT.value, self._history.output_data) + await self.push_message(EventType.STEP_OUTPUT.value, output_data) diff --git a/apps/scheduler/scheduler/context.py b/apps/scheduler/scheduler/context.py index 4d52672de11c44125649e38218cfc95126acf387..0fd22b476e3cc2ca1d2d71ff03a1975ebad936df 100644 --- a/apps/scheduler/scheduler/context.py +++ b/apps/scheduler/scheduler/context.py @@ -151,7 +151,7 @@ async def save_data(task_id: str, user_sub: str, post_body: RequestData, used_do await AppCenterManager.update_recent_app(user_sub, post_body.app.app_id) # 若状态为成功,删除Task - if task.state and task.state.status == StepStatus.SUCCESS: + if not task.state or task.state.status == StepStatus.SUCCESS: await TaskManager.delete_task_by_task_id(task_id) else: # 更新Task