diff --git a/apps/scheduler/call/mcp/mcp.py b/apps/scheduler/call/mcp/mcp.py index 7c59448a91d68167d2c05e3a1cd448e5f54fc512..157ee75f63329d5b2d57253c487467fff7688e63 100644 --- a/apps/scheduler/call/mcp/mcp.py +++ b/apps/scheduler/call/mcp/mcp.py @@ -53,13 +53,14 @@ class MCP(CoreCall, input_model=MCPInput, output_model=MCPOutput): """MCP工具""" controlled_output: bool = Field(default=True) - + # 输出参数配置 output_parameters: dict[str, Any] = Field(description="输出参数配置", default={ "message": {"type": "string", "description": "MCP Server的自然语言输出"}, }) - - mcp_list: list[str] = Field(description="MCP Server ID列表", max_length=5, min_length=1) + + mcp_list: list[str] = Field( + description="MCP Server ID列表", max_length=5, min_length=1) max_steps: int = Field(description="最大步骤数", default=20) text_output: bool = Field(description="是否将结果以文本形式返回", default=True) to_user: bool = Field(description="是否将结果返回给用户", default=True) @@ -72,7 +73,7 @@ class MCP(CoreCall, input_model=MCPInput, output_model=MCPOutput): }, LanguageType.ENGLISH: { "name": "MCP", - "type": CallType.DEFAULT, + "type": CallType.DEFAULT, "description": "Call the MCP Server to execute tools", }, } @@ -80,7 +81,8 @@ class MCP(CoreCall, input_model=MCPInput, output_model=MCPOutput): async def _init(self, call_vars: CallVars) -> MCPInput: """初始化MCP""" # 获取MCP交互类 - self._host = MCPHost(call_vars.ids.user_sub, call_vars.ids.task_id, call_vars.ids.flow_id, self.description) + self._host = MCPHost(call_vars.ids.user_sub, call_vars.ids.task_id, + call_vars.ids.flow_id, self.description) self._tool_list = await self._host.get_tool_list(self.mcp_list) self._call_vars = call_vars @@ -104,14 +106,14 @@ class MCP(CoreCall, input_model=MCPInput, output_model=MCPOutput): # 执行计划 plan_list = deepcopy(self._plan.plans) while len(plan_list) > 0: - async for chunk in self._execute_plan_item(plan_list.pop(0), language): + async for chunk in self._execute_plan_item(plan_list.pop(0)): yield chunk # 生成总结 async for chunk in self._generate_answer(language): yield chunk - async def _generate_plan(self, language) -> AsyncGenerator[CallOutputChunk, None]: + async def _generate_plan(self, language: LanguageType = LanguageType.CHINESE) -> AsyncGenerator[CallOutputChunk, None]: """生成执行计划""" # 开始提示 yield self._create_output(MCP_GENERATE["START"][language], MCPMessageType.PLAN_BEGIN) @@ -140,9 +142,10 @@ class MCP(CoreCall, input_model=MCPInput, output_model=MCPOutput): # 判断是否为Final if plan_item.tool == "Final": return - + self._host.language = language # 获取工具 - tool = next((tool for tool in self._tool_list if tool.id == plan_item.tool), None) + tool = next( + (tool for tool in self._tool_list if tool.id == plan_item.tool), None) if tool is None: err = f"[MCP] 工具 {plan_item.tool} 不存在" logger.error(err) @@ -156,7 +159,7 @@ class MCP(CoreCall, input_model=MCPInput, output_model=MCPOutput): # 调用工具 try: - result = await self._host.call_tool(tool, plan_item, language) + result = await self._host.call_tool(tool, plan_item) except Exception as e: err = f"[MCP] 工具 {tool.name} 调用失败: {e!s}" logger.exception(err) @@ -172,7 +175,7 @@ class MCP(CoreCall, input_model=MCPInput, output_model=MCPOutput): }, ) - async def _generate_answer(self, language) -> AsyncGenerator[CallOutputChunk, None]: + async def _generate_answer(self, language: LanguageType = LanguageType.CHINESE) -> AsyncGenerator[CallOutputChunk, None]: """生成总结""" # 提示开始总结 yield self._create_output( @@ -192,11 +195,12 @@ class MCP(CoreCall, input_model=MCPInput, output_model=MCPOutput): message=answer, ).model_dump(), ) - + # 额外输出一个纯DATA chunk用于保存到变量池 yield CallOutputChunk( type=CallOutputType.DATA, - content=MCPOutput(message=answer).model_dump(by_alias=True, exclude_none=True) + content=MCPOutput(message=answer).model_dump( + by_alias=True, exclude_none=True) ) def _create_output( diff --git a/apps/scheduler/mcp/select.py b/apps/scheduler/mcp/select.py index e8b0e88c09ac1d1ac63f995b779da8483a2fce22..11ed82ca9c1078954c4e8c84eb363d58ff89ec41 100644 --- a/apps/scheduler/mcp/select.py +++ b/apps/scheduler/mcp/select.py @@ -80,7 +80,7 @@ class MCPSelector: return llm_mcp_list async def _get_mcp_by_llm( - self, query: str, mcp_list: list[dict[str, str]], mcp_ids: list[str], language + self, query: str, mcp_list: list[dict[str, str]], mcp_ids: list[str], language: LanguageType = LanguageType.CHINESE ) -> MCPSelectResult: """通过LLM选择最合适的MCP Server""" # 初始化jinja2环境