From bf5da6dae33a8ab8036b2c66056c79617719936f Mon Sep 17 00:00:00 2001 From: zxstty Date: Sun, 10 Aug 2025 17:53:38 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=AE=8C=E5=96=84Agent=E8=B0=83=E7=94=A8?= =?UTF-8?q?=EF=BC=8C=E9=99=8D=E4=BD=8E=E8=AE=A1=E5=88=92=E7=94=9F=E6=88=90?= =?UTF-8?q?=E6=97=B6=E5=BB=B6=EF=BC=8840s=3D>20s=EF=BC=89,=E9=99=8D?= =?UTF-8?q?=E4=BD=8E=E6=AF=8F=E6=AC=A1json=E7=94=9F=E6=88=90=E7=9A=84?= =?UTF-8?q?=E5=BC=80=E9=94=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/llm/function.py | 14 ++----- apps/routers/chat.py | 6 +-- apps/scheduler/executor/agent.py | 13 ++++-- apps/scheduler/mcp/prompt.py | 4 +- apps/scheduler/mcp_agent/base.py | 61 +++++++++++++++++++++++++++ apps/scheduler/mcp_agent/host.py | 9 +++- apps/scheduler/mcp_agent/plan.py | 52 +++++++++-------------- apps/scheduler/mcp_agent/prompt.py | 66 ++++++++++++++++++++++++++++++ apps/scheduler/mcp_agent/select.py | 35 +--------------- apps/services/activity.py | 8 ---- apps/services/task.py | 19 +++------ 11 files changed, 181 insertions(+), 106 deletions(-) create mode 100644 apps/scheduler/mcp_agent/base.py diff --git a/apps/llm/function.py b/apps/llm/function.py index 1f995fe7..6165dc45 100644 --- a/apps/llm/function.py +++ b/apps/llm/function.py @@ -68,7 +68,6 @@ class FunctionLLM: api_key=self._config.api_key, ) - async def _call_openai( self, messages: list[dict[str, str]], @@ -123,7 +122,7 @@ class FunctionLLM: }, ] - response = await self._client.chat.completions.create(**self._params) # type: ignore[arg-type] + response = await self._client.chat.completions.create(**self._params) # type: ignore[arg-type] try: logger.info("[FunctionCall] 大模型输出:%s", response.choices[0].message.tool_calls[0].function.arguments) return response.choices[0].message.tool_calls[0].function.arguments @@ -132,7 +131,6 @@ class FunctionLLM: logger.info("[FunctionCall] 大模型输出:%s", ans) return await FunctionLLM.process_response(ans) - @staticmethod async def process_response(response: str) -> str: """处理大模型的输出""" @@ -169,7 +167,6 @@ class FunctionLLM: return json_str - async def _call_ollama( self, messages: list[dict[str, str]], @@ -196,10 +193,9 @@ class FunctionLLM: "format": schema, }) - response = await self._client.chat(**self._params) # type: ignore[arg-type] + response = await self._client.chat(**self._params) # type: ignore[arg-type] return await self.process_response(response.message.content or "") - async def call( self, messages: list[dict[str, Any]], @@ -254,7 +250,6 @@ class JsonGenerator: ) self._err_info = "" - async def _assemble_message(self) -> str: """组装消息""" # 检查类型 @@ -275,13 +270,12 @@ class JsonGenerator: """单次尝试""" prompt = await self._assemble_message() messages = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": prompt}, + {"role": "system", "content": prompt}, + {"role": "user", "content": "please generate a JSON response based on the above information and schema."}, ] function = FunctionLLM() return await function.call(messages, self._schema, max_tokens, temperature) - async def generate(self) -> dict[str, Any]: """生成JSON""" Draft7Validator.check_schema(self._schema) diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 9a45c2d5..68a5b4af 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -148,9 +148,9 @@ async def chat( await UserBlacklistManager.change_blacklisted_users(user_sub, -10) raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="question is blacklisted") - # 限流检查 - if await Activity.is_active(user_sub): - raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests") + # # 限流检查 + # if await Activity.is_active(user_sub): + # raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests") res = chat_generator(post_body, user_sub, session_id) return StreamingResponse( diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 632cf9e3..60394bd7 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -137,8 +137,9 @@ class MCPAgentExecutor(BaseExecutor): if is_first: # 获取第一个输入参数 tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + step = self.task.runtime.temporary_plans.plans[self.task.state.step_index] mcp_tool = self.tools[tool_id] - self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task) + self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, step.instruction, self.task) else: # 获取后续输入参数 if isinstance(self.params, param): @@ -271,10 +272,16 @@ class MCPAgentExecutor(BaseExecutor): self.resoning_llm ) await self.update_tokens() + error_message = MCPPlanner.change_err_message_to_description( + error_message=self.task.state.error_message, + tool=mcp_tool, + input_params=self.task.state.current_input, + reasoning_llm=self.resoning_llm + ) await self.push_message( EventType.STEP_WAITING_FOR_PARAM, data={ - "message": "当运行产生如下报错:\n" + self.task.state.error_message, + "message": error_message, "params": params_with_null } ) @@ -297,7 +304,7 @@ class MCPAgentExecutor(BaseExecutor): input_data={}, output_data={}, ex_data={ - "message": "当运行产生如下报错:\n" + self.task.state.error_message, + "message": error_message, "params": params_with_null } ) diff --git a/apps/scheduler/mcp/prompt.py b/apps/scheduler/mcp/prompt.py index b322fb08..d6ff72b4 100644 --- a/apps/scheduler/mcp/prompt.py +++ b/apps/scheduler/mcp/prompt.py @@ -233,8 +233,8 @@ FINAL_ANSWER = dedent(r""" MEMORY_TEMPLATE = dedent(r""" {% for ctx in context_list %} - 第{{ loop.index }}步:{{ ctx.step_description }} - - 调用工具 `{{ ctx.step_id }}`,并提供参数 `{{ ctx.input_data }}` + - 调用工具 `{{ ctx.step_name }}`,并提供参数 `{{ ctx.input_data|tojson }}`。 - 执行状态:{{ ctx.status }} - - 得到数据:`{{ ctx.output_data }}` + - 得到数据:`{{ ctx.output_data|tojson }}` {% endfor %} """) diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py new file mode 100644 index 00000000..ac3829b4 --- /dev/null +++ b/apps/scheduler/mcp_agent/base.py @@ -0,0 +1,61 @@ +from typing import Any +import json +from jsonschema import validate +import logging +from apps.llm.function import JsonGenerator +from apps.llm.reasoning import ReasoningLLM + +logger = logging.getLogger(__name__) + + +class McpBase: + @staticmethod + async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + """获取推理结果""" + # 调用推理大模型 + message = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt}, + ] + result = "" + async for chunk in resoning_llm.call( + message, + streaming=False, + temperature=0.07, + result_only=True, + ): + result += chunk + + return result + + @staticmethod + async def _parse_result(result: str, schema: dict[str, Any], left_str: str = '{', right_str: str = '}') -> str: + """解析推理结果""" + left_index = result.find(left_str) + right_index = result.rfind(right_str) + flag = True + if left_str == -1 or right_str == -1: + flag = False + + if left_index > right_index: + flag = False + if flag: + try: + tmp_js = json.loads(result[left_index:right_index + 1]) + validate(instance=tmp_js, schema=schema) + except Exception as e: + logger.error("[McpBase] 解析结果失败: %s", e) + flag = False + if not flag: + json_generator = JsonGenerator( + "请提取下面内容中的json\n\n", + [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "请提取下面内容中的json\n\n"+result}, + ], + schema, + ) + json_result = await json_generator.generate() + else: + json_result = json.loads(result[left_index:right_index + 1]) + return json_result diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index 2701477c..85d992c8 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -24,12 +24,19 @@ logger = logging.getLogger(__name__) _env = SandboxedEnvironment( loader=BaseLoader, - autoescape=True, + autoescape=False, trim_blocks=True, lstrip_blocks=True, ) +def tojson_filter(value): + return json.dumps(value, ensure_ascii=False, separators=(',', ':')) + + +_env.filters['tojson'] = tojson_filter + + class MCPHost: """MCP宿主服务""" diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index 977dfda1..35c44030 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -6,6 +6,7 @@ from jinja2.sandbox import SandboxedEnvironment from apps.llm.reasoning import ReasoningLLM from apps.llm.function import JsonGenerator +from apps.scheduler.mcp_agent.base import McpBase from apps.scheduler.mcp_agent.prompt import ( EVALUATE_GOAL, GENERATE_FLOW_NAME, @@ -14,6 +15,7 @@ from apps.scheduler.mcp_agent.prompt import ( RECREATE_PLAN, RISK_EVALUATE, TOOL_EXECUTE_ERROR_TYPE_ANALYSIS, + CHANGE_ERROR_MESSAGE_TO_DESCRIPTION, GET_MISSING_PARAMS, FINAL_ANSWER ) @@ -35,40 +37,8 @@ _env = SandboxedEnvironment( ) -class MCPPlanner: +class MCPPlanner(McpBase): """MCP 用户目标拆解与规划""" - @staticmethod - async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: - """获取推理结果""" - # 调用推理大模型 - message = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": prompt}, - ] - result = "" - async for chunk in resoning_llm.call( - message, - streaming=False, - temperature=0.07, - result_only=True, - ): - result += chunk - - return result - - @staticmethod - async def _parse_result(result: str, schema: dict[str, Any]) -> str: - """解析推理结果""" - json_generator = JsonGenerator( - result, - [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "请提取下面内容中的json\n\n"+result}, - ], - schema, - ) - json_result = await json_generator.generate() - return json_result @staticmethod async def evaluate_goal( @@ -266,6 +236,22 @@ class MCPPlanner: # 返回工具执行错误类型 return error_type + @staticmethod + async def change_err_message_to_description( + error_message: str, tool: MCPTool, input_params: dict[str, Any], + reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + """将错误信息转换为工具描述""" + template = _env.from_string(CHANGE_ERROR_MESSAGE_TO_DESCRIPTION) + prompt = template.render( + error_message=error_message, + tool_name=tool.name, + tool_description=tool.description, + input_schema=tool.input_schema, + input_params=input_params, + ) + result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) + return result + @staticmethod async def get_missing_param( tool: MCPTool, diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index 139e8e37..365179f6 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -70,6 +70,7 @@ TOOL_SELECT = dedent(r""" 2. 请在给定的MCP工具列表中选择,不要自己生成MCP工具。 3. 可以选择一些辅助工具,但必须确保这些工具与当前目标相关。 4. 注意,返回的工具ID必须是MCP工具的ID,而不是名称。 + 5. 不要选择不存在的工具。 必须按照以下格式生成选择结果,不要输出任何其他内容: ```json { @@ -616,6 +617,71 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r""" # 输出 """ ) +# 将当前程序运行的报错转换为自然语言 +CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r""" + 你是一个智能助手,你的任务是将当前程序运行的报错转换为自然语言描述。 + 请根据以下规则进行转换: + 1. 将报错信息转换为自然语言描述,描述应该简洁明了,能够让人理解报错的原因和影响。 + 2. 描述应该包含报错的具体内容和可能的解决方案。 + 3. 描述应该避免使用过于专业的术语,以便用户能够理解。 + 4. 描述应该尽量简短,控制在50字以内。 + 5. 只输出自然语言描述,不要输出其他内容。 + # 样例 + # 工具信息 + + port_scanner + 扫描主机端口 + + { + "type": "object", + "properties": { + "host": { + "type": "string", + "description": "主机地址" + }, + "port": { + "type": "integer", + "description": "端口号" + }, + "username": { + "type": "string", + "description": "用户名" + }, + "password": { + "type": "string", + "description": "密码" + } + }, + "required": ["host", "port", "username", "password"] + } + + + # 工具入参 + { + "host": "192.0.0.1", + "port": 3306, + "username": "root", + "password": "password" + } + # 报错信息 + 执行端口扫描命令时,出现了错误:`password is not correct`。 + # 输出 + 扫描端口时发生错误:密码不正确。请检查输入的密码是否正确,并重试。 + # 现在开始转换报错信息: + # 工具信息 + + {{tool_name}} + {{tool_description}} + + {{input_schema}} + + + # 工具入参 + {{input_params}} + # 报错信息 + {{error_message}} + # 输出 + """) # 获取缺失的参数的json结构体 GET_MISSING_PARAMS = dedent(r""" 你是一个工具参数获取器。 diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py index 0ae54dca..075e08f0 100644 --- a/apps/scheduler/mcp_agent/select.py +++ b/apps/scheduler/mcp_agent/select.py @@ -15,6 +15,7 @@ from apps.llm.embedding import Embedding from apps.llm.function import FunctionLLM from apps.llm.reasoning import ReasoningLLM from apps.llm.token import TokenCalculator +from apps.scheduler.mcp_agent.base import McpBase from apps.scheduler.mcp_agent.prompt import TOOL_SELECT from apps.schemas.mcp import ( BaseModel, @@ -37,40 +38,8 @@ FINAL_TOOL_ID = "FIANL" SUMMARIZE_TOOL_ID = "SUMMARIZE" -class MCPSelector: +class MCPSelector(McpBase): """MCP选择器""" - @staticmethod - async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: - """获取推理结果""" - # 调用推理大模型 - message = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": prompt}, - ] - result = "" - async for chunk in resoning_llm.call( - message, - streaming=False, - temperature=0.07, - result_only=True, - ): - result += chunk - - return result - - @staticmethod - async def _parse_result(result: str, schema: dict[str, Any]) -> str: - """解析推理结果""" - json_generator = JsonGenerator( - result, - [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "请提取下面内容中的json\n\n"+result}, - ], - schema, - ) - json_result = await json_generator.generate() - return json_result @staticmethod async def select_top_tool( diff --git a/apps/services/activity.py b/apps/services/activity.py index 88142b9e..df322cc8 100644 --- a/apps/services/activity.py +++ b/apps/services/activity.py @@ -22,14 +22,6 @@ class Activity: :param user_sub: 用户实体ID :return: 判断结果,正在提问则返回True """ - time = round(datetime.now(UTC).timestamp(), 3) - - # 检查窗口内总请求数 - count = await MongoDB().get_collection("activity").count_documents( - {"timestamp": {"$gte": time - SLIDE_WINDOW_TIME, "$lte": time}}, - ) - if count >= SLIDE_WINDOW_QUESTION_COUNT: - return True # 检查用户是否正在提问 active = await MongoDB().get_collection("activity").find_one( diff --git a/apps/services/task.py b/apps/services/task.py index ad20f51f..38022f40 100644 --- a/apps/services/task.py +++ b/apps/services/task.py @@ -138,19 +138,12 @@ class TaskManager: """保存flow信息到flow_context""" flow_context_collection = MongoDB().get_collection("flow_context") try: - for history in flow_context: - # 查找是否存在 - current_context = await flow_context_collection.find_one({ - "task_id": task_id, - "_id": history.id, - }) - if current_context: - await flow_context_collection.update_one( - {"_id": current_context["_id"]}, - {"$set": history.model_dump(exclude_none=True, by_alias=True)}, - ) - else: - await flow_context_collection.insert_one(history.model_dump(exclude_none=True, by_alias=True)) + # 删除旧的flow_context + await flow_context_collection.delete_many({"task_id": task_id}) + await flow_context_collection.insert_many( + [history.model_dump(exclude_none=True, by_alias=True) for history in flow_context], + ordered=False, + ) except Exception: logger.exception("[TaskManager] 保存flow执行记录失败") -- Gitee From c6c0c5f6b342eeb028ae632e6eefeff7fb559e0e Mon Sep 17 00:00:00 2001 From: zxstty Date: Sun, 10 Aug 2025 18:09:14 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=AE=8C=E5=96=84=E9=BB=98=E8=AE=A4?= =?UTF-8?q?=E7=94=A8=E6=88=B7=E6=B7=BB=E5=8A=A0=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/main.py b/apps/main.py index 547cb681..99b7de0c 100644 --- a/apps/main.py +++ b/apps/main.py @@ -39,7 +39,7 @@ from apps.routers import ( parameter ) from apps.scheduler.pool.pool import Pool - +logger = logging.getLogger(__name__) # 定义FastAPI app app = FastAPI(redoc_url=None) # 定义FastAPI全局中间件 @@ -104,7 +104,7 @@ async def add_no_auth_user() -> None: auto_execute=False ).model_dump(by_alias=True)) except Exception as e: - logging.warning(f"添加无认证用户失败: {e}") + logger.error(f"[add_no_auth_user] 默认用户 {username} 已存在") async def clear_user_activity() -> None: -- Gitee