From cd0e9c2a4c3d7b063ed8cdd827f4e4b545b82443 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Thu, 23 Oct 2025 17:32:27 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E5=8E=BB=E6=8E=89=E5=BA=9F=E5=BC=83?= =?UTF-8?q?=E7=9A=84User=E8=A1=A8=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/models/user.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apps/models/user.py b/apps/models/user.py index 11a24b7c0..bd720c4f8 100644 --- a/apps/models/user.py +++ b/apps/models/user.py @@ -35,10 +35,6 @@ class User(Base): String(100), default_factory=lambda: sha256(str(uuid.uuid4()).encode()).hexdigest()[:16], nullable=False, ) """用户个人令牌""" - functionLLM: Mapped[str | None] = mapped_column(String(255), default=None, nullable=True) # noqa: N815 - """用户选择的函数模型ID""" - embeddingLLM: Mapped[str | None] = mapped_column(String(255), default=None, nullable=True) # noqa: N815 - """用户选择的向量模型ID""" autoExecute: Mapped[bool | None] = mapped_column(Boolean, default=False, nullable=True) # noqa: N815 """Agent是否自动执行""" -- Gitee From f207e79832f935cf05025d01c843a4a7828b0bb6 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Thu, 23 Oct 2025 17:32:49 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E4=BF=AE=E6=AD=A3API=E8=B7=AF=E5=BE=84?= =?UTF-8?q?=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/routers/user.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/routers/user.py b/apps/routers/user.py index 67fa0b347..2554f9139 100644 --- a/apps/routers/user.py +++ b/apps/routers/user.py @@ -19,7 +19,7 @@ router = APIRouter( ) -@router.post("/user", response_model=ResponseData) +@router.post("", response_model=ResponseData) async def update_user_info(request: Request, data: UserUpdateRequest) -> JSONResponse: """POST /auth/user: 更新当前用户信息""" # 更新用户信息 -- Gitee From 8e5339867f9d6a1ecce1e68175aa508b71f7e555 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Thu, 23 Oct 2025 17:33:31 +0800 Subject: [PATCH 3/6] =?UTF-8?q?Chat=E6=8E=A5=E5=8F=A3=E4=B8=8D=E9=9C=80?= =?UTF-8?q?=E8=A6=81=E5=9C=A8=E8=AF=B7=E6=B1=82=E6=97=B6=E5=8F=91=E9=80=81?= =?UTF-8?q?task=5Fid?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/schemas/request_data.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apps/schemas/request_data.py b/apps/schemas/request_data.py index 5c7ee1ff4..6d0657fb5 100644 --- a/apps/schemas/request_data.py +++ b/apps/schemas/request_data.py @@ -2,7 +2,6 @@ """FastAPI 请求体""" import uuid -from datetime import datetime from typing import Any from pydantic import BaseModel, Field @@ -30,7 +29,6 @@ class RequestData(BaseModel): ) language: LanguageType = Field(default=LanguageType.CHINESE, description="语言") app: RequestDataApp | None = Field(default=None, description="应用") - task_id: str | None = Field(default=None, alias="taskId", description="任务ID") llm_id: str = Field(alias="llmId", description="大模型ID") kb_ids: list[uuid.UUID] = Field(default=[], description="知识库ID列表") @@ -80,5 +78,3 @@ class UserUpdateRequest(BaseModel): user_name: str | None = Field(default=None, description="用户名", alias="userName") auto_execute: bool = Field(default=False, description="是否自动执行", alias="autoExecute") - agreement_confirmed: bool | None = Field(default=None, description="协议确认状态", alias="agreementConfirmed") - last_login: datetime | None = Field(default=None, description="最后一次登录时间", alias="lastLogin") -- Gitee From cb0b5347ea72abf269cc557fe601905142e946ac Mon Sep 17 00:00:00 2001 From: z30057876 Date: Thu, 23 Oct 2025 17:35:25 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E7=94=B1=E4=BA=8E=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E5=8D=95=E4=BE=8B=EF=BC=8C=E4=B8=8D=E9=9C=80=E8=A6=81=E5=86=8D?= =?UTF-8?q?=E6=A3=80=E6=9F=A5function=E5=92=8Cembedding=E6=98=AF=E5=90=A6?= =?UTF-8?q?=E4=B8=BANone?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/mcp/host.py | 6 ------ apps/scheduler/mcp/plan.py | 5 ----- apps/scheduler/mcp/select.py | 19 ++++--------------- apps/scheduler/mcp_agent/host.py | 5 ----- 4 files changed, 4 insertions(+), 31 deletions(-) diff --git a/apps/scheduler/mcp/host.py b/apps/scheduler/mcp/host.py index 9793cbfbf..4caf52e69 100644 --- a/apps/scheduler/mcp/host.py +++ b/apps/scheduler/mcp/host.py @@ -102,12 +102,6 @@ class MCPHost: async def _fill_params(self, tool: MCPTools, query: str) -> dict[str, Any]: """填充工具参数""" - if not self._llm.function: - err = "[MCPHost] 未设置FunctionCall模型" - logger.error(err) - raise RuntimeError(err) - - # 更清晰的输入·指令,这样可以调用generate llm_query = rf""" 请使用参数生成工具,生成满足以下目标的工具参数: diff --git a/apps/scheduler/mcp/plan.py b/apps/scheduler/mcp/plan.py index 7dbb9facc..70e6b9963 100644 --- a/apps/scheduler/mcp/plan.py +++ b/apps/scheduler/mcp/plan.py @@ -65,11 +65,6 @@ class MCPPlanner: async def _parse_plan_result(self, result: str, max_steps: int) -> MCPPlan: """将推理结果解析为结构化数据""" - if not self._llm.function: - err = "[MCPPlanner] 未设置Function模型" - _logger.error(err) - raise RuntimeError(err) - # 构造标准 OpenAI Function 格式 schema = MCPPlan.model_json_schema() schema["properties"]["plans"]["maxItems"] = max_steps diff --git a/apps/scheduler/mcp/select.py b/apps/scheduler/mcp/select.py index dc32a0f5d..ee3bffe31 100644 --- a/apps/scheduler/mcp/select.py +++ b/apps/scheduler/mcp/select.py @@ -6,7 +6,7 @@ import logging from sqlalchemy import select from apps.common.postgres import postgres -from apps.llm import LLMConfig, json_generator +from apps.llm import LLMConfig, embedding, json_generator from apps.models import LanguageType, MCPTools from apps.schemas.mcp import MCPSelectResult from apps.services.mcp_service import MCPServiceManager @@ -39,12 +39,6 @@ class MCPSelector: async def _call_function_mcp(self, reasoning_result: str, mcp_ids: list[str]) -> MCPSelectResult: """调用结构化输出小模型提取JSON""" - if not self._llm.function: - err = "[MCPSelector] 未设置Function模型" - logger.error(err) - raise RuntimeError(err) - - logger.info("[MCPSelector] 调用结构化输出小模型") schema = MCPSelectResult.model_json_schema() # schema中加入选项 schema["properties"]["mcp_id"]["enum"] = mcp_ids @@ -79,16 +73,11 @@ class MCPSelector: async def select_top_tool(self, query: str, mcp_list: list[str], top_n: int = 10) -> list[MCPTools]: """选择最合适的工具""" - if not self._llm.embedding: - err = "[MCPSelector] 未设置Embedding模型" - logger.error(err) - raise RuntimeError(err) - - query_embedding = await self._llm.embedding.get_embedding([query]) + query_embedding = await embedding.get_embedding([query]) async with postgres.session() as session: tool_vecs = await session.scalars( - select(self._llm.embedding.MCPToolVector).where(self._llm.embedding.MCPToolVector.mcpId.in_(mcp_list)) - .order_by(self._llm.embedding.MCPToolVector.embedding.cosine_distance(query_embedding)).limit(top_n), + select(embedding.MCPToolVector).where(embedding.MCPToolVector.mcpId.in_(mcp_list)) + .order_by(embedding.MCPToolVector.embedding.cosine_distance(query_embedding)).limit(top_n), ) # 拿到工具 diff --git a/apps/scheduler/mcp_agent/host.py b/apps/scheduler/mcp_agent/host.py index 1b346e583..a4b30e5f6 100644 --- a/apps/scheduler/mcp_agent/host.py +++ b/apps/scheduler/mcp_agent/host.py @@ -75,11 +75,6 @@ class MCPHost(MCPBase): params: dict[str, Any] | None = None, params_description: str = "", ) -> dict[str, Any]: - if not self._llm.function: - err = "[MCPHost] 未找到函数调用模型" - _logger.error(err) - raise RuntimeError(err) - llm_query = _LLM_QUERY_FIX[language] prompt = _env.from_string(REPAIR_PARAMS[language]).render( tool_name=mcp_tool.toolName, -- Gitee From e1bbabac7503217e1eae09a045a3e38ee955b308 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Thu, 23 Oct 2025 17:36:18 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E8=A1=A5=E5=85=85Loader=E4=B8=AD=E9=81=97?= =?UTF-8?q?=E6=BC=8F=E7=9A=84=E5=87=BD=E6=95=B0=EF=BC=9BLoader=E4=BD=BF?= =?UTF-8?q?=E7=94=A8embedding=E5=8D=95=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/pool/loader/call.py | 34 ++++++++---- apps/scheduler/pool/loader/flow.py | 79 +++++++++++++++++++++------ apps/scheduler/pool/loader/mcp.py | 48 ++++++++-------- apps/scheduler/pool/loader/service.py | 39 ++++++------- apps/scheduler/pool/pool.py | 10 ++-- apps/services/settings.py | 2 +- 6 files changed, 133 insertions(+), 79 deletions(-) diff --git a/apps/scheduler/pool/loader/call.py b/apps/scheduler/pool/loader/call.py index f6bc3bdfb..8220960a6 100644 --- a/apps/scheduler/pool/loader/call.py +++ b/apps/scheduler/pool/loader/call.py @@ -4,16 +4,24 @@ import importlib import logging -from sqlalchemy import delete +from sqlalchemy import delete, inspect from apps.common.postgres import postgres -from apps.llm import Embedding +from apps.llm import embedding from apps.models import NodeInfo from apps.schemas.scheduler import CallInfo _logger = logging.getLogger(__name__) +async def _table_exists(table_name: str) -> bool: + """检查表是否存在""" + async with postgres.engine.connect() as conn: + return await conn.run_sync( + lambda sync_conn: inspect(sync_conn).has_table(table_name), + ) + + class CallLoader: """Call 加载器""" @@ -58,29 +66,35 @@ class CallLoader: await session.commit() - async def _add_vector_to_db( - self, call_metadata: dict[str, CallInfo], embedding_model: Embedding, - ) -> None: + async def _add_vector_to_db(self, call_metadata: dict[str, CallInfo]) -> None: """将向量化数据存入数据库""" + # 检查表是否存在 + if not await _table_exists(embedding.NodePoolVector.__tablename__): + _logger.warning( + "表 %s 不存在,跳过向量数据插入", + embedding.NodePoolVector.__tablename__, + ) + return + async with postgres.session() as session: # 删除旧数据 await session.execute( - delete(embedding_model.NodePoolVector).where(embedding_model.NodePoolVector.serviceId == None), # noqa: E711 + delete(embedding.NodePoolVector).where(embedding.NodePoolVector.serviceId == None), # noqa: E711 ) - call_vecs = await embedding_model.get_embedding([call.description for call in call_metadata.values()]) + call_vecs = await embedding.get_embedding([call.description for call in call_metadata.values()]) for call_id, vec in zip(call_metadata.keys(), call_vecs, strict=True): - session.add(embedding_model.NodePoolVector( + session.add(embedding.NodePoolVector( id=call_id, embedding=vec, )) await session.commit() - async def set_vector(self, embedding_model: Embedding) -> None: + async def set_vector(self) -> None: """将向量化数据存入数据库""" call_metadata = await self._load_system_call() - await self._add_vector_to_db(call_metadata, embedding_model) + await self._add_vector_to_db(call_metadata) async def load(self) -> None: diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index f410d8ea6..c8fa6b739 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -13,7 +13,7 @@ from sqlalchemy import and_, delete, func, select from apps.common.config import config from apps.common.postgres import postgres -from apps.llm import Embedding +from apps.llm import embedding from apps.models import App, AppHashes from apps.models import Flow as FlowInfo from apps.scheduler.util import yaml_enum_presenter, yaml_str_presenter @@ -28,6 +28,14 @@ BASE_PATH = Path(config.deploy.data_dir) / "semantics" / "app" class FlowLoader: """工作流加载器""" + @staticmethod + async def _load_all_flows() -> list[FlowInfo]: + """从数据库加载所有工作流""" + async with postgres.session() as session: + # 查询所有工作流 + flows_query = select(FlowInfo) + return list((await session.scalars(flows_query)).all()) + async def _load_yaml_file(self, flow_path: Path) -> dict[str, Any]: """从YAML文件加载工作流配置""" try: @@ -145,6 +153,8 @@ class FlowLoader: debug=flow_config.checkStatus.debug, ), ) + # 重新向量化该App的所有工作流 + await self._update_vector(app_id) return Flow.model_validate(flow_yaml) @@ -177,9 +187,11 @@ class FlowLoader: debug=flow.checkStatus.debug, ), ) + # 重新向量化该App的所有工作流 + await self._update_vector(app_id) - async def delete(self, app_id: uuid.UUID, flow_id: str, embedding_model: Embedding | None = None) -> None: + async def delete(self, app_id: uuid.UUID, flow_id: str) -> None: """删除指定工作流文件""" flow_path = BASE_PATH / str(app_id) / "flow" / f"{flow_id}.yaml" # 确保目标为文件且存在 @@ -194,10 +206,9 @@ class FlowLoader: FlowInfo.id == flow_id, ), )) - if embedding_model: - await session.execute( - delete(embedding_model.FlowPoolVector).where(embedding_model.FlowPoolVector.id == flow_id), - ) + await session.execute( + delete(embedding.FlowPoolVector).where(embedding.FlowPoolVector.id == flow_id), + ) await session.commit() return logger.warning("[FlowLoader] 工作流文件不存在或不是文件:%s", flow_path) @@ -239,18 +250,54 @@ class FlowLoader: session.add(flow_hash) await session.commit() - async def _update_vector(self, app_id: uuid.UUID, metadata: FlowInfo, embedding_model: Embedding) -> None: - """将向量化数据存入数据库""" + async def _update_vector(self, app_id: uuid.UUID) -> None: + """重新向量化指定App的所有工作流""" + # 从数据库加载该App的所有工作流 + async with postgres.session() as session: + flows_query = select(FlowInfo).where(FlowInfo.appId == app_id) + flows = list((await session.scalars(flows_query)).all()) + + if not flows: + logger.warning("[FlowLoader] App %s 没有工作流,跳过向量化", app_id) + return + + # 获取所有工作流的描述并生成向量 + flow_descriptions = [flow.description for flow in flows] + flow_vecs = await embedding.get_embedding(flow_descriptions) + async with postgres.session() as session: + # 删除该App的所有旧向量数据 await session.execute( - delete(embedding_model.FlowPoolVector).where(embedding_model.FlowPoolVector.id == metadata.id), + delete(embedding.FlowPoolVector).where(embedding.FlowPoolVector.appId == app_id), ) - # 获取向量数据 - service_embedding = await embedding_model.get_embedding([metadata.description]) - session.add(embedding_model.FlowPoolVector( - id=metadata.id, - appId=app_id, - embedding=service_embedding[0], - )) + # 插入新的向量数据 + for flow, vec in zip(flows, flow_vecs, strict=True): + session.add(embedding.FlowPoolVector( + id=flow.id, + appId=app_id, + embedding=vec, + )) await session.commit() + + @staticmethod + async def set_vector() -> None: + """将所有工作流的向量化数据存入数据库""" + flows = await FlowLoader._load_all_flows() + + # 为每个工作流更新向量数据 + for flow in flows: + flow_vecs = await embedding.get_embedding([flow.description]) + + async with postgres.session() as session: + # 删除旧数据 + await session.execute( + delete(embedding.FlowPoolVector).where(embedding.FlowPoolVector.id == flow.id), + ) + # 插入新数据 + session.add(embedding.FlowPoolVector( + id=flow.id, + appId=flow.appId, + embedding=flow_vecs[0], + )) + await session.commit() diff --git a/apps/scheduler/pool/loader/mcp.py b/apps/scheduler/pool/loader/mcp.py index 6935e361e..e3e219c8d 100644 --- a/apps/scheduler/pool/loader/mcp.py +++ b/apps/scheduler/pool/loader/mcp.py @@ -13,7 +13,7 @@ from sqlalchemy import and_, delete, select, update from apps.common.postgres import postgres from apps.common.process_handler import ProcessHandler from apps.constants import MCP_PATH -from apps.llm import Embedding +from apps.llm.embedding import embedding from apps.models.mcp import MCPActivated, MCPInfo, MCPInstallStatus, MCPTools, MCPType from apps.scheduler.pool.mcp.client import MCPClient from apps.scheduler.pool.mcp.install import install_npx, install_uvx @@ -330,30 +330,30 @@ class MCPLoader: @staticmethod - async def _insert_template_tool_vector(mcp_id: str, config: MCPServerConfig, embedding_model: Embedding) -> None: + async def _insert_template_tool_vector(mcp_id: str, config: MCPServerConfig) -> None: """插入MCP相关的向量数据""" # 获取工具列表 tool_list = await MCPLoader._get_template_tool(mcp_id, config) tool_desc_list = [tool.description for tool in tool_list] - mcp_embedding = await embedding_model.get_embedding([config.description]) - tool_embedding = await embedding_model.get_embedding(tool_desc_list) + mcp_embedding = await embedding.get_embedding([config.description]) + tool_embedding = await embedding.get_embedding(tool_desc_list) async with postgres.session() as session: # 删除旧数据 - await session.execute(delete(embedding_model.MCPVector).where(embedding_model.MCPVector.id == mcp_id)) + await session.execute(delete(embedding.MCPVector).where(embedding.MCPVector.id == mcp_id)) await session.execute( - delete(embedding_model.MCPToolVector).where(embedding_model.MCPToolVector.mcpId == mcp_id), + delete(embedding.MCPToolVector).where(embedding.MCPToolVector.mcpId == mcp_id), ) # 插入新数据 - session.add(embedding_model.MCPVector( + session.add(embedding.MCPVector( id=mcp_id, embedding=mcp_embedding[0], )) - for tool, embedding in zip(tool_list, tool_embedding, strict=True): - session.add(embedding_model.MCPToolVector( + for tool, emb in zip(tool_list, tool_embedding, strict=True): + session.add(embedding.MCPToolVector( id=tool.id, mcpId=mcp_id, - embedding=embedding, + embedding=emb, )) await session.commit() @@ -535,7 +535,7 @@ class MCPLoader: @staticmethod - async def remove_deleted_mcp(deleted_mcp_list: list[str], embedding_model: Embedding | None = None) -> None: + async def remove_deleted_mcp(deleted_mcp_list: list[str]) -> None: """ 删除无效的MCP在数据库中的记录 @@ -558,17 +558,16 @@ class MCPLoader: logger.info("[MCPLoader] 清除数据库中无效的MCP") # 删除MCP的向量化数据 - if embedding_model: - async with postgres.session() as session: - for mcp_id in deleted_mcp_list: - await session.execute( - delete(embedding_model.MCPVector).where(embedding_model.MCPVector.id == mcp_id), - ) - await session.execute( - delete(embedding_model.MCPToolVector).where(embedding_model.MCPToolVector.mcpId == mcp_id), - ) - await session.commit() - logger.info("[MCPLoader] 清除数据库中无效的MCP向量化数据") + async with postgres.session() as session: + for mcp_id in deleted_mcp_list: + await session.execute( + delete(embedding.MCPVector).where(embedding.MCPVector.id == mcp_id), + ) + await session.execute( + delete(embedding.MCPToolVector).where(embedding.MCPToolVector.mcpId == mcp_id), + ) + await session.commit() + logger.info("[MCPLoader] 清除数据库中无效的MCP向量化数据") @staticmethod @@ -653,11 +652,10 @@ class MCPLoader: @staticmethod - async def set_vector(embedding_model: Embedding) -> None: + async def set_vector() -> None: """ 将MCP工具描述进行向量化并存入数据库 - :param Embedding embedding_model: 嵌入模型 :return: 无 """ try: @@ -667,7 +665,7 @@ class MCPLoader: for mcp_id, config in mcp_configs.items(): try: # 进行向量化 - await MCPLoader._insert_template_tool_vector(mcp_id, config, embedding_model) + await MCPLoader._insert_template_tool_vector(mcp_id, config) logger.info("[MCPLoader] MCP工具向量化完成: %s", mcp_id) except Exception as e: # noqa: BLE001 logger.warning("[MCPLoader] MCP工具向量化失败: %s, 错误: %s", mcp_id, str(e)) diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index 671d25f14..0caa01d68 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -10,7 +10,7 @@ from sqlalchemy import delete, select from apps.common.config import config from apps.common.postgres import postgres -from apps.llm import Embedding +from apps.llm import embedding from apps.models import ( NodeInfo, PermissionType, @@ -96,9 +96,7 @@ class ServiceLoader: @staticmethod - async def delete( - service_id: uuid.UUID, embedding_model: Embedding | None = None, *, is_reload: bool = False, - ) -> None: + async def delete(service_id: uuid.UUID, *, is_reload: bool = False) -> None: """删除Service,并更新数据库""" async with postgres.session() as session: await session.execute(delete(Service).where(Service.id == service_id)) @@ -106,15 +104,14 @@ class ServiceLoader: await session.execute(delete(ServiceACL).where(ServiceACL.serviceId == service_id)) await session.execute(delete(ServiceHashes).where(ServiceHashes.serviceId == service_id)) - if embedding_model: - await session.execute( - delete(embedding_model.ServicePoolVector).where(embedding_model.ServicePoolVector.id == service_id), - ) - await session.execute( - delete( - embedding_model.NodePoolVector, - ).where(embedding_model.NodePoolVector.serviceId == service_id), - ) + await session.execute( + delete(embedding.ServicePoolVector).where(embedding.ServicePoolVector.id == service_id), + ) + await session.execute( + delete( + embedding.NodePoolVector, + ).where(embedding.NodePoolVector.serviceId == service_id), + ) await session.commit() if not is_reload: @@ -163,7 +160,7 @@ class ServiceLoader: await session.commit() @staticmethod - async def set_vector(embedding_model: Embedding) -> None: + async def set_vector() -> None: """将所有服务和节点的向量化数据存入数据库""" service_nodes = await ServiceLoader._load_all_services() @@ -173,7 +170,6 @@ class ServiceLoader: nodes, service.id, service.description, - embedding_model, ) @staticmethod @@ -181,30 +177,29 @@ class ServiceLoader: nodes: list[NodeInfo], service_id: uuid.UUID, service_description: str, - embedding_model: Embedding, ) -> None: """更新向量数据""" - service_vecs = await embedding_model.get_embedding([service_description]) + service_vecs = await embedding.get_embedding([service_description]) node_descriptions = [] for node in nodes: node_descriptions += [node.description] - node_vecs = await embedding_model.get_embedding(node_descriptions) + node_vecs = await embedding.get_embedding(node_descriptions) async with postgres.session() as session: # 删除旧数据 await session.execute( - delete(embedding_model.ServicePoolVector).where(embedding_model.ServicePoolVector.id == service_id), + delete(embedding.ServicePoolVector).where(embedding.ServicePoolVector.id == service_id), ) await session.execute( - delete(embedding_model.NodePoolVector).where(embedding_model.NodePoolVector.serviceId == service_id), + delete(embedding.NodePoolVector).where(embedding.NodePoolVector.serviceId == service_id), ) # 插入新数据 - session.add(embedding_model.ServicePoolVector( + session.add(embedding.ServicePoolVector( id=service_id, embedding=service_vecs[0], )) for vec in node_vecs: - node_data = embedding_model.NodePoolVector( + node_data = embedding.NodePoolVector( id=node.id, serviceId=service_id, embedding=vec, diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index 35acfcf6d..3a7101973 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -11,7 +11,6 @@ from sqlalchemy import select from apps.common.config import config from apps.common.postgres import postgres -from apps.llm import Embedding from apps.models import Flow as FlowInfo from apps.schemas.enum_var import MetadataType from apps.schemas.flow import Flow @@ -135,12 +134,13 @@ class Pool: await self.mcp_loader.init() - async def set_vector(self, embedding_model: Embedding) -> None: + async def set_vector(self) -> None: """向数据库中写入向量化数据""" # 对所有的Loader进行向量化 - await self.call_loader.set_vector(embedding_model) - await self.service_loader.set_vector(embedding_model) - await self.mcp_loader.set_vector(embedding_model) + await self.call_loader.set_vector() + await self.service_loader.set_vector() + await self.mcp_loader.set_vector() + await self.flow_loader.set_vector() async def get_flow_metadata(self, app_id: uuid.UUID) -> list[FlowInfo]: diff --git a/apps/services/settings.py b/apps/services/settings.py index 594890a5c..87ddad229 100644 --- a/apps/services/settings.py +++ b/apps/services/settings.py @@ -58,7 +58,7 @@ class SettingsManager: await embedding.init(embedding_llm_config) # 触发向量化 - await pool.set_vector(embedding) + await pool.set_vector() logger.info("[SettingsManager] Embedding模型已更新,向量化过程已完成") else: -- Gitee From aa92bc343cce096ec5e3a047dc6838ba93c88b50 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Thu, 23 Oct 2025 17:36:32 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E5=8E=BB=E6=8E=89Init=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/scheduler/scheduler.py | 38 --------------------------- 1 file changed, 38 deletions(-) diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index c58744e51..3a9f4a176 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -97,39 +97,6 @@ class Scheduler: # LLM self.llm = await self._get_scheduler_llm(post_body.llm_id) - - async def _push_init_message( - self, context_num: int, *, is_flow: bool = False, - ) -> None: - """推送初始化消息""" - # 组装feature - if is_flow: - feature = InitContentFeature( - maxTokens=self.llm.reasoning.config.maxToken or 0, - contextNum=context_num, - enableFeedback=False, - enableRegenerate=False, - ) - else: - feature = InitContentFeature( - maxTokens=self.llm.reasoning.config.maxToken or 0, - contextNum=context_num, - enableFeedback=True, - enableRegenerate=True, - ) - - # 保存必要信息到Task - created_at = round(datetime.now(UTC).timestamp(), 3) - self.task.runtime.time = created_at - - # 推送初始化消息 - await self.queue.push_output( - self.task, - self.llm, - event_type=EventType.INIT.value, - data=InitContent(feature=feature, createdAt=created_at).model_dump(exclude_none=True, by_alias=True), - ) - async def _push_done_message(self) -> None: """推送任务完成消息""" _logger.info("[Scheduler] 发送结束消息") @@ -316,11 +283,6 @@ class Scheduler: async def get_top_flow(self) -> str: """获取Top1 Flow""" - if not self.llm.function: - err = "[Scheduler] 未设置Function模型" - _logger.error(err) - raise RuntimeError(err) - # 获取所选应用的所有Flow if not self.post_body.app or not self.post_body.app.app_id: err = "[Scheduler] 未选择应用" -- Gitee