diff --git a/apps/models/task.py b/apps/models/task.py index 397d604f462528d54fbf8572f3092afb6c79ce69..948d87e4a717f8f2c0481c6e3954c3ec7b2e7c1c 100644 --- a/apps/models/task.py +++ b/apps/models/task.py @@ -143,7 +143,7 @@ class ExecutorCheckpoint(Base): """步骤名称""" stepStatus: Mapped[StepStatus] = mapped_column(Enum(StepStatus), nullable=False) # noqa: N815 """步骤状态""" - stepType: Mapped[StepType] = mapped_column(Enum(StepType), nullable=False) # noqa: N815 + stepType: Mapped[str] = mapped_column(String(255), nullable=False) # noqa: N815 """步骤类型""" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default_factory=uuid.uuid4) """检查点ID""" @@ -172,7 +172,7 @@ class ExecutorHistory(Base): """步骤ID""" stepName: Mapped[str] = mapped_column(String(255), nullable=False) # noqa: N815 """步骤名称""" - stepType: Mapped[StepType] = mapped_column(Enum(StepType), nullable=False) # noqa: N815 + stepType: Mapped[str] = mapped_column(String(255), nullable=False) # noqa: N815 """步骤类型""" stepStatus: Mapped[StepStatus] = mapped_column(Enum(StepStatus), nullable=False) # noqa: N815 """步骤状态""" diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index e744dcd7b3f3fde2074590b89c7b07c86d25adca..455055670020e916c6e707b10f46eb726061d151 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -192,7 +192,7 @@ async def get_recently_used_applications( async def get_application(appId: Annotated[uuid.UUID, Path()]) -> JSONResponse: # noqa: N803 """获取应用详情""" try: - app_data = await AppCenterManager.fetch_app_data_by_id(appId) + app_data = await AppCenterManager.fetch_app_metadata_by_id(appId) except ValueError: logger.exception("[AppCenter] 获取应用详情请求无效") return JSONResponse( diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index cb37c69e68044ec6c4b4d01e2f8c01f9bdbac767..4694ec9686b40f9af43aee96382f24b58b70f557 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -3,6 +3,7 @@ import logging import uuid +from typing import TYPE_CHECKING, cast import anyio from mcp.types import TextContent @@ -14,11 +15,15 @@ from apps.scheduler.executor.base import BaseExecutor from apps.scheduler.mcp_agent.host import MCPHost from apps.scheduler.mcp_agent.plan import MCPPlanner from apps.scheduler.pool.mcp.pool import MCPPool +from apps.schemas.agent import AgentAppMetadata from apps.schemas.enum_var import EventType from apps.schemas.mcp import Step from apps.schemas.message import FlowParams from apps.services import AppCenterManager, MCPServiceManager, UserManager +if TYPE_CHECKING: + from apps.models.task import ExecutorCheckpoint + _logger = logging.getLogger(__name__) class MCPAgentExecutor(BaseExecutor): @@ -44,6 +49,7 @@ class MCPAgentExecutor(BaseExecutor): self._mcp_pool = MCPPool() self._mcp_list = [] self._current_input = {} + self._current_tool = None # 初始化MCP Host相关对象 self._planner = MCPPlanner(self.task, self.llm) self._host = MCPHost(self.task, self.llm) @@ -60,8 +66,13 @@ class MCPAgentExecutor(BaseExecutor): """加载MCP服务器列表""" _logger.info("[MCPAgentExecutor] 加载MCP服务器列表") # 获取MCP服务器列表 - app = await AppCenterManager.fetch_app_data_by_id(self.agent_id) - mcp_ids = app.mcp_service + app_metadata = await AppCenterManager.fetch_app_metadata_by_id(self.agent_id) + if not isinstance(app_metadata, AgentAppMetadata): + err = "[MCPAgentExecutor] 应用类型不是Agent" + _logger.error(err) + raise TypeError(err) + + mcp_ids = app_metadata.mcp_service for mcp_id in mcp_ids: if not await MCPServiceManager.is_user_actived(self.task.metadata.userSub, mcp_id): _logger.warning( @@ -83,18 +94,23 @@ class MCPAgentExecutor(BaseExecutor): inputSchema={}, outputSchema={}, ) - async def get_tool_input_param(self, *, is_first: bool) -> None: - """获取工具输入参数""" + def _validate_task_state(self) -> None: + """验证任务状态是否存在""" if not self.task.state: err = "[MCPAgentExecutor] 任务状态不存在" _logger.error(err) raise RuntimeError(err) + async def get_tool_input_param(self, *, is_first: bool) -> None: + """获取工具输入参数""" + self._validate_task_state() + state = cast("ExecutorCheckpoint", self.task.state) + if is_first: # 获取第一个输入参数 - mcp_tool = self.tools[self.task.state.stepName] + self._current_tool = self.tools[state.stepName] self._current_input = await self._host.get_first_input_params( - mcp_tool, self.task.runtime.userInput, self.task, + self._current_tool, self.task.runtime.userInput, self.task, ) else: # 获取后续输入参数 @@ -104,103 +120,162 @@ class MCPAgentExecutor(BaseExecutor): else: params = {} params_description = "" - mcp_tool = self.tools[self.task.state.stepName] - self.task.state.currentInput = await self._host.fill_params( - mcp_tool, + self._current_tool = self.tools[state.stepName] + state.currentInput = await self._host.fill_params( + self._current_tool, self.task.runtime.userInput, - self.task.state.currentInput, - self.task.state.errorMessage, + state.currentInput, + state.errorMessage, params, params_description, self.task.runtime.language, ) + self.task.state = state - async def confirm_before_step(self) -> None: - """确认前步骤""" - if not self.task.state: - err = "[MCPAgentExecutor] 任务状态不存在" + def _validate_current_tool(self) -> None: + """验证当前工具是否存在""" + if self._current_tool is None: + err = "[MCPAgentExecutor] 当前工具不存在" _logger.error(err) raise RuntimeError(err) - # 发送确认消息 - mcp_tool = self.tools[self.task.state.stepName] - confirm_message = await self._planner.get_tool_risk( - mcp_tool, self._current_input, "", self.llm, self.task.runtime.language, + def _create_executor_history( + self, + step_status: StepStatus, + input_data: dict | None = None, + output_data: dict | None = None, + extra_data: dict | None = None, + ) -> ExecutorHistory: + """创建ExecutorHistory对象""" + self._validate_task_state() + state = cast("ExecutorCheckpoint", self.task.state) + current_tool = cast("MCPTools", self._current_tool) + return ExecutorHistory( + taskId=self.task.metadata.id, + stepId=state.stepId, + stepName=current_tool.toolName if self._current_tool else state.stepName, + stepType=str(current_tool.id) if self._current_tool else "", + stepStatus=step_status, + executorId=state.executorId, + executorName=state.executorName, + executorStatus=state.executorStatus, + inputData=input_data or {}, + outputData=output_data or {}, + extraData=extra_data or {}, ) + + def _remove_last_context_if_same_step(self) -> None: + """如果最后一个context是当前步骤,则删除它""" + self._validate_task_state() + state = cast("ExecutorCheckpoint", self.task.state) + if len(self.task.context) and self.task.context[-1].stepId == state.stepId: + del self.task.context[-1] + + async def _handle_step_error_and_continue(self) -> None: + """处理步骤错误并继续下一步""" + self._validate_task_state() + state = cast("ExecutorCheckpoint", self.task.state) + await self._push_message( + EventType.STEP_ERROR, + data={ + "message": state.errorMessage, + }, + ) + if len(self.task.context) and self.task.context[-1].stepId == state.stepId: + self.task.context[-1].stepStatus = StepStatus.ERROR + self.task.context[-1].outputData = { + "message": state.errorMessage, + } + else: + self.task.context.append( + self._create_executor_history( + step_status=StepStatus.ERROR, + input_data=self._current_input, + output_data={ + "message": state.errorMessage, + }, + ), + ) + await self.get_next_step() + + async def confirm_before_step(self) -> None: + """确认前步骤""" + self._validate_task_state() + self._validate_current_tool() + state = cast("ExecutorCheckpoint", self.task.state) + current_tool = cast("MCPTools", self._current_tool) + + confirm_message = await self._planner.get_tool_risk(current_tool, self._current_input, "") await self._push_message( EventType.STEP_WAITING_FOR_START, confirm_message.model_dump(exclude_none=True, by_alias=True), ) await self._push_message(EventType.FLOW_STOP, {}) - self.task.state.executorStatus = ExecutorStatus.WAITING - self.task.state.stepStatus = StepStatus.WAITING + state.executorStatus = ExecutorStatus.WAITING + state.stepStatus = StepStatus.WAITING self.task.context.append( - ExecutorHistory( - taskId=self.task.metadata.id, - stepId=self.task.state.stepId, - stepName=self.task.state.stepName, - stepDescription=self.task.state.stepDescription, - stepStatus=self.task.state.stepStatus, - executorId=self.task.state.executorId, - executorName=self.task.state.executorName, - executorStatus=self.task.state.executorStatus, - inputData={}, - outputData={}, - extraData=confirm_message.model_dump(exclude_none=True, by_alias=True), + self._create_executor_history( + step_status=state.stepStatus, + extra_data=confirm_message.model_dump(exclude_none=True, by_alias=True), ), ) + self.task.state = state async def run_step(self) -> None: """执行步骤""" - if not self.task.state: - err = "[MCPAgentExecutor] 任务状态不存在" - _logger.error(err) - raise RuntimeError(err) - - self.task.state.executorStatus = ExecutorStatus.RUNNING - self.task.state.stepStatus = StepStatus.RUNNING - mcp_tool = self.tools[self.task.state.stepName] - mcp_client = await self._mcp_pool.get(mcp_tool.mcpId, self.task.metadata.userSub) + self._validate_task_state() + state = cast("ExecutorCheckpoint", self.task.state) + state.executorStatus = ExecutorStatus.RUNNING + state.stepStatus = StepStatus.RUNNING + self.task.state = state + self._validate_current_tool() + current_tool = cast("MCPTools", self._current_tool) + + mcp_client = await self._mcp_pool.get(current_tool.mcpId, self.task.metadata.userSub) if not mcp_client: - _logger.exception("[MCPAgentExecutor] MCP客户端不存在: %s", mcp_tool.mcpId) - self.task.state.stepStatus = StepStatus.ERROR - self.task.state.errorMessage = { - "err_msg": f"MCP客户端不存在: {mcp_tool.mcpId}", + _logger.exception("[MCPAgentExecutor] MCP客户端不存在: %s", current_tool.mcpId) + state.stepStatus = StepStatus.ERROR + state.errorMessage = { + "err_msg": f"MCP客户端不存在: {current_tool.mcpId}", "data": self._current_input, } + self.task.state = state return try: - output_data = await mcp_client.call_tool(mcp_tool.toolName, self._current_input) + output_data = await mcp_client.call_tool(current_tool.toolName, self._current_input) except anyio.ClosedResourceError as e: - _logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", mcp_tool.mcpId) + _logger.exception("[MCPAgentExecutor] MCP客户端连接已关闭: %s", current_tool.mcpId) # 停止当前用户MCP进程 - await self._mcp_pool.stop(mcp_tool.mcpId, self.task.metadata.userSub) - self.task.state.stepStatus = StepStatus.ERROR - self.task.state.errorMessage = { + await self._mcp_pool.stop(current_tool.mcpId, self.task.metadata.userSub) + state.stepStatus = StepStatus.ERROR + state.errorMessage = { "err_msg": str(e), "data": self._current_input, } + self.task.state = state return except Exception as e: - _logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误", self.task.state.stepName) - self.task.state.stepStatus = StepStatus.ERROR - self.task.state.errorMessage = { + _logger.exception("[MCPAgentExecutor] 执行步骤 %s 时发生错误", state.stepName) + state.stepStatus = StepStatus.ERROR + state.errorMessage = { "err_msg": str(e), "data": self._current_input, } + self.task.state = state return - _logger.error("当前工具名称: %s, 输出参数: %s", self.task.state.stepName, output_data) + _logger.error("当前工具名称: %s, 输出参数: %s", state.stepName, output_data) if output_data.isError: err = "" for output in output_data.content: if isinstance(output, TextContent): err += output.text - self.task.state.stepStatus = StepStatus.ERROR - self.task.state.errorMessage = { + state.stepStatus = StepStatus.ERROR + state.errorMessage = { "err_msg": err, "data": {}, } + self.task.state = state return message = "" @@ -214,70 +289,58 @@ class MCPAgentExecutor(BaseExecutor): await self._push_message(EventType.STEP_INPUT, self._current_input) await self._push_message(EventType.STEP_OUTPUT, output_data) self.task.context.append( - ExecutorHistory( - taskId=self.task.metadata.id, - stepId=self.task.state.stepId, - stepName=self.task.state.stepName, - stepDescription=self.task.state.stepDescription, - stepStatus=StepStatus.SUCCESS, - executorId=self.task.state.executorId, - executorName=self.task.state.executorName, - executorStatus=self.task.state.executorStatus, - inputData=self._current_input, - outputData=output_data, + self._create_executor_history( + step_status=StepStatus.SUCCESS, + input_data=self._current_input, + output_data=output_data, ), ) - self.task.state.stepStatus = StepStatus.SUCCESS + state.stepStatus = StepStatus.SUCCESS + self.task.state = state async def generate_params_with_null(self) -> None: """生成参数补充""" - if not self.task.state: - err = "[MCPAgentExecutor] 任务状态不存在" - _logger.error(err) - raise RuntimeError(err) + self._validate_task_state() + self._validate_current_tool() + state = cast("ExecutorCheckpoint", self.task.state) + current_tool = cast("MCPTools", self._current_tool) - mcp_tool = self.tools[self.task.state.stepName] params_with_null = await self._planner.get_missing_param( - mcp_tool, + current_tool, self._current_input, - self.task.state.errorMessage, + state.errorMessage, + ) + error_msg_str = ( + str(state.errorMessage.get("err_msg", "")) + if isinstance(state.errorMessage, dict) + else str(state.errorMessage) ) error_message = await self._planner.change_err_message_to_description( - error_message=self.task.state.errorMessage, - tool=mcp_tool, + error_message=error_msg_str, + tool=current_tool, input_params=self._current_input, ) await self._push_message( EventType.STEP_WAITING_FOR_PARAM, data={"message": error_message, "params": params_with_null}, ) await self._push_message(EventType.FLOW_STOP, data={}) - self.task.state.executorStatus = ExecutorStatus.WAITING - self.task.state.stepStatus = StepStatus.PARAM + state.executorStatus = ExecutorStatus.WAITING + state.stepStatus = StepStatus.PARAM self.task.context.append( - ExecutorHistory( - taskId=self.task.metadata.id, - stepId=self.task.state.stepId, - stepName=self.task.state.stepName, - stepDescription=self.task.state.stepDescription, - stepStatus=self.task.state.stepStatus, - executorId=self.task.state.executorId, - executorName=self.task.state.executorName, - executorStatus=self.task.state.executorStatus, - inputData={}, - outputData={}, - extraData={ + self._create_executor_history( + step_status=state.stepStatus, + extra_data={ "message": error_message, "params": params_with_null, }, ), ) + self.task.state = state async def get_next_step(self) -> None: """获取下一步""" - if not self.task.state: - err = "[MCPAgentExecutor] 任务状态不存在" - _logger.error(err) - raise RuntimeError(err) + self._validate_task_state() + state = cast("ExecutorCheckpoint", self.task.state) if self._step_cnt < AGENT_MAX_STEPS: self._step_cnt += 1 @@ -296,53 +359,36 @@ class MCPAgentExecutor(BaseExecutor): tool_id=AGENT_FINAL_STEP_NAME, description=AGENT_FINAL_STEP_NAME, ) - step_description = step.description - self.task.state.stepId = uuid.uuid4() - self.task.state.stepName = step.tool_id - self.task.state.stepDescription = step_description - self.task.state.stepStatus = StepStatus.INIT + state.stepId = uuid.uuid4() + state.stepName = step.tool_id + state.stepStatus = StepStatus.INIT else: # 没有下一步了,结束流程 - self.task.state.toolId = AGENT_FINAL_STEP_NAME + state.stepName = AGENT_FINAL_STEP_NAME + self.task.state = state async def error_handle_after_step(self) -> None: """步骤执行失败后的错误处理""" - if not self.task.state: - err = "[MCPAgentExecutor] 任务状态不存在" - _logger.error(err) - raise RuntimeError(err) - - self.task.state.stepStatus = StepStatus.ERROR - self.task.state.executorStatus = ExecutorStatus.ERROR + self._validate_task_state() + state = cast("ExecutorCheckpoint", self.task.state) + state.stepStatus = StepStatus.ERROR + state.executorStatus = ExecutorStatus.ERROR await self._push_message( EventType.FLOW_FAILED, data={}, ) - if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId: - del self.task.context[-1] + self._remove_last_context_if_same_step() self.task.context.append( - ExecutorHistory( - taskId=self.task.metadata.id, - stepId=self.task.state.stepId, - stepName=self.task.state.stepName, - stepDescription=self.task.state.stepDescription, - stepStatus=self.task.state.stepStatus, - executorId=self.task.state.executorId, - executorName=self.task.state.executorName, - executorStatus=self.task.state.executorStatus, - inputData={}, - outputData={}, - ), + self._create_executor_history(step_status=state.stepStatus), ) + self.task.state = state async def work(self) -> None: # noqa: C901, PLR0912, PLR0915 """执行当前步骤""" - if not self.task.state: - err = "[MCPAgentExecutor] 任务状态不存在" - _logger.error(err) - raise RuntimeError(err) + self._validate_task_state() + state = cast("ExecutorCheckpoint", self.task.state) - if self.task.state.stepStatus == StepStatus.INIT: + if state.stepStatus == StepStatus.INIT: await self._push_message( EventType.STEP_INIT, data={}, @@ -352,19 +398,18 @@ class MCPAgentExecutor(BaseExecutor): # 等待用户确认 await self.confirm_before_step() return - self.task.state.stepStatus = StepStatus.RUNNING - elif self.task.state.stepStatus in [StepStatus.PARAM, StepStatus.WAITING, StepStatus.RUNNING]: - if self.task.state.stepStatus == StepStatus.PARAM: - if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId: - del self.task.context[-1] + state.stepStatus = StepStatus.RUNNING + self.task.state = state + elif state.stepStatus in [StepStatus.PARAM, StepStatus.WAITING, StepStatus.RUNNING]: + if state.stepStatus == StepStatus.PARAM: + self._remove_last_context_if_same_step() await self.get_tool_input_param(is_first=False) - elif self.task.state.stepStatus == StepStatus.WAITING: + elif state.stepStatus == StepStatus.WAITING: if self.params: - if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId: - del self.task.context[-1] + self._remove_last_context_if_same_step() else: - self.task.state.executorStatus = ExecutorStatus.CANCELLED - self.task.state.stepStatus = StepStatus.CANCELLED + state.executorStatus = ExecutorStatus.CANCELLED + state.stepStatus = StepStatus.CANCELLED await self._push_message( EventType.STEP_CANCEL, data={}, @@ -373,104 +418,56 @@ class MCPAgentExecutor(BaseExecutor): EventType.FLOW_CANCEL, data={}, ) - if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId: + if len(self.task.context) and self.task.context[-1].stepId == state.stepId: self.task.context[-1].stepStatus = StepStatus.CANCELLED + self.task.state = state return max_retry = 5 for i in range(max_retry): if i != 0: await self.get_tool_input_param(is_first=True) await self.run_step() - if self.task.state.stepStatus == StepStatus.SUCCESS: + if state.stepStatus == StepStatus.SUCCESS: break - elif self.task.state.stepStatus == StepStatus.ERROR: + elif state.stepStatus == StepStatus.ERROR: # 错误处理 if self._retry_times >= AGENT_MAX_RETRY_TIMES: await self.error_handle_after_step() else: user_info = await UserManager.get_user(self.task.metadata.userSub) - if user_info.auto_execute: - await self._push_message( - EventType.STEP_ERROR, - data={ - "message": self.task.state.errorMessage, - }, - ) - if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId: - self.task.context[-1].stepStatus = StepStatus.ERROR - self.task.context[-1].outputData = { - "message": self.task.state.errorMessage, - } - else: - self.task.context.append( - ExecutorHistory( - taskId=self.task.metadata.id, - stepId=self.task.state.stepId, - stepName=self.task.state.stepName, - stepDescription=self.task.state.stepDescription, - stepStatus=StepStatus.ERROR, - executorId=self.task.state.executorId, - executorName=self.task.state.executorName, - executorStatus=self.task.state.executorStatus, - inputData=self._current_input, - outputData={ - "message": self.task.state.errorMessage, - }, - ), - ) - await self.get_next_step() + if not user_info: + err = "[MCPAgentExecutor] 用户不存在" + _logger.error(err) + raise RuntimeError(err) + + if user_info.autoExecute: + await self._handle_step_error_and_continue() else: - mcp_tool = self.tools[self.task.state.stepName] + mcp_tool = self.tools[state.stepName] + error_msg = ( + str(state.errorMessage.get("err_msg", "")) + if isinstance(state.errorMessage, dict) + else str(state.errorMessage) + ) is_param_error = await self._planner.is_param_error( await self._host.assemble_memory(self.task.runtime, self.task.context), - self.task.state.errorMessage, + error_msg, mcp_tool, - self.task.state.stepDescription, + "", self._current_input, ) if is_param_error.is_param_error: # 如果是参数错误,生成参数补充 await self.generate_params_with_null() else: - await self._push_message( - EventType.STEP_ERROR, - data={ - "message": self.task.state.errorMessage, - }, - ) - if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId: - self.task.context[-1].stepStatus = StepStatus.ERROR - self.task.context[-1].outputData = { - "message": self.task.state.errorMessage, - } - else: - self.task.context.append( - ExecutorHistory( - taskId=self.task.metadata.id, - stepId=self.task.state.stepId, - stepName=self.task.state.stepName, - stepDescription=self.task.state.stepDescription, - stepStatus=StepStatus.ERROR, - executorId=self.task.state.executorId, - executorName=self.task.state.executorName, - executorStatus=self.task.state.executorStatus, - inputData=self._current_input, - outputData={ - "message": self.task.state.errorMessage, - }, - ), - ) - await self.get_next_step() - elif self.task.state.stepStatus == StepStatus.SUCCESS: + await self._handle_step_error_and_continue() + elif state.stepStatus == StepStatus.SUCCESS: await self.get_next_step() async def summarize(self) -> None: """总结""" async for chunk in self._planner.generate_answer( - self.task.runtime.userInput, - (await self._host.assemble_memory(self.task.runtime, self.task.context)), - self.llm, - self.task.runtime.language, + await self._host.assemble_memory(self.task.runtime, self.task.context), ): await self._push_message( EventType.TEXT_ADD, @@ -478,72 +475,66 @@ class MCPAgentExecutor(BaseExecutor): ) self.task.runtime.fullAnswer += chunk - async def run(self) -> None: # noqa: C901 + async def run(self) -> None: """执行MCP Agent的主逻辑""" if not self.task.state: err = "[MCPAgentExecutor] 任务状态不存在" _logger.error(err) raise RuntimeError(err) + state = cast("ExecutorCheckpoint", self.task.state) # 初始化MCP服务 await self.load_mcp() data = {} - if self.task.state.executorStatus == ExecutorStatus.INIT: + if state.executorStatus == ExecutorStatus.INIT: # 初始化状态 - self.task.state.executorId = str(uuid.uuid4()) - self.task.state.executorName = (await self._planner.get_flow_name()).flow_name + state.executorId = str(uuid.uuid4()) + state.executorName = (await self._planner.get_flow_name()).flow_name flow_risk = await self._planner.get_flow_excute_risk(self.tool_list) if self._user.autoExecute: data = flow_risk.model_dump(exclude_none=True, by_alias=True) await self.get_next_step() + self.task.state = state - self.task.state.executorStatus = ExecutorStatus.RUNNING + state.executorStatus = ExecutorStatus.RUNNING + self.task.state = state await self._push_message(EventType.FLOW_START, data=data) - if self.task.state.stepName == AGENT_FINAL_STEP_NAME: + if state.stepName == AGENT_FINAL_STEP_NAME: # 如果已经是最后一步,直接结束 - self.task.state.executorStatus = ExecutorStatus.SUCCESS + state.executorStatus = ExecutorStatus.SUCCESS + self.task.state = state await self._push_message(EventType.FLOW_SUCCESS, data={}) await self.summarize() return try: - while self.task.state.executorStatus == ExecutorStatus.RUNNING: + while state.executorStatus == ExecutorStatus.RUNNING: await self.work() - if self.task.state.stepName == AGENT_FINAL_STEP_NAME: + if state.stepName == AGENT_FINAL_STEP_NAME: # 如果已经是最后一步,直接结束 - self.task.state.executorStatus = ExecutorStatus.SUCCESS - self.task.state.stepStatus = StepStatus.SUCCESS + state.executorStatus = ExecutorStatus.SUCCESS + state.stepStatus = StepStatus.SUCCESS + self.task.state = state await self._push_message(EventType.FLOW_SUCCESS, data={}) await self.summarize() except Exception as e: _logger.exception("[MCPAgentExecutor] 执行过程中发生错误") - self.task.state.executorName = ExecutorStatus.ERROR - self.task.state.errorMessage = { + state.executorStatus = ExecutorStatus.ERROR + state.errorMessage = { "err_msg": str(e), "data": {}, } - self.task.state.stepStatus = StepStatus.ERROR + state.stepStatus = StepStatus.ERROR await self._push_message(EventType.STEP_ERROR, data={}) await self._push_message(EventType.FLOW_FAILED, data={}) - if len(self.task.context) and self.task.context[-1].stepId == self.task.state.stepId: - del self.task.context[-1] + self._remove_last_context_if_same_step() self.task.context.append( - ExecutorHistory( - taskId=self.task.metadata.id, - stepId=self.task.state.stepId, - stepName=self.task.state.stepName, - stepDescription=self.task.state.stepDescription, - stepStatus=self.task.state.stepStatus, - executorId=self.task.state.executorId, - executorName=self.task.state.executorName, - executorStatus=self.task.state.executorStatus, - inputData={}, - outputData={}, - ), + self._create_executor_history(step_status=state.stepStatus), ) + self.task.state = state finally: for mcp_service in self._mcp_list: try: diff --git a/apps/scheduler/executor/flow.py b/apps/scheduler/executor/flow.py index 955dc65dded27e848433e42148339df0cf06d918..4d820503a63d949088335389874185b2fb9d92a7 100644 --- a/apps/scheduler/executor/flow.py +++ b/apps/scheduler/executor/flow.py @@ -90,7 +90,8 @@ class FlowExecutor(BaseExecutor): stepStatus=StepStatus.RUNNING, stepId=self.flow.basicConfig.startStep, stepName=self.flow.steps[self.flow.basicConfig.startStep].name, - stepType=StepType(self.flow.steps[self.flow.basicConfig.startStep].type), + # 先转换为StepType,再转换为str,确定Flow的类型在其中 + stepType=str(StepType(self.flow.steps[self.flow.basicConfig.startStep].type)), ) # 是否到达Flow结束终点(变量) self._reached_end: bool = False diff --git a/apps/scheduler/executor/qa.py b/apps/scheduler/executor/qa.py index 5159571068be0b0f8394b810366ab3577837f343..3a8f315cf0cbc60ef4aaa73833498a5ad768f63c 100644 --- a/apps/scheduler/executor/qa.py +++ b/apps/scheduler/executor/qa.py @@ -92,6 +92,7 @@ class QAExecutor(BaseExecutor): stepStatus=StepStatus.RUNNING, stepId=uuid.uuid4(), stepName="QAExecutor", + stepType="", appId=None, ) diff --git a/apps/scheduler/executor/step.py b/apps/scheduler/executor/step.py index a6470955bd3dde9c3d0307e0fd24e5984d53d8ce..57a783b88d74301b483a64244f1c7aa526df46bc 100644 --- a/apps/scheduler/executor/step.py +++ b/apps/scheduler/executor/step.py @@ -91,7 +91,7 @@ class StepExecutor(BaseExecutor): # State写入ID和运行状态 self.task.state.stepId = self.step.step_id - self.task.state.stepType = StepType(self.step.step.type) + self.task.state.stepType = str(StepType(self.step.step.type)) self.task.state.stepName = self.step.step.name # 获取并验证Call类 @@ -265,7 +265,7 @@ class StepExecutor(BaseExecutor): executorStatus=self.task.state.executorStatus, stepId=self.step.step_id, stepName=self.step.step.name, - stepType=StepType(self.step.step.type), + stepType=self.task.state.stepType, stepStatus=self.task.state.stepStatus, inputData=self.obj.input, outputData=output_data, diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py index 75b208daf2c24273524d4c6c6cf3c6de3023a522..04326edf341a4536fe7f015473f6ac64f4f141c9 100644 --- a/apps/scheduler/mcp_agent/base.py +++ b/apps/scheduler/mcp_agent/base.py @@ -27,7 +27,7 @@ class MCPBase: self._goal = task.runtime.userInput self._language = task.runtime.language - async def get_resoning_result(self, prompt: str) -> str: + async def get_reasoning_result(self, prompt: str) -> str: """获取推理结果""" # 调用推理大模型 message = [ diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index 10cccd2ef39ac52196652685db03689e4e00eb93..5179cc66ce020b57dc8afac978a7fe30557f685f 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -59,7 +59,7 @@ class MCPHost(MCPBase): background_info=await self.assemble_memory(runtime, context), ) _logger.info("[MCPHost] 填充工具参数: %s", prompt) - result = await self.get_resoning_result(prompt) + result = await self.get_reasoning_result(prompt) # 使用JsonGenerator解析结果 return await self._parse_result( result, diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index f36bbaba65a68862011305df63917ae1aa869436..82b098484bad35a1d230c56e1abeaf023bf4e1d9 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -21,6 +21,7 @@ from apps.scheduler.mcp_agent.prompt import ( RISK_EVALUATE, ) from apps.scheduler.slot.slot import Slot +from apps.schemas.llm import LLMChunk from apps.schemas.mcp import ( FlowName, FlowRisk, @@ -45,7 +46,7 @@ class MCPPlanner(MCPBase): """获取当前流程的名称""" template = _env.from_string(GENERATE_FLOW_NAME[self._language]) prompt = template.render(goal=self._goal) - result = await self.get_resoning_result(prompt) + result = await self.get_reasoning_result(prompt) result = await self._parse_result(result, FlowName.model_json_schema()) return FlowName.model_validate(result) @@ -54,7 +55,7 @@ class MCPPlanner(MCPBase): # 获取推理结果 template = _env.from_string(GEN_STEP[self._language]) prompt = template.render(goal=self._goal, history=history, tools=tools) - result = await self.get_resoning_result(prompt) + result = await self.get_reasoning_result(prompt) # 解析为结构化数据 schema = Step.model_json_schema() @@ -74,7 +75,7 @@ class MCPPlanner(MCPBase): goal=self._goal, tools=tools, ) - result = await self.get_resoning_result(prompt) + result = await self.get_reasoning_result(prompt) result = await self._parse_result(result, FlowRisk.model_json_schema()) # 使用FlowRisk模型解析结果 return FlowRisk.model_validate(result) @@ -94,7 +95,7 @@ class MCPPlanner(MCPBase): input_param=input_param, additional_info=additional_info, ) - result = await self.get_resoning_result(prompt) + result = await self.get_reasoning_result(prompt) schema = ToolRisk.model_json_schema() risk = await self._parse_result(result, schema) @@ -121,7 +122,7 @@ class MCPPlanner(MCPBase): input_params=input_params, error_message=error_message, ) - result = await self.get_resoning_result(prompt) + result = await self.get_reasoning_result(prompt) # 解析为结构化数据 schema = IsParamError.model_json_schema() is_param_error = await self._parse_result(result, schema) @@ -140,7 +141,7 @@ class MCPPlanner(MCPBase): input_schema=tool.inputSchema, input_params=input_params, ) - return await self.get_resoning_result(prompt) + return await self.get_reasoning_result(prompt) async def get_missing_param( self, tool: MCPTools, input_param: dict[str, Any], error_message: dict[str, Any], @@ -156,7 +157,7 @@ class MCPPlanner(MCPBase): schema=schema_with_null, error_message=error_message, ) - result = await self.get_resoning_result(prompt) + result = await self.get_reasoning_result(prompt) # 解析为结构化数据 return await self._parse_result(result, schema_with_null) diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 55b312b95e4be0ba96871c88a105f2ac38ed9074..7fec571154f088447d0b8a38948e2516ba07bea7 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -293,7 +293,7 @@ class Scheduler: self.task, self.queue, self.task.ids.user_sub, llm, history, doc_ids, rag_data)) else: # 查找对应的App元数据 - app_data = await AppCenterManager.fetch_app_data_by_id(self.post_body.app.app_id) + app_data = await AppCenterManager.fetch_app_metadata_by_id(self.post_body.app.app_id) if not app_data: _logger.error("[Scheduler] App %s 不存在", self.post_body.app.app_id) await self.queue.close() diff --git a/apps/services/appcenter.py b/apps/services/appcenter.py index aa82d140fcaa71089e2a83a3794ebf3b135e3a4a..2279097a1c245c7e9316587658ffc765e2ca66ac 100644 --- a/apps/services/appcenter.py +++ b/apps/services/appcenter.py @@ -203,21 +203,29 @@ class AppCenterManager: @staticmethod - async def fetch_app_data_by_id(app_id: uuid.UUID) -> App: + async def fetch_app_metadata_by_id(app_id: uuid.UUID) -> AppMetadata | AgentAppMetadata: """ - 根据应用ID获取应用元数据(使用PostgreSQL) + 根据应用ID获取应用元数据(先检查数据库,再使用Loader) :param app_id: 应用唯一标识 - :return: 应用数据 + :return: 应用元数据 """ + # 先在数据库中检查应用是否存在 async with postgres.session() as session: - app_obj = (await session.scalars( - select(App).where(App.id == app_id), + app_exists = (await session.scalars( + select(App.id).where(App.id == app_id), )).one_or_none() - if not app_obj: + if not app_exists: msg = f"[AppCenterManager] 应用不存在: {app_id}" raise ValueError(msg) - return app_obj + + # 应用存在,使用Loader获取Metadata + try: + return await Pool().app_loader.read_metadata(app_id) + except ValueError as e: + # 如果Loader抛出ValueError,说明metadata文件不存在 + msg = f"[AppCenterManager] 应用元数据文件不存在: {app_id}" + raise ValueError(msg) from e @staticmethod diff --git a/design/executor/agent.md b/design/executor/agent.md new file mode 100644 index 0000000000000000000000000000000000000000..065b8be18fb39fc4f46b56acf4a359ba2f73b75e --- /dev/null +++ b/design/executor/agent.md @@ -0,0 +1,1086 @@ +# MCP Agent Executor 模块设计文档 + +## 1. 概述 + +MCP Agent Executor 是基于 Model Context Protocol (MCP) 的智能代理执行器,负责将用户的自然语言目标转化为可执行的工具调用序列,并自动完成多步骤任务流程。 + +### 1.1 核心特性 + +- **自动规划**:基于用户目标自动生成执行计划 +- **动态工具调用**:支持多个 MCP 服务器的工具调用 +- **智能错误处理**:自动识别参数错误并请求用户补充 +- **风险评估**:对流程和步骤进行风险评估 +- **断点续传**:支持任务暂停和恢复 +- **用户交互**:支持手动确认和参数补充 + +### 1.2 技术栈 + +- **语言**: Python 3.x +- **异步框架**: anyio +- **MCP 客户端**: mcp (Anthropic) +- **LLM 集成**: 支持多种大模型 +- **数据验证**: Pydantic + +## 2. 架构设计 + +### 2.1 类结构 + +```mermaid +classDiagram + class BaseExecutor { + <> + +TaskData task + +MessageQueue msg_queue + +LLMConfig llm + +str question + +init() None + +run() None + -_load_history(n: int) None + -_push_message(event_type, data) None + } + + class MCPAgentExecutor { + +UUID agent_id + +str agent_description + +dict[str, MCPTools] tools + +FlowParams params + -int _step_cnt + -int _retry_times + -MCPPool _mcp_pool + -list _mcp_list + -dict _current_input + -MCPTools _current_tool + -MCPPlanner _planner + -MCPHost _host + -User _user + +init() None + +load_mcp() None + +get_tool_input_param(is_first: bool) None + +confirm_before_step() None + +run_step() None + +generate_params_with_null() None + +get_next_step() None + +error_handle_after_step() None + +work() None + +summarize() None + +run() None + } + + class MCPPlanner { + +get_flow_name() FlowName + +create_next_step(history, tools) Step + +get_flow_excute_risk(tools) FlowRisk + +get_tool_risk(tool, input_param) ToolRisk + +is_param_error(...) IsParamError + +change_err_message_to_description(...) str + +get_missing_param(...) dict + +generate_answer(memory) AsyncGenerator + } + + class MCPHost { + +assemble_memory(runtime, context) str + +get_first_input_params(...) dict + +fill_params(...) dict + } + + class MCPPool { + +get(mcp_id, user_sub) MCPClient + +stop(mcp_id, user_sub) None + } + + BaseExecutor <|-- MCPAgentExecutor + MCPAgentExecutor --> MCPPlanner : uses + MCPAgentExecutor --> MCPHost : uses + MCPAgentExecutor --> MCPPool : manages +``` + +### 2.2 核心组件关系 + +```mermaid +graph TB + A[MCPAgentExecutor] --> B[MCPPlanner] + A --> C[MCPHost] + A --> D[MCPPool] + A --> E[TaskData] + A --> F[MessageQueue] + + B --> G[LLM Reasoning] + C --> G + + D --> H[MCP Client 1] + D --> I[MCP Client 2] + D --> J[MCP Client N] + + E --> K[ExecutorCheckpoint] + E --> L[ExecutorHistory] + E --> M[TaskRuntime] + + F --> N[Event Bus] + + style A fill:#f9f,stroke:#333,stroke-width:4px + style B fill:#bbf,stroke:#333,stroke-width:2px + style C fill:#bbf,stroke:#333,stroke-width:2px + style D fill:#bfb,stroke:#333,stroke-width:2px +``` + +## 3. 执行流程 + +### 3.1 主流程图 + +```mermaid +flowchart TD + Start([开始]) --> Init[初始化 Executor] + Init --> LoadMCP[加载 MCP 服务] + LoadMCP --> CheckStatus{检查执行器状态} + + CheckStatus -->|INIT| InitFlow[初始化流程] + CheckStatus -->|其他| Resume[恢复执行] + + InitFlow --> GenFlowName[生成流程名称] + GenFlowName --> RiskEval[流程风险评估] + RiskEval --> GetFirstStep[获取第一步] + + Resume --> WorkLoop{步骤状态判断} + GetFirstStep --> WorkLoop + + WorkLoop -->|INIT| GetToolParam[获取工具参数] + WorkLoop -->|WAITING| UserConfirm{用户确认} + WorkLoop -->|PARAM| FillParam[填充参数] + WorkLoop -->|ERROR| HandleError[错误处理] + WorkLoop -->|SUCCESS| NextStep[获取下一步] + WorkLoop -->|FINAL_STEP| Summarize[生成总结] + + GetToolParam --> AutoExec{是否自动执行} + AutoExec -->|否| ConfirmStep[等待用户确认] + AutoExec -->|是| RunStep[执行步骤] + + ConfirmStep --> Stop1[暂停流程] + Stop1 --> End([结束]) + + UserConfirm -->|确认| RunStep + UserConfirm -->|取消| Cancel[取消流程] + Cancel --> End + + FillParam --> RunStep + + RunStep --> StepResult{执行结果} + StepResult -->|成功| NextStep + StepResult -->|失败| ErrorType{错误类型判断} + + ErrorType -->|参数错误| GenMissingParam[生成缺失参数] + ErrorType -->|其他错误| RetryCheck{重试次数检查} + + GenMissingParam --> Stop2[等待参数补充] + Stop2 --> End + + RetryCheck -->||>=MAX_RETRY| Fail[流程失败] + Fail --> End + + HandleError --> WorkLoop + NextStep --> StepCheck{步骤数检查} + + StepCheck -->||>=MAX_STEPS| FinalStep[设置为最终步骤] + FinalStep --> Summarize + + Summarize --> Success[流程成功] + Success --> End +``` + +### 3.2 步骤执行状态机 + +```mermaid +stateDiagram-v2 + [*] --> INIT: 创建步骤 + + INIT --> RUNNING: 获取参数成功
且自动执行 + INIT --> WAITING: 获取参数成功
且需要确认 + + WAITING --> RUNNING: 用户确认 + WAITING --> CANCELLED: 用户取消 + + RUNNING --> SUCCESS: 执行成功 + RUNNING --> ERROR: 执行失败 + + ERROR --> PARAM: 参数错误
且需要用户补充 + ERROR --> INIT: 参数错误
且自动重试 + ERROR --> ERROR: 其他错误
继续下一步 + + PARAM --> RUNNING: 用户补充参数 + + SUCCESS --> [*]: 步骤完成 + CANCELLED --> [*]: 取消完成 + ERROR --> [*]: 达到最大重试次数 +``` + +### 3.3 执行器状态机 + +```mermaid +stateDiagram-v2 + [*] --> INIT: 创建任务 + + INIT --> RUNNING: 开始执行 + + RUNNING --> WAITING: 等待用户输入 + RUNNING --> SUCCESS: 所有步骤完成 + RUNNING --> ERROR: 执行错误 + RUNNING --> CANCELLED: 用户取消 + + WAITING --> RUNNING: 接收用户输入 + WAITING --> CANCELLED: 用户取消 + + SUCCESS --> [*]: 流程完成 + ERROR --> [*]: 流程失败 + CANCELLED --> [*]: 取消完成 +``` + +## 4. 时序图 + +### 4.1 正常执行流程 + +```mermaid +sequenceDiagram + actor User + participant Scheduler + participant Executor as MCPAgentExecutor + participant Planner as MCPPlanner + participant Host as MCPHost + participant Pool as MCPPool + participant MCP as MCP Server + participant Queue as MessageQueue + + User->>Scheduler: 提交任务 + Scheduler->>Executor: 创建执行器 + + activate Executor + Executor->>Executor: init() + Executor->>Pool: 初始化连接池 + Executor->>Planner: 创建规划器 + Executor->>Host: 创建宿主 + + Executor->>Executor: load_mcp() + Executor->>Pool: 获取 MCP 服务列表 + + Executor->>Planner: get_flow_name() + Planner-->>Executor: 流程名称 + + Executor->>Planner: get_flow_excute_risk() + Planner-->>Executor: 风险评估 + + Executor->>Queue: FLOW_START + Queue-->>User: 流程开始事件 + + loop 每个步骤 + Executor->>Planner: create_next_step() + Planner-->>Executor: Step + + Executor->>Queue: STEP_INIT + Queue-->>User: 步骤初始化 + + Executor->>Host: get_first_input_params() + Host-->>Executor: 工具参数 + + alt 需要用户确认 + Executor->>Planner: get_tool_risk() + Planner-->>Executor: 工具风险 + Executor->>Queue: STEP_WAITING_FOR_START + Queue-->>User: 等待确认 + User->>Scheduler: 确认执行 + Scheduler->>Executor: 继续执行 + end + + Executor->>Pool: get(mcp_id) + Pool-->>Executor: MCP Client + + Executor->>MCP: call_tool(tool_name, params) + MCP-->>Executor: 执行结果 + + Executor->>Queue: STEP_INPUT + Executor->>Queue: STEP_OUTPUT + Queue-->>User: 步骤完成 + + alt 执行失败 + Executor->>Planner: is_param_error() + Planner-->>Executor: 错误类型 + + alt 参数错误 + Executor->>Planner: get_missing_param() + Planner-->>Executor: 缺失参数 + Executor->>Queue: STEP_WAITING_FOR_PARAM + Queue-->>User: 等待参数 + User->>Scheduler: 补充参数 + Scheduler->>Executor: 继续执行 + else 其他错误 + Executor->>Planner: create_next_step() + end + end + end + + Executor->>Planner: generate_answer() + Planner-->>Executor: 流式回答 + Executor->>Queue: TEXT_ADD (多次) + Queue-->>User: 流式输出 + + Executor->>Queue: FLOW_SUCCESS + Queue-->>User: 流程成功 + + Executor->>Pool: stop(所有 MCP) + deactivate Executor +``` + +### 4.2 错误处理流程 + +```mermaid +sequenceDiagram + participant Executor as MCPAgentExecutor + participant Planner as MCPPlanner + participant Host as MCPHost + participant MCP as MCP Server + participant Queue as MessageQueue + participant User + + Executor->>MCP: call_tool(params) + MCP-->>Executor: 错误响应 + + Executor->>Executor: 记录错误状态 + + Executor->>Planner: is_param_error(error_msg) + Planner->>Planner: LLM 分析错误 + Planner-->>Executor: IsParamError + + alt 是参数错误 + Executor->>Planner: change_err_message_to_description() + Planner-->>Executor: 用户友好描述 + + Executor->>Planner: get_missing_param() + Planner-->>Executor: 缺失参数列表 + + Executor->>Queue: STEP_WAITING_FOR_PARAM + Queue-->>User: 显示缺失参数表单 + + User->>Executor: 补充参数 (params) + + Executor->>Host: fill_params(params) + Host-->>Executor: 完整参数 + + Executor->>MCP: call_tool(完整参数) + MCP-->>Executor: 成功响应 + + else 非参数错误 + alt 自动执行模式 + Executor->>Planner: create_next_step() + Planner-->>Executor: 下一步 + Executor->>Queue: STEP_ERROR + Queue-->>User: 显示错误,继续执行 + else 手动执行模式 + Executor->>Executor: 检查重试次数 + + alt 未达到最大重试 + Executor->>Planner: create_next_step() + Planner-->>Executor: 下一步 + else 达到最大重试 + Executor->>Queue: FLOW_FAILED + Queue-->>User: 流程失败 + end + end + end +``` + +## 5. 核心方法详解 + +### 5.1 初始化方法 + +#### init() - 初始化执行器 + +**功能说明:** 在执行器创建后首次被调用,负责初始化所有必要的组件和状态。 + +**执行步骤:** + +1. 初始化步骤计数器(_step_cnt)和重试计数器(_retry_times)为0 +2. 创建 MCP 连接池实例(MCPPool)用于管理多个 MCP 客户端 +3. 初始化空的 MCP 服务列表和当前输入参数字典 +4. 创建规划器(MCPPlanner)实例,传入任务和 LLM 配置 +5. 创建宿主(MCPHost)实例,传入任务和 LLM 配置 +6. 从用户管理器中获取并验证用户信息 +7. 加载历史会话记录 + +**使用场景:** 在调度器创建执行器实例后自动调用 + +--- + +#### load_mcp() - 加载 MCP 服务 + +**功能说明:** 根据 Agent 配置加载所需的 MCP 服务器和工具列表。 + +**执行步骤:** + +1. 通过应用中心管理器获取 Agent 的元数据配置 +2. 验证应用类型是否为 Agent 类型 +3. 遍历元数据中配置的 MCP 服务 ID 列表 +4. 对每个 MCP 服务检查用户是否已激活 +5. 获取已激活的 MCP 服务详情并添加到服务列表 +6. 加载每个 MCP 服务提供的工具到工具字典 +7. 添加特殊的 FINAL 工具用于标记流程结束 + +**使用场景:** 在 run() 方法开始执行前调用 + +--- + +### 5.2 步骤执行方法 + +#### work() - 执行当前步骤的主逻辑 + +**功能说明:** 根据当前步骤状态执行相应的处理逻辑,是步骤执行的核心调度方法。 + +**状态处理逻辑:** + +| 步骤状态 | 处理逻辑 | 后续动作 | +|---------|---------|---------| +| INIT | 获取工具参数,判断用户是否开启自动执行 | 自动执行则直接运行;否则等待确认 | +| WAITING | 检查是否有用户确认参数 | 有参数则继续;无参数则取消流程 | +| PARAM | 填充用户补充的参数 | 使用新参数重新执行 | +| RUNNING | 执行工具调用,最多重试5次 | 成功则继续;失败则记录错误 | +| ERROR | 判断重试次数和错误类型 | 参数错误请求补充;其他错误继续或失败 | +| SUCCESS | 获取下一步骤 | 继续执行或结束流程 | + +**特殊处理:** + +- 用户取消时发送 STEP_CANCEL 和 FLOW_CANCEL 事件 +- 参数状态时会删除上一次的重复上下文记录 +- 运行状态时会尝试最多5次重新获取参数并执行 + +--- + +#### run_step() - 执行单个工具调用 + +**功能说明:** 执行当前步骤对应的 MCP 工具调用,处理执行结果和异常。 + +**执行流程:** + +1. 设置执行器和步骤状态为 RUNNING +2. 验证当前工具对象存在性 +3. 从 MCP 连接池获取对应的客户端 +4. 调用 MCP 工具并传入当前输入参数 +5. 解析工具返回结果,提取文本内容 +6. 推送 STEP_INPUT 和 STEP_OUTPUT 事件 +7. 将执行历史记录添加到任务上下文 +8. 更新步骤状态为 SUCCESS + +**异常处理:** + +- **ClosedResourceError(连接关闭)**: 停止当前用户的 MCP 进程,设置错误状态 +- **通用异常**: 记录异常堆栈,将错误信息和输入数据保存到状态中 +- **工具返回错误**: 提取错误文本内容,设置步骤状态为 ERROR + +**输出格式:** 工具输出封装为包含 message 字段的字典 + +--- + +#### get_next_step() - 规划下一步骤 + +**功能说明:** 基于当前执行历史和可用工具,使用 LLM 规划下一步要执行的步骤。 + +**执行逻辑:** + +1. 检查步骤计数器是否小于最大步骤数限制(25步) +2. 递增步骤计数器 +3. 组装历史记忆(包含所有已执行步骤的输入输出) +4. 调用规划器的 create_next_step 方法,最多重试3次 +5. 验证规划返回的工具 ID 是否在可用工具列表中 +6. 如果规划失败或工具不存在,设置为 FINAL 步骤 +7. 生成新的步骤 ID 和步骤名称 +8. 更新任务状态 + +**容错机制:** + +- 规划失败时自动重试最多3次 +- 所有重试都失败或步骤数超限时自动设置为结束步骤 + +--- + +### 5.3 参数处理方法 + +#### get_tool_input_param() - 获取工具输入参数 + +**功能说明:** 根据当前工具的输入模式(首次或补充)生成或填充工具参数。 + +**参数说明:** + +- **is_first**: 布尔值,标识是否为首次获取参数 + +**执行逻辑:** + +**首次获取(is_first=True):** + +1. 从当前状态获取工具对象 +2. 调用宿主的 get_first_input_params 方法 +3. 基于用户输入、工具定义和任务上下文生成参数 +4. 将生成的参数保存到当前输入变量 + +**参数补充(is_first=False):** + +1. 从 params 字段提取用户补充的内容和描述 +2. 调用宿主的 fill_params 方法 +3. 使用用户输入、当前参数、错误信息和补充内容修复参数 +4. 更新状态中的当前输入参数 + +**应用场景:** + +- 步骤初始化时首次生成参数 +- 参数错误后用户补充时填充参数 +- 步骤重试时重新生成参数 + +--- + +#### generate_params_with_null() - 生成参数补充请求 + +**功能说明:** 当检测到参数错误时,生成需要用户补充的参数列表并暂停执行。 + +**执行流程:** + +1. 调用规划器获取缺失参数列表(使用 get_missing_param) +2. 提取错误信息中的错误消息文本 +3. 调用规划器将技术错误消息转换为用户友好描述 +4. 推送 STEP_WAITING_FOR_PARAM 事件,包含描述和参数列表 +5. 推送 FLOW_STOP 事件暂停流程 +6. 设置执行器状态为 WAITING,步骤状态为 PARAM +7. 添加包含参数请求信息的历史记录 + +**输出内容:** + +- **message**: 用户友好的错误描述和参数说明 +- **params**: 缺失参数的结构化定义(支持 null 值) + +--- + +### 5.4 错误处理方法 + +#### error_handle_after_step() - 处理不可恢复的步骤错误 + +**功能说明:** 当步骤错误无法通过重试或参数补充解决时,终止整个流程。 + +**执行步骤:** + +1. 设置步骤状态为 ERROR +2. 设置执行器状态为 ERROR +3. 推送 FLOW_FAILED 事件通知流程失败 +4. 删除可能重复的最后一条上下文记录 +5. 创建并添加错误状态的历史记录 +6. 更新任务状态到检查点 + +**触发条件:** + +- 错误重试次数达到最大限制(3次) +- 无法继续执行的致命错误 + +--- + +#### _handle_step_error_and_continue() - 处理错误并继续执行 + +**功能说明:** 记录当前步骤的错误信息,并尝试规划下一步继续执行。 + +**执行流程:** + +1. 推送 STEP_ERROR 事件,包含错误消息 +2. 检查上下文最后一条记录是否为当前步骤 +3. 如果是则更新状态为 ERROR 和输出数据 +4. 如果不是则创建新的错误历史记录 +5. 调用 get_next_step 规划下一步骤 + +**适用场景:** + +- 自动执行模式下的非致命错误 +- 用户选择继续执行的错误 + +--- + +### 5.5 用户交互方法 + +#### confirm_before_step() - 等待用户确认执行 + +**功能说明:** 在执行高风险或需要确认的步骤前,暂停流程等待用户确认。 + +**执行流程:** + +1. 验证当前任务状态和工具对象存在 +2. 调用规划器评估当前工具的执行风险 +3. 生成包含风险等级和原因的确认消息 +4. 推送 STEP_WAITING_FOR_START 事件,附带风险信息 +5. 推送 FLOW_STOP 事件暂停流程 +6. 设置执行器状态为 WAITING,步骤状态为 WAITING +7. 添加包含风险评估的历史记录 + +**风险评估内容:** + +- **risk**: 风险等级(LOW/MEDIUM/HIGH) +- **reason**: 风险原因的自然语言描述 + +--- + +### 5.6 总结与输出方法 + +#### summarize() - 生成最终回答 + +**功能说明:** 在所有步骤执行完成后,基于执行历史生成自然语言总结。 + +**执行流程:** + +1. 组装完整的执行记忆(包含所有步骤的输入输出) +2. 调用规划器的 generate_answer 方法 +3. 以异步流式方式接收 LLM 生成的回答片段 +4. 对每个片段推送 TEXT_ADD 事件 +5. 将片段追加到任务运行时的完整回答字段 +6. 循环直到所有内容生成完毕 + +**输出特点:** + +- 流式输出,用户可实时看到生成进度 +- 基于完整执行历史生成,包含上下文信息 +- 自然语言格式,易于理解 + +--- + +### 5.7 辅助验证方法 + +#### _validate_task_state() - 验证任务状态 + +**功能说明:** 检查任务的状态对象是否存在,不存在则抛出异常。 + +**使用场景:** 在所有需要访问任务状态的方法开始时调用 + +--- + +#### _validate_current_tool() - 验证当前工具 + +**功能说明:** 检查当前工具对象是否已设置,未设置则抛出异常。 + +**使用场景:** 在所有需要使用当前工具的方法中调用 + +--- + +#### _remove_last_context_if_same_step() - 清理重复上下文 + +**功能说明:** 如果任务上下文的最后一条记录与当前步骤 ID 相同,则删除该记录。 + +**使用场景:** + +- 参数补充前清理等待记录 +- 错误处理前清理失败记录 +- 避免重复记录导致历史混乱 + +--- + +#### _create_executor_history() - 创建历史记录 + +**功能说明:** 根据当前状态创建一条标准化的执行历史记录对象。 + +**输入参数:** + +- **step_status**: 步骤状态枚举值 +- **input_data**: 步骤输入数据字典(可选) +- **output_data**: 步骤输出数据字典(可选) +- **extra_data**: 额外数据字典(可选,如风险评估) + +**返回对象包含:** + +- 任务标识信息(taskId、stepId、executorId等) +- 步骤详细信息(名称、类型、状态) +- 数据内容(输入、输出、额外数据) + +**使用场景:** 所有需要记录步骤执行历史的场景 + +## 6. 数据模型 + +### 6.1 核心数据模型关系图 + +```mermaid +erDiagram + Task ||--o| ExecutorCheckpoint : "has" + Task ||--o{ ExecutorHistory : "contains" + Task ||--|| TaskRuntime : "has" + ExecutorCheckpoint ||--|| ExecutorStatus : "has status" + ExecutorCheckpoint ||--|| StepStatus : "has step status" + ExecutorHistory ||--|| StepStatus : "has status" + ExecutorHistory ||--|| ExecutorStatus : "has executor status" + MCPAgentExecutor ||--o{ MCPTools : "uses" + MCPAgentExecutor ||--|| MCPPlanner : "has" + MCPAgentExecutor ||--|| MCPHost : "has" + MCPAgentExecutor ||--|| MCPPool : "manages" + MCPTools ||--|| Step : "defines" + + Task { + UUID id PK "任务ID" + string userSub FK "用户ID" + UUID conversationId FK "对话ID" + UUID checkpointId FK "检查点ID" + datetime updatedAt "更新时间" + } + + TaskRuntime { + UUID taskId PK,FK "任务ID" + float time "时间" + float fullTime "完整时间" + string sessionId FK "会话ID" + string userInput "用户输入" + string fullAnswer "完整输出" + list fact "记忆" + string reasoning "推理" + dict filledSlot "槽位" + list document "文档" + LanguageType language "语言" + } + + ExecutorCheckpoint { + UUID id PK "检查点ID" + UUID taskId FK "任务ID" + UUID appId "应用ID" + string executorId "执行器ID" + string executorName "执行器名称" + ExecutorStatus executorStatus "执行器状态" + UUID stepId "当前步骤ID" + string stepName "当前步骤名称" + StepStatus stepStatus "当前步骤状态" + string stepType "当前步骤类型" + string executorDescription "执行器描述" + dict data "步骤额外数据" + dict errorMessage "错误信息" + } + + ExecutorHistory { + UUID id PK "历史记录ID" + UUID taskId FK "任务ID" + string executorId "执行器ID" + string executorName "执行器名称" + ExecutorStatus executorStatus "执行器状态" + UUID stepId "步骤ID" + string stepName "步骤名称" + string stepType "步骤类型" + StepStatus stepStatus "步骤状态" + dict inputData "步骤输入数据" + dict outputData "步骤输出数据" + dict extraData "步骤额外数据" + datetime updatedAt "更新时间" + } + + MCPTools { + string id PK "工具ID" + string mcpId "MCP服务ID" + string toolName "工具名称" + string description "工具描述" + dict inputSchema "输入参数Schema" + dict outputSchema "输出参数Schema" + } + + Step { + string tool_id "工具ID" + string description "步骤描述" + } + + ExecutorStatus { + string value "UNKNOWN|INIT|WAITING|RUNNING|SUCCESS|ERROR|CANCELLED" + } + + StepStatus { + string value "UNKNOWN|INIT|WAITING|RUNNING|SUCCESS|ERROR|PARAM|CANCELLED" + } +``` + +### 6.2 执行器状态枚举 + +```mermaid +classDiagram + class ExecutorStatus { + <> + UNKNOWN 未知状态 + INIT 初始化 + WAITING 等待用户输入 + RUNNING 运行中 + SUCCESS 成功完成 + ERROR 执行错误 + CANCELLED 用户取消 + } + + class StepStatus { + <> + UNKNOWN 未知状态 + INIT 初始化 + WAITING 等待用户确认 + RUNNING 运行中 + SUCCESS 成功完成 + ERROR 执行错误 + PARAM 等待参数补充 + CANCELLED 用户取消 + } + + class LanguageType { + <> + CHINESE 中文(zh) + ENGLISH 英文(en) + } +``` + +### 6.3 数据模型详细说明 + +#### ExecutorCheckpoint - 执行器检查点 + +**用途**: 保存任务执行的当前状态,支持断点续传和状态恢复。 + +**关键字段**: + +- **执行器级别**: + - `executorId`: 唯一标识一个执行流程(如工作流ID) + - `executorName`: 流程的可读名称 + - `executorStatus`: 整个执行器的当前状态 + - `executorDescription`: 流程的详细描述 + +- **步骤级别**: + - `stepId`: 当前正在执行步骤的唯一标识 + - `stepName`: 步骤名称(通常是工具ID) + - `stepStatus`: 当前步骤的状态 + - `stepType`: 步骤类型标识 + +- **辅助数据**: + - `data`: 步骤相关的额外数据 + - `errorMessage`: 错误信息(包含 err_msg 和 data) + +#### ExecutorHistory - 执行器历史 + +**用途**: 记录每个步骤的完整执行历史,用于上下文记忆和回溯。 + +**关键字段**: + +- **输入输出**: + - `inputData`: 步骤执行时的输入参数 + - `outputData`: 步骤执行后的输出结果 + - `extraData`: 额外信息(如风险评估、参数请求等) + +- **状态追踪**: + - `stepStatus`: 该历史记录对应的步骤状态 + - `executorStatus`: 记录时的执行器状态 + - `updatedAt`: 记录更新时间 + +**用途场景**: + +- LLM 上下文组装 +- 用户查看执行历史 +- 错误诊断和重试 + +#### MCPTools - MCP 工具定义 + +**用途**: 描述 MCP 服务提供的工具及其参数规范。 + +**关键字段**: + +- `id`: 工具的唯一标识符 +- `mcpId`: 所属 MCP 服务的 ID +- `toolName`: 工具在 MCP 服务中的名称 +- `description`: 工具功能的自然语言描述 +- `inputSchema`: JSON Schema 格式的输入参数定义 +- `outputSchema`: JSON Schema 格式的输出结果定义 + +**特殊工具**: + +- `FINAL`: 虚拟工具,用于标记流程结束 + +#### Step - 步骤定义 + +**用途**: LLM 规划生成的单个执行步骤。 + +**关键字段**: + +- `tool_id`: 要执行的工具 ID(对应 MCPTools.id) +- `description`: 该步骤的自然语言描述(说明为什么执行这个工具) + +### 6.4 数据流转示意图 + +```mermaid +graph LR + A[用户输入] --> B[TaskRuntime] + B --> C[MCPAgentExecutor] + C --> D[MCPPlanner] + D --> E[Step] + E --> F[MCPTools] + F --> G[ExecutorCheckpoint] + G --> H[工具执行] + H --> I[ExecutorHistory] + I --> J[TaskRuntime.fullAnswer] + I --> K[上下文记忆] + K --> D + + style B fill:#e1f5ff + style G fill:#fff4e1 + style I fill:#e7f5e1 + style J fill:#ffe1f5 +``` + +**说明**: + +1. **用户输入** → **TaskRuntime**: 保存用户的原始问题 +2. **TaskRuntime** → **MCPAgentExecutor**: 启动执行器 +3. **MCPAgentExecutor** → **MCPPlanner**: 请求规划下一步 +4. **MCPPlanner** → **Step**: 生成步骤定义 +5. **Step** → **MCPTools**: 查找对应工具 +6. **MCPTools** → **ExecutorCheckpoint**: 保存当前状态 +7. **ExecutorCheckpoint** → **工具执行**: 执行 MCP 工具 +8. **工具执行** → **ExecutorHistory**: 记录执行历史 +9. **ExecutorHistory** → **TaskRuntime**: 更新完整回答 +10. **ExecutorHistory** → **上下文记忆**: 提供历史上下文 +11. **上下文记忆** → **MCPPlanner**: 用于下一步规划(循环) + +## 7. 事件系统 + +### 7.1 事件类型 + +| 事件类型 | 触发时机 | 数据结构 | +|---------|---------|---------| +| FLOW_START | 流程开始 | FlowRisk (可选) | +| FLOW_STOP | 流程暂停 | {} | +| FLOW_SUCCESS | 流程成功 | {} | +| FLOW_FAILED | 流程失败 | {} | +| FLOW_CANCEL | 流程取消 | {} | +| STEP_INIT | 步骤初始化 | {} | +| STEP_INPUT | 步骤输入 | 工具参数 | +| STEP_OUTPUT | 步骤输出 | 工具结果 | +| STEP_ERROR | 步骤错误 | 错误信息 | +| STEP_CANCEL | 步骤取消 | {} | +| STEP_WAITING_FOR_START | 等待用户确认 | ToolRisk | +| STEP_WAITING_FOR_PARAM | 等待参数补充 | 缺失参数列表 | +| TEXT_ADD | 流式文本输出 | TextAddContent | + +### 7.2 事件流转图 + +```mermaid +graph LR + A[FLOW_START] --> B[STEP_INIT] + B --> C{需要确认?} + C -->|是| D[STEP_WAITING_FOR_START] + C -->|否| E[STEP_INPUT] + D --> F[FLOW_STOP] + F --> G[用户确认] + G --> E + E --> H[STEP_OUTPUT] + H --> I{执行成功?} + I -->|是| J[下一步或结束] + I -->|否| K[STEP_ERROR] + K --> L{错误类型} + L -->|参数错误| M[STEP_WAITING_FOR_PARAM] + L -->|其他| N[继续或失败] + M --> F + J --> O{是否最后一步?} + O -->|是| P[TEXT_ADD流式] + O -->|否| B + P --> Q[FLOW_SUCCESS] + N --> R[FLOW_FAILED] +``` + +## 8. 配置参数 + +### 8.1 常量配置 + +| 常量 | 值 | 说明 | +|-----|-----|-----| +| AGENT_MAX_STEPS | 25 | 最大执行步骤数 | +| AGENT_MAX_RETRY_TIMES | 3 | 最大重试次数 | +| AGENT_FINAL_STEP_NAME | "FIANL" | 最终步骤名称 | +| JSON_GEN_MAX_TRIAL | 3 | JSON生成最大尝试次数 | + +### 8.2 用户配置 + +| 配置项 | 类型 | 说明 | +|-------|------|-----| +| autoExecute | bool | 是否自动执行(不等待确认) | + +## 9. 错误处理策略 + +### 9.1 错误分类 + +```mermaid +graph TD + A[执行错误] --> B{错误类型判断} + B --> C[参数错误] + B --> D[连接错误] + B --> E[权限错误] + B --> F[超时错误] + B --> G[其他错误] + + C --> C1[LLM判断是否参数错误] + C1 -->|是| C2[生成缺失参数] + C1 -->|否| C3[继续下一步] + + D --> D1[停止MCP客户端] + D1 --> D2[重新连接] + + E --> E1[记录错误] + E1 --> E2[继续下一步] + + F --> F1[记录错误] + F1 --> F2[继续下一步] + + G --> G1[检查重试次数] + G1 -->|未超限| G2[继续下一步] + G1 -->|超限| G3[流程失败] +``` + +### 9.2 错误处理流程 + +1. **参数错误** + - 使用 LLM 判断是否为参数错误 + - 生成缺失参数列表 + - 等待用户补充 + +2. **连接错误** + - 捕获 `ClosedResourceError` + - 停止当前 MCP 客户端 + - 记录错误并继续下一步 + +3. **重试机制** + - 单步骤内重试:最多5次 + - 全局重试:最多3次 + - 超过限制后流程失败 + +## 10. 性能优化 + +### 10.1 MCP 连接池 + +- 使用 `MCPPool` 管理 MCP 客户端 +- 按 `(mcp_id, user_sub)` 复用连接 +- 任务结束后统一关闭连接 + +### 10.2 上下文管理 + +- 历史记录限制(默认3轮) +- 步骤上下文去重(避免重复记录) +- 流式输出(减少内存占用) + +### 10.3 并发控制 + +- 全局并发任务上限:30 +- 单用户滑动窗口限流:15秒5次请求 + +## 11. 安全考虑 + +### 11.1 风险评估 + +```mermaid +graph LR + A[用户请求] --> B[流程风险评估] + B --> C{风险等级} + C -->|LOW| D[自动执行或提示] + C -->|MEDIUM| E[必须确认] + C -->|HIGH| F[必须确认+警告] + + D --> G[工具风险评估] + E --> G + F --> G + + G --> H{工具风险} + H -->|LOW| I[执行] + H -->|MEDIUM| J[确认后执行] + H -->|HIGH| K[确认+警告后执行] +``` + +## 12. 参考资料 + +- [Model Context Protocol](https://modelcontextprotocol.io/) +- [Anthropic MCP SDK](https://github.com/anthropics/mcp) diff --git a/design/services/flow.md b/design/services/flow.md new file mode 100644 index 0000000000000000000000000000000000000000..8f435fd46b7f7b9a22d2ab8bab2c5822a2f44346 --- /dev/null +++ b/design/services/flow.md @@ -0,0 +1,766 @@ +# FlowManager 模块设计文档 + +## 1. 概述 + +FlowManager 模块是 Euler Copilot Framework 中负责工作流(Flow)管理的核心模块。该模块提供了完整的工作流生命周期管理功能,包括创建、读取、更新、删除工作流,以及节点元数据管理、服务管理等功能。 + +### 1.1 核心组件 + +- **FlowManager**: 工作流管理的主要服务类,提供工作流和节点的 CRUD 操作 +- **FlowServiceManager**: 工作流拓扑验证和处理服务 +- **FlowLoader**: 工作流配置文件的加载和保存 +- **Flow API Router**: FastAPI 路由层,提供 RESTful API 接口 + +## 2. 数据模型 + +### 2.1 核心数据结构 + +```mermaid +classDiagram + class Flow { + +str name + +str description + +FlowCheckStatus checkStatus + +FlowBasicConfig basicConfig + +FlowError onError + +dict~UUID,Step~ steps + +list~Edge~ edges + } + + class Step { + +str node + +str type + +str name + +str description + +PositionItem pos + +dict params + } + + class Edge { + +UUID id + +str edge_from + +str edge_to + +EdgeType edge_type + } + + class FlowBasicConfig { + +UUID startStep + +UUID endStep + +PositionItem focusPoint + } + + class FlowCheckStatus { + +bool debug + +bool connectivity + } + + class FlowItem { + +str flow_id + +str name + +str description + +bool enable + +list~NodeItem~ nodes + +list~EdgeItem~ edges + +FlowBasicConfig basic_config + +FlowCheckStatus check_status + } + + Flow --> Step + Flow --> Edge + Flow --> FlowBasicConfig + Flow --> FlowCheckStatus + FlowItem --> NodeItem + FlowItem --> EdgeItem +``` + +### 2.2 数据库模型 + +```mermaid +erDiagram + App ||--o{ Flow : contains + Flow { + string id PK + uuid appId FK + string name + text description + string path + bool debug + bool enabled + datetime updatedAt + } + + App { + uuid id PK + string name + string author + } + + NodeInfo { + string id PK + uuid serviceId FK + string callId + string name + text description + datetime updatedAt + } + + Service { + uuid id PK + string name + string author + datetime updatedAt + } + + UserFavorite { + string userSub + uuid itemId + string favouriteType + } + + AppHashes { + uuid appId FK + string filePath + string hash + } + + Service ||--o{ NodeInfo : contains + App ||--o{ AppHashes : tracks +``` + +## 3. 核心功能 + +### 3.1 工作流管理流程 + +```mermaid +sequenceDiagram + participant Client + participant API as Flow Router + participant FM as FlowManager + participant FSM as FlowServiceManager + participant FL as FlowLoader + participant DB as PostgreSQL + participant FS as FileSystem + + Note over Client,FS: 获取工作流 + Client->>API: GET /api/flow?appId=xxx&flowId=xxx + API->>FM: get_flow_by_app_and_flow_id() + FM->>DB: 查询 Flow 记录 + FM->>FL: load(app_id, flow_id) + FL->>FS: 读取 YAML 文件 + FL->>FL: 验证基本字段 + FL->>FL: 处理边(edges) + FL->>FL: 处理步骤(steps) + FL->>DB: 更新数据库记录 + FL-->>FM: 返回 Flow 配置 + FM->>FM: 转换为 FlowItem + FM-->>API: 返回 FlowItem + API-->>Client: 返回响应 + + Note over Client,FS: 更新工作流 + Client->>API: PUT /api/flow?appId=xxx&flowId=xxx + API->>FSM: remove_excess_structure_from_flow() + FSM-->>API: 清理后的 FlowItem + API->>FSM: validate_flow_illegal() + FSM->>FSM: 验证节点ID唯一性 + FSM->>FSM: 验证边的合法性 + FSM->>FSM: 验证起始/终止节点 + FSM-->>API: 验证通过 + API->>FSM: validate_flow_connectivity() + FSM->>FSM: BFS 检查连通性 + FSM-->>API: 返回连通性状态 + API->>FM: put_flow_by_app_and_flow_id() + FM->>FM: 转换 FlowItem 为 Flow + FM->>FL: 检查旧配置 + FM->>FM: is_flow_config_equal() + FM->>FL: save(app_id, flow_id, flow) + FL->>FS: 写入 YAML 文件 + FL->>DB: 更新 Flow 记录 + FL->>DB: 更新 AppHashes + FL-->>FM: 保存成功 + FM-->>API: 更新完成 + API-->>Client: 返回更新结果 +``` + +### 3.2 节点和服务管理流程 + +```mermaid +sequenceDiagram + participant Client + participant API as Flow Router + participant FM as FlowManager + participant DB as PostgreSQL + participant NM as NodeManager + + Note over Client,NM: 获取用户服务列表 + Client->>API: GET /api/flow/service + API->>FM: get_service_by_user_id(user_sub) + FM->>DB: 查询用户收藏的服务 + FM->>DB: 查询用户上传的服务 + FM->>DB: 去重并查询服务详情 + loop 每个服务 + FM->>FM: get_node_id_by_service_id() + FM->>DB: 查询服务下的节点 + FM->>FM: 格式化节点元数据 + end + FM-->>API: 返回服务及节点列表 + API-->>Client: 返回响应 + + Note over Client,NM: 获取节点详细信息 + Client->>FM: get_node_by_node_id(node_id) + FM->>DB: 查询节点记录 + FM->>NM: get_node_params(node_id) + NM-->>FM: 返回参数 schema + FM->>FM: 创建空槽位(empty slot) + FM->>FM: 提取类型描述 + FM-->>Client: 返回节点元数据 +``` + +### 3.3 工作流验证流程 + +```mermaid +flowchart TD + Start([开始验证]) --> ValidateNodes[验证节点ID唯一性] + ValidateNodes --> CheckStart{检查起始节点} + CheckStart -->|存在| CheckEnd{检查终止节点} + CheckStart -->|不存在| Error1[抛出异常: 起始节点不存在] + CheckEnd -->|存在| ValidateEdges[验证边的合法性] + CheckEnd -->|不存在| Error2[抛出异常: 终止节点不存在] + + ValidateEdges --> CheckEdgeId{边ID唯一?} + CheckEdgeId -->|否| Error3[抛出异常: 边ID重复] + CheckEdgeId -->|是| CheckSelfLoop{自环?} + CheckSelfLoop -->|是| Error4[抛出异常: 起止节点相同] + CheckSelfLoop -->|否| CheckBranch{分支合法?} + CheckBranch -->|否| Error5[抛出异常: 分支非法] + CheckBranch -->|是| CalcDegree[计算入度/出度] + + CalcDegree --> CheckStartDeg{起始节点入度=0?} + CheckStartDeg -->|否| Error6[抛出异常: 起始节点有入边] + CheckStartDeg -->|是| CheckEndDeg{终止节点出度=0?} + CheckEndDeg -->|否| Error7[抛出异常: 终止节点有出边] + CheckEndDeg -->|是| ValidateConn[验证连通性] + + ValidateConn --> BFS[BFS遍历图] + BFS --> CheckReachable{所有节点可达?} + CheckReachable -->|否| ConnFalse[连通性=False] + CheckReachable -->|是| CheckEndReachable{终止节点可达?} + CheckEndReachable -->|否| ConnFalse + CheckEndReachable -->|是| CheckOutEdge{非终止节点都有出边?} + CheckOutEdge -->|否| ConnFalse + CheckOutEdge -->|是| ConnTrue[连通性=True] + + ConnTrue --> Success([验证成功]) + ConnFalse --> Success + + Error1 --> End([结束]) + Error2 --> End + Error3 --> End + Error4 --> End + Error5 --> End + Error6 --> End + Error7 --> End + Success --> End +``` + +### 3.4 工作流删除流程 + +```mermaid +sequenceDiagram + participant Client + participant API as Flow Router + participant FM as FlowManager + participant FL as FlowLoader + participant DB as PostgreSQL + participant FS as FileSystem + participant AL as AppLoader + + Client->>API: DELETE /api/flow?appId=xxx&flowId=xxx + API->>API: 验证用户权限 + API->>FM: delete_flow_by_app_and_flow_id() + + FM->>FL: delete(app_id, flow_id) + FL->>FS: 检查文件是否存在 + FL->>FS: 删除 YAML 文件 + FL->>DB: 删除 Flow 记录 + FL->>DB: 删除向量数据(如果有) + FL->>DB: commit + FL-->>FM: 删除成功 + + FM->>DB: 删除 AppHashes 记录 + FM->>AL: read_metadata(app_id) + AL-->>FM: 返回 AppMetadata + FM->>FM: 从 hashes 中移除 flow + FM->>FM: 从 flows 列表中移除 + FM->>AL: save(metadata, app_id) + AL->>FS: 更新 metadata.yaml + AL->>DB: 更新数据库 + AL-->>FM: 保存成功 + + FM-->>API: 删除完成 + API-->>Client: 返回删除结果 +``` + +## 4. API 接口 + +### 4.1 接口列表 + +| 方法 | 路径 | 功能 | 认证 | +|------|------|------|------| +| GET | `/api/flow/service` | 获取用户可访问的服务及节点列表 | 需要 | +| GET | `/api/flow` | 获取指定工作流的拓扑结构 | 需要 | +| PUT | `/api/flow` | 更新工作流拓扑结构 | 需要 | +| DELETE | `/api/flow` | 删除工作流 | 需要 | + +### 4.2 接口详情 + +#### 4.2.1 获取服务列表 + +**HTTP 请求:** + +```http +GET /api/flow/service +Authorization: Bearer +``` + +**返回示例:** + +```json +{ + "code": 200, + "message": "Node所在Service获取成功", + "result": { + "services": [ + { + "serviceId": "00000000-0000-0000-0000-000000000000", + "name": "系统", + "data": [ + { + "nodeId": "start", + "callId": "Start", + "name": "开始", + "updatedAt": 1234567890.123 + } + ], + "createdAt": null + } + ] + } +} +``` + +#### 4.2.2 获取工作流 + +**HTTP 请求:** + +```http +GET /api/flow?appId={uuid}&flowId={string} +Authorization: Bearer +``` + +**返回示例:** + +```json +{ + "code": 200, + "message": "应用的Workflow获取成功", + "result": { + "flow": { + "flowId": "main", + "name": "主工作流", + "description": "这是一个示例工作流", + "enable": true, + "nodes": [...], + "edges": [...], + "basicConfig": { + "startStep": "uuid", + "endStep": "uuid", + "focusPoint": {"x": 0, "y": 0} + }, + "checkStatus": { + "debug": false, + "connectivity": true + } + } + } +} +``` + +#### 4.2.3 更新工作流 + +**HTTP 请求:** + +```http +PUT /api/flow?appId={uuid}&flowId={string} +Authorization: Bearer +Content-Type: application/json + +{ + "flow": { + "flowId": "main", + "name": "主工作流", + "nodes": [...], + "edges": [...], + "basicConfig": {...} + } +} +``` + +**返回示例:** + +```json +{ + "code": 200, + "message": "应用下流更新成功", + "result": { + "flow": {...} + } +} +``` + +#### 4.2.4 删除工作流 + +**HTTP 请求:** + +```http +DELETE /api/flow?appId={uuid}&flowId={string} +Authorization: Bearer +``` + +**返回示例:** + +```json +{ + "code": 200, + "message": "应用下流程删除成功", + "result": { + "flowId": "main" + } +} +``` + +## 5. 核心类详解 + +### 5.1 FlowManager + +位置:`apps/services/flow.py` + +**主要方法:** + +| 方法 | 功能 | 参数 | 返回值 | +|------|------|------|--------| +| `get_flows_by_app_id` | 获取应用的所有工作流 | `app_id: UUID` | `list[FlowInfo]` | +| `get_node_id_by_service_id` | 获取服务下的节点列表 | `service_id: UUID` | `list[NodeMetaDataBase]` | +| `get_service_by_user_id` | 获取用户的服务列表 | `user_sub: str` | `list[NodeServiceItem]` | +| `get_node_by_node_id` | 获取节点详细信息 | `node_id: str` | `NodeMetaDataItem` | +| `get_flow_by_app_and_flow_id` | 获取工作流详情 | `app_id: UUID, flow_id: str` | `FlowItem` | +| `put_flow_by_app_and_flow_id` | 保存/更新工作流 | `app_id: UUID, flow_id: str, flow_item: FlowItem` | `None` | +| `delete_flow_by_app_and_flow_id` | 删除工作流 | `app_id: UUID, flow_id: str` | `None` | +| `update_flow_debug_by_app_and_flow_id` | 更新调试状态 | `app_id: UUID, flow_id: str, debug: bool` | `bool` | +| `is_flow_config_equal` | 比较两个工作流配置是否相等 | `flow_config_1: Flow, flow_config_2: Flow` | `bool` | + +### 5.2 FlowServiceManager + +位置:`apps/services/flow_service.py` + +**主要方法:** + +| 方法 | 功能 | 异常 | +|------|------|------| +| `remove_excess_structure_from_flow` | 移除工作流中的多余结构(无效边) | `FlowBranchValidationError` | +| `validate_flow_illegal` | 验证工作流是否合法 | `FlowNodeValidationError`, `FlowEdgeValidationError` | +| `validate_flow_connectivity` | 验证工作流连通性(BFS) | - | +| `_validate_node_ids` | 验证节点ID唯一性 | `FlowNodeValidationError` | +| `_validate_edges` | 验证边的合法性 | `FlowEdgeValidationError` | +| `_validate_node_degrees` | 验证起始/终止节点的度数 | `FlowNodeValidationError` | + +### 5.3 FlowLoader + +位置:`apps/scheduler/pool/loader/flow.py` + +**主要方法:** + +| 方法 | 功能 | 异常 | +|------|------|------| +| `load` | 从文件系统加载工作流 | `FileNotFoundError`, `RuntimeError` | +| `save` | 保存工作流到文件系统 | - | +| `delete` | 删除工作流文件和数据 | - | +| `_load_yaml_file` | 加载YAML文件 | - | +| `_validate_basic_fields` | 验证基本字段 | - | +| `_process_edges` | 处理边的转换 | - | +| `_process_steps` | 处理步骤的转换 | `ValueError` | +| `_update_db` | 更新数据库记录 | - | +| `_update_vector` | 更新向量数据 | - | + +## 6. 状态管理 + +### 6.1 工作流状态 + +```mermaid +stateDiagram-v2 + [*] --> Created: 创建工作流 + Created --> Editing: 编辑中 + Editing --> Validating: 保存 + Validating --> Invalid: 验证失败 + Validating --> Valid: 验证成功 + Invalid --> Editing: 继续编辑 + Valid --> Testing: 开始调试 + Testing --> Debugged: 调试成功 + Testing --> Valid: 调试失败 + Debugged --> Published: 发布 + Published --> [*]: 删除 + Editing --> [*]: 删除 + Valid --> [*]: 删除 + + note right of Valid + connectivity = true + debug = false + end note + + note right of Debugged + connectivity = true + debug = true + end note +``` + +### 6.2 工作流检查状态 + +- **debug**:是否经过调试(当工作流内容修改后会重置为 false) +- **connectivity**:图的连通性检查 + - 起始节点到终止节点是否联通 + - 除终止节点外所有节点是否都有出边 + - 所有节点是否都可从起始节点到达 + +## 7. 数据转换 + +### 7.1 FlowItem 与 Flow 转换 + +```mermaid +flowchart LR + subgraph Frontend + FlowItem[FlowItem
前端数据结构] + end + + subgraph Backend + Flow[Flow
配置数据结构] + end + + subgraph Storage + YAML[YAML文件] + DB[(数据库)] + end + + FlowItem -->|put_flow| Flow + Flow -->|get_flow| FlowItem + Flow -->|save| YAML + YAML -->|load| Flow + Flow -->|_update_db| DB + DB -->|query| Flow +``` + +**转换关键点:** + +1. **NodeItem → Step** + - `step_id` 作为字典 key + - `node_id` 映射到 `node` + - `call_id` 映射到 `type` + - `parameters["input_parameters"]` 映射到 `params` + +2. **EdgeItem → Edge** + - `source_branch` + `branch_id` 组合成 `edge_from`(用 "." 连接) + - `target_branch` 映射到 `edge_to` + - `type` 转换为 `EdgeType` 枚举 + +3. **边格式处理** + - 存储:`edge_from = "step_id.branch_id"`(有分支时) + - 存储:`edge_from = "step_id"`(无分支时) + - 解析:按 "." 分割,长度为 2 表示有分支 + +## 8. 异常处理 + +### 8.1 异常类型 + +| 异常类 | 触发条件 | 处理方式 | +|--------|----------|----------| +| `FlowNodeValidationError` | 节点ID重复、起始/终止节点不存在、度数错误 | 返回400错误 | +| `FlowEdgeValidationError` | 边ID重复、自环、分支非法 | 返回400错误 | +| `FlowBranchValidationError` | 分支字段缺失/为空、分支重复、非法字符 | 返回400错误 | +| `ValueError` | 应用不存在、工作流不存在、配置错误 | 返回404/500错误 | +| `FileNotFoundError` | 工作流文件不存在 | 返回404错误 | +| `RuntimeError` | YAML文件格式错误 | 返回500错误 | + +### 8.2 错误处理流程 + +```mermaid +flowchart TD + Request[API请求] --> Auth{权限验证} + Auth -->|失败| Return403[返回403 Forbidden] + Auth -->|成功| Validate{数据验证} + Validate -->|失败| Return400[返回400 Bad Request] + Validate -->|成功| Process[处理业务逻辑] + Process -->|异常| CatchError{捕获异常} + CatchError -->|FlowValidationError| Return400 + CatchError -->|FileNotFoundError| Return404[返回404 Not Found] + CatchError -->|ValueError| Return404 + CatchError -->|其他异常| Return500[返回500 Internal Server Error] + Process -->|成功| Return200[返回200 OK] +``` + +## 9. 性能优化 + +### 9.1 并发处理 + +- 使用 `async/await` 异步处理 +- 数据库查询使用连接池 +- 文件I/O使用 `aiofiles` 异步操作 + +### 9.2 数据库优化 + +- 创建复合索引: `idx_app_id_id`, `idx_app_id_name` +- 批量查询减少数据库往返 +- 使用 `select().where().order_by()` 优化查询 + +### 9.3 缓存策略 + +- FlowLoader 通过 Pool 单例复用 +- 节点参数 schema 可缓存 +- 向量化数据独立存储和更新 + +## 10. 安全性 + +### 10.1 访问控制 + +```mermaid +flowchart TD + Request[用户请求] --> CheckSession{会话验证} + CheckSession -->|失败| Reject1[拒绝访问] + CheckSession -->|成功| CheckToken{Personal Token验证} + CheckToken -->|失败| Reject2[拒绝访问] + CheckToken -->|成功| CheckPerm{权限检查} + CheckPerm -->|读取| ValidateRead{验证读权限} + CheckPerm -->|写入/删除| ValidateWrite{验证写权限} + ValidateRead -->|有权限| AllowRead[允许读取] + ValidateRead -->|无权限| Reject3[拒绝访问] + ValidateWrite -->|是所有者| AllowWrite[允许写入] + ValidateWrite -->|非所有者| Reject4[拒绝访问] +``` + +### 10.2 数据验证 + +- 输入参数使用 Pydantic 模型验证 +- 分支ID禁止包含"."等非法字符 +- 节点/边ID唯一性检查 +- YAML文件格式验证 + +### 10.3 文件安全 + +- 文件路径使用 `anyio.Path` 安全处理 +- 限制文件访问在 `data_dir` 范围内 +- 文件hash值校验(AppHashes表) + +## 11. 扩展性 + +### 11.1 支持的节点类型 + +- **START**: 起始节点 +- **END**: 终止节点 +- **CHOICE**: 分支节点(多出边) +- **Empty**: 空节点(工具被删除时的占位符) +- **自定义节点**: 通过Service和NodeInfo扩展 + +### 11.2 支持的边类型 + +- **NORMAL**: 普通边 +- **其他自定义类型**: 通过 `EdgeType` 枚举扩展 + +### 11.3 插件机制 + +- 通过 `Pool().get_call()` 动态加载节点实现 +- Service机制支持用户自定义服务 +- 节点参数schema动态生成 + +## 12. 配置示例 + +### 12.1 YAML配置文件示例 + +```yaml +name: 示例工作流 +description: 这是一个示例工作流配置 +checkStatus: + debug: false + connectivity: true +basicConfig: + startStep: a1b2c3d4-e5f6-7890-abcd-ef1234567890 + endStep: f1e2d3c4-b5a6-7890-dcba-fe0987654321 + focusPoint: + x: 0.0 + y: 0.0 +onError: + use_llm: true +steps: + a1b2c3d4-e5f6-7890-abcd-ef1234567890: + type: Start + node: start + name: 开始 + description: 工作流起始节点 + pos: + x: 100.0 + y: 100.0 + params: {} + b2c3d4e5-f6a7-8901-bcde-f12345678901: + type: ApiCall + node: api_node_123 + name: API调用 + description: 调用外部API + pos: + x: 300.0 + y: 100.0 + params: + url: https://api.example.com + method: GET + f1e2d3c4-b5a6-7890-dcba-fe0987654321: + type: End + node: end + name: 结束 + description: 工作流结束节点 + pos: + x: 500.0 + y: 100.0 + params: {} +edges: + - id: edge001 + edge_from: a1b2c3d4-e5f6-7890-abcd-ef1234567890 + edge_to: b2c3d4e5-f6a7-8901-bcde-f12345678901 + edge_type: NORMAL + - id: edge002 + edge_from: b2c3d4e5-f6a7-8901-bcde-f12345678901 + edge_to: f1e2d3c4-b5a6-7890-dcba-fe0987654321 + edge_type: NORMAL +``` + +## 13. 文件存储结构 + +```text +data_dir/ +└── semantics/ + └── app/ + └── {app_id}/ + ├── metadata.yaml # 应用元数据 + └── flow/ + ├── main.yaml # 主工作流 + ├── {flow_id}.yaml # 其他工作流 + └── ... +``` + +## 14. 总结 + +FlowManager 模块提供了完整的工作流管理功能,具有以下特点: + +✅ **完整的CRUD操作**: 支持工作流的创建、读取、更新、删除 +✅ **严格的数据验证**: 多层验证确保工作流配置正确性 +✅ **连通性检查**: BFS算法验证图的连通性 +✅ **权限控制**: 完善的用户权限和访问控制 +✅ **异步处理**: 高性能的异步I/O操作 +✅ **扩展性强**: 支持自定义节点和边类型 +✅ **数据一致性**: 数据库与文件系统双重存储保证一致性 + +该模块是 Euler Copilot Framework 工作流引擎的核心组件,为上层应用提供了稳定可靠的工作流管理服务。