diff --git a/apps/scheduler/call/suggest/suggest.py b/apps/scheduler/call/suggest/suggest.py
index 9b1b5624a3572985641ad172c8e99e3c5f6e7f61..7921c7785c06a07748040468ceb55dfb86176a15 100644
--- a/apps/scheduler/call/suggest/suggest.py
+++ b/apps/scheduler/call/suggest/suggest.py
@@ -11,18 +11,15 @@ from jinja2.sandbox import SandboxedEnvironment
from pydantic import Field
from pydantic.json_schema import SkipJsonSchema
-from apps.common.security import Security
from apps.models import LanguageType, NodeInfo
from apps.scheduler.call.core import CoreCall
from apps.schemas.enum_var import CallOutputType
-from apps.schemas.record import RecordContent
from apps.schemas.scheduler import (
CallError,
CallInfo,
CallOutputChunk,
CallVars,
)
-from apps.services.record import RecordManager
from apps.services.user_tag import UserTagManager
from .prompt import SUGGEST_PROMPT
@@ -77,13 +74,8 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO
async def _init(self, call_vars: CallVars) -> SuggestionInput:
"""初始化"""
- if self.conversation_id is None:
- self._history_questions = []
- else:
- self._history_questions = await self._get_history_questions(
- call_vars.ids.user_id,
- self.conversation_id,
- )
+ # 从 ExecutorBackground 中获取历史问题
+ self._history_questions = call_vars.background.history_questions
self._app_id = call_vars.ids.app_id
self._flow_id = call_vars.ids.executor_id
self._env = SandboxedEnvironment(
@@ -111,22 +103,6 @@ class Suggestion(CoreCall, input_model=SuggestionInput, output_model=SuggestionO
history_questions=self._history_questions,
)
-
- async def _get_history_questions(self, user_id: str, conversation_id: uuid.UUID) -> list[str]:
- """获取当前对话的历史问题"""
- records = await RecordManager.query_record_by_conversation_id(
- user_id,
- conversation_id,
- 15,
- )
-
- history_questions = []
- for record in records:
- record_data = RecordContent.model_validate_json(Security.decrypt(record.content, record.key))
- history_questions.append(record_data.question)
- return history_questions
-
-
async def _exec(self, input_data: dict[str, Any]) -> AsyncGenerator[CallOutputChunk, None]:
"""运行问题推荐"""
data = SuggestionInput(**input_data)
diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py
index e4aba23f654e1576b00f56cbaed0d0539de4751d..8e1d6012a77008ea9cf9bf8dd8a8952014540659 100644
--- a/apps/scheduler/executor/agent.py
+++ b/apps/scheduler/executor/agent.py
@@ -33,10 +33,6 @@ class MCPAgentExecutor(BaseExecutor):
agent_id: uuid.UUID = Field(default=uuid.uuid4(), description="App ID作为Agent ID")
agent_description: str = Field(default="", description="Agent描述")
- tools: dict[str, MCPTools] = Field(
- description="MCP工具列表,key为tool_id",
- default={},
- )
params: FlowParams | bool | None = Field(
default=None,
description="流执行过程中的参数补充",
@@ -51,6 +47,7 @@ class MCPAgentExecutor(BaseExecutor):
self._mcp_list = []
self._current_input = {}
self._current_tool = None
+ self._tool_list = {}
# 初始化MCP Host相关对象
self._planner = MCPPlanner(self.task, self.llm)
self._host = MCPHost(self.task, self.llm)
@@ -88,9 +85,9 @@ class MCPAgentExecutor(BaseExecutor):
self._mcp_list.append(mcp_service)
for tool in await MCPServiceManager.get_mcp_tools(mcp_id):
- self.tools[tool.id] = tool
+ self._tool_list[tool.id] = tool
- self.tools[AGENT_FINAL_STEP_NAME] = MCPTools(
+ self._tool_list[AGENT_FINAL_STEP_NAME] = MCPTools(
mcpId="", toolName=AGENT_FINAL_STEP_NAME, description="结束流程的工具",
inputSchema={}, outputSchema={},
)
@@ -109,7 +106,7 @@ class MCPAgentExecutor(BaseExecutor):
if is_first:
# 获取第一个输入参数
- self._current_tool = self.tools[state.stepName]
+ self._current_tool = self._tool_list[state.stepName]
self._current_input = await self._host.get_first_input_params(
self._current_tool, self.task.runtime.userInput, self.task,
)
@@ -121,7 +118,7 @@ class MCPAgentExecutor(BaseExecutor):
else:
params = {}
params_description = ""
- self._current_tool = self.tools[state.stepName]
+ self._current_tool = self._tool_list[state.stepName]
state.currentInput = await self._host.fill_params(
self._current_tool,
self.task.runtime.userInput,
@@ -215,7 +212,7 @@ class MCPAgentExecutor(BaseExecutor):
self.task.state = state
await self._push_message(
- EventType.STEP_END,
+ EventType.STEP_OUTPUT,
data=error_output,
)
@@ -385,12 +382,12 @@ class MCPAgentExecutor(BaseExecutor):
step = None
for _ in range(max_retry):
try:
- step = await self._planner.create_next_step(history, self.tool_list)
- if step.tool_id in self.tools:
+ step = await self._planner.create_next_step(history, list(self._tool_list.values()))
+ if step.tool_id in self._tool_list:
break
except Exception:
_logger.exception("[MCPAgentExecutor] 获取下一步失败,重试中...")
- if step is None or step.tool_id not in self.tools:
+ if step is None or step.tool_id not in self._tool_list:
step = Step(
tool_id=AGENT_FINAL_STEP_NAME,
description=AGENT_FINAL_STEP_NAME,
@@ -407,11 +404,12 @@ class MCPAgentExecutor(BaseExecutor):
"""步骤执行失败后的错误处理"""
self._validate_task_state()
state = cast("ExecutorCheckpoint", self.task.state)
+ error_output = {"message": state.errorMessage}
state.stepStatus = StepStatus.ERROR
state.executorStatus = ExecutorStatus.ERROR
self.task.state = state
- await self._push_message(EventType.STEP_END, data={})
+ await self._push_message(EventType.STEP_OUTPUT, data=error_output)
await self._push_message(EventType.EXECUTOR_STOP, data={})
await self._add_error_to_context(state.stepStatus)
@@ -440,7 +438,7 @@ class MCPAgentExecutor(BaseExecutor):
state.stepStatus = StepStatus.CANCELLED
self.task.state = state
- await self._push_message(EventType.STEP_END, data={})
+ await self._push_message(EventType.STEP_OUTPUT, data={})
await self._push_message(EventType.EXECUTOR_STOP, data={})
self._update_last_context_status(StepStatus.CANCELLED)
return
@@ -458,7 +456,7 @@ class MCPAgentExecutor(BaseExecutor):
elif self._user.autoExecute:
await self._handle_step_error_and_continue()
else:
- mcp_tool = self.tools[state.stepName]
+ mcp_tool = self._tool_list[state.stepName]
error_msg = self._get_error_message_str(state.errorMessage)
is_param_error = await self._planner.is_param_error(
await self._host.assemble_memory(self.task.runtime, self.task.context),
@@ -477,14 +475,46 @@ class MCPAgentExecutor(BaseExecutor):
async def summarize(self) -> None:
"""总结"""
+ thinking_started = False
async for chunk in self._planner.generate_answer(
await self._host.assemble_memory(self.task.runtime, self.task.context),
):
+ if chunk.reasoning_content:
+ if not thinking_started:
+ await self._push_message(
+ EventType.TEXT_ADD,
+ data="",
+ )
+ self.task.runtime.fullAnswer += ""
+ thinking_started = True
+
+ await self._push_message(
+ EventType.TEXT_ADD,
+ data=chunk.reasoning_content,
+ )
+ self.task.runtime.fullAnswer += chunk.reasoning_content
+
+ if chunk.content:
+ if thinking_started:
+ await self._push_message(
+ EventType.TEXT_ADD,
+ data="",
+ )
+ self.task.runtime.fullAnswer += ""
+ thinking_started = False
+
+ await self._push_message(
+ EventType.TEXT_ADD,
+ data=chunk.content,
+ )
+ self.task.runtime.fullAnswer += chunk.content
+
+ if thinking_started:
await self._push_message(
EventType.TEXT_ADD,
- data=chunk,
+ data="",
)
- self.task.runtime.fullAnswer += chunk
+ self.task.runtime.fullAnswer += ""
async def run(self) -> None:
"""执行MCP Agent的主逻辑"""
@@ -493,14 +523,10 @@ class MCPAgentExecutor(BaseExecutor):
# 初始化MCP服务
await self.load_mcp()
- data = {}
if state.executorStatus == ExecutorStatus.INIT:
# 初始化状态
state.executorId = str(uuid.uuid4())
state.executorName = (await self._planner.get_flow_name()).flow_name
- flow_risk = await self._planner.get_flow_excute_risk(self.tool_list)
- if self._user.autoExecute:
- data = flow_risk.model_dump(exclude_none=True, by_alias=True)
await self.get_next_step()
self.task.state = state
@@ -528,8 +554,9 @@ class MCPAgentExecutor(BaseExecutor):
}
state.stepStatus = StepStatus.ERROR
self.task.state = state
+ error_output = {"message": state.errorMessage}
- await self._push_message(EventType.STEP_END, data={})
+ await self._push_message(EventType.STEP_OUTPUT, data=error_output)
await self._push_message(EventType.EXECUTOR_STOP, data={})
await self._add_error_to_context(state.stepStatus)
finally:
diff --git a/apps/scheduler/executor/base.py b/apps/scheduler/executor/base.py
index dae1249775eff1ce03d50cba70e1c3932173ec0a..eb5c594d911dbc60602121fb019d985b0cd4c356 100644
--- a/apps/scheduler/executor/base.py
+++ b/apps/scheduler/executor/base.py
@@ -46,26 +46,35 @@ class BaseExecutor(BaseModel, ABC):
self.background = ExecutorBackground(
conversation=[],
facts=[],
+ history_questions=[],
num=n,
)
return
- # 获取最后n+5条Record
+ # 获取最后n+10条Record
records = await RecordManager.query_record_by_conversation_id(
- self.task.metadata.userId, self.task.metadata.conversationId, n + 5,
+ self.task.metadata.userId, self.task.metadata.conversationId, n + 10,
)
- # 组装问答
+ # 组装问答、事实和历史问题
context = []
facts = []
- for record in records:
+ history_questions = []
+ for i, record in enumerate(records):
record_data = RecordContent.model_validate_json(Security.decrypt(record.content, record.key))
- context.append({
- "question": record_data.question,
- "answer": record_data.answer,
- })
- facts.extend(record_data.facts)
+ # context 取最后 n 组
+ if i >= len(records) - n:
+ context.append({
+ "question": record_data.question,
+ "answer": record_data.answer,
+ })
+ # facts 取最后 n+5 组
+ if i >= len(records) - (n + 5):
+ facts.extend(record_data.facts)
+ # history_questions 取全部(n+10组)
+ history_questions.append(record_data.question)
self.background = ExecutorBackground(
conversation=context,
facts=facts,
+ history_questions=history_questions,
num=n,
)
diff --git a/apps/scheduler/executor/qa.py b/apps/scheduler/executor/qa.py
index 3a8f315cf0cbc60ef4aaa73833498a5ad768f63c..bb6228188e8fee780ff471b4a3a15e8c596a6d1e 100644
--- a/apps/scheduler/executor/qa.py
+++ b/apps/scheduler/executor/qa.py
@@ -9,7 +9,7 @@ from apps.models import ExecutorCheckpoint, ExecutorStatus, StepStatus
from apps.models.task import LanguageType
from apps.scheduler.call.rag.schema import DocItem, RAGOutput
from apps.schemas.document import DocumentInfo
-from apps.schemas.enum_var import EventType, SpecialCallType
+from apps.schemas.enum_var import SpecialCallType
from apps.schemas.flow import Step
from apps.schemas.task import StepQueueItem
@@ -83,7 +83,6 @@ class QAExecutor(BaseExecutor):
async def init(self) -> None:
"""初始化QAExecutor"""
await self._load_history()
- # 初始化新State
self.task.state = ExecutorCheckpoint(
taskId=self.task.metadata.id,
executorId=str(self.task.metadata.conversationId),
@@ -105,13 +104,11 @@ class QAExecutor(BaseExecutor):
doc_info_list = []
doc_cnt = 0
doc_id_map = {}
- # 预留tokens
_ = round(max_tokens * 0.8)
for doc_chunk in doc_chunk_list:
if doc_chunk.docId not in doc_id_map:
doc_cnt += 1
- # 创建DocumentInfo对象
created_at_value = (
doc_chunk.docCreatedAt.timestamp()
if isinstance(doc_chunk.docCreatedAt, datetime)
@@ -132,8 +129,10 @@ class QAExecutor(BaseExecutor):
return doc_info_list
- async def run(self) -> None:
- """运行QA"""
+ async def _execute_rag_step(self) -> bool:
+ """执行RAG检索步骤"""
+ _logger.info("[QAExecutor] 开始执行RAG检索步骤")
+
rag_exec = StepExecutor(
msg_queue=self.msg_queue,
task=self.task,
@@ -150,19 +149,125 @@ class QAExecutor(BaseExecutor):
await rag_exec.init()
await rag_exec.run()
+ if self.task.state and self.task.state.stepStatus == StepStatus.ERROR:
+ _logger.error("[QAExecutor] RAG检索步骤失败")
+ return False
+
+ rag_output_data = None
+ for history in self.task.context:
+ if history.stepId == rag_exec.step.step_id:
+ rag_output_data = history.outputData
+ break
- # 解析并推送文档信息
- if first_chunk and isinstance(first_chunk.content, dict):
- rag_output = RAGOutput.model_validate(first_chunk.content)
+ if rag_output_data and isinstance(rag_output_data, dict):
+ rag_output = RAGOutput.model_validate(rag_output_data)
doc_chunk_list: list[DocItem] = [
DocItem.model_validate(item) if not isinstance(item, DocItem) else item
for item in rag_output.corpus
]
doc_info_list = await self._assemble_doc_info(doc_chunk_list, 8192)
for doc_info in doc_info_list:
- await self._push_rag_doc(doc_info)
+ await self._push_message(
+ "document.add",
+ doc_info.model_dump(by_alias=True, exclude_none=True),
+ )
+ _logger.info("[QAExecutor] RAG检索步骤成功,已推送%d个文档", len(doc_info_list))
+ else:
+ _logger.warning("[QAExecutor] RAG检索步骤未返回有效数据")
+
+ return True
+
+ async def _execute_llm_step(self) -> bool:
+ """执行LLM问答步骤"""
+ _logger.info("[QAExecutor] 开始执行LLM问答步骤")
+
+ llm_exec = StepExecutor(
+ msg_queue=self.msg_queue,
+ task=self.task,
+ step=StepQueueItem(
+ step_id=uuid.uuid4(),
+ step=_RAG_STEP_LIST[1][self.task.runtime.language],
+ enable_filling=False,
+ to_user=True,
+ ),
+ background=self.background,
+ question=self.question,
+ llm=self.llm,
+ )
+ await llm_exec.init()
+ await llm_exec.run()
+
+ if self.task.state and self.task.state.stepStatus == StepStatus.ERROR:
+ _logger.error("[QAExecutor] LLM问答步骤失败")
+ return False
+
+ _logger.info("[QAExecutor] LLM问答步骤完成")
+ return True
+
+ async def _execute_remaining_steps(self) -> bool:
+ """执行剩余步骤:问题推荐和记忆存储"""
+ _logger.info("[QAExecutor] 开始执行问题推荐步骤")
+ suggestion_exec = StepExecutor(
+ msg_queue=self.msg_queue,
+ task=self.task,
+ step=StepQueueItem(
+ step_id=uuid.uuid4(),
+ step=_RAG_STEP_LIST[2][self.task.runtime.language],
+ enable_filling=False,
+ to_user=True,
+ ),
+ background=self.background,
+ question=self.question,
+ llm=self.llm,
+ )
+ await suggestion_exec.init()
+ await suggestion_exec.run()
+
+ if self.task.state and self.task.state.stepStatus == StepStatus.ERROR:
+ _logger.error("[QAExecutor] 问题推荐步骤失败")
+ return False
+
+ _logger.info("[QAExecutor] 开始执行记忆存储步骤")
+ facts_exec = StepExecutor(
+ msg_queue=self.msg_queue,
+ task=self.task,
+ step=StepQueueItem(
+ step_id=uuid.uuid4(),
+ step=_RAG_STEP_LIST[3][self.task.runtime.language],
+ enable_filling=False,
+ to_user=False,
+ ),
+ background=self.background,
+ question=self.question,
+ llm=self.llm,
+ )
+ await facts_exec.init()
+ await facts_exec.run()
+
+ if self.task.state and self.task.state.stepStatus == StepStatus.ERROR:
+ _logger.error("[QAExecutor] 记忆存储步骤失败")
+ return False
+
+ return True
+
+ async def run(self) -> None:
+ """运行QA"""
+ _logger.info("[QAExecutor] 开始运行QA流程")
+
+ rag_success = await self._execute_rag_step()
+ if not rag_success:
+ _logger.error("[QAExecutor] RAG检索步骤失败,终止执行")
+ return
+
+ llm_success = await self._execute_llm_step()
+ if not llm_success:
+ _logger.error("[QAExecutor] LLM问答步骤失败,终止执行")
+ return
+
+ remaining_success = await self._execute_remaining_steps()
+ if not remaining_success:
+ _logger.error("[QAExecutor] 剩余步骤失败,终止执行")
+ return
- # 保存答案
- full_answer = ""
- self.task.runtime.fullAnswer = full_answer
self.task.runtime.fullTime = round(datetime.now(UTC).timestamp(), 2) - self.task.runtime.time
+ _logger.info("[QAExecutor] QA流程完成")
diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py
index 8140ec04b55f8ca5ce0271e38577f17520a2be8b..572e7cf2701bb758fae0bee997cb8feb5595fd49 100644
--- a/apps/scheduler/mcp_agent/plan.py
+++ b/apps/scheduler/mcp_agent/plan.py
@@ -14,7 +14,6 @@ from apps.scheduler.mcp_agent.prompt import (
CHANGE_ERROR_MESSAGE_TO_DESCRIPTION,
FINAL_ANSWER,
GEN_STEP,
- GENERATE_FLOW_EXCUTE_RISK,
GENERATE_FLOW_NAME,
GET_MISSING_PARAMS,
IS_PARAM_ERROR,
@@ -24,7 +23,6 @@ from apps.scheduler.slot.slot import Slot
from apps.schemas.llm import LLMChunk
from apps.schemas.mcp import (
FlowName,
- FlowRisk,
IsParamError,
Step,
ToolRisk,
@@ -85,25 +83,6 @@ class MCPPlanner(MCPBase):
# 使用Step模型解析结果
return Step.model_validate(step)
- async def get_flow_excute_risk(self, tools: list[MCPTools]) -> FlowRisk:
- """获取当前流程的风险评估结果"""
- template = _env.from_string(GENERATE_FLOW_EXCUTE_RISK[self._language])
- prompt = template.render(
- goal=self._goal,
- tools=tools,
- )
-
- # 组装OpenAI标准Function格式
- function = {
- "name": "evaluate_flow_execution_risk",
- "description": "Evaluate the potential risks and safety concerns of executing the entire workflow",
- "parameters": FlowRisk.model_json_schema(),
- }
-
- result = await self.get_json_result(prompt, function)
- # 使用FlowRisk模型解析结果
- return FlowRisk.model_validate(result)
-
async def get_tool_risk(
self,
tool: MCPTools,
@@ -206,7 +185,7 @@ class MCPPlanner(MCPBase):
return await self.get_json_result(prompt, function)
async def generate_answer(self, memory: str) -> AsyncGenerator[LLMChunk, None]:
- """生成最终回答"""
+ """生成最终回答,返回LLMChunk"""
template = _env.from_string(FINAL_ANSWER[self._language])
prompt = template.render(
memory=memory,
diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py
index eea822f827a27030d7e0edaafb4179d1cc55e671..07d924ecf7cbbded08a6dd0905ffa7bcf8681d4a 100644
--- a/apps/scheduler/mcp_agent/prompt.py
+++ b/apps/scheduler/mcp_agent/prompt.py
@@ -55,73 +55,6 @@ user's goal.
),
}
-GENERATE_FLOW_EXCUTE_RISK: dict[LanguageType, str] = {
- LanguageType.CHINESE: dedent(
- r"""
- 你是一个智能助手,你的任务是根据用户的目标和当前的工具集合,评估当前流程的风险。
-
- # 样例
- ## 目标
- 我需要扫描当前mysql数据库,分析性能瓶颈, 并调优
- ## 工具集合
- 你可以访问并使用一些工具,这些工具将在 XML标签中给出。
-
- - mysql_analyzer 分析MySQL数据库性能
- - performance_tuner 调优数据库性能
-
- ## 输出
- {
- "risk": "high",
- "reason": "当前目标实现带来的风险较高,因为需要通过performance_tuner工具对MySQL数据库进行调优,而该\
-工具可能会对数据库的性能和稳定性产生较大的影响,因此风险评估为高。"
- }
-
- # 现在开始评估当前流程的风险:
- ## 目标
- {{goal}}
- ## 工具集合
-
- {% for tool in tools %}
- - {{tool.id}} {{tool.name}};{{tool.description}}
- {% endfor %}
-
- ## 输出
- """,
- ),
- LanguageType.ENGLISH: dedent(
- r"""
- You are an intelligent assistant, your task is to evaluate the risk of the current process based on the \
-user's goal and the current tool set.
- # Example
- # Goal
- I need to scan the current MySQL database, analyze performance bottlenecks, and optimize it.
- # Tool Set
- You can access and use some tools, which will be given in the XML tag.
-
- - mysql_analyzer Analyze MySQL database performance
- - performance_tuner Tune database performance
-
- # Output
- {
- "risk": "high",
- "reason": "The risk brought by the realization of the current goal is relatively high, because it \
-is necessary to tune the MySQL database through the performance_tuner tool, which may have a greater impact on \
-the performance and stability of the database. Therefore, the risk assessment is high."
- }
- # Now start evaluating the risk of the current process:
- # Goal
- {{goal}}
- # Tool Set
-
- {% for tool in tools %}
- - {{tool.id}} {{tool.name}}; {{tool.description}}
- {% endfor %}
-
- # Output
- """,
- ),
-}
-
GEN_STEP: dict[LanguageType, str] = {
LanguageType.CHINESE: dedent(
r"""
diff --git a/apps/scheduler/scheduler/conversation.py b/apps/scheduler/scheduler/conversation.py
deleted file mode 100644
index 8885704f7d5fc79e389ede96e687143adca292b9..0000000000000000000000000000000000000000
--- a/apps/scheduler/scheduler/conversation.py
+++ /dev/null
@@ -1,35 +0,0 @@
-# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
-"""对话管理相关的Mixin类"""
-
-import logging
-import uuid
-
-from apps.models import Conversation
-from apps.services.appcenter import AppCenterManager
-from apps.services.conversation import ConversationManager
-
-_logger = logging.getLogger(__name__)
-
-
-class ConversationMixin:
- """处理对话管理相关的逻辑"""
-
- async def create_new_conversation(
- self, title: str, user_id: str, app_id: uuid.UUID | None = None,
- *,
- debug: bool = False,
- ) -> Conversation:
- """判断并创建新对话"""
- if app_id and not await AppCenterManager.validate_user_app_access(user_id, app_id):
- err = "Invalid app_id."
- raise RuntimeError(err)
- new_conv = await ConversationManager.add_conversation_by_user(
- title=title,
- user_id=user_id,
- app_id=app_id,
- debug=debug,
- )
- if not new_conv:
- err = "Create new conversation failed."
- raise RuntimeError(err)
- return new_conv
diff --git a/apps/scheduler/scheduler/data.py b/apps/scheduler/scheduler/data.py
index 008f5a23d3f846580f5404df7f3c71fc8ecdc9a6..1acfedf87a3711bf2279d3e5c8bc49c0a8990f39 100644
--- a/apps/scheduler/scheduler/data.py
+++ b/apps/scheduler/scheduler/data.py
@@ -23,38 +23,44 @@ class DataMixin:
task: TaskData
post_body: RequestData
- async def _save_data(self) -> None:
- """保存当前Executor、Task、Record等的数据"""
- task = self.task
- user_id = self.task.metadata.userId
- post_body = self.post_body
-
+ def _extract_used_documents(self) -> list[dict]:
+ """提取任务运行时使用的文档列表"""
used_docs = []
- record_group = None # TODO: 需要从适当的地方获取record_group
+ task = self.task
if hasattr(task.runtime, "documents") and task.runtime.documents:
for docs in task.runtime.documents:
doc_dict = docs if isinstance(docs, dict) else (docs.model_dump() if hasattr(docs, "model_dump") else docs)
used_docs.append(doc_dict)
- record_content = RecordContent(
+ return used_docs
+
+ def _build_record_content(self) -> RecordContent:
+ """构建记录内容对象"""
+ task = self.task
+
+ return RecordContent(
question=task.runtime.question if hasattr(task.runtime, "question") else "",
answer=task.runtime.answer if hasattr(task.runtime, "answer") else "",
facts=task.runtime.facts if hasattr(task.runtime, "facts") else [],
data={},
)
+ def _encrypt_record_content(self, record_content: RecordContent) -> tuple[str, str] | None:
+ """加密记录内容"""
try:
encrypt_data, encrypt_config = Security.encrypt(record_content.model_dump_json(by_alias=True))
+ return encrypt_data, encrypt_config
except Exception:
_logger.exception("[Scheduler] 问答对加密错误")
- return
+ return None
- if task.state:
- await TaskManager.save_flow_context(task.context)
+ def _build_record(self, encrypt_data: str, encrypt_config: str, current_time: float) -> Record:
+ """构建记录对象"""
+ task = self.task
+ user_id = task.metadata.userId
- current_time = round(datetime.now(UTC).timestamp(), 2)
- record = Record(
+ return Record(
id=task.metadata.id,
conversationId=task.metadata.conversationId,
taskId=task.metadata.id,
@@ -62,9 +68,9 @@ class DataMixin:
content=encrypt_data,
key=encrypt_config,
metadata=RecordMetadata(
- timeCost=0, # TODO: 需要从task中获取时间成本
- inputTokens=0, # TODO: 需要从task中获取token信息
- outputTokens=0, # TODO: 需要从task中获取token信息
+ timeCost=0,
+ inputTokens=0,
+ outputTokens=0,
feature={},
),
createdAt=current_time,
@@ -76,18 +82,37 @@ class DataMixin:
) if task.state else None,
)
+ async def _handle_document_management(self, record_group: str | None, used_docs: list[dict]) -> None:
+ """处理文档管理相关操作"""
+ user_id = self.task.metadata.userId
+ post_body = self.post_body
+
if record_group and post_body.conversation_id:
await DocumentManager.change_doc_status(user_id, post_body.conversation_id, record_group)
+ if record_group and used_docs:
+ await DocumentManager.save_answer_doc(user_id, record_group, used_docs)
+
+ async def _save_record_data(self, record: Record) -> None:
+ """保存记录数据"""
+ user_id = self.task.metadata.userId
+ post_body = self.post_body
+
if post_body.conversation_id:
await RecordManager.insert_record_data(user_id, post_body.conversation_id, record)
- if record_group and used_docs:
- await DocumentManager.save_answer_doc(user_id, record_group, used_docs)
+ async def _update_app_center(self) -> None:
+ """更新应用中心最近使用的应用"""
+ user_id = self.task.metadata.userId
+ post_body = self.post_body
if post_body.app and post_body.app.app_id:
await AppCenterManager.update_recent_app(user_id, post_body.app.app_id)
+ async def _handle_task_state(self) -> None:
+ """处理任务状态管理"""
+ task = self.task
+
if not task.state or task.state.flow_status in [StepStatus.SUCCESS, StepStatus.ERROR, StepStatus.CANCELLED]:
await TaskManager.delete_task_by_task_id(task.metadata.id)
else:
@@ -95,3 +120,30 @@ class DataMixin:
await TaskManager.save_task_runtime(task.runtime)
if task.state:
await TaskManager.save_executor_checkpoint(task.state)
+
+ async def _save_data(self) -> None:
+ """保存当前Executor、Task、Record等的数据"""
+ task = self.task
+ record_group = None
+
+ used_docs = self._extract_used_documents()
+
+ record_content = self._build_record_content()
+ encrypted_result = self._encrypt_record_content(record_content)
+ if encrypted_result is None:
+ return
+ encrypt_data, encrypt_config = encrypted_result
+
+ if task.state:
+ await TaskManager.save_flow_context(task.context)
+
+ current_time = round(datetime.now(UTC).timestamp(), 2)
+ record = self._build_record(encrypt_data, encrypt_config, current_time)
+
+ await self._handle_document_management(record_group, used_docs)
+
+ await self._save_record_data(record)
+
+ await self._update_app_center()
+
+ await self._handle_task_state()
diff --git a/apps/scheduler/scheduler/executor.py b/apps/scheduler/scheduler/executor.py
index 4eebcd37dfb168f06a92108459d8892e8b560bbe..2becbedc17082e7f66052ef876248064c570f9ac 100644
--- a/apps/scheduler/scheduler/executor.py
+++ b/apps/scheduler/scheduler/executor.py
@@ -14,9 +14,7 @@ from apps.scheduler.executor.qa import QAExecutor
from apps.scheduler.pool.pool import pool
from apps.schemas.request_data import RequestData
from apps.schemas.task import TaskData
-from apps.services.activity import Activity
from apps.services.appcenter import AppCenterManager
-from apps.services.conversation import ConversationManager
_logger = logging.getLogger(__name__)
@@ -33,36 +31,6 @@ class ExecutorMixin:
"""获取Top1 Flow (由FlowMixin实现)"""
... # noqa: PIE790
- async def _determine_app_id(self) -> uuid.UUID | None:
- """确定最终使用的 app_id"""
- conversation = None
-
- if self.task.metadata.conversationId:
- conversation = await ConversationManager.get_conversation_by_conversation_id(
- self.task.metadata.userId,
- self.task.metadata.conversationId,
- )
-
- if conversation and conversation.appId:
- final_app_id = conversation.appId
- _logger.info("[Scheduler] 使用Conversation中的appId: %s", final_app_id)
-
- if self.post_body.app and self.post_body.app.app_id and self.post_body.app.app_id != conversation.appId:
- _logger.warning(
- "[Scheduler] post_body中的app_id(%s)与Conversation中的appId(%s)不符,忽略post_body中的app信息",
- self.post_body.app.app_id,
- conversation.appId,
- )
- self.post_body.app.app_id = conversation.appId
- elif self.post_body.app and self.post_body.app.app_id:
- final_app_id = self.post_body.app.app_id
- _logger.info("[Scheduler] Conversation中无appId,使用post_body中的app_id: %s", final_app_id)
- else:
- final_app_id = None
- _logger.info("[Scheduler] Conversation和post_body中均无appId,fallback到智能问答")
-
- return final_app_id
-
async def _create_executor_task(self, final_app_id: uuid.UUID | None) -> asyncio.Task | None:
"""根据 app_id 创建对应的执行器任务"""
if final_app_id is None:
@@ -87,13 +55,7 @@ class ExecutorMixin:
return asyncio.create_task(self._run_agent())
async def _handle_task_cancellation(self, main_task: asyncio.Task) -> None:
- """
- 处理任务取消的逻辑
-
- Args:
- main_task: 需要取消的主任务
-
- """
+ """处理任务取消的逻辑"""
_logger.warning("[Scheduler] 用户取消执行,正在终止...")
main_task.cancel()
try:
@@ -115,26 +77,6 @@ class ExecutorMixin:
else:
_logger.warning("[Scheduler] task.state为None,无法更新ExecutorStatus")
- async def _monitor_activity(self, kill_event: asyncio.Event, user_id: str) -> None:
- """监控用户活动状态,不活跃时终止工作流"""
- try:
- check_interval = 0.5
-
- while not kill_event.is_set():
- is_active = await Activity.is_active(user_id)
-
- if not is_active:
- _logger.warning("[Scheduler] 用户 %s 不活跃,终止工作流", user_id)
- kill_event.set()
- break
-
- await asyncio.sleep(check_interval)
- except asyncio.CancelledError:
- _logger.info("[Scheduler] 活动监控任务已取消")
- except Exception:
- _logger.exception("[Scheduler] 活动监控过程中发生错误")
- kill_event.set()
-
async def _run_qa(self) -> None:
"""运行QA执行器"""
qa_executor = QAExecutor(
diff --git a/apps/scheduler/scheduler/init.py b/apps/scheduler/scheduler/init.py
index 96a12d3f8796ed61c8e79c36c4f4104d82d96670..05aba7cf1fb69d80be44bded928ae7f7dd0464b7 100644
--- a/apps/scheduler/scheduler/init.py
+++ b/apps/scheduler/scheduler/init.py
@@ -9,9 +9,10 @@ from jinja2.sandbox import SandboxedEnvironment
from apps.common.queue import MessageQueue
from apps.llm import LLM
-from apps.models import Task, TaskRuntime, User
+from apps.models import Conversation, Task, TaskRuntime, User
from apps.schemas.request_data import RequestData
from apps.schemas.task import TaskData
+from apps.services.appcenter import AppCenterManager
from apps.services.conversation import ConversationManager
from apps.services.llm import LLMManager
from apps.services.task import TaskManager
@@ -20,7 +21,7 @@ from apps.services.user import UserManager
_logger = logging.getLogger(__name__)
-class InitializationMixin:
+class InitMixin:
"""处理Scheduler初始化相关的逻辑"""
task: TaskData
@@ -65,7 +66,6 @@ class InitializationMixin:
_logger.info("[Scheduler] 尝试从Conversation ID %s 恢复任务", conversation_id)
- restored = False
try:
conversation = await ConversationManager.get_conversation_by_conversation_id(
user_id,
@@ -84,7 +84,7 @@ class InitializationMixin:
self.task.runtime.taskId = task_id
if self.task.state:
self.task.state.taskId = task_id
- restored = True
+ return
else:
_logger.warning(
"[Scheduler] Conversation %s 不存在或无权访问,创建新任务",
@@ -93,11 +93,10 @@ class InitializationMixin:
except Exception:
_logger.exception("[Scheduler] 从Conversation恢复任务失败,创建新任务")
- if not restored:
- _logger.info("[Scheduler] 无法恢复任务,创建新任务")
- self._create_new_task(task_id, user_id, conversation_id)
+ _logger.info("[Scheduler] 无法恢复任务,创建新任务")
+ self._create_new_task(task_id, user_id, conversation_id)
- async def _init_user(self, user_id: str) -> None:
+ async def _get_user(self, user_id: str) -> None:
"""初始化用户"""
user = await UserManager.get_user(user_id)
if not user:
@@ -115,3 +114,23 @@ class InitializationMixin:
lstrip_blocks=True,
extensions=["jinja2.ext.loopcontrols"],
)
+
+ async def _create_new_conversation(
+ self, title: str, user_id: str, app_id: uuid.UUID | None = None,
+ *,
+ debug: bool = False,
+ ) -> Conversation:
+ """判断并创建新对话"""
+ if app_id and not await AppCenterManager.validate_user_app_access(user_id, app_id):
+ err = "Invalid app_id."
+ raise RuntimeError(err)
+ new_conv = await ConversationManager.add_conversation_by_user(
+ title=title,
+ user_id=user_id,
+ app_id=app_id,
+ debug=debug,
+ )
+ if not new_conv:
+ err = "Create new conversation failed."
+ raise RuntimeError(err)
+ return new_conv
diff --git a/apps/scheduler/scheduler/message.py b/apps/scheduler/scheduler/message.py
index a4c9230a534ac152fd3360fb9c0c14b88081c34b..ba103c9102d22f5a203d91b934179938c46d9545 100644
--- a/apps/scheduler/scheduler/message.py
+++ b/apps/scheduler/scheduler/message.py
@@ -12,7 +12,7 @@ from apps.schemas.task import TaskData
_logger = logging.getLogger(__name__)
-class MessagingMixin:
+class MessageMixin:
"""处理消息推送相关的逻辑"""
queue: MessageQueue
diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py
index 436a17e2547ba69702daaa46c6a38dad1f32492a..28b513dfaf718593cafc2d630be6b2c699f22009 100644
--- a/apps/scheduler/scheduler/scheduler.py
+++ b/apps/scheduler/scheduler/scheduler.py
@@ -8,23 +8,23 @@ import uuid
from apps.common.queue import MessageQueue
from apps.schemas.request_data import RequestData
-from .conversation import ConversationMixin
from .data import DataMixin
from .executor import ExecutorMixin
from .flow import FlowMixin
-from .init import InitializationMixin
-from .message import MessagingMixin
+from .init import InitMixin
+from .message import MessageMixin
+from .util import UtilMixin
_logger = logging.getLogger(__name__)
class Scheduler(
- InitializationMixin,
+ InitMixin,
ExecutorMixin,
FlowMixin,
- ConversationMixin,
DataMixin,
- MessagingMixin,
+ MessageMixin,
+ UtilMixin,
):
"""
"调度器",是最顶层的、控制Executor执行顺序和状态的逻辑。
@@ -32,12 +32,12 @@ class Scheduler(
Scheduler包含一个"SchedulerContext",作用为多个Executor的"聊天会话"
所有属性都继承自各个Mixin类,主要包括:
- - task: TaskData (来自InitializationMixin)
- - llm: LLMConfig (来自InitializationMixin)
- - queue: MessageQueue (来自InitializationMixin)
- - post_body: RequestData (来自InitializationMixin)
- - user: User (来自InitializationMixin)
- - _env: SandboxedEnvironment (来自InitializationMixin)
+ - task: TaskData (来自InitMixin)
+ - llm: LLMConfig (来自InitMixin)
+ - queue: MessageQueue (来自InitMixin)
+ - post_body: RequestData (来自InitMixin)
+ - user: User (来自InitMixin)
+ - _env: SandboxedEnvironment (来自InitMixin)
"""
async def init(
@@ -51,7 +51,7 @@ class Scheduler(
self.queue = queue
self.post_body = post_body
- await self._init_user(user_id)
+ await self._get_user(user_id)
await self._init_task(task_id, user_id)
self._init_jinja2_env()
self.llm = await self._get_scheduler_llm(post_body.llm_id)
diff --git a/apps/scheduler/scheduler/util.py b/apps/scheduler/scheduler/util.py
new file mode 100644
index 0000000000000000000000000000000000000000..0f710155da3bf6053eef3e7128da20bcde04386f
--- /dev/null
+++ b/apps/scheduler/scheduler/util.py
@@ -0,0 +1,70 @@
+# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved.
+"""工具类Mixin"""
+
+import asyncio
+import logging
+import uuid
+
+from apps.schemas.request_data import RequestData
+from apps.schemas.task import TaskData
+from apps.services.activity import Activity
+from apps.services.conversation import ConversationManager
+
+_logger = logging.getLogger(__name__)
+
+
+class UtilMixin:
+ """通用工具方法的Mixin类"""
+
+ task: TaskData
+ post_body: RequestData
+
+ async def _determine_app_id(self) -> uuid.UUID | None:
+ """确定最终使用的 app_id"""
+ conversation = None
+
+ if self.task.metadata.conversationId:
+ conversation = await ConversationManager.get_conversation_by_conversation_id(
+ self.task.metadata.userId,
+ self.task.metadata.conversationId,
+ )
+
+ if conversation and conversation.appId:
+ final_app_id = conversation.appId
+ _logger.info("[Scheduler] 使用Conversation中的appId: %s", final_app_id)
+
+ if self.post_body.app and self.post_body.app.app_id and self.post_body.app.app_id != conversation.appId:
+ _logger.warning(
+ "[Scheduler] post_body中的app_id(%s)与Conversation中的appId(%s)不符,忽略post_body中的app信息",
+ self.post_body.app.app_id,
+ conversation.appId,
+ )
+ self.post_body.app.app_id = conversation.appId
+ elif self.post_body.app and self.post_body.app.app_id:
+ final_app_id = self.post_body.app.app_id
+ _logger.info("[Scheduler] Conversation中无appId,使用post_body中的app_id: %s", final_app_id)
+ else:
+ final_app_id = None
+ _logger.info("[Scheduler] Conversation和post_body中均无appId,fallback到智能问答")
+
+ return final_app_id
+
+ async def _monitor_activity(self, kill_event: asyncio.Event, user_id: str) -> None:
+ """监控用户活动状态,不活跃时终止工作流"""
+ try:
+ check_interval = 0.5
+
+ while not kill_event.is_set():
+ is_active = await Activity.is_active(user_id)
+
+ if not is_active:
+ _logger.warning("[Scheduler] 用户 %s 不活跃,终止工作流", user_id)
+ kill_event.set()
+ break
+
+ await asyncio.sleep(check_interval)
+ except asyncio.CancelledError:
+ _logger.info("[Scheduler] 活动监控任务已取消")
+ except Exception:
+ _logger.exception("[Scheduler] 活动监控过程中发生错误")
+ kill_event.set()
diff --git a/apps/schemas/mcp.py b/apps/schemas/mcp.py
index e64391f4b662e69e1d9848801e3d2ffd756e3041..474fd646bf95ab529755014962e5227f8cb3a73a 100644
--- a/apps/schemas/mcp.py
+++ b/apps/schemas/mcp.py
@@ -84,14 +84,6 @@ class FlowName(BaseModel):
flow_name: str = Field(description="MCP 流程名称", default="")
-
-class FlowRisk(BaseModel):
- """MCP 流程风险评估结果"""
-
- risk: Risk = Field(description="风险类型", default=Risk.LOW)
- reason: str = Field(description="风险原因", default="")
-
-
class ToolRisk(BaseModel):
"""MCP工具风险评估结果"""
diff --git a/apps/schemas/scheduler.py b/apps/schemas/scheduler.py
index fa7cd8dd08637c52074fc900e29a6f74a68c4f65..3f44772331885fb79c85997397d07ecadc64b583 100644
--- a/apps/schemas/scheduler.py
+++ b/apps/schemas/scheduler.py
@@ -35,6 +35,7 @@ class ExecutorBackground(BaseModel):
num: int = Field(description="对话记录最大数量", default=0)
conversation: list[dict[str, str]] = Field(description="对话记录", default=[])
facts: list[str] = Field(description="当前Executor的背景信息", default=[])
+ history_questions: list[str] = Field(description="历史问题列表", default=[])
class CallVars(BaseModel):
diff --git a/design/executor/agent.md b/design/executor/agent.md
index 927754d7fa5a708484db8d2187a0d7d0eee66b4f..c58ca2f92d835e991128345c0a34b66751405f82 100644
--- a/design/executor/agent.md
+++ b/design/executor/agent.md
@@ -68,7 +68,7 @@ classDiagram
class MCPPlanner {
+get_flow_name() FlowName
+create_next_step(history, tools) Step
- +get_flow_excute_risk(tools) FlowRisk
+ +get_flow_excute_risk(tools: dict[str, MCPTools]) FlowRisk
+get_tool_risk(tool, input_param) ToolRisk
+is_param_error(...) IsParamError
+change_err_message_to_description(...) str