From afcfbc4ba55716bb7e15513c229fb7086b2bd499 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E9=B8=BF=E5=AE=87?= Date: Sun, 23 Feb 2025 19:09:03 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20AppLoader=20=E4=BF=9D=E5=AD=98?= =?UTF-8?q?=E8=87=B3=E6=95=B0=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 史鸿宇 --- apps/manager/appcenter.py | 42 ++-------------- apps/scheduler/pool/loader/app.py | 80 ++++++++++++++++++++++--------- 2 files changed, 61 insertions(+), 61 deletions(-) diff --git a/apps/manager/appcenter.py b/apps/manager/appcenter.py index 55f56dfc..98544fc2 100644 --- a/apps/manager/appcenter.py +++ b/apps/manager/appcenter.py @@ -8,8 +8,6 @@ from datetime import datetime, timezone from enum import Enum from typing import Any, Optional -from fastapi.encoders import jsonable_encoder - from apps.constants import LOGGER from apps.entities.appcenter import AppCenterCardItem, AppData from apps.entities.collection import User @@ -204,20 +202,6 @@ class AppCenterManager: :return: 应用ID """ app_id = str(uuid.uuid4()) - app = AppPool( - _id=app_id, - name=data.name, - description=data.description, - author=user_sub, - icon=data.icon, - links=data.links, - first_questions=data.first_questions, - history_len=data.history_len, - permission=Permission( - type=data.permission.type, - users=data.permission.users or [], - ), - ) metadata = AppMetadata( type=MetadataType.APP, id=app_id, @@ -235,10 +219,7 @@ class AppCenterManager: ), ) try: - app_hashes = await AppLoader.save(MetadataType.APP, metadata.model_dump(), app_id) - app.hashes = app_hashes - app_collection = MongoDB.get_collection("app") - await app_collection.insert_one(jsonable_encoder(app)) + await AppLoader.save(MetadataType.APP, metadata, app_id) return app_id except Exception as e: LOGGER.error(f"[AppCenterManager] Create app failed: {e}") @@ -273,25 +254,10 @@ class AppCenterManager: app_data = AppPool.model_validate(await app_collection.find_one({"_id": app_id})) if not app_data: return False - app_hashes = await AppLoader.save(MetadataType.APP, metadata.model_dump(), app_id) + await AppLoader.save(MetadataType.APP, metadata, app_id) # 如果工作流ID列表不一致,则需要取消发布状态 - published_false_needed = {flow.id for flow in app_data.flows} != set(data.workflows) - update_data = { - "name": data.name, - "description": data.description, - "icon": data.icon, - "links": data.links, - "first_questions": data.first_questions, - "history_len": data.history_len, - "permission": Permission( - type=data.permission.type, - users=data.permission.users or [], - ), - "hashes": app_hashes, - } - if published_false_needed: - update_data["published"] = False - await app_collection.update_one({"_id": app_id}, {"$set": jsonable_encoder(update_data)}) + if {flow.id for flow in app_data.flows} != set(data.workflows): + await app_collection.update_one({"_id": app_id}, {"$set": {"published": False}}) return True except Exception as e: LOGGER.error(f"[AppCenterManager] Update app failed: {e}") diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py index 89d7a4e5..470ad344 100644 --- a/apps/scheduler/pool/loader/app.py +++ b/apps/scheduler/pool/loader/app.py @@ -2,16 +2,21 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from hashlib import sha256 -from typing import Any + +import pathlib from anyio import Path +from fastapi.encoders import jsonable_encoder +from sqlalchemy.dialects.postgresql import insert from apps.common.config import config from apps.constants import APP_DIR -from apps.entities.flow import AppMetadata, MetadataType +from apps.entities.flow import AppMetadata, MetadataType, Permission from apps.entities.pool import AppPool +from apps.entities.vector import AppPoolVector from apps.models.mongo import MongoDB +from apps.models.postgres import PostgreSQL +from apps.scheduler.pool.check import FileChecker from apps.scheduler.pool.loader.metadata import MetadataLoader @@ -31,8 +36,28 @@ class AppLoader: if not isinstance(metadata, AppMetadata): err = f"元数据类型错误: {metadata_path}" raise TypeError(err) - metadata_hash = sha256(await metadata_path.read_bytes()).hexdigest() - hashes = { str(metadata_path.relative_to(cls._dir_path)): metadata_hash } + await cls._update_db(metadata) + + @classmethod + async def save( + cls, + metadata_type: MetadataType, + metadata: AppMetadata, + app_id: str, + ) -> None: + """保存应用 + + :param metadata_type: 元数据类型 + :param metadata: 元数据 + :param app_id: 应用 ID + """ + await MetadataLoader().save_one(metadata_type, metadata, app_id) + await cls._update_db(metadata) + + @classmethod + async def _update_db(cls, metadata: AppMetadata) -> None: + """更新数据库""" + hashes = FileChecker().check_one(pathlib.Path(str(cls._dir_path)) / APP_DIR / metadata.id) app_collection = MongoDB.get_collection("app") app_pool = AppPool( _id=metadata.id, @@ -40,29 +65,38 @@ class AppLoader: name=metadata.name, description=metadata.description, author=metadata.author, + links=metadata.links, + first_questions=metadata.first_questions, history_len=metadata.history_len, + permission=metadata.permission if metadata.permission else Permission(), hashes=hashes, ) if not await app_collection.find_one({"_id": metadata.id}): - await app_collection.insert_one(app_pool.model_dump(exclude_none=True, by_alias=True)) + await app_collection.insert_one(jsonable_encoder(app_pool)) else: await app_collection.update_one( {"_id": metadata.id}, - {"$set": app_pool.model_dump(exclude_none=True, by_alias=True)}, + {"$set": { + "icon": metadata.icon, + "name": metadata.name, + "description": metadata.description, + "author": metadata.author, + "links": metadata.links, + "first_questions": metadata.first_questions, + "history_len": metadata.history_len, + "permission": metadata.permission if metadata.permission else Permission(), + "hashes": hashes, + }}, ) - - - @classmethod - async def save(cls, metadata_type: MetadataType, metadata: dict[str, Any], app_id: str) -> dict[str, str]: - """保存应用 - - :param metadata_type: 元数据类型 - :param metadata: 元数据 - :param app_id: 应用 ID - - :return: 文件路径和哈希值 - """ - await MetadataLoader().save_one(metadata_type, metadata, app_id) - metadata_path = cls._dir_path / APP_DIR / app_id / "metadata.yaml" - metadata_hash = sha256(await metadata_path.read_bytes()).hexdigest() - return { str(metadata_path.relative_to(cls._dir_path)): metadata_hash } + # 向量化所有数据并保存 + session = await PostgreSQL.get_session() + service_embedding = await PostgreSQL.get_embedding([metadata.description]) + insert_stmt = insert(AppPoolVector).values( + id=metadata.id, + embedding=service_embedding[0], + ).on_conflict_do_update( + index_elements=["id"], + set_={"embedding": service_embedding[0]}, + ) + await session.execute(insert_stmt) + await session.commit() -- Gitee From 2bd73ebd1c8439872a06362c57f70275176ed6ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E9=B8=BF=E5=AE=87?= Date: Sun, 23 Feb 2025 19:11:12 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20clean=20code=20=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 史鸿宇 --- apps/entities/pool.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/apps/entities/pool.py b/apps/entities/pool.py index 0ca9b66d..45b3c3d6 100644 --- a/apps/entities/pool.py +++ b/apps/entities/pool.py @@ -2,6 +2,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ + from datetime import datetime, timezone from typing import Any, Optional @@ -77,9 +78,18 @@ class NodePool(BaseData): service_id: Optional[str] = Field(description="Node所属的Service ID", default=None) call_id: str = Field(description="所使用的Call的ID") annotation: Optional[str] = Field(description="Node的注释", default=None) - known_params: Optional[dict[str, Any]] = Field(description="已知的用于Call部分的参数,独立于输入和输出之外", default=None) - override_input: Optional[dict[str, Any]] = Field(description="Node的输入Schema;用于描述Call的参数中特定的字段", default=None) - override_output: Optional[dict[str, Any]] = Field(description="Node的输出Schema;用于描述Call的输出中特定的字段", default=None) + known_params: Optional[dict[str, Any]] = Field( + description="已知的用于Call部分的参数,独立于输入和输出之外", + default=None, + ) + override_input: Optional[dict[str, Any]] = Field( + description="Node的输入Schema;用于描述Call的参数中特定的字段", + default=None, + ) + override_output: Optional[dict[str, Any]] = Field( + description="Node的输出Schema;用于描述Call的输出中特定的字段", + default=None, + ) class AppFlow(BaseData): @@ -87,10 +97,10 @@ class AppFlow(BaseData): enabled: bool = Field(description="是否启用", default=True) path: str = Field(description="Flow的路径") - focus_point: PositionItem = Field( - description="Flow的视觉焦点", default=PositionItem(x=0, y=0)) + focus_point: PositionItem = Field(description="Flow的视觉焦点", default=PositionItem(x=0, y=0)) debug: bool = Field(description="调试是否成功", default=False) + class AppPool(BaseData): """应用信息 -- Gitee