diff --git a/apps/llm/function.py b/apps/llm/function.py index 1f995fe7ba187cead03aa6fc62a4cbce1ec05a65..6165dc455aae038aeb1ad9bcd0840b418c3dc8e8 100644 --- a/apps/llm/function.py +++ b/apps/llm/function.py @@ -68,7 +68,6 @@ class FunctionLLM: api_key=self._config.api_key, ) - async def _call_openai( self, messages: list[dict[str, str]], @@ -123,7 +122,7 @@ class FunctionLLM: }, ] - response = await self._client.chat.completions.create(**self._params) # type: ignore[arg-type] + response = await self._client.chat.completions.create(**self._params) # type: ignore[arg-type] try: logger.info("[FunctionCall] 大模型输出:%s", response.choices[0].message.tool_calls[0].function.arguments) return response.choices[0].message.tool_calls[0].function.arguments @@ -132,7 +131,6 @@ class FunctionLLM: logger.info("[FunctionCall] 大模型输出:%s", ans) return await FunctionLLM.process_response(ans) - @staticmethod async def process_response(response: str) -> str: """处理大模型的输出""" @@ -169,7 +167,6 @@ class FunctionLLM: return json_str - async def _call_ollama( self, messages: list[dict[str, str]], @@ -196,10 +193,9 @@ class FunctionLLM: "format": schema, }) - response = await self._client.chat(**self._params) # type: ignore[arg-type] + response = await self._client.chat(**self._params) # type: ignore[arg-type] return await self.process_response(response.message.content or "") - async def call( self, messages: list[dict[str, Any]], @@ -254,7 +250,6 @@ class JsonGenerator: ) self._err_info = "" - async def _assemble_message(self) -> str: """组装消息""" # 检查类型 @@ -275,13 +270,12 @@ class JsonGenerator: """单次尝试""" prompt = await self._assemble_message() messages = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": prompt}, + {"role": "system", "content": prompt}, + {"role": "user", "content": "please generate a JSON response based on the above information and schema."}, ] function = FunctionLLM() return await function.call(messages, self._schema, max_tokens, temperature) - async def generate(self) -> dict[str, Any]: """生成JSON""" Draft7Validator.check_schema(self._schema) diff --git a/apps/main.py b/apps/main.py index 547cb6810828582114b59d6e2d74c77b1d2450f2..99b7de0cd927f937abc95971cb878dd87e878abb 100644 --- a/apps/main.py +++ b/apps/main.py @@ -39,7 +39,7 @@ from apps.routers import ( parameter ) from apps.scheduler.pool.pool import Pool - +logger = logging.getLogger(__name__) # 定义FastAPI app app = FastAPI(redoc_url=None) # 定义FastAPI全局中间件 @@ -104,7 +104,7 @@ async def add_no_auth_user() -> None: auto_execute=False ).model_dump(by_alias=True)) except Exception as e: - logging.warning(f"添加无认证用户失败: {e}") + logger.error(f"[add_no_auth_user] 默认用户 {username} 已存在") async def clear_user_activity() -> None: diff --git a/apps/routers/chat.py b/apps/routers/chat.py index 9a45c2d5af4fd8862373cb078d34bad2df0784d1..68a5b4af8096a3c82072131fcdce266da520a3f8 100644 --- a/apps/routers/chat.py +++ b/apps/routers/chat.py @@ -148,9 +148,9 @@ async def chat( await UserBlacklistManager.change_blacklisted_users(user_sub, -10) raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="question is blacklisted") - # 限流检查 - if await Activity.is_active(user_sub): - raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests") + # # 限流检查 + # if await Activity.is_active(user_sub): + # raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests") res = chat_generator(post_body, user_sub, session_id) return StreamingResponse( diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py index 632cf9e3819be5c4169d22817986b7750e64d11f..60394bd76cb11dccc25ab1370cdded4034ac1311 100644 --- a/apps/scheduler/executor/agent.py +++ b/apps/scheduler/executor/agent.py @@ -137,8 +137,9 @@ class MCPAgentExecutor(BaseExecutor): if is_first: # 获取第一个输入参数 tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool + step = self.task.runtime.temporary_plans.plans[self.task.state.step_index] mcp_tool = self.tools[tool_id] - self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task) + self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, step.instruction, self.task) else: # 获取后续输入参数 if isinstance(self.params, param): @@ -271,10 +272,16 @@ class MCPAgentExecutor(BaseExecutor): self.resoning_llm ) await self.update_tokens() + error_message = MCPPlanner.change_err_message_to_description( + error_message=self.task.state.error_message, + tool=mcp_tool, + input_params=self.task.state.current_input, + reasoning_llm=self.resoning_llm + ) await self.push_message( EventType.STEP_WAITING_FOR_PARAM, data={ - "message": "当运行产生如下报错:\n" + self.task.state.error_message, + "message": error_message, "params": params_with_null } ) @@ -297,7 +304,7 @@ class MCPAgentExecutor(BaseExecutor): input_data={}, output_data={}, ex_data={ - "message": "当运行产生如下报错:\n" + self.task.state.error_message, + "message": error_message, "params": params_with_null } ) diff --git a/apps/scheduler/mcp/prompt.py b/apps/scheduler/mcp/prompt.py index b322fb0883e8ed935243389cb86066845a549631..d6ff72b4c2b8991fe07b95562641f3adfd1f8c92 100644 --- a/apps/scheduler/mcp/prompt.py +++ b/apps/scheduler/mcp/prompt.py @@ -233,8 +233,8 @@ FINAL_ANSWER = dedent(r""" MEMORY_TEMPLATE = dedent(r""" {% for ctx in context_list %} - 第{{ loop.index }}步:{{ ctx.step_description }} - - 调用工具 `{{ ctx.step_id }}`,并提供参数 `{{ ctx.input_data }}` + - 调用工具 `{{ ctx.step_name }}`,并提供参数 `{{ ctx.input_data|tojson }}`。 - 执行状态:{{ ctx.status }} - - 得到数据:`{{ ctx.output_data }}` + - 得到数据:`{{ ctx.output_data|tojson }}` {% endfor %} """) diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py new file mode 100644 index 0000000000000000000000000000000000000000..ac3829b418be54bfc5df6ae3d61db82fe0fc129e --- /dev/null +++ b/apps/scheduler/mcp_agent/base.py @@ -0,0 +1,61 @@ +from typing import Any +import json +from jsonschema import validate +import logging +from apps.llm.function import JsonGenerator +from apps.llm.reasoning import ReasoningLLM + +logger = logging.getLogger(__name__) + + +class McpBase: + @staticmethod + async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + """获取推理结果""" + # 调用推理大模型 + message = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt}, + ] + result = "" + async for chunk in resoning_llm.call( + message, + streaming=False, + temperature=0.07, + result_only=True, + ): + result += chunk + + return result + + @staticmethod + async def _parse_result(result: str, schema: dict[str, Any], left_str: str = '{', right_str: str = '}') -> str: + """解析推理结果""" + left_index = result.find(left_str) + right_index = result.rfind(right_str) + flag = True + if left_str == -1 or right_str == -1: + flag = False + + if left_index > right_index: + flag = False + if flag: + try: + tmp_js = json.loads(result[left_index:right_index + 1]) + validate(instance=tmp_js, schema=schema) + except Exception as e: + logger.error("[McpBase] 解析结果失败: %s", e) + flag = False + if not flag: + json_generator = JsonGenerator( + "请提取下面内容中的json\n\n", + [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "请提取下面内容中的json\n\n"+result}, + ], + schema, + ) + json_result = await json_generator.generate() + else: + json_result = json.loads(result[left_index:right_index + 1]) + return json_result diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index 2701477c4f212d006867cdd5447eb0a26ac7310c..85d992c8d5249299daff12d864ac47b9676ba2a2 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -24,12 +24,19 @@ logger = logging.getLogger(__name__) _env = SandboxedEnvironment( loader=BaseLoader, - autoescape=True, + autoescape=False, trim_blocks=True, lstrip_blocks=True, ) +def tojson_filter(value): + return json.dumps(value, ensure_ascii=False, separators=(',', ':')) + + +_env.filters['tojson'] = tojson_filter + + class MCPHost: """MCP宿主服务""" diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py index 977dfda109207f69933bf5d81fbcf7d552a19f54..35c440309a4694e1c75967c40b08b6e8f59710a5 100644 --- a/apps/scheduler/mcp_agent/plan.py +++ b/apps/scheduler/mcp_agent/plan.py @@ -6,6 +6,7 @@ from jinja2.sandbox import SandboxedEnvironment from apps.llm.reasoning import ReasoningLLM from apps.llm.function import JsonGenerator +from apps.scheduler.mcp_agent.base import McpBase from apps.scheduler.mcp_agent.prompt import ( EVALUATE_GOAL, GENERATE_FLOW_NAME, @@ -14,6 +15,7 @@ from apps.scheduler.mcp_agent.prompt import ( RECREATE_PLAN, RISK_EVALUATE, TOOL_EXECUTE_ERROR_TYPE_ANALYSIS, + CHANGE_ERROR_MESSAGE_TO_DESCRIPTION, GET_MISSING_PARAMS, FINAL_ANSWER ) @@ -35,40 +37,8 @@ _env = SandboxedEnvironment( ) -class MCPPlanner: +class MCPPlanner(McpBase): """MCP 用户目标拆解与规划""" - @staticmethod - async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: - """获取推理结果""" - # 调用推理大模型 - message = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": prompt}, - ] - result = "" - async for chunk in resoning_llm.call( - message, - streaming=False, - temperature=0.07, - result_only=True, - ): - result += chunk - - return result - - @staticmethod - async def _parse_result(result: str, schema: dict[str, Any]) -> str: - """解析推理结果""" - json_generator = JsonGenerator( - result, - [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "请提取下面内容中的json\n\n"+result}, - ], - schema, - ) - json_result = await json_generator.generate() - return json_result @staticmethod async def evaluate_goal( @@ -266,6 +236,22 @@ class MCPPlanner: # 返回工具执行错误类型 return error_type + @staticmethod + async def change_err_message_to_description( + error_message: str, tool: MCPTool, input_params: dict[str, Any], + reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str: + """将错误信息转换为工具描述""" + template = _env.from_string(CHANGE_ERROR_MESSAGE_TO_DESCRIPTION) + prompt = template.render( + error_message=error_message, + tool_name=tool.name, + tool_description=tool.description, + input_schema=tool.input_schema, + input_params=input_params, + ) + result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm) + return result + @staticmethod async def get_missing_param( tool: MCPTool, diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py index 139e8e3720be24fe56af4ff8341dd5ae9095f4fc..365179f6e5c763fcd440b8e4260400ee0e7229d3 100644 --- a/apps/scheduler/mcp_agent/prompt.py +++ b/apps/scheduler/mcp_agent/prompt.py @@ -70,6 +70,7 @@ TOOL_SELECT = dedent(r""" 2. 请在给定的MCP工具列表中选择,不要自己生成MCP工具。 3. 可以选择一些辅助工具,但必须确保这些工具与当前目标相关。 4. 注意,返回的工具ID必须是MCP工具的ID,而不是名称。 + 5. 不要选择不存在的工具。 必须按照以下格式生成选择结果,不要输出任何其他内容: ```json { @@ -616,6 +617,71 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r""" # 输出 """ ) +# 将当前程序运行的报错转换为自然语言 +CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r""" + 你是一个智能助手,你的任务是将当前程序运行的报错转换为自然语言描述。 + 请根据以下规则进行转换: + 1. 将报错信息转换为自然语言描述,描述应该简洁明了,能够让人理解报错的原因和影响。 + 2. 描述应该包含报错的具体内容和可能的解决方案。 + 3. 描述应该避免使用过于专业的术语,以便用户能够理解。 + 4. 描述应该尽量简短,控制在50字以内。 + 5. 只输出自然语言描述,不要输出其他内容。 + # 样例 + # 工具信息 + + port_scanner + 扫描主机端口 + + { + "type": "object", + "properties": { + "host": { + "type": "string", + "description": "主机地址" + }, + "port": { + "type": "integer", + "description": "端口号" + }, + "username": { + "type": "string", + "description": "用户名" + }, + "password": { + "type": "string", + "description": "密码" + } + }, + "required": ["host", "port", "username", "password"] + } + + + # 工具入参 + { + "host": "192.0.0.1", + "port": 3306, + "username": "root", + "password": "password" + } + # 报错信息 + 执行端口扫描命令时,出现了错误:`password is not correct`。 + # 输出 + 扫描端口时发生错误:密码不正确。请检查输入的密码是否正确,并重试。 + # 现在开始转换报错信息: + # 工具信息 + + {{tool_name}} + {{tool_description}} + + {{input_schema}} + + + # 工具入参 + {{input_params}} + # 报错信息 + {{error_message}} + # 输出 + """) # 获取缺失的参数的json结构体 GET_MISSING_PARAMS = dedent(r""" 你是一个工具参数获取器。 diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py index 0ae54dca92eb98cc32ab9ffad3ef260b225ed048..075e08f00cb2b5a2976d5372c623193b04c42412 100644 --- a/apps/scheduler/mcp_agent/select.py +++ b/apps/scheduler/mcp_agent/select.py @@ -15,6 +15,7 @@ from apps.llm.embedding import Embedding from apps.llm.function import FunctionLLM from apps.llm.reasoning import ReasoningLLM from apps.llm.token import TokenCalculator +from apps.scheduler.mcp_agent.base import McpBase from apps.scheduler.mcp_agent.prompt import TOOL_SELECT from apps.schemas.mcp import ( BaseModel, @@ -37,40 +38,8 @@ FINAL_TOOL_ID = "FIANL" SUMMARIZE_TOOL_ID = "SUMMARIZE" -class MCPSelector: +class MCPSelector(McpBase): """MCP选择器""" - @staticmethod - async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str: - """获取推理结果""" - # 调用推理大模型 - message = [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": prompt}, - ] - result = "" - async for chunk in resoning_llm.call( - message, - streaming=False, - temperature=0.07, - result_only=True, - ): - result += chunk - - return result - - @staticmethod - async def _parse_result(result: str, schema: dict[str, Any]) -> str: - """解析推理结果""" - json_generator = JsonGenerator( - result, - [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "请提取下面内容中的json\n\n"+result}, - ], - schema, - ) - json_result = await json_generator.generate() - return json_result @staticmethod async def select_top_tool( diff --git a/apps/services/activity.py b/apps/services/activity.py index 88142b9ee6772319c0eaf32dfdd310e968e32adc..df322cc8792c477bed9d5712e968a6512e38ea84 100644 --- a/apps/services/activity.py +++ b/apps/services/activity.py @@ -22,14 +22,6 @@ class Activity: :param user_sub: 用户实体ID :return: 判断结果,正在提问则返回True """ - time = round(datetime.now(UTC).timestamp(), 3) - - # 检查窗口内总请求数 - count = await MongoDB().get_collection("activity").count_documents( - {"timestamp": {"$gte": time - SLIDE_WINDOW_TIME, "$lte": time}}, - ) - if count >= SLIDE_WINDOW_QUESTION_COUNT: - return True # 检查用户是否正在提问 active = await MongoDB().get_collection("activity").find_one( diff --git a/apps/services/task.py b/apps/services/task.py index ad20f51f8ce38bf0d784e47bf10c6e237e9ec3a0..38022f4000527c2c8eb95a7d88090e6149820399 100644 --- a/apps/services/task.py +++ b/apps/services/task.py @@ -138,19 +138,12 @@ class TaskManager: """保存flow信息到flow_context""" flow_context_collection = MongoDB().get_collection("flow_context") try: - for history in flow_context: - # 查找是否存在 - current_context = await flow_context_collection.find_one({ - "task_id": task_id, - "_id": history.id, - }) - if current_context: - await flow_context_collection.update_one( - {"_id": current_context["_id"]}, - {"$set": history.model_dump(exclude_none=True, by_alias=True)}, - ) - else: - await flow_context_collection.insert_one(history.model_dump(exclude_none=True, by_alias=True)) + # 删除旧的flow_context + await flow_context_collection.delete_many({"task_id": task_id}) + await flow_context_collection.insert_many( + [history.model_dump(exclude_none=True, by_alias=True) for history in flow_context], + ordered=False, + ) except Exception: logger.exception("[TaskManager] 保存flow执行记录失败")