From 849f0f50c3afb3cdc2f2f9fae79c612680d5b851 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Thu, 7 Aug 2025 20:43:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=E8=A1=A8=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/models/task.py | 85 ++++++++++++++++++++-- apps/schemas/task.py | 86 +--------------------- apps/services/appcenter.py | 10 +-- apps/services/conversation.py | 2 +- apps/services/flow_validate.py | 10 ++- apps/services/mcp_service.py | 7 +- apps/services/parameter.py | 23 +++--- apps/services/record.py | 10 +-- apps/services/task.py | 129 +++++++++++---------------------- 9 files changed, 156 insertions(+), 206 deletions(-) diff --git a/apps/models/task.py b/apps/models/task.py index 2b48f335..a1b71f65 100644 --- a/apps/models/task.py +++ b/apps/models/task.py @@ -1,12 +1,16 @@ """任务 数据库表""" import uuid +from datetime import UTC, datetime +from typing import Any -from sqlalchemy import ForeignKey, String +from sqlalchemy import DateTime, Enum, Float, ForeignKey, Integer, String, Text from sqlalchemy.dialects.postgresql import JSONB, UUID from sqlalchemy.orm import Mapped, mapped_column -from apps.models.base import Base +from apps.schemas.enum_var import FlowStatus, StepStatus + +from .base import Base class Task(Base): @@ -18,10 +22,47 @@ class Task(Base): conversationId: Mapped[uuid.UUID] = mapped_column( # noqa: N815 UUID(as_uuid=True), ForeignKey("framework_conversation.id"), nullable=False, ) - checkpointId: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("framework_executor_checkpoint.id")) # noqa: N815 """对话ID""" + checkpointId: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("framework_executor_checkpoint.id")) # noqa: N815 + """检查点ID""" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default_factory=uuid.uuid4) """任务ID""" + updatedAt: Mapped[datetime] = mapped_column( # noqa: N815 + DateTime(timezone=True), nullable=False, default_factory=lambda: datetime.now(UTC), + onupdate=lambda: datetime.now(UTC), + ) + + +class TaskRuntime(Base): + """任务运行时数据""" + + __tablename__ = "framework_task_runtime" + + taskId: Mapped[uuid.UUID] = mapped_column( # noqa: N815 + UUID(as_uuid=True), ForeignKey("framework_task.id"), nullable=False, + primary_key=True, + ) + """任务ID""" + inputToken: Mapped[int] = mapped_column(Integer, default=0, nullable=False) # noqa: N815 + """输入Token""" + outputToken: Mapped[int] = mapped_column(Integer, default=0, nullable=False) # noqa: N815 + """输出Token""" + time: Mapped[float] = mapped_column(Float, default=0.0, nullable=False) + """时间""" + fullTime: Mapped[float] = mapped_column(Float, default=0.0, nullable=False) # noqa: N815 + """完整时间成本""" + userInput: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815 + """用户输入""" + fullAnswer: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815 + """完整输出""" + fact: Mapped[list[str]] = mapped_column(JSONB, nullable=False, default=[]) + """记忆""" + reasoning: Mapped[str] = mapped_column(Text, nullable=False, default="") + """中间推理""" + filledSlot: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815 + """计划""" + document: Mapped[list[dict[str, Any]]] = mapped_column(JSONB, nullable=False, default={}) + """关联文档""" class ExecutorCheckpoint(Base): @@ -29,17 +70,32 @@ class ExecutorCheckpoint(Base): __tablename__ = "framework_executor_checkpoint" + # 执行器级数据 taskId: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("framework_task.id"), nullable=False) # noqa: N815 """任务ID""" + appId: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) # noqa: N815 + """应用ID""" executorId: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) # noqa: N815 """执行器ID(例如工作流ID)""" executorName: Mapped[str] = mapped_column(String(255), nullable=False) # noqa: N815 """执行器名称(例如工作流名称)""" + executorStatus: Mapped[FlowStatus] = mapped_column(Enum(FlowStatus), nullable=False) # noqa: N815 + """执行器状态""" + # 步骤级数据 stepId: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) # noqa: N815 """步骤ID""" + stepName: Mapped[str] = mapped_column(String(255), nullable=False) # noqa: N815 + """步骤名称""" + stepStatus: Mapped[StepStatus] = mapped_column(Enum(StepStatus), nullable=False) # noqa: N815 + """步骤状态""" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default_factory=uuid.uuid4) """检查点ID""" - + executorDescription: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815 + """执行器描述""" + stepDescription: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815 + """步骤描述""" + data: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) + """步骤额外数据""" class ExecutorHistory(Base): @@ -53,7 +109,26 @@ class ExecutorHistory(Base): """执行器ID(例如工作流ID)""" executorName: Mapped[str] = mapped_column(String(255)) # noqa: N815 """执行器名称(例如工作流名称)""" - stepId: Mapped[str] = mapped_column(String(36)) # noqa: N815 + executorStatus: Mapped[FlowStatus] = mapped_column(Enum(FlowStatus), nullable=False) # noqa: N815 + """执行器状态""" + stepId: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) # noqa: N815 """步骤ID""" + stepName: Mapped[str] = mapped_column(String(255), nullable=False) # noqa: N815 + """步骤名称""" + stepStatus: Mapped[StepStatus] = mapped_column(Enum(StepStatus), nullable=False) # noqa: N815 + """步骤状态""" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default_factory=uuid.uuid4) """执行器历史ID""" + stepDescription: Mapped[str] = mapped_column(Text, nullable=False, default="") # noqa: N815 + """步骤描述""" + inputData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815 + """步骤输入数据""" + outputData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815 + """步骤输出数据""" + extraData: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default={}) # noqa: N815 + """步骤额外数据""" + updatedAt: Mapped[datetime] = mapped_column( # noqa: N815 + DateTime(timezone=True), nullable=False, default_factory=lambda: datetime.now(UTC), + onupdate=lambda: datetime.now(UTC), + ) + """更新时间""" diff --git a/apps/schemas/task.py b/apps/schemas/task.py index 2e0a65d2..a23f7cf9 100644 --- a/apps/schemas/task.py +++ b/apps/schemas/task.py @@ -1,106 +1,28 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """Task相关数据结构定义""" -import uuid -from datetime import UTC, datetime from typing import Any from pydantic import BaseModel, Field -from apps.schemas.enum_var import FlowStatus, StepStatus - from .flow import Step from .mcp import MCPPlan -class FlowStepHistory(BaseModel): - """ - 任务执行历史;每个Executor每个步骤执行后都会创建 - - Collection: flow_history - """ - - id: str = Field(default_factory=lambda: str(uuid.uuid4()), alias="_id") - task_id: str = Field(description="任务ID") - flow_id: str = Field(description="FlowID") - flow_name: str = Field(description="Flow名称") - flow_status: FlowStatus = Field(description="Flow状态") - step_id: str = Field(description="当前步骤名称") - step_name: str = Field(description="当前步骤名称") - step_description: str = Field(description="当前步骤描述", default="") - step_status: StepStatus = Field(description="当前步骤状态") - input_data: dict[str, Any] = Field(description="当前Step执行的输入", default={}) - output_data: dict[str, Any] = Field(description="当前Step执行后的结果", default={}) - ex_data: dict[str, Any] | None = Field(description="额外数据", default=None) - created_at: float = Field(default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3)) - +class CheckpointExtra(BaseModel): + """Executor额外数据""" -class ExecutorState(BaseModel): - """FlowExecutor状态""" - - # 执行器级数据 - flow_id: str = Field(description="Flow ID", default="") - flow_name: str = Field(description="Flow名称", default="") - description: str = Field(description="Flow描述", default="") - flow_status: FlowStatus = Field(description="Flow状态", default=FlowStatus.INIT) - # 任务级数据 - step_id: str = Field(description="当前步骤ID", default="") - step_index: int = Field(description="当前步骤索引", default=0) - step_name: str = Field(description="当前步骤名称", default="") - step_status: StepStatus = Field(description="当前步骤状态", default=StepStatus.UNKNOWN) - step_description: str = Field(description="当前步骤描述", default="") - app_id: str = Field(description="应用ID", default="") current_input: dict[str, Any] = Field(description="当前输入数据", default={}) error_message: str = Field(description="错误信息", default="") retry_times: int = Field(description="当前步骤重试次数", default=0) -class TaskIds(BaseModel): - """任务涉及的各种ID""" - - session_id: str = Field(description="会话ID") - conversation_id: uuid.UUID = Field(description="对话ID") - record_id: str = Field(description="记录ID", default_factory=lambda: str(uuid.uuid4())) - user_sub: str = Field(description="用户ID") - - -class TaskTokens(BaseModel): - """任务Token""" +class TaskExtra(BaseModel): + """任务额外数据""" - input_tokens: int = Field(description="输入Token", default=0) - output_tokens: int = Field(description="输出Token", default=0) - time: float = Field(description="时间点", default=0.0) - full_time: float = Field(description="完整时间成本", default=0.0) - - -class TaskRuntime(BaseModel): - """任务运行时数据""" - - question: str = Field(description="用户问题", default="") - answer: str = Field(description="模型回答", default="") - facts: list[str] = Field(description="记忆", default=[]) - summary: str = Field(description="摘要", default="") - filled: dict[str, Any] = Field(description="填充的槽位", default={}) - documents: list[dict[str, Any]] = Field(description="文档列表", default=[]) temporary_plans: MCPPlan | None = Field(description="临时计划列表", default=None) -class Task(BaseModel): - """ - 任务信息 - - Collection: task - """ - - id: str = Field(default_factory=lambda: str(uuid.uuid4()), alias="_id") - ids: TaskIds = Field(description="任务涉及的各种ID") - context: list[FlowStepHistory] = Field(description="Flow的步骤执行信息", default=[]) - state: ExecutorState = Field(description="Flow的状态", default=ExecutorState()) - tokens: TaskTokens = Field(description="Token信息") - runtime: TaskRuntime = Field(description="任务运行时数据") - created_at: float = Field(default_factory=lambda: round(datetime.now(tz=UTC).timestamp(), 3)) - - class StepQueueItem(BaseModel): """步骤栈中的元素""" diff --git a/apps/services/appcenter.py b/apps/services/appcenter.py index 17bc8275..7f4a6f86 100644 --- a/apps/services/appcenter.py +++ b/apps/services/appcenter.py @@ -494,7 +494,7 @@ class AppCenterManager: # mcp_service 逻辑 if data is not None and hasattr(data, "mcp_service") and data.mcp_service: # 创建应用场景,验证传入的 mcp_service 状态,确保只使用已经激活的 (create_app) - metadata.mcp_service = [svc for svc in data.mcp_service if MCPServiceManager.is_active(user_sub, svc)] + metadata.mcp_service = [svc.id for svc in data.mcp_service if MCPServiceManager.is_active(user_sub, svc.id)] elif data is not None and hasattr(data, "mcp_service"): # 更新应用场景,使用 data 中的 mcp_service (update_app) metadata.mcp_service = data.mcp_service if data.mcp_service is not None else [] @@ -554,11 +554,8 @@ class AppCenterManager: "permission": Permission( type=data.permission.type, users=data.permission.users or [], - ) if data else app_data.permission, + ) if data else (app_data.permission if app_data else None), } - # 设置权限 - if app_data: - common_params["permission"] = app_data.permission # 根据应用类型创建不同的元数据 if app_type == AppType.AGENT: @@ -597,6 +594,5 @@ class AppCenterManager: ) # 保存应用 - app_loader = AppLoader() - await app_loader.save(metadata, app_id) + await AppLoader.save(metadata, app_id) return metadata diff --git a/apps/services/conversation.py b/apps/services/conversation.py index 47fa28d2..17c0dad2 100644 --- a/apps/services/conversation.py +++ b/apps/services/conversation.py @@ -153,7 +153,7 @@ class ConversationManager: await session.delete(conv) await session.commit() - await TaskManager.delete_tasks_and_flow_context_by_conversation_id(conversation_id) + await TaskManager.delete_task_history_checkpoint_by_conversation_id(conversation_id) @staticmethod diff --git a/apps/services/flow_validate.py b/apps/services/flow_validate.py index 619a121f..ed1eb695 100644 --- a/apps/services/flow_validate.py +++ b/apps/services/flow_validate.py @@ -57,11 +57,13 @@ class FlowService: if node.call_id == NodeType.CHOICE.value: input_parameters = node.parameters["input_parameters"] if "choices" not in input_parameters: - logger.error(f"[FlowService] 节点{node.name}的分支字段缺失") - raise FlowBranchValidationError(f"[FlowService] 节点{node.name}的分支字段缺失") + err = f"[FlowService] 节点{node.name}的分支字段缺失" + logger.error(err) + raise FlowBranchValidationError(err) if not input_parameters["choices"]: - logger.error(f"[FlowService] 节点{node.name}的分支字段为空") - raise FlowBranchValidationError(f"[FlowService] 节点{node.name}的分支字段为空") + err = f"[FlowService] 节点{node.name}的分支字段为空" + logger.error(err) + raise FlowBranchValidationError(err) for choice in input_parameters["choices"]: if "branch_id" not in choice: err = f"[FlowService] 节点{node.name}的分支choice缺少branch_id字段" diff --git a/apps/services/mcp_service.py b/apps/services/mcp_service.py index dc6619ab..b77ab656 100644 --- a/apps/services/mcp_service.py +++ b/apps/services/mcp_service.py @@ -107,7 +107,7 @@ class MCPServiceManager: :param page: int: 页码 :return: MCP服务列表 """ - mcpservice_pools = await MCPServiceManager._search_mcpservice(search_type, keyword, page, is_active) + mcpservice_pools = await MCPServiceManager._search_mcpservice(search_type, keyword, page, is_active=is_active) return [ MCPServiceCardItem( mcpserviceId=item.id, @@ -318,7 +318,7 @@ class MCPServiceManager: async def active_mcpservice( user_sub: str, mcp_id: str, - mcp_env: dict[str, Any] = {}, + mcp_env: dict[str, Any] | None = None, ) -> None: """ 激活MCP服务 @@ -327,6 +327,9 @@ class MCPServiceManager: :param mcp_id: str: MCP服务ID :return: 无 """ + if mcp_env is None: + mcp_env = {} + async with postgres.session() as session: mcp_info = (await session.scalars(select(MCPInfo).where(MCPInfo.id == mcp_id))).one_or_none() if not mcp_info: diff --git a/apps/services/parameter.py b/apps/services/parameter.py index 322224dc..7d22b0c1 100644 --- a/apps/services/parameter.py +++ b/apps/services/parameter.py @@ -3,15 +3,12 @@ import logging -from pymongo import ASCENDING - from apps.scheduler.call.choice.condition_handler import ConditionHandler from apps.scheduler.call.choice.schema import BoolOperate, DictOperate, ListOperate, NumberOperate, StringOperate, Type from apps.scheduler.slot.slot import Slot from apps.schemas.flow_topology import FlowItem from apps.schemas.response_data import ( OperateAndBindType, - ParamsNode, StepParams, ) from apps.services.node import NodeManager @@ -39,9 +36,12 @@ class ParameterManager: operate = DictOperate if operate: for item in operate: - result.append(OperateAndBindType( - operate=item, - bind_type=(await ConditionHandler.get_value_type_from_operate(item)))) + result += [ + OperateAndBindType( + operate=item, + bind_type=(await ConditionHandler.get_value_type_from_operate(item)), + ), + ] return result @staticmethod @@ -69,14 +69,19 @@ class ParameterManager: pre_step_params = [] for i in range(1, len(q)): step_id = q[i] - node_id = step_id_to_node_id.get(step_id) - params_schema, output_schema = await NodeManager.get_node_params(node_id) + node_id: str | None = step_id_to_node_id.get(step_id) + node_name: str | None = step_id_to_node_name.get(step_id) + if node_id is None or node_name is None: + err = f"[ParameterManager] 节点 {step_id} 不存在" + logger.error(err) + continue + _, output_schema = await NodeManager.get_node_params(node_id) slot = Slot(output_schema) params_node = slot.get_params_node_from_schema() pre_step_params.append( StepParams( stepId=step_id, - name=step_id_to_node_name.get(step_id), + name=node_name, paramsNode=params_node, ), ) diff --git a/apps/services/record.py b/apps/services/record.py index d62603b5..9f81d926 100644 --- a/apps/services/record.py +++ b/apps/services/record.py @@ -82,9 +82,7 @@ class RecordManager: """查询ConversationID的最后n条问答对""" sort_order = -1 if order == "desc" else 1 - mongo = MongoDB() - record_group_collection = mongo.get_collection("record_group") - try: + async with postgres.session() as session: # 得到conversation的全部record_group id record_groups = await record_group_collection.aggregate( [ @@ -112,19 +110,13 @@ class RecordManager: continue records.append(Record.model_validate(record[0]["records"])) - except Exception: - logger.exception("[RecordManager] 查询加密问答对失败") - return [] - else: return records @staticmethod async def update_record_flow_status_to_cancelled_by_task_ids(task_ids: list[str]) -> None: """更新Record关联的Flow状态""" - record_group_collection = MongoDB().get_collection("record_group") try: - await record_group_collection.update_many( {"records.task_id": {"$in": task_ids}, "records.flow.flow_status": {"$nin": [FlowStatus.ERROR.value, FlowStatus.SUCCESS.value]}}, {"$set": {"records.$[elem].flow.flow_status": FlowStatus.CANCELLED}}, array_filters=[{"elem.flow.flow_id": {"$in": task_ids}}], diff --git a/apps/services/task.py b/apps/services/task.py index 34e6bf75..61131fdd 100644 --- a/apps/services/task.py +++ b/apps/services/task.py @@ -4,15 +4,11 @@ import logging import uuid +from sqlalchemy import delete, select + from apps.common.postgres import postgres +from apps.models.task import ExecutorCheckpoint, ExecutorHistory, Task from apps.schemas.request_data import RequestData -from apps.schemas.task import ( - FlowStepHistory, - Task, - TaskIds, - TaskRuntime, - TaskTokens, -) from .record import RecordManager @@ -47,20 +43,7 @@ class TaskManager: @staticmethod - async def get_task_by_group_id(group_id: str, conversation_id: str) -> Task | None: - """获取组ID的最后一条问答组关联的任务""" - task_collection = MongoDB().get_collection("task") - record_group_collection = MongoDB().get_collection("record_group") - record_group = await record_group_collection.find_one({"conversation_id": conversation_id, "_id": group_id}) - if not record_group: - return None - record_group_obj = RecordGroup.model_validate(record_group) - task = await task_collection.find_one({"_id": record_group_obj.task_id}) - return Task.model_validate(task) - - - @staticmethod - async def get_task_by_task_id(task_id: str) -> Task | None: + async def get_task_by_task_id(task_id: uuid.UUID) -> Task | None: """根据task_id获取任务""" task_collection = MongoDB().get_collection("task") task = await task_collection.find_one({"_id": task_id}) @@ -69,37 +52,11 @@ class TaskManager: return Task.model_validate(task) - @staticmethod - async def get_context_by_record_id(record_group_id: str, record_id: str) -> list[FlowStepHistory]: - """根据record_group_id获取flow信息""" - record_group_collection = MongoDB().get_collection("record_group") - flow_context_collection = MongoDB().get_collection("flow_context") - try: - record_group = await record_group_collection.aggregate([ - {"$match": {"_id": record_group_id}}, - {"$unwind": "$records"}, - {"$match": {"records.id": record_id}}, - ]) - records = await record_group.to_list(length=1) - if not records: - return [] - - flow_context_list = [] - for flow_context_id in records[0]["records"]["flow"]["history_ids"]: - flow_context = await flow_context_collection.find_one({"_id": flow_context_id}) - if flow_context: - flow_context_list.append(FlowStepHistory.model_validate(flow_context)) - except Exception: - logger.exception("[TaskManager] 获取record_id的flow信息失败") - return [] - else: - return flow_context_list - - @staticmethod async def get_context_by_task_id(task_id: str, length: int = 0) -> list[FlowStepHistory]: """根据task_id获取flow信息""" - flow_context_collection = MongoDB().get_collection("flow_context") + async with postgres.session() as session: + executor_history_collection = session.query(ExecutorHistory) flow_context = [] try: @@ -130,10 +87,8 @@ class TaskManager: user_sub=user_sub if user_sub else "", session_id=session_id if session_id else "", conversation_id=post_body.conversation_id, - group_id=post_body.group_id if post_body.group_id else "", ), question=post_body.question if post_body else "", - group_id=post_body.group_id if post_body else "", tokens=TaskTokens(), runtime=TaskRuntime(), ) @@ -163,50 +118,50 @@ class TaskManager: @staticmethod async def delete_task_by_task_id(task_id: str) -> None: """通过task_id删除Task信息""" - mongo = MongoDB() - task_collection = mongo.get_collection("task") - - task = await task_collection.find_one({"_id": task_id}, {"_id": 1}) - if task: - await task_collection.delete_one({"_id": task_id}) + async with postgres.session() as session: + task = (await session.scalars( + select(Task).where(Task.id == task_id), + )).one_or_none() + if task: + await session.delete(task) @staticmethod - async def delete_tasks_by_conversation_id(conversation_id: str) -> list[str]: + async def delete_tasks_by_conversation_id(conversation_id: uuid.UUID) -> list[str]: """通过ConversationID删除Task信息""" - mongo = MongoDB() - task_collection = mongo.get_collection("task") - task_ids = [] - try: - async for task in task_collection.find( - {"conversation_id": conversation_id}, - {"_id": 1}, - ): - task_ids.append(task["_id"]) - if task_ids: - await task_collection.delete_many({"conversation_id": conversation_id}) + async with postgres.session() as session: + task_ids = [] + tasks = (await session.scalars( + select(Task).where(Task.conversationId == conversation_id), + )).all() + for task in tasks: + task_ids.append(str(task.id)) + await session.delete(task) + await session.commit() return task_ids - except Exception: - logger.exception("[TaskManager] 删除ConversationID的Task信息失败") - return [] + @staticmethod - async def delete_tasks_and_flow_context_by_conversation_id(conversation_id: str) -> None: + async def delete_task_history_checkpoint_by_conversation_id(conversation_id: uuid.UUID) -> None: """通过ConversationID删除Task信息""" - mongo = MongoDB() - task_collection = mongo.get_collection("task") - flow_context_collection = mongo.get_collection("flow_context") - - async with mongo.get_session() as session, await session.start_transaction(): - task_ids = [ - task["_id"] async for task in task_collection.find( - {"conversation_id": conversation_id}, - {"_id": 1}, - session=session, - ) - ] - await task_collection.delete_many({"conversation_id": conversation_id}, session=session) - await flow_context_collection.delete_many({"task_id": {"$in": task_ids}}, session=session) + # 删除Task + task_ids = [] + async with postgres.session() as session: + task = list((await session.scalars( + select(Task).where(Task.conversationId == conversation_id), + )).all()) + for item in task: + task_ids.append(item.id) + await session.delete(item) + + # 删除Task对应的State + await session.execute( + delete(ExecutorCheckpoint).where(ExecutorCheckpoint.taskId.in_(task_ids)), + ) + await session.execute( + delete(ExecutorHistory).where(ExecutorHistory.taskId.in_(task_ids)), + ) + await session.commit() @classmethod -- Gitee