From 7d783fb1bcbf332c5645e9f440011b75301e0e09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E9=B8=BF=E5=AE=87?= Date: Mon, 24 Feb 2025 17:50:19 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20=E5=AE=8C=E5=96=84=E4=BF=9D?= =?UTF-8?q?=E5=AD=98=20Service=20=E9=80=BB=E8=BE=91=EF=BC=8C=E5=BA=94?= =?UTF-8?q?=E7=94=A8=E4=B8=AD=E5=BF=83=E8=BF=81=E7=A7=BB=E8=87=B3=20AppLoa?= =?UTF-8?q?der?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 史鸿宇 --- apps/manager/appcenter.py | 36 +++++++----- apps/manager/service.py | 19 ++++--- apps/scheduler/pool/loader/app.py | 57 ++++++++++++------- apps/scheduler/pool/loader/service.py | 81 +++++++++++++-------------- 4 files changed, 110 insertions(+), 83 deletions(-) diff --git a/apps/manager/appcenter.py b/apps/manager/appcenter.py index 98544fc2..a683e0bd 100644 --- a/apps/manager/appcenter.py +++ b/apps/manager/appcenter.py @@ -8,6 +8,8 @@ from datetime import datetime, timezone from enum import Enum from typing import Any, Optional +import ray + from apps.constants import LOGGER from apps.entities.appcenter import AppCenterCardItem, AppData from apps.entities.collection import User @@ -219,7 +221,9 @@ class AppCenterManager: ), ) try: - await AppLoader.save(MetadataType.APP, metadata, app_id) + app_loader = AppLoader.remote() + await app_loader.save.remote(metadata, app_id) # type: ignore[attr-type] + ray.kill(app_loader) return app_id except Exception as e: LOGGER.error(f"[AppCenterManager] Create app failed: {e}") @@ -254,7 +258,9 @@ class AppCenterManager: app_data = AppPool.model_validate(await app_collection.find_one({"_id": app_id})) if not app_data: return False - await AppLoader.save(MetadataType.APP, metadata, app_id) + app_loader = AppLoader.remote() + await app_loader.save.remote(metadata, app_id) # type: ignore[attr-type] + ray.kill(app_loader) # 如果工作流ID列表不一致,则需要取消发布状态 if {flow.id for flow in app_data.flows} != set(data.workflows): await app_collection.update_one({"_id": app_id}, {"$set": {"published": False}}) @@ -327,17 +333,21 @@ class AppCenterManager: :return: 删除是否成功 """ try: - async with MongoDB.get_session() as session, await session.start_transaction(): - app_collection = MongoDB.get_collection("app") - await app_collection.delete_one({"_id": app_id}, session=session) - user_collection = MongoDB.get_collection("user") - await user_collection.update_one( - {"_id": user_sub}, - {"$unset": {f"app_usage.{app_id}": ""}}, - session=session, - ) - await session.commit_transaction() - return True + app_collection = MongoDB.get_collection("app") + app_data = AppPool.model_validate(await app_collection.find_one({"_id": app_id})) + if not app_data: + return False + if app_data.author != user_sub: + return False + app_loader = AppLoader.remote() + await app_loader.delete.remote(app_id) # type: ignore[attr-type] + ray.kill(app_loader) + user_collection = MongoDB.get_collection("user") + await user_collection.update_one( + {"_id": user_sub}, + {"$unset": {f"app_usage.{app_id}": ""}}, + ) + return True except Exception as e: LOGGER.error(f"[AppCenterManager] Delete app failed: {e}") return False diff --git a/apps/manager/service.py b/apps/manager/service.py index 142f1c24..5329ee64 100644 --- a/apps/manager/service.py +++ b/apps/manager/service.py @@ -11,7 +11,7 @@ from anyio import Path from jsonschema import ValidationError, validate from apps.common.config import config -from apps.constants import LOGGER +from apps.constants import LOGGER, SERVICE_DIR from apps.entities.enum_var import SearchType from apps.entities.flow import ServiceApiConfig, ServiceMetadata from apps.entities.pool import NodePool, ServicePool @@ -116,7 +116,9 @@ class ServiceCenterManager: author=user_sub, api=ServiceApiConfig(server=data["servers"][0]["url"]), ) - await ServiceLoader().save(service_id, service_metadata, data) + service_loader = ServiceLoader.remote() + await service_loader.save.remote(service_id, service_metadata, data) # type: ignore[attr-type] + ray.kill(service_loader) # 返回服务ID return service_id @@ -148,7 +150,9 @@ class ServiceCenterManager: author=user_sub, api=ServiceApiConfig(server=data["servers"][0]["url"]), ) - await ServiceLoader().save(service_id, service_metadata, data) + service_loader = ServiceLoader.remote() + await service_loader.save.remote(service_id, service_metadata, data) # type: ignore[attr-type] + ray.kill(service_loader) # 返回服务ID return service_id @@ -195,7 +199,7 @@ class ServiceCenterManager: if service_pool_store.author != user_sub: msg = "Permission denied" raise ValueError(msg) - service_path = Path(config["SEMANTICS_DIR"]) / "service" / service_id / "openapi" / "api.yaml" + service_path = Path(config["SEMANTICS_DIR"]) / SERVICE_DIR / service_id / "openapi" / "api.yaml" async with await Path(service_path).open() as f: service_data = yaml.safe_load(await f.read()) return service_pool_store.name, service_data @@ -217,10 +221,9 @@ class ServiceCenterManager: msg = "Permission denied" raise ValueError(msg) # 删除服务 - await service_collection.delete_one({"_id": service_id}) - # 删除 Node 信息 - node_collection = MongoDB.get_collection("node") - await node_collection.delete_many({"service_id": service_id}) + service_loader = ServiceLoader.remote() + await service_loader.delete.remote(service_id) # type: ignore[attr-type] + ray.kill(service_loader) return True @staticmethod diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py index 470ad344..de675016 100644 --- a/apps/scheduler/pool/loader/app.py +++ b/apps/scheduler/pool/loader/app.py @@ -5,12 +5,14 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. import pathlib +import ray from anyio import Path from fastapi.encoders import jsonable_encoder +from sqlalchemy import delete from sqlalchemy.dialects.postgresql import insert from apps.common.config import config -from apps.constants import APP_DIR +from apps.constants import APP_DIR, LOGGER from apps.entities.flow import AppMetadata, MetadataType, Permission from apps.entities.pool import AppPool from apps.entities.vector import AppPoolVector @@ -20,44 +22,56 @@ from apps.scheduler.pool.check import FileChecker from apps.scheduler.pool.loader.metadata import MetadataLoader +@ray.remote class AppLoader: """应用加载器""" - _dir_path = Path(config["SEMANTICS_DIR"]) - - @classmethod - async def load(cls, app_id: str) -> None: + async def load(self, app_id: str) -> None: """从文件系统中加载应用 :param app_id: 应用 ID """ - metadata_path = cls._dir_path / APP_DIR / app_id / "metadata.yaml" + metadata_path = Path(config["SEMANTICS_DIR"]) / APP_DIR / app_id / "metadata.yaml" metadata = await MetadataLoader().load_one(metadata_path) if not isinstance(metadata, AppMetadata): err = f"元数据类型错误: {metadata_path}" raise TypeError(err) - await cls._update_db(metadata) + await self._update_db(metadata) - @classmethod - async def save( - cls, - metadata_type: MetadataType, - metadata: AppMetadata, - app_id: str, - ) -> None: + async def save(self, metadata: AppMetadata, app_id: str) -> None: """保存应用 - :param metadata_type: 元数据类型 - :param metadata: 元数据 + :param metadata: 应用元数据 + :param app_id: 应用 ID + """ + await MetadataLoader().save_one(MetadataType.APP, metadata, app_id) + await self._update_db(metadata) + + async def delete(self, app_id: str) -> None: + """删除App,并更新数据库 + :param app_id: 应用 ID """ - await MetadataLoader().save_one(metadata_type, metadata, app_id) - await cls._update_db(metadata) + app_collection = MongoDB.get_collection("app") + try: + await app_collection.delete_one({"_id": app_id}) + except Exception as e: + err = f"删除App失败:{e}" + LOGGER.error(err) + + session = await PostgreSQL.get_session() + try: + await session.execute(delete(AppPoolVector).where(AppPoolVector.id == app_id)) + await session.commit() + except Exception as e: + err = f"删除数据库失败:{e}" + LOGGER.error(err) + + await session.aclose() - @classmethod - async def _update_db(cls, metadata: AppMetadata) -> None: + async def _update_db(self, metadata: AppMetadata) -> None: """更新数据库""" - hashes = FileChecker().check_one(pathlib.Path(str(cls._dir_path)) / APP_DIR / metadata.id) + hashes = FileChecker().check_one(pathlib.Path(config["SEMANTICS_DIR"]) / APP_DIR / metadata.id) app_collection = MongoDB.get_collection("app") app_pool = AppPool( _id=metadata.id, @@ -100,3 +114,4 @@ class AppLoader: ) await session.execute(insert_stmt) await session.commit() + await session.aclose() diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index 0a2ecd2a..d855fd39 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -4,7 +4,6 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ import asyncio -import uuid import ray from anyio import Path @@ -18,8 +17,7 @@ from apps.entities.pool import NodePool from apps.entities.vector import NodePoolVector, ServicePoolVector from apps.models.mongo import MongoDB from apps.models.postgres import PostgreSQL -from apps.scheduler.openapi import reduce_openapi_spec -from apps.scheduler.pool.loader.metadata import MetadataLoader +from apps.scheduler.pool.loader.metadata import MetadataLoader, MetadataType from apps.scheduler.pool.loader.openapi import OpenAPILoader @@ -49,6 +47,45 @@ class ServiceLoader: nodes = [NodePool(**node.model_dump(exclude_none=True, by_alias=True)) for node in nodes] await self._update_db(nodes, metadata) + async def save(self, service_id: str, metadata: ServiceMetadata, data: dict) -> None: + """在文件系统上保存Service,并更新数据库""" + service_path = Path(config["SEMANTICS_DIR"]) / "service" / service_id + # 保存元数据 + await MetadataLoader().save_one(MetadataType.SERVICE, metadata, service_id) + # 保存 OpenAPI 文档 + openapi_loader = OpenAPILoader.remote() + await openapi_loader.save_one.remote(data) # type: ignore[arg-type] + # 读取 Node 信息 + nodes = [ + openapi_loader.load_one.remote(service_id, yaml_path, metadata) # type: ignore[arg-type] + async for yaml_path in (service_path / "openapi").rglob("*.yaml") + ] + nodes = (await asyncio.gather(*nodes))[0] + ray.kill(openapi_loader) + # 更新数据库 + await self._update_db(nodes, metadata) + + async def delete(self, service_id: str) -> None: + """删除Service,并更新数据库""" + service_collection = MongoDB.get_collection("service") + node_collection = MongoDB.get_collection("node") + try: + await service_collection.delete_one({"_id": service_id}) + await node_collection.delete_many({"service_id": service_id}) + except Exception as e: + err = f"删除Service失败:{e}" + LOGGER.error(err) + + session = await PostgreSQL.get_session() + try: + await session.execute(delete(ServicePoolVector).where(ServicePoolVector.id == service_id)) + await session.execute(delete(NodePoolVector).where(NodePoolVector.id == service_id)) + await session.commit() + except Exception as e: + err = f"删除数据库失败:{e}" + LOGGER.error(err) + + await session.aclose() async def _update_db(self, nodes: list[NodePool], metadata: ServiceMetadata) -> None: """更新数据库""" @@ -118,41 +155,3 @@ class ServiceLoader: await session.commit() await session.aclose() - - async def save(self, service_id: str, metadata: ServiceMetadata, data: dict) -> None: - """在文件系统上保存Service,并更新数据库""" - # 读取 Node 信息 - openapi_spec_data = reduce_openapi_spec(data) - nodes: list[NodePool] = [] - for endpoint in openapi_spec_data.endpoints: - node_data = NodePool( - _id=str(uuid.uuid4()), - service_id=service_id, - name=endpoint.name, - description=endpoint.description, - call_id="api", - ) - nodes.append(node_data) - await self._update_db(nodes, metadata) - - async def delete(self, service_id: str) -> None: - """删除Service,并更新数据库""" - service_collection = MongoDB.get_collection("service") - node_collection = MongoDB.get_collection("node") - try: - await service_collection.delete_one({"_id": service_id}) - await node_collection.delete_many({"service_id": service_id}) - except Exception as e: - err = f"删除Service失败:{e}" - LOGGER.error(err) - - session = await PostgreSQL.get_session() - try: - await session.execute(delete(ServicePoolVector).where(ServicePoolVector.id == service_id)) - await session.execute(delete(NodePoolVector).where(NodePoolVector.id == service_id)) - await session.commit() - except Exception as e: - err = f"删除数据库失败:{e}" - LOGGER.error(err) - - await session.aclose() -- Gitee From b426dc24a0ed66d9c830d66435de82a83a6e54a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E9=B8=BF=E5=AE=87?= Date: Mon, 24 Feb 2025 18:19:52 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat:=20=E5=88=A9=E7=94=A8=20Pydantic=20?= =?UTF-8?q?=E6=A0=A1=E9=AA=8C=20OpenAPI=20YAML=20=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 史鸿宇 --- apps/entities/file_type.py | 85 ++++++++++++++++++++++++++++++++++++++ apps/manager/service.py | 32 ++------------ 2 files changed, 88 insertions(+), 29 deletions(-) create mode 100644 apps/entities/file_type.py diff --git a/apps/entities/file_type.py b/apps/entities/file_type.py new file mode 100644 index 00000000..058d67ba --- /dev/null +++ b/apps/entities/file_type.py @@ -0,0 +1,85 @@ +"""YAML 文件格式数据结构 + +Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +""" + +from typing import Any, Optional + +from pydantic import BaseModel, Field + + +class OpenAPIInfo(BaseModel): + """OpenAPI文件信息""" + + title: str = Field(..., description="API的标题") + version: str = Field(..., description="API的版本") + description: str = Field(..., description="API的描述") + + +class OpenAPIServer(BaseModel): + """OpenAPI服务器信息""" + + url: str = Field(..., description="API的服务器地址") + + +class OpenAPITag(BaseModel): + """OpenAPI标签定义""" + + name: str = Field(..., description="标签名称") + description: Optional[str] = Field(None, description="标签描述") + + +class OpenAPIOperation(BaseModel): + """OpenAPI操作定义,例如 GET、POST 等操作""" + + summary: Optional[str] = Field(None, description="操作摘要") + description: Optional[str] = Field(None, description="操作描述") + operation_id: Optional[str] = Field(None, description="操作唯一标识", alias="operationId") + parameters: Optional[list[Any]] = Field(default_factory=list, description="参数列表") + responses: dict[str, Any] = Field(..., description="响应定义") + tags: Optional[list[str]] = Field(default_factory=list, description="标签列表") + + +class OpenAPIPath(BaseModel): + """OpenAPI路径下不同 HTTP 方法的操作定义""" + + get: Optional[OpenAPIOperation] = Field(None, description="GET操作") + put: Optional[OpenAPIOperation] = Field(None, description="PUT操作") + post: Optional[OpenAPIOperation] = Field(None, description="POST操作") + delete: Optional[OpenAPIOperation] = Field(None, description="DELETE操作") + patch: Optional[OpenAPIOperation] = Field(None, description="PATCH操作") + options: Optional[OpenAPIOperation] = Field(None, description="OPTIONS操作") + head: Optional[OpenAPIOperation] = Field(None, description="HEAD操作") + + +class OpenAPISecurityScheme(BaseModel): + """OpenAPI安全方案定义""" + + type: str = Field(..., description="安全方案类型,例如 apiKey、http、oauth2 等") + description: Optional[str] = Field(None, description="安全方案描述") + name: Optional[str] = Field(None, description="安全方案名称") + in_: Optional[str] = Field(None, alias="in", description="安全方案传递位置,如 header、query 等") + + +class OpenAPIComponents(BaseModel): + """OpenAPI组件定义""" + + schemas: Optional[dict[str, Any]] = Field(default_factory=dict, description="数据模型定义") + parameters: Optional[dict[str, Any]] = Field(default_factory=dict, description="参数定义") + security_schemes: Optional[dict[str, OpenAPISecurityScheme]] = Field( + alias="securitySchemes", + default_factory=dict, + description="安全方案定义", + ) + + +class OpenAPI(BaseModel): + """完整的 OpenAPI 文件格式数据结构""" + + openapi: str = Field(..., description="OpenAPI版本") + info: OpenAPIInfo = Field(..., description="API的基本信息") + servers: list[OpenAPIServer] = Field(..., description="API的服务器地址") + paths: dict[str, OpenAPIPath] = Field(..., description="API的路径定义") + components: Optional[OpenAPIComponents] = Field(None, description="API的组件定义") + security: Optional[list[dict[str, list[str]]]] = Field(None, description="API的安全定义") + tags: Optional[list[OpenAPITag]] = Field(None, description="API的标签定义") diff --git a/apps/manager/service.py b/apps/manager/service.py index 5329ee64..ac264ee2 100644 --- a/apps/manager/service.py +++ b/apps/manager/service.py @@ -8,11 +8,12 @@ from typing import Any, Optional import ray import yaml from anyio import Path -from jsonschema import ValidationError, validate +from jsonschema import ValidationError from apps.common.config import config from apps.constants import LOGGER, SERVICE_DIR from apps.entities.enum_var import SearchType +from apps.entities.file_type import OpenAPI from apps.entities.flow import ServiceApiConfig, ServiceMetadata from apps.entities.pool import NodePool, ServicePool from apps.entities.response_data import ServiceApiData, ServiceCardItem @@ -288,35 +289,8 @@ class ServiceCenterManager: msg = "Service data is empty" raise ValueError(msg) # 校验 OpenAPI 规范的 JSON Schema - openapi_schema = { - "type": "object", - "properties": { - "openapi": {"type": "string"}, - "info": { - "type": "object", - "properties": { - "title": {"type": "string"}, - "version": {"type": "string"}, - "description": {"type": "string"}, - }, - "required": ["title", "version", "description"], - }, - "servers": { - "type": "array", - "items": { - "type": "object", - "properties": { - "url": {"type": "string"}, - }, - "required": ["url"], - }, - }, - "paths": {"type": "object"}, - }, - "required": ["openapi", "info", "servers", "paths"], - } try: - validate(instance=data, schema=openapi_schema) + OpenAPI.model_validate(data) except ValidationError as e: msg = f"Data does not conform to OpenAPI standard: {e.message}" raise ValueError(msg) from e -- Gitee From 6aad4c23abd3216b47d1819500fcc07eb1add42f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8F=B2=E9=B8=BF=E5=AE=87?= Date: Mon, 24 Feb 2025 18:37:54 +0800 Subject: [PATCH 3/3] =?UTF-8?q?refactor:=20ServiceLoader=20=E5=A4=8D?= =?UTF-8?q?=E7=94=A8=20self.load?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 史鸿宇 --- apps/scheduler/pool/loader/service.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index d855fd39..576ff141 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -4,6 +4,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ import asyncio +import pathlib import ray from anyio import Path @@ -17,6 +18,7 @@ from apps.entities.pool import NodePool from apps.entities.vector import NodePoolVector, ServicePoolVector from apps.models.mongo import MongoDB from apps.models.postgres import PostgreSQL +from apps.scheduler.pool.check import FileChecker from apps.scheduler.pool.loader.metadata import MetadataLoader, MetadataType from apps.scheduler.pool.loader.openapi import OpenAPILoader @@ -49,21 +51,16 @@ class ServiceLoader: async def save(self, service_id: str, metadata: ServiceMetadata, data: dict) -> None: """在文件系统上保存Service,并更新数据库""" - service_path = Path(config["SEMANTICS_DIR"]) / "service" / service_id + service_path = pathlib.Path(config["SEMANTICS_DIR"]) / "service" / service_id # 保存元数据 await MetadataLoader().save_one(MetadataType.SERVICE, metadata, service_id) # 保存 OpenAPI 文档 openapi_loader = OpenAPILoader.remote() await openapi_loader.save_one.remote(data) # type: ignore[arg-type] - # 读取 Node 信息 - nodes = [ - openapi_loader.load_one.remote(service_id, yaml_path, metadata) # type: ignore[arg-type] - async for yaml_path in (service_path / "openapi").rglob("*.yaml") - ] - nodes = (await asyncio.gather(*nodes))[0] ray.kill(openapi_loader) - # 更新数据库 - await self._update_db(nodes, metadata) + # 重新载入 + hashes = FileChecker().check_one(service_path) + await self.load(service_id, hashes) async def delete(self, service_id: str) -> None: """删除Service,并更新数据库""" -- Gitee