diff --git a/apps/scheduler/call/suggest/suggest.py b/apps/scheduler/call/suggest/suggest.py index 9b1b5624a3572985641ad172c8e99e3c5f6e7f61..7921c7785c06a07748040468ceb55dfb86176a15 100644 --- a/apps/scheduler/call/suggest/suggest.py +++ b/apps/scheduler/call/suggest/suggest.py @@ -11,18 +11,15 @@ from jinja2.sandbox import SandboxedEnvironment from pydantic import Field from pydantic.json_schema import SkipJsonSchema -from apps.common.security import Security from apps.models import LanguageType, NodeInfo from apps.scheduler.call.core import CoreCall from apps.schemas.enum_var import CallOutputType -from apps.schemas.record import RecordContent from apps.schemas.scheduler import ( CallError, CallInfo, CallOutputChunk, CallVars, ) -from apps.services.record import RecordManager from apps.services.user_tag import UserTagManager from .prompt import SUGGEST_PROMPT @@ -77,13 +74,8 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO async def _init(self, call_vars: CallVars) -> SuggestionInput: """初始化""" - if self.conversation_id is None: - self._history_questions = [] - else: - self._history_questions = await self._get_history_questions( - call_vars.ids.user_id, - self.conversation_id, - ) + # 从 ExecutorBackground 中获取历史问题 + self._history_questions = call_vars.background.history_questions self._app_id = call_vars.ids.app_id self._flow_id = call_vars.ids.executor_id self._env = SandboxedEnvironment( @@ -111,22 +103,6 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO history_questions=self._history_questions, ) - - async def _get_history_questions(self, user_id: str, conversation_id: uuid.UUID) -> list[str]: - """获取当前对话的历史问题""" - records = await RecordManager.query_record_by_conversation_id( - user_id, - conversation_id, - 15, - ) - - history_questions = [] - for record in records: - record_data = RecordContent.model_validate_json(Security.decrypt(record.content, record.key)) - history_questions.append(record_data.question) - return history_questions - - async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]: """运行问题推荐""" data = SuggestionInput(**input_data) diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index e4aba23f654e1576b00f56cbaed0d0539de4751d..8e1d6012a77008ea9cf9bf8dd8a8952014540659 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -33,10 +33,6 @@ class MCPAgentExecutor(BaseExecutor): agent_id: uuid.UUID = Field(default=uuid.uuid4(), description="App ID作为Agent ID") agent_description: str = Field(default="", description="Agent描述") - tools: dict[str, MCPTools] = Field( - description="MCP工具列表,key为tool_id", - default={}, - ) params: FlowParams | bool | None = Field( default=None, description="流执行过程中的参数补充", @@ -51,6 +47,7 @@ class MCPAgentExecutor(BaseExecutor): self._mcp_list = [] self._current_input = {} self._current_tool = None + self._tool_list = {} # 初始化MCP Host相关对象 self._planner = MCPPlanner(self.task, self.llm) self._host = MCPHost(self.task, self.llm) @@ -88,9 +85,9 @@ class MCPAgentExecutor(BaseExecutor): self._mcp_list.append(mcp_service) for tool in await MCPServiceManager.get_mcp_tools(mcp_id): - self.tools[tool.id] = tool + self._tool_list[tool.id] = tool - self.tools[AGENT_FINAL_STEP_NAME] = MCPTools( + self._tool_list[AGENT_FINAL_STEP_NAME] = MCPTools( mcpId="", toolName=AGENT_FINAL_STEP_NAME, description="结束流程的工具", inputSchema={}, outputSchema={}, ) @@ -109,7 +106,7 @@ class MCPAgentExecutor(BaseExecutor): if is_first: # 获取第一个输入参数 - self._current_tool = self.tools[state.stepName] + self._current_tool = self._tool_list[state.stepName] self._current_input = await self._host.get_first_input_params( self._current_tool, self.task.runtime.userInput, self.task, ) @@ -121,7 +118,7 @@ class MCPAgentExecutor(BaseExecutor): else: params = {} params_description = "" - self._current_tool = self.tools[state.stepName] + self._current_tool = self._tool_list[state.stepName] state.currentInput = await self._host.fill_params( self._current_tool, self.task.runtime.userInput, @@ -215,7 +212,7 @@ class MCPAgentExecutor(BaseExecutor): self.task.state = state await self._push_message( - EventType.STEP_END, + EventType.STEP_OUTPUT, data=error_output, ) @@ -385,12 +382,12 @@ class MCPAgentExecutor(BaseExecutor): step = None for _ in range(max_retry): try: - step = await self._planner.create_next_step(history, self.tool_list) - if step.tool_id in self.tools: + step = await self._planner.create_next_step(history, list(self._tool_list.values())) + if step.tool_id in self._tool_list: break except Exception: _logger.exception("[MCPAgentExecutor] 获取下一步失败,重试中...") - if step is None or step.tool_id not in self.tools: + if step is None or step.tool_id not in self._tool_list: step = Step( tool_id=AGENT_FINAL_STEP_NAME, description=AGENT_FINAL_STEP_NAME, @@ -407,11 +404,12 @@ class MCPAgentExecutor(BaseExecutor): """步骤执行失败后的错误处理""" self._validate_task_state() state = cast("ExecutorCheckpoint", self.task.state) + error_output = {"message": state.errorMessage} state.stepStatus = StepStatus.ERROR state.executorStatus = ExecutorStatus.ERROR self.task.state = state - await self._push_message(EventType.STEP_END, data={}) + await self._push_message(EventType.STEP_OUTPUT, data=error_output) await self._push_message(EventType.EXECUTOR_STOP, data={}) await self._add_error_to_context(state.stepStatus) @@ -440,7 +438,7 @@ class MCPAgentExecutor(BaseExecutor): state.stepStatus = StepStatus.CANCELLED self.task.state = state - await self._push_message(EventType.STEP_END, data={}) + await self._push_message(EventType.STEP_OUTPUT, data={}) await self._push_message(EventType.EXECUTOR_STOP, data={}) self._update_last_context_status(StepStatus.CANCELLED) return @@ -458,7 +456,7 @@ class MCPAgentExecutor(BaseExecutor): elif self._user.autoExecute: await self._handle_step_error_and_continue() else: - mcp_tool = self.tools[state.stepName] + mcp_tool = self._tool_list[state.stepName] error_msg = self._get_error_message_str(state.errorMessage) is_param_error = await self._planner.is_param_error( await self._host.assemble_memory(self.task.runtime, self.task.context), @@ -477,14 +475,46 @@ class MCPAgentExecutor(BaseExecutor): async def summarize(self) -> None: """总结""" + thinking_started = False async for chunk in self._planner.generate_answer( await self._host.assemble_memory(self.task.runtime, self.task.context), ): + if chunk.reasoning_content: + if not thinking_started: + await self._push_message( + EventType.TEXT_ADD, + data="", + ) + self.task.runtime.fullAnswer += "" + thinking_started = True + + await self._push_message( + EventType.TEXT_ADD, + data=chunk.reasoning_content, + ) + self.task.runtime.fullAnswer += chunk.reasoning_content + + if chunk.content: + if thinking_started: + await self._push_message( + EventType.TEXT_ADD, + data="", + ) + self.task.runtime.fullAnswer += "" + thinking_started = False + + await self._push_message( + EventType.TEXT_ADD, + data=chunk.content, + ) + self.task.runtime.fullAnswer += chunk.content + + if thinking_started: await self._push_message( EventType.TEXT_ADD, - data=chunk, + data="", ) - self.task.runtime.fullAnswer += chunk + self.task.runtime.fullAnswer += "" async def run(self) -> None: """执行MCP Agent的主逻辑""" @@ -493,14 +523,10 @@ class MCPAgentExecutor(BaseExecutor): # 初始化MCP服务 await self.load_mcp() - data = {} if state.executorStatus == ExecutorStatus.INIT: # 初始化状态 state.executorId = str(uuid.uuid4()) state.executorName = (await self._planner.get_flow_name()).flow_name - flow_risk = await self._planner.get_flow_excute_risk(self.tool_list) - if self._user.autoExecute: - data = flow_risk.model_dump(exclude_none=True, by_alias=True) await self.get_next_step() self.task.state = state @@ -528,8 +554,9 @@ class MCPAgentExecutor(BaseExecutor): } state.stepStatus = StepStatus.ERROR self.task.state = state + error_output = {"message": state.errorMessage} - await self._push_message(EventType.STEP_END, data={}) + await self._push_message(EventType.STEP_OUTPUT, data=error_output) await self._push_message(EventType.EXECUTOR_STOP, data={}) await self._add_error_to_context(state.stepStatus) finally: diff --git a/apps/scheduler/executor/base.py b/apps/scheduler/executor/base.py index dae1249775eff1ce03d50cba70e1c3932173ec0a..eb5c594d911dbc60602121fb019d985b0cd4c356 100644 --- a/apps/scheduler/executor/base.py +++ b/apps/scheduler/executor/base.py @@ -46,26 +46,35 @@ class BaseExecutor(BaseModel, ABC): self.background = ExecutorBackground( conversation=[], facts=[], + history_questions=[], num=n, ) return - # 获取最后n+5条Record + # 获取最后n+10条Record records = await RecordManager.query_record_by_conversation_id( - self.task.metadata.userId, self.task.metadata.conversationId, n + 5, + self.task.metadata.userId, self.task.metadata.conversationId, n + 10, ) - # 组装问答 + # 组装问答、事实和历史问题 context = [] facts = [] - for record in records: + history_questions = [] + for i, record in enumerate(records): record_data = RecordContent.model_validate_json(Security.decrypt(record.content, record.key)) - context.append({ - "question": record_data.question, - "answer": record_data.answer, - }) - facts.extend(record_data.facts) + # context 取最后 n 组 + if i >= len(records) - n: + context.append({ + "question": record_data.question, + "answer": record_data.answer, + }) + # facts 取最后 n+5 组 + if i >= len(records) - (n + 5): + facts.extend(record_data.facts) + # history_questions 取全部(n+10组) + history_questions.append(record_data.question) self.background = ExecutorBackground( conversation=context, facts=facts, + history_questions=history_questions, num=n, ) diff --git a/apps/scheduler/executor/qa.py b/apps/scheduler/executor/qa.py index 3a8f315cf0cbc60ef4aaa73833498a5ad768f63c..bb6228188e8fee780ff471b4a3a15e8c596a6d1e 100644 --- a/apps/scheduler/executor/qa.py +++ b/apps/scheduler/executor/qa.py @@ -9,7 +9,7 @@ from apps.models import ExecutorCheckpoint, ExecutorStatus, StepStatus from apps.models.task import LanguageType from apps.scheduler.call.rag.schema import DocItem, RAGOutput from apps.schemas.document import DocumentInfo -from apps.schemas.enum_var import EventType, SpecialCallType +from apps.schemas.enum_var import SpecialCallType from apps.schemas.flow import Step from apps.schemas.task import StepQueueItem @@ -83,7 +83,6 @@ class QAExecutor(BaseExecutor): async def init(self) -> None: """初始化QAExecutor""" await self._load_history() - # 初始化新State self.task.state = ExecutorCheckpoint( taskId=self.task.metadata.id, executorId=str(self.task.metadata.conversationId), @@ -105,13 +104,11 @@ class QAExecutor(BaseExecutor): doc_info_list = [] doc_cnt = 0 doc_id_map = {} - # 预留tokens _ = round(max_tokens * 0.8) for doc_chunk in doc_chunk_list: if doc_chunk.docId not in doc_id_map: doc_cnt += 1 - # 创建DocumentInfo对象 created_at_value = ( doc_chunk.docCreatedAt.timestamp() if isinstance(doc_chunk.docCreatedAt, datetime) @@ -132,8 +129,10 @@ class QAExecutor(BaseExecutor): return doc_info_list - async def run(self) -> None: - """运行QA""" + async def _execute_rag_step(self) -> bool: + """执行RAG检索步骤""" + _logger.info("[QAExecutor] 开始执行RAG检索步骤") + rag_exec = StepExecutor( msg_queue=self.msg_queue, task=self.task, @@ -150,19 +149,125 @@ class QAExecutor(BaseExecutor): await rag_exec.init() await rag_exec.run() + if self.task.state and self.task.state.stepStatus == StepStatus.ERROR: + _logger.error("[QAExecutor] RAG检索步骤失败") + return False + + rag_output_data = None + for history in self.task.context: + if history.stepId == rag_exec.step.step_id: + rag_output_data = history.outputData + break - # 解析并推送文档信息 - if first_chunk and isinstance(first_chunk.content, dict): - rag_output = RAGOutput.model_validate(first_chunk.content) + if rag_output_data and isinstance(rag_output_data, dict): + rag_output = RAGOutput.model_validate(rag_output_data) doc_chunk_list: list[DocItem] = [ DocItem.model_validate(item) if not isinstance(item, DocItem) else item for item in rag_output.corpus ] doc_info_list = await self._assemble_doc_info(doc_chunk_list, 8192) for doc_info in doc_info_list: - await self._push_rag_doc(doc_info) + await self._push_message( + "document.add", + doc_info.model_dump(by_alias=True, exclude_none=True), + ) + _logger.info("[QAExecutor] RAG检索步骤成功,已推送%d个文档", len(doc_info_list)) + else: + _logger.warning("[QAExecutor] RAG检索步骤未返回有效数据") + + return True + + async def _execute_llm_step(self) -> bool: + """执行LLM问答步骤""" + _logger.info("[QAExecutor] 开始执行LLM问答步骤") + + llm_exec = StepExecutor( + msg_queue=self.msg_queue, + task=self.task, + step=StepQueueItem( + step_id=uuid.uuid4(), + step=_RAG_STEP_LIST[1][self.task.runtime.language], + enable_filling=False, + to_user=True, + ), + background=self.background, + question=self.question, + llm=self.llm, + ) + await llm_exec.init() + await llm_exec.run() + + if self.task.state and self.task.state.stepStatus == StepStatus.ERROR: + _logger.error("[QAExecutor] LLM问答步骤失败") + return False + + _logger.info("[QAExecutor] LLM问答步骤完成") + return True + + async def _execute_remaining_steps(self) -> bool: + """执行剩余步骤:问题推荐和记忆存储""" + _logger.info("[QAExecutor] 开始执行问题推荐步骤") + suggestion_exec = StepExecutor( + msg_queue=self.msg_queue, + task=self.task, + step=StepQueueItem( + step_id=uuid.uuid4(), + step=_RAG_STEP_LIST[2][self.task.runtime.language], + enable_filling=False, + to_user=True, + ), + background=self.background, + question=self.question, + llm=self.llm, + ) + await suggestion_exec.init() + await suggestion_exec.run() + + if self.task.state and self.task.state.stepStatus == StepStatus.ERROR: + _logger.error("[QAExecutor] 问题推荐步骤失败") + return False + + _logger.info("[QAExecutor] 开始执行记忆存储步骤") + facts_exec = StepExecutor( + msg_queue=self.msg_queue, + task=self.task, + step=StepQueueItem( + step_id=uuid.uuid4(), + step=_RAG_STEP_LIST[3][self.task.runtime.language], + enable_filling=False, + to_user=False, + ), + background=self.background, + question=self.question, + llm=self.llm, + ) + await facts_exec.init() + await facts_exec.run() + + if self.task.state and self.task.state.stepStatus == StepStatus.ERROR: + _logger.error("[QAExecutor] 记忆存储步骤失败") + return False + + return True + + async def run(self) -> None: + """运行QA""" + _logger.info("[QAExecutor] 开始运行QA流程") + + rag_success = await self._execute_rag_step() + if not rag_success: + _logger.error("[QAExecutor] RAG检索步骤失败,终止执行") + return + + llm_success = await self._execute_llm_step() + if not llm_success: + _logger.error("[QAExecutor] LLM问答步骤失败,终止执行") + return + + remaining_success = await self._execute_remaining_steps() + if not remaining_success: + _logger.error("[QAExecutor] 剩余步骤失败,终止执行") + return - # 保存答案 - full_answer = "" - self.task.runtime.fullAnswer = full_answer self.task.runtime.fullTime = round(datetime.now(UTC).timestamp(), 2) - self.task.runtime.time + _logger.info("[QAExecutor] QA流程完成") diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index 8140ec04b55f8ca5ce0271e38577f17520a2be8b..572e7cf2701bb758fae0bee997cb8feb5595fd49 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -14,7 +14,6 @@ from apps.scheduler.mcp_agent.prompt import ( CHANGE_ERROR_MESSAGE_TO_DESCRIPTION, FINAL_ANSWER, GEN_STEP, - GENERATE_FLOW_EXCUTE_RISK, GENERATE_FLOW_NAME, GET_MISSING_PARAMS, IS_PARAM_ERROR, @@ -24,7 +23,6 @@ from apps.scheduler.slot.slot import Slot from apps.schemas.llm import LLMChunk from apps.schemas.mcp import ( FlowName, - FlowRisk, IsParamError, Step, ToolRisk, @@ -85,25 +83,6 @@ class MCPPlanner(MCPBase): # 使用Step模型解析结果 return Step.model_validate(step) - async def get_flow_excute_risk(self, tools: list[MCPTools]) -> FlowRisk: - """获取当前流程的风险评估结果""" - template = _env.from_string(GENERATE_FLOW_EXCUTE_RISK[self._language]) - prompt = template.render( - goal=self._goal, - tools=tools, - ) - - # 组装OpenAI标准Function格式 - function = { - "name": "evaluate_flow_execution_risk", - "description": "Evaluate the potential risks and safety concerns of executing the entire workflow", - "parameters": FlowRisk.model_json_schema(), - } - - result = await self.get_json_result(prompt, function) - # 使用FlowRisk模型解析结果 - return FlowRisk.model_validate(result) - async def get_tool_risk( self, tool: MCPTools, @@ -206,7 +185,7 @@ class MCPPlanner(MCPBase): return await self.get_json_result(prompt, function) async def generate_answer(self, memory: str) -> AsyncGenerator[LLMChunk, None]: - """生成最终回答""" + """生成最终回答,返回LLMChunk""" template = _env.from_string(FINAL_ANSWER[self._language]) prompt = template.render( memory=memory, diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index eea822f827a27030d7e0edaafb4179d1cc55e671..07d924ecf7cbbded08a6dd0905ffa7bcf8681d4a 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -55,73 +55,6 @@ user's goal. ), } -GENERATE_FLOW_EXCUTE_RISK: dict[LanguageType, str] = { - LanguageType.CHINESE: dedent( - r""" - 你是一个智能助手,你的任务是根据用户的目标和当前的工具集合,评估当前流程的风险。 - - # 样例 - ## 目标 - 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优 - ## 工具集合 - 你可以访问并使用一些工具,这些工具将在 XML标签中给出。 - - - mysql_analyzer 分析MySQL数据库性能 - - performance_tuner 调优数据库性能 - - ## 输出 - { - "risk": "high", - "reason": "当前目标实现带来的风险较高,因为需要通过performance_tuner工具对MySQL数据库进行调优,而该\ -工具可能会对数据库的性能和稳定性产生较大的影响,因此风险评估为高。" - } - - # 现在开始评估当前流程的风险: - ## 目标 - {{goal}} - ## 工具集合 - - {% for tool in tools %} - - {{tool.id}} {{tool.name}};{{tool.description}} - {% endfor %} - - ## 输出 - """, - ), - LanguageType.ENGLISH: dedent( - r""" - You are an intelligent assistant, your task is to evaluate the risk of the current process based on the \ -user's goal and the current tool set. - # Example - # Goal - I need to scan the current MySQL database, analyze performance bottlenecks, and optimize it. - # Tool Set - You can access and use some tools, which will be given in the XML tag. - - - mysql_analyzer Analyze MySQL database performance - - performance_tuner Tune database performance - - # Output - { - "risk": "high", - "reason": "The risk brought by the realization of the current goal is relatively high, because it \ -is necessary to tune the MySQL database through the performance_tuner tool, which may have a greater impact on \ -the performance and stability of the database. Therefore, the risk assessment is high." - } - # Now start evaluating the risk of the current process: - # Goal - {{goal}} - # Tool Set - - {% for tool in tools %} - - {{tool.id}} {{tool.name}}; {{tool.description}} - {% endfor %} - - # Output - """, - ), -} - GEN_STEP: dict[LanguageType, str] = { LanguageType.CHINESE: dedent( r""" diff --git a/apps/scheduler/scheduler/conversation.py b/apps/scheduler/scheduler/conversation.py deleted file mode 100644 index 8885704f7d5fc79e389ede96e687143adca292b9..0000000000000000000000000000000000000000 --- a/apps/scheduler/scheduler/conversation.py +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""对话管理相关的Mixin类""" - -import logging -import uuid - -from apps.models import Conversation -from apps.services.appcenter import AppCenterManager -from apps.services.conversation import ConversationManager - -_logger = logging.getLogger(__name__) - - -class ConversationMixin: - """处理对话管理相关的逻辑""" - - async def create_new_conversation( - self, title: str, user_id: str, app_id: uuid.UUID | None = None, - *, - debug: bool = False, - ) -> Conversation: - """判断并创建新对话""" - if app_id and not await AppCenterManager.validate_user_app_access(user_id, app_id): - err = "Invalid app_id." - raise RuntimeError(err) - new_conv = await ConversationManager.add_conversation_by_user( - title=title, - user_id=user_id, - app_id=app_id, - debug=debug, - ) - if not new_conv: - err = "Create new conversation failed." - raise RuntimeError(err) - return new_conv diff --git a/apps/scheduler/scheduler/data.py b/apps/scheduler/scheduler/data.py index 008f5a23d3f846580f5404df7f3c71fc8ecdc9a6..1acfedf87a3711bf2279d3e5c8bc49c0a8990f39 100644 --- a/apps/scheduler/scheduler/data.py +++ b/apps/scheduler/scheduler/data.py @@ -23,38 +23,44 @@ class DataMixin: task: TaskData post_body: RequestData - async def _save_data(self) -> None: - """保存当前Executor、Task、Record等的数据""" - task = self.task - user_id = self.task.metadata.userId - post_body = self.post_body - + def _extract_used_documents(self) -> list[dict]: + """提取任务运行时使用的文档列表""" used_docs = [] - record_group = None # TODO: 需要从适当的地方获取record_group + task = self.task if hasattr(task.runtime, "documents") and task.runtime.documents: for docs in task.runtime.documents: doc_dict = docs if isinstance(docs, dict) else (docs.model_dump() if hasattr(docs, "model_dump") else docs) used_docs.append(doc_dict) - record_content = RecordContent( + return used_docs + + def _build_record_content(self) -> RecordContent: + """构建记录内容对象""" + task = self.task + + return RecordContent( question=task.runtime.question if hasattr(task.runtime, "question") else "", answer=task.runtime.answer if hasattr(task.runtime, "answer") else "", facts=task.runtime.facts if hasattr(task.runtime, "facts") else [], data={}, ) + def _encrypt_record_content(self, record_content: RecordContent) -> tuple[str, str] | None: + """加密记录内容""" try: encrypt_data, encrypt_config = Security.encrypt(record_content.model_dump_json(by_alias=True)) + return encrypt_data, encrypt_config except Exception: _logger.exception("[Scheduler] 问答对加密错误") - return + return None - if task.state: - await TaskManager.save_flow_context(task.context) + def _build_record(self, encrypt_data: str, encrypt_config: str, current_time: float) -> Record: + """构建记录对象""" + task = self.task + user_id = task.metadata.userId - current_time = round(datetime.now(UTC).timestamp(), 2) - record = Record( + return Record( id=task.metadata.id, conversationId=task.metadata.conversationId, taskId=task.metadata.id, @@ -62,9 +68,9 @@ class DataMixin: content=encrypt_data, key=encrypt_config, metadata=RecordMetadata( - timeCost=0, # TODO: 需要从task中获取时间成本 - inputTokens=0, # TODO: 需要从task中获取token信息 - outputTokens=0, # TODO: 需要从task中获取token信息 + timeCost=0, + inputTokens=0, + outputTokens=0, feature={}, ), createdAt=current_time, @@ -76,18 +82,37 @@ class DataMixin: ) if task.state else None, ) + async def _handle_document_management(self, record_group: str | None, used_docs: list[dict]) -> None: + """处理文档管理相关操作""" + user_id = self.task.metadata.userId + post_body = self.post_body + if record_group and post_body.conversation_id: await DocumentManager.change_doc_status(user_id, post_body.conversation_id, record_group) + if record_group and used_docs: + await DocumentManager.save_answer_doc(user_id, record_group, used_docs) + + async def _save_record_data(self, record: Record) -> None: + """保存记录数据""" + user_id = self.task.metadata.userId + post_body = self.post_body + if post_body.conversation_id: await RecordManager.insert_record_data(user_id, post_body.conversation_id, record) - if record_group and used_docs: - await DocumentManager.save_answer_doc(user_id, record_group, used_docs) + async def _update_app_center(self) -> None: + """更新应用中心最近使用的应用""" + user_id = self.task.metadata.userId + post_body = self.post_body if post_body.app and post_body.app.app_id: await AppCenterManager.update_recent_app(user_id, post_body.app.app_id) + async def _handle_task_state(self) -> None: + """处理任务状态管理""" + task = self.task + if not task.state or task.state.flow_status in [StepStatus.SUCCESS, StepStatus.ERROR, StepStatus.CANCELLED]: await TaskManager.delete_task_by_task_id(task.metadata.id) else: @@ -95,3 +120,30 @@ class DataMixin: await TaskManager.save_task_runtime(task.runtime) if task.state: await TaskManager.save_executor_checkpoint(task.state) + + async def _save_data(self) -> None: + """保存当前Executor、Task、Record等的数据""" + task = self.task + record_group = None + + used_docs = self._extract_used_documents() + + record_content = self._build_record_content() + encrypted_result = self._encrypt_record_content(record_content) + if encrypted_result is None: + return + encrypt_data, encrypt_config = encrypted_result + + if task.state: + await TaskManager.save_flow_context(task.context) + + current_time = round(datetime.now(UTC).timestamp(), 2) + record = self._build_record(encrypt_data, encrypt_config, current_time) + + await self._handle_document_management(record_group, used_docs) + + await self._save_record_data(record) + + await self._update_app_center() + + await self._handle_task_state() diff --git a/apps/scheduler/scheduler/executor.py b/apps/scheduler/scheduler/executor.py index 4eebcd37dfb168f06a92108459d8892e8b560bbe..2becbedc17082e7f66052ef876248064c570f9ac 100644 --- a/apps/scheduler/scheduler/executor.py +++ b/apps/scheduler/scheduler/executor.py @@ -14,9 +14,7 @@ from apps.scheduler.executor.qa import QAExecutor from apps.scheduler.pool.pool import pool from apps.schemas.request_data import RequestData from apps.schemas.task import TaskData -from apps.services.activity import Activity from apps.services.appcenter import AppCenterManager -from apps.services.conversation import ConversationManager _logger = logging.getLogger(__name__) @@ -33,36 +31,6 @@ class ExecutorMixin: """获取Top1 Flow (由FlowMixin实现)""" ... # noqa: PIE790 - async def _determine_app_id(self) -> uuid.UUID | None: - """确定最终使用的 app_id""" - conversation = None - - if self.task.metadata.conversationId: - conversation = await ConversationManager.get_conversation_by_conversation_id( - self.task.metadata.userId, - self.task.metadata.conversationId, - ) - - if conversation and conversation.appId: - final_app_id = conversation.appId - _logger.info("[Scheduler] 使用Conversation中的appId: %s", final_app_id) - - if self.post_body.app and self.post_body.app.app_id and self.post_body.app.app_id != conversation.appId: - _logger.warning( - "[Scheduler] post_body中的app_id(%s)与Conversation中的appId(%s)不符,忽略post_body中的app信息", - self.post_body.app.app_id, - conversation.appId, - ) - self.post_body.app.app_id = conversation.appId - elif self.post_body.app and self.post_body.app.app_id: - final_app_id = self.post_body.app.app_id - _logger.info("[Scheduler] Conversation中无appId,使用post_body中的app_id: %s", final_app_id) - else: - final_app_id = None - _logger.info("[Scheduler] Conversation和post_body中均无appId,fallback到智能问答") - - return final_app_id - async def _create_executor_task(self, final_app_id: uuid.UUID | None) -> asyncio.Task | None: """根据 app_id 创建对应的执行器任务""" if final_app_id is None: @@ -87,13 +55,7 @@ class ExecutorMixin: return asyncio.create_task(self._run_agent()) async def _handle_task_cancellation(self, main_task: asyncio.Task) -> None: - """ - 处理任务取消的逻辑 - - Args: - main_task: 需要取消的主任务 - - """ + """处理任务取消的逻辑""" _logger.warning("[Scheduler] 用户取消执行,正在终止...") main_task.cancel() try: @@ -115,26 +77,6 @@ class ExecutorMixin: else: _logger.warning("[Scheduler] task.state为None,无法更新ExecutorStatus") - async def _monitor_activity(self, kill_event: asyncio.Event, user_id: str) -> None: - """监控用户活动状态,不活跃时终止工作流""" - try: - check_interval = 0.5 - - while not kill_event.is_set(): - is_active = await Activity.is_active(user_id) - - if not is_active: - _logger.warning("[Scheduler] 用户 %s 不活跃,终止工作流", user_id) - kill_event.set() - break - - await asyncio.sleep(check_interval) - except asyncio.CancelledError: - _logger.info("[Scheduler] 活动监控任务已取消") - except Exception: - _logger.exception("[Scheduler] 活动监控过程中发生错误") - kill_event.set() - async def _run_qa(self) -> None: """运行QA执行器""" qa_executor = QAExecutor( diff --git a/apps/scheduler/scheduler/init.py b/apps/scheduler/scheduler/init.py index 96a12d3f8796ed61c8e79c36c4f4104d82d96670..05aba7cf1fb69d80be44bded928ae7f7dd0464b7 100644 --- a/apps/scheduler/scheduler/init.py +++ b/apps/scheduler/scheduler/init.py @@ -9,9 +9,10 @@ from jinja2.sandbox import SandboxedEnvironment from apps.common.queue import MessageQueue from apps.llm import LLM -from apps.models import Task, TaskRuntime, User +from apps.models import Conversation, Task, TaskRuntime, User from apps.schemas.request_data import RequestData from apps.schemas.task import TaskData +from apps.services.appcenter import AppCenterManager from apps.services.conversation import ConversationManager from apps.services.llm import LLMManager from apps.services.task import TaskManager @@ -20,7 +21,7 @@ from apps.services.user import UserManager _logger = logging.getLogger(__name__) -class InitializationMixin: +class InitMixin: """处理Scheduler初始化相关的逻辑""" task: TaskData @@ -65,7 +66,6 @@ class InitializationMixin: _logger.info("[Scheduler] 尝试从Conversation ID %s 恢复任务", conversation_id) - restored = False try: conversation = await ConversationManager.get_conversation_by_conversation_id( user_id, @@ -84,7 +84,7 @@ class InitializationMixin: self.task.runtime.taskId = task_id if self.task.state: self.task.state.taskId = task_id - restored = True + return else: _logger.warning( "[Scheduler] Conversation %s 不存在或无权访问,创建新任务", @@ -93,11 +93,10 @@ class InitializationMixin: except Exception: _logger.exception("[Scheduler] 从Conversation恢复任务失败,创建新任务") - if not restored: - _logger.info("[Scheduler] 无法恢复任务,创建新任务") - self._create_new_task(task_id, user_id, conversation_id) + _logger.info("[Scheduler] 无法恢复任务,创建新任务") + self._create_new_task(task_id, user_id, conversation_id) - async def _init_user(self, user_id: str) -> None: + async def _get_user(self, user_id: str) -> None: """初始化用户""" user = await UserManager.get_user(user_id) if not user: @@ -115,3 +114,23 @@ class InitializationMixin: lstrip_blocks=True, extensions=["jinja2.ext.loopcontrols"], ) + + async def _create_new_conversation( + self, title: str, user_id: str, app_id: uuid.UUID | None = None, + *, + debug: bool = False, + ) -> Conversation: + """判断并创建新对话""" + if app_id and not await AppCenterManager.validate_user_app_access(user_id, app_id): + err = "Invalid app_id." + raise RuntimeError(err) + new_conv = await ConversationManager.add_conversation_by_user( + title=title, + user_id=user_id, + app_id=app_id, + debug=debug, + ) + if not new_conv: + err = "Create new conversation failed." + raise RuntimeError(err) + return new_conv diff --git a/apps/scheduler/scheduler/message.py b/apps/scheduler/scheduler/message.py index a4c9230a534ac152fd3360fb9c0c14b88081c34b..ba103c9102d22f5a203d91b934179938c46d9545 100644 --- a/apps/scheduler/scheduler/message.py +++ b/apps/scheduler/scheduler/message.py @@ -12,7 +12,7 @@ from apps.schemas.task import TaskData _logger = logging.getLogger(__name__) -class MessagingMixin: +class MessageMixin: """处理消息推送相关的逻辑""" queue: MessageQueue diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 436a17e2547ba69702daaa46c6a38dad1f32492a..28b513dfaf718593cafc2d630be6b2c699f22009 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -8,23 +8,23 @@ import uuid from apps.common.queue import MessageQueue from apps.schemas.request_data import RequestData -from .conversation import ConversationMixin from .data import DataMixin from .executor import ExecutorMixin from .flow import FlowMixin -from .init import InitializationMixin -from .message import MessagingMixin +from .init import InitMixin +from .message import MessageMixin +from .util import UtilMixin _logger = logging.getLogger(__name__) class Scheduler( - InitializationMixin, + InitMixin, ExecutorMixin, FlowMixin, - ConversationMixin, DataMixin, - MessagingMixin, + MessageMixin, + UtilMixin, ): """ "调度器",是最顶层的、控制Executor执行顺序和状态的逻辑。 @@ -32,12 +32,12 @@ class Scheduler( Scheduler包含一个"SchedulerContext",作用为多个Executor的"聊天会话" 所有属性都继承自各个Mixin类,主要包括: - - task: TaskData (来自InitializationMixin) - - llm: LLMConfig (来自InitializationMixin) - - queue: MessageQueue (来自InitializationMixin) - - post_body: RequestData (来自InitializationMixin) - - user: User (来自InitializationMixin) - - _env: SandboxedEnvironment (来自InitializationMixin) + - task: TaskData (来自InitMixin) + - llm: LLMConfig (来自InitMixin) + - queue: MessageQueue (来自InitMixin) + - post_body: RequestData (来自InitMixin) + - user: User (来自InitMixin) + - _env: SandboxedEnvironment (来自InitMixin) """ async def init( @@ -51,7 +51,7 @@ class Scheduler( self.queue = queue self.post_body = post_body - await self._init_user(user_id) + await self._get_user(user_id) await self._init_task(task_id, user_id) self._init_jinja2_env() self.llm = await self._get_scheduler_llm(post_body.llm_id) diff --git a/apps/scheduler/scheduler/util.py b/apps/scheduler/scheduler/util.py new file mode 100644 index 0000000000000000000000000000000000000000..0f710155da3bf6053eef3e7128da20bcde04386f --- /dev/null +++ b/apps/scheduler/scheduler/util.py @@ -0,0 +1,70 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""工具类Mixin""" + +import asyncio +import logging +import uuid + +from apps.schemas.request_data import RequestData +from apps.schemas.task import TaskData +from apps.services.activity import Activity +from apps.services.conversation import ConversationManager + +_logger = logging.getLogger(__name__) + + +class UtilMixin: + """通用工具方法的Mixin类""" + + task: TaskData + post_body: RequestData + + async def _determine_app_id(self) -> uuid.UUID | None: + """确定最终使用的 app_id""" + conversation = None + + if self.task.metadata.conversationId: + conversation = await ConversationManager.get_conversation_by_conversation_id( + self.task.metadata.userId, + self.task.metadata.conversationId, + ) + + if conversation and conversation.appId: + final_app_id = conversation.appId + _logger.info("[Scheduler] 使用Conversation中的appId: %s", final_app_id) + + if self.post_body.app and self.post_body.app.app_id and self.post_body.app.app_id != conversation.appId: + _logger.warning( + "[Scheduler] post_body中的app_id(%s)与Conversation中的appId(%s)不符,忽略post_body中的app信息", + self.post_body.app.app_id, + conversation.appId, + ) + self.post_body.app.app_id = conversation.appId + elif self.post_body.app and self.post_body.app.app_id: + final_app_id = self.post_body.app.app_id + _logger.info("[Scheduler] Conversation中无appId,使用post_body中的app_id: %s", final_app_id) + else: + final_app_id = None + _logger.info("[Scheduler] Conversation和post_body中均无appId,fallback到智能问答") + + return final_app_id + + async def _monitor_activity(self, kill_event: asyncio.Event, user_id: str) -> None: + """监控用户活动状态,不活跃时终止工作流""" + try: + check_interval = 0.5 + + while not kill_event.is_set(): + is_active = await Activity.is_active(user_id) + + if not is_active: + _logger.warning("[Scheduler] 用户 %s 不活跃,终止工作流", user_id) + kill_event.set() + break + + await asyncio.sleep(check_interval) + except asyncio.CancelledError: + _logger.info("[Scheduler] 活动监控任务已取消") + except Exception: + _logger.exception("[Scheduler] 活动监控过程中发生错误") + kill_event.set() diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py index e64391f4b662e69e1d9848801e3d2ffd756e3041..474fd646bf95ab529755014962e5227f8cb3a73a 100644 --- a/apps/schemas/mcp.py +++ b/apps/schemas/mcp.py @@ -84,14 +84,6 @@ class FlowName(BaseModel): flow_name: str = Field(description="MCP 流程名称", default="") - -class FlowRisk(BaseModel): - """MCP 流程风险评估结果""" - - risk: Risk = Field(description="风险类型", default=Risk.LOW) - reason: str = Field(description="风险原因", default="") - - class ToolRisk(BaseModel): """MCP工具风险评估结果""" diff --git a/apps/schemas/scheduler.py b/apps/schemas/scheduler.py index fa7cd8dd08637c52074fc900e29a6f74a68c4f65..3f44772331885fb79c85997397d07ecadc64b583 100644 --- a/apps/schemas/scheduler.py +++ b/apps/schemas/scheduler.py @@ -35,6 +35,7 @@ class ExecutorBackground(BaseModel): num: int = Field(description="对话记录最大数量", default=0) conversation: list[dict[str, str]] = Field(description="对话记录", default=[]) facts: list[str] = Field(description="当前Executor的背景信息", default=[]) + history_questions: list[str] = Field(description="历史问题列表", default=[]) class CallVars(BaseModel): diff --git a/design/executor/agent.md b/design/executor/agent.md index 927754d7fa5a708484db8d2187a0d7d0eee66b4f..c58ca2f92d835e991128345c0a34b66751405f82 100644 --- a/design/executor/agent.md +++ b/design/executor/agent.md @@ -68,7 +68,7 @@ classDiagram class MCPPlanner { +get_flow_name() FlowName +create_next_step(history, tools) Step - +get_flow_excute_risk(tools) FlowRisk + +get_flow_excute_risk(tools: dict[str, MCPTools]) FlowRisk +get_tool_risk(tool, input_param) ToolRisk +is_param_error(...) IsParamError +change_err_message_to_description(...) str