diff --git a/apps/llm/function.py b/apps/llm/function.py
index 1f995fe7ba187cead03aa6fc62a4cbce1ec05a65..6165dc455aae038aeb1ad9bcd0840b418c3dc8e8 100644
--- a/apps/llm/function.py
+++ b/apps/llm/function.py
@@ -68,7 +68,6 @@ class FunctionLLM:
api_key=self._config.api_key,
)
-
async def _call_openai(
self,
messages: list[dict[str, str]],
@@ -123,7 +122,7 @@ class FunctionLLM:
},
]
- response = await self._client.chat.completions.create(**self._params) # type: ignore[arg-type]
+ response = await self._client.chat.completions.create(**self._params) # type: ignore[arg-type]
try:
logger.info("[FunctionCall] 大模型输出:%s", response.choices[0].message.tool_calls[0].function.arguments)
return response.choices[0].message.tool_calls[0].function.arguments
@@ -132,7 +131,6 @@ class FunctionLLM:
logger.info("[FunctionCall] 大模型输出:%s", ans)
return await FunctionLLM.process_response(ans)
-
@staticmethod
async def process_response(response: str) -> str:
"""处理大模型的输出"""
@@ -169,7 +167,6 @@ class FunctionLLM:
return json_str
-
async def _call_ollama(
self,
messages: list[dict[str, str]],
@@ -196,10 +193,9 @@ class FunctionLLM:
"format": schema,
})
- response = await self._client.chat(**self._params) # type: ignore[arg-type]
+ response = await self._client.chat(**self._params) # type: ignore[arg-type]
return await self.process_response(response.message.content or "")
-
async def call(
self,
messages: list[dict[str, Any]],
@@ -254,7 +250,6 @@ class JsonGenerator:
)
self._err_info = ""
-
async def _assemble_message(self) -> str:
"""组装消息"""
# 检查类型
@@ -275,13 +270,12 @@ class JsonGenerator:
"""单次尝试"""
prompt = await self._assemble_message()
messages = [
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": prompt},
+ {"role": "system", "content": prompt},
+ {"role": "user", "content": "please generate a JSON response based on the above information and schema."},
]
function = FunctionLLM()
return await function.call(messages, self._schema, max_tokens, temperature)
-
async def generate(self) -> dict[str, Any]:
"""生成JSON"""
Draft7Validator.check_schema(self._schema)
diff --git a/apps/main.py b/apps/main.py
index 547cb6810828582114b59d6e2d74c77b1d2450f2..99b7de0cd927f937abc95971cb878dd87e878abb 100644
--- a/apps/main.py
+++ b/apps/main.py
@@ -39,7 +39,7 @@ from apps.routers import (
parameter
)
from apps.scheduler.pool.pool import Pool
-
+logger = logging.getLogger(__name__)
# 定义FastAPI app
app = FastAPI(redoc_url=None)
# 定义FastAPI全局中间件
@@ -104,7 +104,7 @@ async def add_no_auth_user() -> None:
auto_execute=False
).model_dump(by_alias=True))
except Exception as e:
- logging.warning(f"添加无认证用户失败: {e}")
+ logger.error(f"[add_no_auth_user] 默认用户 {username} 已存在")
async def clear_user_activity() -> None:
diff --git a/apps/routers/chat.py b/apps/routers/chat.py
index 9a45c2d5af4fd8862373cb078d34bad2df0784d1..68a5b4af8096a3c82072131fcdce266da520a3f8 100644
--- a/apps/routers/chat.py
+++ b/apps/routers/chat.py
@@ -148,9 +148,9 @@ async def chat(
await UserBlacklistManager.change_blacklisted_users(user_sub, -10)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="question is blacklisted")
- # 限流检查
- if await Activity.is_active(user_sub):
- raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests")
+ # # 限流检查
+ # if await Activity.is_active(user_sub):
+ # raise HTTPException(status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests")
res = chat_generator(post_body, user_sub, session_id)
return StreamingResponse(
diff --git a/apps/scheduler/executor/agent.py b/apps/scheduler/executor/agent.py
index 632cf9e3819be5c4169d22817986b7750e64d11f..60394bd76cb11dccc25ab1370cdded4034ac1311 100644
--- a/apps/scheduler/executor/agent.py
+++ b/apps/scheduler/executor/agent.py
@@ -137,8 +137,9 @@ class MCPAgentExecutor(BaseExecutor):
if is_first:
# 获取第一个输入参数
tool_id = self.task.runtime.temporary_plans.plans[self.task.state.step_index].tool
+ step = self.task.runtime.temporary_plans.plans[self.task.state.step_index]
mcp_tool = self.tools[tool_id]
- self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, self.task.runtime.question, self.task)
+ self.task.state.current_input = await MCPHost._get_first_input_params(mcp_tool, step.instruction, self.task)
else:
# 获取后续输入参数
if isinstance(self.params, param):
@@ -271,10 +272,16 @@ class MCPAgentExecutor(BaseExecutor):
self.resoning_llm
)
await self.update_tokens()
+ error_message = MCPPlanner.change_err_message_to_description(
+ error_message=self.task.state.error_message,
+ tool=mcp_tool,
+ input_params=self.task.state.current_input,
+ reasoning_llm=self.resoning_llm
+ )
await self.push_message(
EventType.STEP_WAITING_FOR_PARAM,
data={
- "message": "当运行产生如下报错:\n" + self.task.state.error_message,
+ "message": error_message,
"params": params_with_null
}
)
@@ -297,7 +304,7 @@ class MCPAgentExecutor(BaseExecutor):
input_data={},
output_data={},
ex_data={
- "message": "当运行产生如下报错:\n" + self.task.state.error_message,
+ "message": error_message,
"params": params_with_null
}
)
diff --git a/apps/scheduler/mcp/prompt.py b/apps/scheduler/mcp/prompt.py
index b322fb0883e8ed935243389cb86066845a549631..d6ff72b4c2b8991fe07b95562641f3adfd1f8c92 100644
--- a/apps/scheduler/mcp/prompt.py
+++ b/apps/scheduler/mcp/prompt.py
@@ -233,8 +233,8 @@ FINAL_ANSWER = dedent(r"""
MEMORY_TEMPLATE = dedent(r"""
{% for ctx in context_list %}
- 第{{ loop.index }}步:{{ ctx.step_description }}
- - 调用工具 `{{ ctx.step_id }}`,并提供参数 `{{ ctx.input_data }}`
+ - 调用工具 `{{ ctx.step_name }}`,并提供参数 `{{ ctx.input_data|tojson }}`。
- 执行状态:{{ ctx.status }}
- - 得到数据:`{{ ctx.output_data }}`
+ - 得到数据:`{{ ctx.output_data|tojson }}`
{% endfor %}
""")
diff --git a/apps/scheduler/mcp_agent/base.py b/apps/scheduler/mcp_agent/base.py
new file mode 100644
index 0000000000000000000000000000000000000000..ac3829b418be54bfc5df6ae3d61db82fe0fc129e
--- /dev/null
+++ b/apps/scheduler/mcp_agent/base.py
@@ -0,0 +1,61 @@
+from typing import Any
+import json
+from jsonschema import validate
+import logging
+from apps.llm.function import JsonGenerator
+from apps.llm.reasoning import ReasoningLLM
+
+logger = logging.getLogger(__name__)
+
+
+class McpBase:
+ @staticmethod
+ async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
+ """获取推理结果"""
+ # 调用推理大模型
+ message = [
+ {"role": "system", "content": "You are a helpful assistant."},
+ {"role": "user", "content": prompt},
+ ]
+ result = ""
+ async for chunk in resoning_llm.call(
+ message,
+ streaming=False,
+ temperature=0.07,
+ result_only=True,
+ ):
+ result += chunk
+
+ return result
+
+ @staticmethod
+ async def _parse_result(result: str, schema: dict[str, Any], left_str: str = '{', right_str: str = '}') -> str:
+ """解析推理结果"""
+ left_index = result.find(left_str)
+ right_index = result.rfind(right_str)
+ flag = True
+ if left_str == -1 or right_str == -1:
+ flag = False
+
+ if left_index > right_index:
+ flag = False
+ if flag:
+ try:
+ tmp_js = json.loads(result[left_index:right_index + 1])
+ validate(instance=tmp_js, schema=schema)
+ except Exception as e:
+ logger.error("[McpBase] 解析结果失败: %s", e)
+ flag = False
+ if not flag:
+ json_generator = JsonGenerator(
+ "请提取下面内容中的json\n\n",
+ [
+ {"role": "system", "content": "You are a helpful assistant."},
+ {"role": "user", "content": "请提取下面内容中的json\n\n"+result},
+ ],
+ schema,
+ )
+ json_result = await json_generator.generate()
+ else:
+ json_result = json.loads(result[left_index:right_index + 1])
+ return json_result
diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py
index 2701477c4f212d006867cdd5447eb0a26ac7310c..85d992c8d5249299daff12d864ac47b9676ba2a2 100644
--- a/apps/scheduler/mcp_agent/host.py
+++ b/apps/scheduler/mcp_agent/host.py
@@ -24,12 +24,19 @@ logger = logging.getLogger(__name__)
_env = SandboxedEnvironment(
loader=BaseLoader,
- autoescape=True,
+ autoescape=False,
trim_blocks=True,
lstrip_blocks=True,
)
+def tojson_filter(value):
+ return json.dumps(value, ensure_ascii=False, separators=(',', ':'))
+
+
+_env.filters['tojson'] = tojson_filter
+
+
class MCPHost:
"""MCP宿主服务"""
diff --git a/apps/scheduler/mcp_agent/plan.py b/apps/scheduler/mcp_agent/plan.py
index 977dfda109207f69933bf5d81fbcf7d552a19f54..35c440309a4694e1c75967c40b08b6e8f59710a5 100644
--- a/apps/scheduler/mcp_agent/plan.py
+++ b/apps/scheduler/mcp_agent/plan.py
@@ -6,6 +6,7 @@ from jinja2.sandbox import SandboxedEnvironment
from apps.llm.reasoning import ReasoningLLM
from apps.llm.function import JsonGenerator
+from apps.scheduler.mcp_agent.base import McpBase
from apps.scheduler.mcp_agent.prompt import (
EVALUATE_GOAL,
GENERATE_FLOW_NAME,
@@ -14,6 +15,7 @@ from apps.scheduler.mcp_agent.prompt import (
RECREATE_PLAN,
RISK_EVALUATE,
TOOL_EXECUTE_ERROR_TYPE_ANALYSIS,
+ CHANGE_ERROR_MESSAGE_TO_DESCRIPTION,
GET_MISSING_PARAMS,
FINAL_ANSWER
)
@@ -35,40 +37,8 @@ _env = SandboxedEnvironment(
)
-class MCPPlanner:
+class MCPPlanner(McpBase):
"""MCP 用户目标拆解与规划"""
- @staticmethod
- async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
- """获取推理结果"""
- # 调用推理大模型
- message = [
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": prompt},
- ]
- result = ""
- async for chunk in resoning_llm.call(
- message,
- streaming=False,
- temperature=0.07,
- result_only=True,
- ):
- result += chunk
-
- return result
-
- @staticmethod
- async def _parse_result(result: str, schema: dict[str, Any]) -> str:
- """解析推理结果"""
- json_generator = JsonGenerator(
- result,
- [
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": "请提取下面内容中的json\n\n"+result},
- ],
- schema,
- )
- json_result = await json_generator.generate()
- return json_result
@staticmethod
async def evaluate_goal(
@@ -266,6 +236,22 @@ class MCPPlanner:
# 返回工具执行错误类型
return error_type
+ @staticmethod
+ async def change_err_message_to_description(
+ error_message: str, tool: MCPTool, input_params: dict[str, Any],
+ reasoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
+ """将错误信息转换为工具描述"""
+ template = _env.from_string(CHANGE_ERROR_MESSAGE_TO_DESCRIPTION)
+ prompt = template.render(
+ error_message=error_message,
+ tool_name=tool.name,
+ tool_description=tool.description,
+ input_schema=tool.input_schema,
+ input_params=input_params,
+ )
+ result = await MCPPlanner.get_resoning_result(prompt, reasoning_llm)
+ return result
+
@staticmethod
async def get_missing_param(
tool: MCPTool,
diff --git a/apps/scheduler/mcp_agent/prompt.py b/apps/scheduler/mcp_agent/prompt.py
index 139e8e3720be24fe56af4ff8341dd5ae9095f4fc..365179f6e5c763fcd440b8e4260400ee0e7229d3 100644
--- a/apps/scheduler/mcp_agent/prompt.py
+++ b/apps/scheduler/mcp_agent/prompt.py
@@ -70,6 +70,7 @@ TOOL_SELECT = dedent(r"""
2. 请在给定的MCP工具列表中选择,不要自己生成MCP工具。
3. 可以选择一些辅助工具,但必须确保这些工具与当前目标相关。
4. 注意,返回的工具ID必须是MCP工具的ID,而不是名称。
+ 5. 不要选择不存在的工具。
必须按照以下格式生成选择结果,不要输出任何其他内容:
```json
{
@@ -616,6 +617,71 @@ TOOL_EXECUTE_ERROR_TYPE_ANALYSIS = dedent(r"""
# 输出
"""
)
+# 将当前程序运行的报错转换为自然语言
+CHANGE_ERROR_MESSAGE_TO_DESCRIPTION = dedent(r"""
+ 你是一个智能助手,你的任务是将当前程序运行的报错转换为自然语言描述。
+ 请根据以下规则进行转换:
+ 1. 将报错信息转换为自然语言描述,描述应该简洁明了,能够让人理解报错的原因和影响。
+ 2. 描述应该包含报错的具体内容和可能的解决方案。
+ 3. 描述应该避免使用过于专业的术语,以便用户能够理解。
+ 4. 描述应该尽量简短,控制在50字以内。
+ 5. 只输出自然语言描述,不要输出其他内容。
+ # 样例
+ # 工具信息
+
+ port_scanner
+ 扫描主机端口
+
+ {
+ "type": "object",
+ "properties": {
+ "host": {
+ "type": "string",
+ "description": "主机地址"
+ },
+ "port": {
+ "type": "integer",
+ "description": "端口号"
+ },
+ "username": {
+ "type": "string",
+ "description": "用户名"
+ },
+ "password": {
+ "type": "string",
+ "description": "密码"
+ }
+ },
+ "required": ["host", "port", "username", "password"]
+ }
+
+
+ # 工具入参
+ {
+ "host": "192.0.0.1",
+ "port": 3306,
+ "username": "root",
+ "password": "password"
+ }
+ # 报错信息
+ 执行端口扫描命令时,出现了错误:`password is not correct`。
+ # 输出
+ 扫描端口时发生错误:密码不正确。请检查输入的密码是否正确,并重试。
+ # 现在开始转换报错信息:
+ # 工具信息
+
+ {{tool_name}}
+ {{tool_description}}
+
+ {{input_schema}}
+
+
+ # 工具入参
+ {{input_params}}
+ # 报错信息
+ {{error_message}}
+ # 输出
+ """)
# 获取缺失的参数的json结构体
GET_MISSING_PARAMS = dedent(r"""
你是一个工具参数获取器。
diff --git a/apps/scheduler/mcp_agent/select.py b/apps/scheduler/mcp_agent/select.py
index 0ae54dca92eb98cc32ab9ffad3ef260b225ed048..075e08f00cb2b5a2976d5372c623193b04c42412 100644
--- a/apps/scheduler/mcp_agent/select.py
+++ b/apps/scheduler/mcp_agent/select.py
@@ -15,6 +15,7 @@ from apps.llm.embedding import Embedding
from apps.llm.function import FunctionLLM
from apps.llm.reasoning import ReasoningLLM
from apps.llm.token import TokenCalculator
+from apps.scheduler.mcp_agent.base import McpBase
from apps.scheduler.mcp_agent.prompt import TOOL_SELECT
from apps.schemas.mcp import (
BaseModel,
@@ -37,40 +38,8 @@ FINAL_TOOL_ID = "FIANL"
SUMMARIZE_TOOL_ID = "SUMMARIZE"
-class MCPSelector:
+class MCPSelector(McpBase):
"""MCP选择器"""
- @staticmethod
- async def get_resoning_result(prompt: str, resoning_llm: ReasoningLLM = ReasoningLLM()) -> str:
- """获取推理结果"""
- # 调用推理大模型
- message = [
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": prompt},
- ]
- result = ""
- async for chunk in resoning_llm.call(
- message,
- streaming=False,
- temperature=0.07,
- result_only=True,
- ):
- result += chunk
-
- return result
-
- @staticmethod
- async def _parse_result(result: str, schema: dict[str, Any]) -> str:
- """解析推理结果"""
- json_generator = JsonGenerator(
- result,
- [
- {"role": "system", "content": "You are a helpful assistant."},
- {"role": "user", "content": "请提取下面内容中的json\n\n"+result},
- ],
- schema,
- )
- json_result = await json_generator.generate()
- return json_result
@staticmethod
async def select_top_tool(
diff --git a/apps/services/activity.py b/apps/services/activity.py
index 88142b9ee6772319c0eaf32dfdd310e968e32adc..df322cc8792c477bed9d5712e968a6512e38ea84 100644
--- a/apps/services/activity.py
+++ b/apps/services/activity.py
@@ -22,14 +22,6 @@ class Activity:
:param user_sub: 用户实体ID
:return: 判断结果,正在提问则返回True
"""
- time = round(datetime.now(UTC).timestamp(), 3)
-
- # 检查窗口内总请求数
- count = await MongoDB().get_collection("activity").count_documents(
- {"timestamp": {"$gte": time - SLIDE_WINDOW_TIME, "$lte": time}},
- )
- if count >= SLIDE_WINDOW_QUESTION_COUNT:
- return True
# 检查用户是否正在提问
active = await MongoDB().get_collection("activity").find_one(
diff --git a/apps/services/task.py b/apps/services/task.py
index ad20f51f8ce38bf0d784e47bf10c6e237e9ec3a0..38022f4000527c2c8eb95a7d88090e6149820399 100644
--- a/apps/services/task.py
+++ b/apps/services/task.py
@@ -138,19 +138,12 @@ class TaskManager:
"""保存flow信息到flow_context"""
flow_context_collection = MongoDB().get_collection("flow_context")
try:
- for history in flow_context:
- # 查找是否存在
- current_context = await flow_context_collection.find_one({
- "task_id": task_id,
- "_id": history.id,
- })
- if current_context:
- await flow_context_collection.update_one(
- {"_id": current_context["_id"]},
- {"$set": history.model_dump(exclude_none=True, by_alias=True)},
- )
- else:
- await flow_context_collection.insert_one(history.model_dump(exclude_none=True, by_alias=True))
+ # 删除旧的flow_context
+ await flow_context_collection.delete_many({"task_id": task_id})
+ await flow_context_collection.insert_many(
+ [history.model_dump(exclude_none=True, by_alias=True) for history in flow_context],
+ ordered=False,
+ )
except Exception:
logger.exception("[TaskManager] 保存flow执行记录失败")