diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py index de675016b98d995aae0307314de3478875d6578b..467022ae11e334e18352ba46af1587466d0b7bd0 100644 --- a/apps/scheduler/pool/loader/app.py +++ b/apps/scheduler/pool/loader/app.py @@ -26,7 +26,7 @@ from apps.scheduler.pool.loader.metadata import MetadataLoader class AppLoader: """应用加载器""" - async def load(self, app_id: str) -> None: + async def load(self, app_id: str, hashes: dict[str, str]) -> None: """从文件系统中加载应用 :param app_id: 应用 ID @@ -34,8 +34,9 @@ class AppLoader: metadata_path = Path(config["SEMANTICS_DIR"]) / APP_DIR / app_id / "metadata.yaml" metadata = await MetadataLoader().load_one(metadata_path) if not isinstance(metadata, AppMetadata): - err = f"元数据类型错误: {metadata_path}" + err = f"[AppLoader] 元数据类型错误: {metadata_path}" raise TypeError(err) + metadata.hashes = hashes await self._update_db(metadata) async def save(self, metadata: AppMetadata, app_id: str) -> None: @@ -56,7 +57,7 @@ class AppLoader: try: await app_collection.delete_one({"_id": app_id}) except Exception as e: - err = f"删除App失败:{e}" + err = f"[AppLoader] 删除App失败:{e}" LOGGER.error(err) session = await PostgreSQL.get_session() @@ -64,53 +65,74 @@ class AppLoader: await session.execute(delete(AppPoolVector).where(AppPoolVector.id == app_id)) await session.commit() except Exception as e: - err = f"删除数据库失败:{e}" + err = f"[AppLoader] 删除数据库失败:{e}" LOGGER.error(err) await session.aclose() async def _update_db(self, metadata: AppMetadata) -> None: """更新数据库""" - hashes = FileChecker().check_one(pathlib.Path(config["SEMANTICS_DIR"]) / APP_DIR / metadata.id) - app_collection = MongoDB.get_collection("app") - app_pool = AppPool( - _id=metadata.id, - icon=metadata.icon, - name=metadata.name, - description=metadata.description, - author=metadata.author, - links=metadata.links, - first_questions=metadata.first_questions, - history_len=metadata.history_len, - permission=metadata.permission if metadata.permission else Permission(), - hashes=hashes, - ) - if not await app_collection.find_one({"_id": metadata.id}): - await app_collection.insert_one(jsonable_encoder(app_pool)) - else: - await app_collection.update_one( - {"_id": metadata.id}, - {"$set": { - "icon": metadata.icon, - "name": metadata.name, - "description": metadata.description, - "author": metadata.author, - "links": metadata.links, - "first_questions": metadata.first_questions, - "history_len": metadata.history_len, - "permission": metadata.permission if metadata.permission else Permission(), - "hashes": hashes, - }}, - ) + if not metadata.hashes: + LOGGER.warning(f"[AppLoader] 应用 {metadata.id} 的哈希值为空") + # 重新计算哈希值 + metadata.hashes = FileChecker().check_one(pathlib.Path(config["SEMANTICS_DIR"]) / APP_DIR / metadata.id) + # 更新应用数据 + try: + app_collection = MongoDB.get_collection("app") + if not await app_collection.find_one({"_id": metadata.id}): + # 创建应用时需写入完整数据结构,自动初始化创建时间、flow列表、收藏列表和权限 + await app_collection.insert_one( + jsonable_encoder( + AppPool( + _id=metadata.id, + icon=metadata.icon, + name=metadata.name, + description=metadata.description, + author=metadata.author, + links=metadata.links, + first_questions=metadata.first_questions, + history_len=metadata.history_len, + permission=metadata.permission if metadata.permission else Permission(), + hashes=metadata.hashes, + ), + ), + ) + else: + # 更新应用数据:部分映射 AppMetadata 到 AppPool,其他字段不更新 + await app_collection.update_one( + {"_id": metadata.id}, + { + "$set": { + "icon": metadata.icon, + "name": metadata.name, + "description": metadata.description, + "author": metadata.author, + "links": metadata.links, + "first_questions": metadata.first_questions, + "history_len": metadata.history_len, + "permission": metadata.permission if metadata.permission else Permission(), + "hashes": metadata.hashes, + }, + }, + ) + except Exception as e: + err = f"[AppLoader] 更新 MongoDB 失败:{e}" + LOGGER.error(err) + raise RuntimeError(err) from e + # 向量化所有数据并保存 session = await PostgreSQL.get_session() service_embedding = await PostgreSQL.get_embedding([metadata.description]) - insert_stmt = insert(AppPoolVector).values( - id=metadata.id, - embedding=service_embedding[0], - ).on_conflict_do_update( - index_elements=["id"], - set_={"embedding": service_embedding[0]}, + insert_stmt = ( + insert(AppPoolVector) + .values( + id=metadata.id, + embedding=service_embedding[0], + ) + .on_conflict_do_update( + index_elements=["id"], + set_={"embedding": service_embedding[0]}, + ) ) await session.execute(insert_stmt) await session.commit() diff --git a/apps/scheduler/pool/loader/metadata.py b/apps/scheduler/pool/loader/metadata.py index edf0f02cfefef736070acc3ce71df3c46460e6dd..fa7485767ab65617255e5a08f24fd63491f97efd 100644 --- a/apps/scheduler/pool/loader/metadata.py +++ b/apps/scheduler/pool/loader/metadata.py @@ -7,6 +7,7 @@ from typing import Any, Optional, Union import yaml from anyio import Path +from fastapi.encoders import jsonable_encoder from apps.common.config import config from apps.constants import APP_DIR, LOGGER, SERVICE_DIR @@ -94,5 +95,5 @@ class MetadataLoader: else: data = metadata - yaml_data = yaml.safe_dump(data.model_dump(by_alias=True, exclude_none=True)) + yaml_data = yaml.safe_dump(jsonable_encoder(data)) await resource_path.write_text(yaml_data) diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index b30766093cb994b751479d1939152cdfcd01588a..3d63d2336401a2cd45e0e4b952adb7d6d33d0dac 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -7,13 +7,14 @@ import pathlib import ray from anyio import Path +from fastapi.encoders import jsonable_encoder from sqlalchemy import delete from sqlalchemy.dialects.postgresql import insert from apps.common.config import config from apps.constants import LOGGER -from apps.entities.flow import ServiceMetadata -from apps.entities.pool import NodePool +from apps.entities.flow import Permission, ServiceMetadata +from apps.entities.pool import NodePool, ServicePool from apps.entities.vector import NodePoolVector, ServicePoolVector from apps.models.mongo import MongoDB from apps.models.postgres import PostgreSQL @@ -32,7 +33,7 @@ class ServiceLoader: # 载入元数据 metadata = await MetadataLoader().load_one(service_path / "metadata.yaml") if not isinstance(metadata, ServiceMetadata): - err = f"元数据类型错误: {service_path / 'metadata.yaml'}" + err = f"[ServiceLoader] 元数据类型错误: {service_path / 'metadata.yaml'}" LOGGER.error(err) raise TypeError(err) metadata.hashes = hashes @@ -70,7 +71,7 @@ class ServiceLoader: await service_collection.delete_one({"_id": service_id}) await node_collection.delete_many({"service_id": service_id}) except Exception as e: - err = f"删除Service失败:{e}" + err = f"[ServiceLoader] 删除Service失败:{e}" LOGGER.error(err) session = await PostgreSQL.get_session() @@ -79,39 +80,56 @@ class ServiceLoader: await session.execute(delete(NodePoolVector).where(NodePoolVector.id == service_id)) await session.commit() except Exception as e: - err = f"删除数据库失败:{e}" + err = f"[ServiceLoader] 删除数据库失败:{e}" LOGGER.error(err) await session.aclose() async def _update_db(self, nodes: list[NodePool], metadata: ServiceMetadata) -> None: """更新数据库""" + if not metadata.hashes: + LOGGER.warning(f"[ServiceLoader] 服务 {metadata.id} 的哈希值为空") + # 重新计算哈希值 + metadata.hashes = FileChecker().check_one(pathlib.Path(config["SEMANTICS_DIR"]) / "service" / metadata.id) # 更新MongoDB service_collection = MongoDB.get_collection("service") node_collection = MongoDB.get_collection("node") try: - # 部分映射 ServiceMetadata 到 ServicePool, 忽略其他字段 - await service_collection.update_one( - {"_id": metadata.id}, - { - "$set": { - "name": metadata.name, - "description": metadata.description, - "author": metadata.author, - "permission": metadata.permission, - "hashes": metadata.hashes, + # 先删除旧的节点 + await node_collection.delete_many({"service_id": metadata.id}) + # 插入或更新 Service + if not await service_collection.find_one({"_id": metadata.id}): + # 创建服务时需写入完整数据结构,自动初始化创建时间、收藏列表和权限 + await service_collection.insert_one( + jsonable_encoder( + ServicePool( + _id=metadata.id, + name=metadata.name, + description=metadata.description, + author=metadata.author, + permission=metadata.permission if metadata.permission else Permission(), + hashes=metadata.hashes, + ), + ), + ) + else: + # 更新服务数据:部分映射 ServiceMetadata 到 ServicePool,其他字段不更新 + await service_collection.update_one( + {"_id": metadata.id}, + { + "$set": { + "name": metadata.name, + "description": metadata.description, + "author": metadata.author, + "permission": metadata.permission if metadata.permission else Permission(), + "hashes": metadata.hashes, + }, }, - }, - upsert=True, - ) - for node in nodes: - await node_collection.update_one( - {"_id": node.id}, - {"$set": node.model_dump(exclude_none=True, by_alias=True)}, - upsert=True, ) + for node in nodes: + await node_collection.insert_one(jsonable_encoder(node)) except Exception as e: - err = f"更新MongoDB失败:{e}" + err = f"[ServiceLoader] 更新 MongoDB 失败:{e}" LOGGER.error(err) raise RuntimeError(err) from e