diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index 0ec4db9155a06fe16cbeddff223ccd9a764d8b3d..df540eaf2a2945358f81199d42f2a5f9c7bf5ee9 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -9,7 +9,7 @@ from fastapi.responses import JSONResponse from apps.dependency.user import get_user, verify_user from apps.exceptions import InstancePermissionError -from apps.schemas.appcenter import AppFlowInfo, AppPermissionData +from apps.schemas.appcenter import AppFlowInfo, AppMcpServiceInfo, AppPermissionData from apps.schemas.enum_var import AppFilterType, AppType from apps.schemas.request_data import CreateAppRequest, ModFavAppRequest from apps.schemas.response_data import ( @@ -25,7 +25,7 @@ from apps.schemas.response_data import ( ResponseData, ) from apps.services.appcenter import AppCenterManager - +from apps.services.mcp_service import MCPServiceManager logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/app", @@ -214,6 +214,15 @@ async def get_application( ) for flow in app_data.flows ] + mcp_service = [] + if app_data.mcp_service: + for service in app_data.mcp_service: + mcp_collection = await MCPServiceManager.get_mcp_service(service) + mcp_service.append(AppMcpServiceInfo( + id=mcp_collection.id, + name=mcp_collection.name, + description=mcp_collection.description, + )) return JSONResponse( status_code=status.HTTP_200_OK, content=GetAppPropertyRsp( @@ -234,7 +243,7 @@ async def get_application( authorizedUsers=app_data.permission.users, ), workflows=workflows, - mcpService=app_data.mcp_service, + mcpService=mcp_service, ), ).model_dump(exclude_none=True, by_alias=True), ) diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 06bc2dd785008f0c78c805e5118684c17339ca46..5e03547587cd86e5671cb09014449c882b15587b 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -34,14 +34,14 @@ router = APIRouter( ) -async def init_task(post_body: RequestData, user_sub: str) -> Task: +async def init_task(post_body: RequestData, user_sub: str, session_id: str) -> Task: """初始化Task""" # 生成group_id if not post_body.group_id: post_body.group_id = str(uuid.uuid4()) # 更改信息并刷新数据库 - if post_body.new_task: + if post_body.task_id is None: conversation = await ConversationManager.get_conversation_by_conversation_id( user_sub=user_sub, conversation_id=post_body.conversation_id, @@ -51,7 +51,9 @@ async def init_task(post_body: RequestData, user_sub: str) -> Task: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=err) task_ids = await TaskManager.delete_tasks_by_conversation_id(post_body.conversation_id) await RecordManager.update_record_flow_status_to_cancelled_by_task_ids(task_ids) - task = await TaskManager.init_new_task(user_sub=user_sub, conversation_id=post_body.conversation_id, post_body=post_body) + task = await TaskManager.init_new_task(user_sub=user_sub, session_id=session_id, post_body=post_body) + task.runtime.question = post_body.question + task.ids.group_id = post_body.group_id else: if not post_body.task_id: err = "[Chat] task_id 不可为空!" @@ -60,7 +62,7 @@ async def init_task(post_body: RequestData, user_sub: str) -> Task: return task -async def chat_generator(post_body: RequestData, user_sub: str) -> AsyncGenerator[str, None]: +async def chat_generator(post_body: RequestData, user_sub: str, session_id: str) -> AsyncGenerator[str, None]: """进行实际问答,并从MQ中获取消息""" try: await Activity.set_active(user_sub) @@ -72,7 +74,7 @@ async def chat_generator(post_body: RequestData, user_sub: str) -> AsyncGenerato await Activity.remove_active(user_sub) return - task = await init_task(post_body, user_sub) + task = await init_task(post_body, user_sub, session_id) # 创建queue;由Scheduler进行关闭 queue = MessageQueue() @@ -80,6 +82,7 @@ async def chat_generator(post_body: RequestData, user_sub: str) -> AsyncGenerato # 在单独Task中运行Scheduler,拉齐queue.get的时机 scheduler = Scheduler(task, queue, post_body) + logger.info(f"[Chat] 用户是否活跃: {await Activity.is_active(user_sub)}") scheduler_task = asyncio.create_task(scheduler.run()) # 处理每一条消息 @@ -130,6 +133,7 @@ async def chat_generator(post_body: RequestData, user_sub: str) -> AsyncGenerato async def chat( post_body: RequestData, user_sub: Annotated[str, Depends(get_user)], + session_id: Annotated[str, Depends(get_session)], ) -> StreamingResponse: """LLM流式对话接口""" # 问题黑名单检测 @@ -142,7 +146,7 @@ async def chat( if await Activity.is_active(user_sub): raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests") - res = chat_generator(post_body, user_sub) + res = chat_generator(post_body, user_sub, session_id) return StreamingResponse( content=res, media_type="text/event-stream", diff --git a/apps/routers/mcp_service.py b/apps/routers/mcp_service.py index de484e78b2677d18a6e339e457e81113002492bf..82fa72de1355fb1b61b258350403f55aef69a809 100644 --- a/apps/routers/mcp_service.py +++ b/apps/routers/mcp_service.py @@ -53,6 +53,7 @@ async def get_mcpservice_list( ] = SearchType.ALL, keyword: Annotated[str | None, Query(..., alias="keyword", description="搜索关键字")] = None, page: Annotated[int, Query(..., alias="page", ge=1, description="页码")] = 1, + is_active: Annotated[bool | None, Query(..., alias="isActive", description="是否激活")] = None, ) -> JSONResponse: """获取服务列表""" try: @@ -61,6 +62,7 @@ async def get_mcpservice_list( user_sub, keyword, page, + is_active ) except Exception as e: err = f"[MCPServiceCenter] 获取MCP服务列表失败: {e}" diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 4db38587cecfd96fb909fa69f55371c1f39b3cb5..fc799fa16d15af80c3f1aa7e4c375b1add0dc1f7 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -28,7 +28,6 @@ from apps.schemas.message import param from apps.services.task import TaskManager from apps.services.appcenter import AppCenterManager from apps.services.mcp_service import MCPServiceManager -from apps.services.task import TaskManager from apps.services.user import UserManager logger = logging.getLogger(__name__) @@ -55,6 +54,12 @@ class MCPAgentExecutor(BaseExecutor): description="推理大模型", ) + async def update_tokens(self) -> None: + """更新令牌数""" + self.task.tokens.input_tokens = self.resoning_llm.input_tokens + self.task.tokens.output_tokens = self.resoning_llm.output_tokens + await TaskManager.save_task(self.task.id, self.task) + async def load_state(self) -> None: """从数据库中加载FlowExecutor的状态""" logger.info("[FlowExecutor] 加载Executor状态") @@ -108,8 +113,8 @@ class MCPAgentExecutor(BaseExecutor): max_steps=self.max_steps-start_index-1, reasoning_llm=self.resoning_llm ) - self.msg_queue.push_output( - self.task, + self.update_tokens() + self.push_message( EventType.STEP_CANCEL, data={} ) @@ -123,7 +128,7 @@ class MCPAgentExecutor(BaseExecutor): for i in range(start_index, len(self.task.runtime.temporary_plans.plans)): self.task.runtime.temporary_plans.plans[i].step_id = str(uuid.uuid4()) - async def get_tool_input_param(self, is_first: bool) -> dict[str, Any]: + async def get_tool_input_param(self, is_first: bool) -> None: if is_first: # 获取第一个输入参数 self.task.state.current_input = await MCPHost._get_first_input_params(self.tools[self.task.state.step_id], self.task.runtime.question, self.task) @@ -156,9 +161,10 @@ class MCPAgentExecutor(BaseExecutor): logger.info("[MCPAgentExecutor] 等待用户确认步骤 %d", self.task.state.step_index) # 发送确认消息 confirm_message = await MCPPlanner.get_tool_risk(self.tools[self.task.state.step_id], self.task.state.current_input, "", self.resoning_llm) - self.msg_queue.push_output(self.task, EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( + self.update_tokens() + self.push_message(EventType.STEP_WAITING_FOR_START, confirm_message.model_dump( exclude_none=True, by_alias=True)) - self.msg_queue.push_output(self.task, EventType.FLOW_STOP, {}) + self.push_message(EventType.FLOW_STOP, {}) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.WAITING self.task.context.append( @@ -188,16 +194,15 @@ class MCPAgentExecutor(BaseExecutor): logger.error("[MCPAgentExecutor] MCP客户端未找到: %s", mcp_tool.mcp_id) self.task.state.flow_status = FlowStatus.ERROR error = "[MCPAgentExecutor] MCP客户端未找到: {}".format(mcp_tool.mcp_id) - raise Exception(error) + self.task.state.error_message = error try: output_params = await mcp_client.call_tool(mcp_tool.name, self.task.state.current_input) - self.msg_queue.push_output( - self.task, + self.update_tokens() + self.push_message( EventType.STEP_INPUT, self.task.state.current_input ) - self.msg_queue.push_output( - self.task, + self.push_message( EventType.STEP_OUTPUT, output_params ) @@ -216,7 +221,7 @@ class MCPAgentExecutor(BaseExecutor): ) self.task.state.step_status = StepStatus.SUCCESS except Exception as e: - logging.warning("[MCPAgentExecutor] 执行步骤 %s 失败: %s", mcp_tool.name, str(e)) + logger.warning("[MCPAgentExecutor] 执行步骤 %s 失败: %s", mcp_tool.name, str(e)) import traceback self.task.state.error_message = traceback.format_exc() self.task.state.step_status = StepStatus.ERROR @@ -230,14 +235,18 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.error_message, self.resoning_llm ) - self.msg_queue.push_output( - self.task, + self.update_tokens() + self.push_message( EventType.STEP_WAITING_FOR_PARAM, data={ "message": "当运行产生如下报错:\n" + self.task.state.error_message, "params": params_with_null } ) + self.push_message( + EventType.FLOW_STOP, + data={} + ) self.task.state.flow_status = FlowStatus.WAITING self.task.state.step_status = StepStatus.PARAM self.task.context.append( @@ -265,8 +274,7 @@ class MCPAgentExecutor(BaseExecutor): # 最后一步 self.task.state.flow_status = FlowStatus.SUCCESS self.task.state.step_status = StepStatus.SUCCESS - self.msg_queue.push_output( - self.task, + self.push_message( EventType.FLOW_SUCCESS, data={} ) @@ -276,8 +284,7 @@ class MCPAgentExecutor(BaseExecutor): self.task.state.step_description = self.task.runtime.temporary_plans.plans[self.task.state.step_index].content self.task.state.step_status = StepStatus.INIT self.task.state.current_input = {} - self.msg_queue.push_output( - self.task, + self.push_message( EventType.STEP_INIT, data={} ) @@ -285,8 +292,7 @@ class MCPAgentExecutor(BaseExecutor): # 没有下一步了,结束流程 self.task.state.flow_status = FlowStatus.SUCCESS self.task.state.step_status = StepStatus.SUCCESS - self.msg_queue.push_output( - self.task, + self.push_message( EventType.FLOW_SUCCESS, data={} ) @@ -296,8 +302,7 @@ class MCPAgentExecutor(BaseExecutor): """步骤执行失败后的错误处理""" self.task.state.step_status = StepStatus.ERROR self.task.state.flow_status = FlowStatus.ERROR - self.msg_queue.push_output( - self.task, + self.push_message( EventType.FLOW_FAILED, data={} ) @@ -320,13 +325,13 @@ class MCPAgentExecutor(BaseExecutor): async def work(self) -> None: """执行当前步骤""" if self.task.state.step_status == StepStatus.INIT: - self.get_tool_input_param(is_first=True) + await self.get_tool_input_param(is_first=True) user_info = await UserManager.get_userinfo_by_user_sub(self.task.ids.user_sub) if not user_info.auto_execute: # 等待用户确认 await self.confirm_before_step() return - self.step.state.step_status = StepStatus.RUNNING + self.task.state.step_status = StepStatus.RUNNING elif self.task.state.step_status in [StepStatus.PARAM, StepStatus.WAITING, StepStatus.RUNNING]: if self.task.context[-1].step_status == StepStatus.PARAM: if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: @@ -338,24 +343,22 @@ class MCPAgentExecutor(BaseExecutor): else: self.task.state.flow_status = FlowStatus.CANCELLED self.task.state.step_status = StepStatus.CANCELLED - self.msg_queue.push_output( - self.task, + self.push_message( EventType.STEP_CANCEL, data={} ) - self.msg_queue.push_output( - self.task, + self.push_message( EventType.FLOW_CANCEL, data={} ) if len(self.task.context) and self.task.context[-1].step_id == self.task.state.step_id: self.task.context[-1].step_status = StepStatus.CANCELLED if self.task.state.step_status == StepStatus.PARAM: - self.get_tool_input_param(is_first=False) - max_retry = 5 + await self.get_tool_input_param(is_first=False) + max_retry = 5 for i in range(max_retry): if i != 0: - self.get_tool_input_param(is_first=False) + await self.get_tool_input_param(is_first=False) await self.run_step() if self.task.state.step_status == StepStatus.SUCCESS: break @@ -389,8 +392,7 @@ class MCPAgentExecutor(BaseExecutor): (await MCPHost.assemble_memory(self.task)), self.resoning_llm ): - self.msg_queue.push_output( - self.task, + self.push_message( EventType.TEXT_ADD, data=chunk ) @@ -405,32 +407,29 @@ class MCPAgentExecutor(BaseExecutor): # 初始化状态 self.task.state.flow_id = str(uuid.uuid4()) self.task.state.flow_name = await MCPPlanner.get_flow_name(self.task.runtime.question, self.resoning_llm) - self.task.runtime.temporary_plans = await self.plan(is_replan=False) + await self.plan(is_replan=False) self.reset_step_to_index(0) TaskManager.save_task(self.task.id, self.task) self.task.state.flow_status = FlowStatus.RUNNING - self.msg_queue.push_output( - self.task, + self.push_message( EventType.FLOW_START, data={} ) try: while self.task.state.step_index < len(self.task.runtime.temporary_plans) and \ self.task.state.flow_status == FlowStatus.RUNNING: - self.work() + await self.work() TaskManager.save_task(self.task.id, self.task) except Exception as e: logger.error("[MCPAgentExecutor] 执行过程中发生错误: %s", str(e)) self.task.state.flow_status = FlowStatus.ERROR self.task.state.error_message = str(e) self.task.state.step_status = StepStatus.ERROR - self.msg_queue.push_output( - self.task, + self.push_message( EventType.STEP_ERROR, data={} ) - self.msg_queue.push_output( - self.task, + self.push_message( EventType.FLOW_FAILED, data={} ) diff --git a/apps/scheduler/executor/base.py b/apps/scheduler/executor/base.py index cf2f4e6838f5d8d5b6578db2750543ddfd22ee75..8dcb99c7125e7d1ccc7debde39e4d9963724985b 100644 --- a/apps/scheduler/executor/base.py +++ b/apps/scheduler/executor/base.py @@ -49,10 +49,8 @@ class BaseExecutor(BaseModel, ABC): question=self.question, params=self.task.runtime.filled, ).model_dump(exclude_none=True, by_alias=True) - elif event_type == EventType.FLOW_STOP.value: - data = {} elif event_type == EventType.TEXT_ADD.value and isinstance(data, str): - data=TextAddContent(text=data).model_dump(exclude_none=True, by_alias=True) + data = TextAddContent(text=data).model_dump(exclude_none=True, by_alias=True) if data is None: data = {} @@ -62,7 +60,7 @@ class BaseExecutor(BaseModel, ABC): await self.msg_queue.push_output( self.task, event_type=event_type, - data=data, # type: ignore[arg-type] + data=data, # type: ignore[arg-type] ) @abstractmethod diff --git a/apps/scheduler/scheduler/context.py b/apps/scheduler/scheduler/context.py index 3b26f42f2995db9911a61d85e140f4ec2da2d5e1..dc35d4bd05d564bc96f50aaca537ded94caefae7 100644 --- a/apps/scheduler/scheduler/context.py +++ b/apps/scheduler/scheduler/context.py @@ -158,7 +158,6 @@ async def save_data(task: Task, user_sub: str, post_body: RequestData) -> None: facts=task.runtime.facts, data={}, ) - try: # 加密Record数据 encrypt_data, encrypt_config = Security.encrypt(record_content.model_dump_json(by_alias=True)) diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index b81448475c690f2772442f808ff629b139b6039f..13eb7ee0b0d81a7a003a0b8fdb913e70f860739d 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -56,7 +56,6 @@ class Scheduler: while not kill_event.is_set(): # 检查用户活动状态 is_active = await Activity.is_active(user_sub) - if not is_active: logger.warning("[Scheduler] 用户 %s 不活跃,终止工作流", user_sub) kill_event.set() @@ -78,8 +77,7 @@ class Scheduler: ) if not llm_id: logger.error("[Scheduler] 获取大模型ID失败") - await self.queue.close() - return + return None if llm_id == "empty": llm = LLM( _id="empty", @@ -89,16 +87,16 @@ class Scheduler: model_name=Config().get_config().llm.model, max_tokens=Config().get_config().llm.max_tokens, ) + return llm else: llm = await LLMManager.get_llm_by_id(self.task.ids.user_sub, llm_id) if not llm: logger.error("[Scheduler] 获取大模型失败") - await self.queue.close() - return + return None + return llm except Exception: logger.exception("[Scheduler] 获取大模型失败") - await self.queue.close() - return + return None async def get_kb_ids_use_in_chat_with_rag(self) -> list[str]: """获取知识库ID列表""" @@ -106,10 +104,6 @@ class Scheduler: kb_ids = await KnowledgeBaseManager.get_kb_ids_by_conversation_id( self.task.ids.user_sub, self.task.ids.conversation_id, ) - if not kb_ids: - logger.error("[Scheduler] 获取知识库ID失败") - await self.queue.close() - return [] return kb_ids except Exception: logger.exception("[Scheduler] 获取知识库ID失败") @@ -135,10 +129,6 @@ class Scheduler: if not self.post_body.app or self.post_body.app.app_id == "": llm = await self.get_llm_use_in_chat_with_rag() kb_ids = await self.get_kb_ids_use_in_chat_with_rag() - if not llm: - logger.error("[Scheduler] 获取大模型失败") - await self.queue.close() - return self.task = await push_init_message(self.task, self.queue, 3, is_flow=False) rag_data = RAGQueryReq( kbIds=kb_ids, diff --git a/apps/schemas/appcenter.py b/apps/schemas/appcenter.py index a89f39df18083d90b988cc764c0e88f90500d1f3..a65fffb2db9d74e33145c8faec6995f578ed5e94 100644 --- a/apps/schemas/appcenter.py +++ b/apps/schemas/appcenter.py @@ -50,6 +50,14 @@ class AppFlowInfo(BaseModel): debug: bool = Field(..., description="是否经过调试") +class AppMcpServiceInfo(BaseModel): + """应用关联的MCP服务信息""" + + id: str = Field(..., description="MCP服务ID") + name: str = Field(..., description="MCP服务名称") + description: str = Field(..., description="MCP服务简介") + + class AppData(BaseModel): """应用信息数据结构""" @@ -64,4 +72,4 @@ class AppData(BaseModel): permission: AppPermissionData = Field( default_factory=lambda: AppPermissionData(authorizedUsers=None), description="权限配置") workflows: list[AppFlowInfo] = Field(default=[], description="工作流信息列表") - mcp_service: list[str] = Field(default=[], alias="mcpService", description="MCP服务id列表") + mcp_service: list[AppMcpServiceInfo] = Field(default=[], alias="mcpService", description="MCP服务id列表") diff --git a/apps/schemas/record.py b/apps/schemas/record.py index 6a394375b02fc340119af397739f16c932bc0cbd..dbc06b106994d5889386d6bff92a0f88b40e380f 100644 --- a/apps/schemas/record.py +++ b/apps/schemas/record.py @@ -94,7 +94,7 @@ class RecordData(BaseModel): id: str group_id: str = Field(alias="groupId") conversation_id: str = Field(alias="conversationId") - task_id: str = Field(alias="taskId") + task_id: str | None = Field(default=None, alias="taskId") document: list[RecordDocument] = [] flow: RecordFlow | None = None content: RecordContent @@ -130,8 +130,8 @@ class Record(RecordData): user_sub: str key: dict[str, Any] = {} - task_id: str - content: str + task_id: str | None = Field(default=None, description="任务ID") + content: str = Field(default="", description="Record内容,已加密") comment: RecordComment = Field(default=RecordComment()) flow: FlowHistory = Field( default=FlowHistory(), description="Flow执行历史信息") diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py index 8719c2e985990b4f7e0f122353fb6097a359ff8a..8d053e1cc527e80874e924908570c12809ebc16f 100644 --- a/apps/schemas/request_data.py +++ b/apps/schemas/request_data.py @@ -48,7 +48,6 @@ class RequestData(BaseModel): app: RequestDataApp | None = Field(default=None, description="应用") debug: bool = Field(default=False, description="是否调试") task_id: str | None = Field(default=None, alias="taskId", description="任务ID") - new_task: bool = Field(default=True, description="是否新建任务") class QuestionBlacklistRequest(BaseModel): diff --git a/apps/services/activity.py b/apps/services/activity.py index 299a49a640664b29a3e4724c5842f12bd289e3bf..88142b9ee6772319c0eaf32dfdd310e968e32adc 100644 --- a/apps/services/activity.py +++ b/apps/services/activity.py @@ -3,11 +3,13 @@ import uuid from datetime import UTC, datetime - +import logging from apps.common.mongo import MongoDB from apps.constants import SLIDE_WINDOW_QUESTION_COUNT, SLIDE_WINDOW_TIME from apps.exceptions import ActivityError +logger = logging.getLogger(__name__) + class Activity: """用户活动控制,限制单用户同一时间只能提问一个问题""" diff --git a/apps/services/mcp_service.py b/apps/services/mcp_service.py index 2c84a2114f2ef6d0a1f48c66aa60f67eea676977..ba510350fc4103187e0c13aa170a4868631506a3 100644 --- a/apps/services/mcp_service.py +++ b/apps/services/mcp_service.py @@ -78,6 +78,7 @@ class MCPServiceManager: user_sub: str, keyword: str | None, page: int, + is_active: bool | None = None, ) -> list[MCPServiceCardItem]: """ 获取所有MCP服务列表 @@ -89,6 +90,11 @@ class MCPServiceManager: :return: MCP服务列表 """ filters = MCPServiceManager._build_filters(search_type, keyword) + if is_active is not None: + if is_active: + filters["activated"] = {"$in": [user_sub]} + else: + filters["activated"] = {"$nin": [user_sub]} mcpservice_pools = await MCPServiceManager._search_mcpservice(filters, page) return [ MCPServiceCardItem( diff --git a/apps/services/record.py b/apps/services/record.py index 6b61f91ecd841a9c585ada2913807f7f3c7dfe18..cf8373b042ab409662dcf1d2e08ebc8ac3666d1b 100644 --- a/apps/services/record.py +++ b/apps/services/record.py @@ -142,7 +142,7 @@ class RecordManager: record_group_collection = MongoDB().get_collection("record_group") try: await record_group_collection.update_many( - {"records.flow.flow_id": {"$in": task_ids}, "records.flow.flow_status": {"$nin": [FlowStatus.ERROR.value, FlowStatus.SUCCESS.value]}}, + {"records.task_id": {"$in": task_ids}, "records.flow.flow_status": {"$nin": [FlowStatus.ERROR.value, FlowStatus.SUCCESS.value]}}, {"$set": {"records.$[elem].flow.flow_status": FlowStatus.CANCELLED}}, array_filters=[{"elem.flow.flow_id": {"$in": task_ids}}], ) diff --git a/apps/services/task.py b/apps/services/task.py index 2f75a8c3a034cb94e051efd1ee00496f9ab0351d..eec4e197afeb8b82c1ad90effe7be8ec961d7039 100644 --- a/apps/services/task.py +++ b/apps/services/task.py @@ -115,7 +115,6 @@ class TaskManager: @staticmethod async def init_new_task( - cls, user_sub: str, session_id: str | None = None, post_body: RequestData | None = None, @@ -180,6 +179,7 @@ class TaskManager: task_ids.append(task["_id"]) if task_ids: await task_collection.delete_many({"conversation_id": conversation_id}) + return task_ids except Exception: logger.exception("[TaskManager] 删除ConversationID的Task信息失败") return []