From b507e2d9d9391a0952145277bf8d7a02f19f0254 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Thu, 31 Jul 2025 14:44:34 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=81=E7=A7=BB=E6=95=B0=E6=8D=AE=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/models/session.py | 2 +- apps/routers/appcenter.py | 15 ++-- apps/routers/flow.py | 9 ++- apps/routers/service.py | 11 +-- apps/scheduler/executor/step.py | 2 +- apps/scheduler/pool/loader/app.py | 58 +++++++++----- apps/scheduler/pool/loader/flow.py | 106 ++++++++++++------------- apps/scheduler/pool/loader/metadata.py | 13 +-- apps/scheduler/pool/loader/service.py | 5 +- apps/scheduler/pool/pool.py | 63 ++++----------- apps/schemas/flow.py | 2 +- apps/services/activity.py | 51 ++++++------ apps/services/appcenter.py | 82 ++++++++++--------- apps/services/rag.py | 3 +- 14 files changed, 205 insertions(+), 217 deletions(-) diff --git a/apps/models/session.py b/apps/models/session.py index d17585fa..5d8f0f52 100644 --- a/apps/models/session.py +++ b/apps/models/session.py @@ -48,7 +48,7 @@ class SessionActivity(Base): """会话活动""" __tablename__ = "framework_session_activity" - id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True, init=False) """主键ID""" userSub: Mapped[str] = mapped_column(String(50), ForeignKey("framework_user.userSub"), nullable=False) # noqa: N815 """用户名""" diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index aecbe092..a38aaa61 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -2,6 +2,7 @@ """FastAPI 应用中心相关路由""" import logging +import uuid from typing import Annotated from fastapi import APIRouter, Body, Depends, Path, Query, Request, status @@ -11,17 +12,17 @@ from apps.dependency.user import verify_personal_token, verify_session from apps.exceptions import InstancePermissionError from apps.schemas.appcenter import AppFlowInfo, AppPermissionData from apps.schemas.enum_var import AppFilterType, AppType -from apps.schemas.request_data import CreateAppRequest, ChangeFavouriteAppRequest +from apps.schemas.request_data import ChangeFavouriteAppRequest, CreateAppRequest from apps.schemas.response_data import ( BaseAppOperationMsg, BaseAppOperationRsp, + ChangeFavouriteAppMsg, + ChangeFavouriteAppRsp, GetAppListMsg, GetAppListRsp, GetAppPropertyMsg, GetAppPropertyRsp, GetRecentAppListRsp, - ChangeFavouriteAppMsg, - ChangeFavouriteAppRsp, ResponseData, ) from apps.services.appcenter import AppCenterManager @@ -183,7 +184,7 @@ async def get_recently_used_applications( @router.get("/{appId}", response_model=GetAppPropertyRsp | ResponseData) -async def get_application(appId: Annotated[str, Path()]) -> JSONResponse: # noqa: N803 +async def get_application(appId: Annotated[uuid.UUID, Path()]) -> JSONResponse: # noqa: N803 """获取应用详情""" try: app_data = await AppCenterManager.fetch_app_data_by_id(appId) @@ -248,7 +249,7 @@ async def get_application(appId: Annotated[str, Path()]) -> JSONResponse: # noq ) async def delete_application( request: Request, - app_id: Annotated[str, Path(..., alias="appId", description="应用ID")], + app_id: Annotated[uuid.UUID, Path(..., alias="appId", description="应用ID")], ) -> JSONResponse: """删除应用""" user_sub: str = request.state.user_sub @@ -297,7 +298,7 @@ async def delete_application( @router.post("/{appId}", response_model=BaseAppOperationRsp) async def publish_application( request: Request, - app_id: Annotated[str, Path(..., alias="appId", description="应用ID")], + app_id: Annotated[uuid.UUID, Path(..., alias="appId", description="应用ID")], ) -> JSONResponse: """发布应用""" user_sub: str = request.state.user_sub @@ -346,7 +347,7 @@ async def publish_application( @router.put("/{appId}", response_model=ChangeFavouriteAppRsp | ResponseData) async def modify_favorite_application( raw_request: Request, - app_id: Annotated[str, Path(..., alias="appId", description="应用ID")], + app_id: Annotated[uuid.UUID, Path(..., alias="appId", description="应用ID")], request: Annotated[ChangeFavouriteAppRequest, Body(...)], ) -> JSONResponse: """更改应用收藏状态""" diff --git a/apps/routers/flow.py b/apps/routers/flow.py index 3502087f..880bf1eb 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -1,6 +1,7 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """FastAPI Flow拓扑结构展示API""" +import uuid from typing import Annotated from fastapi import APIRouter, Body, Depends, Query, Request, status @@ -60,7 +61,7 @@ async def get_services(request: Request) -> NodeServiceListRsp: status.HTTP_404_NOT_FOUND: {"model": ResponseData}, }, ) -async def get_flow(request: Request, appId: str, flowId: str) -> JSONResponse: # noqa: N803 +async def get_flow(request: Request, appId: uuid.UUID, flowId: uuid.UUID) -> JSONResponse: # noqa: N803 """获取流拓扑结构""" if not await AppCenterManager.validate_user_app_access(request.state.user_sub, appId): return JSONResponse( @@ -100,8 +101,8 @@ async def get_flow(request: Request, appId: str, flowId: str) -> JSONResponse: ) async def put_flow( request: Request, - appId: Annotated[str, Query()], # noqa: N803 - flowId: Annotated[str, Query()], # noqa: N803 + appId: Annotated[uuid.UUID, Query()], # noqa: N803 + flowId: Annotated[uuid.UUID, Query()], # noqa: N803 put_body: Annotated[PutFlowReq, Body()], ) -> JSONResponse: """修改流拓扑结构""" @@ -152,7 +153,7 @@ async def put_flow( status.HTTP_404_NOT_FOUND: {"model": ResponseData}, }, ) -async def delete_flow(request: Request, appId: str, flowId: str) -> JSONResponse: # noqa: N803 +async def delete_flow(request: Request, appId: uuid.UUID, flowId: uuid.UUID) -> JSONResponse: # noqa: N803 """删除流拓扑结构""" if not await AppCenterManager.validate_app_belong_to_user(request.state.user_sub, appId): return JSONResponse( diff --git a/apps/routers/service.py b/apps/routers/service.py index 77144eaf..902919e6 100644 --- a/apps/routers/service.py +++ b/apps/routers/service.py @@ -2,6 +2,7 @@ """FastAPI 语义接口中心相关路由""" import logging +import uuid from typing import Annotated from fastapi import APIRouter, Depends, Path, Request, status @@ -13,13 +14,13 @@ from apps.schemas.enum_var import SearchType from apps.schemas.request_data import ChangeFavouriteServiceRequest, UpdateServiceRequest from apps.schemas.response_data import ( BaseServiceOperationMsg, + ChangeFavouriteServiceMsg, + ChangeFavouriteServiceRsp, DeleteServiceRsp, GetServiceDetailMsg, GetServiceDetailRsp, GetServiceListMsg, GetServiceListRsp, - ChangeFavouriteServiceMsg, - ChangeFavouriteServiceRsp, ResponseData, UpdateServiceMsg, UpdateServiceRsp, @@ -181,7 +182,7 @@ async def update_service(request: Request, data: UpdateServiceRequest) -> JSONRe @router.get("/{serviceId}", response_model=GetServiceDetailRsp) async def get_service_detail( - request: Request, serviceId: Annotated[str, Path()], # noqa: N803 + request: Request, serviceId: Annotated[uuid.UUID, Path()], # noqa: N803 *, edit: bool = False, ) -> JSONResponse: """获取服务详情""" @@ -246,7 +247,7 @@ async def get_service_detail( @router.delete("/{serviceId}", response_model=DeleteServiceRsp) -async def delete_service(request: Request, serviceId: Annotated[str, Path()]) -> JSONResponse: # noqa: N803 +async def delete_service(request: Request, serviceId: Annotated[uuid.UUID, Path()]) -> JSONResponse: # noqa: N803 """删除服务""" try: await ServiceCenterManager.delete_service(request.state.user_sub, serviceId) @@ -286,7 +287,7 @@ async def delete_service(request: Request, serviceId: Annotated[str, Path()]) -> @router.put("/{serviceId}", response_model=ChangeFavouriteServiceRsp) async def modify_favorite_service( request: Request, - serviceId: Annotated[str, Path()], # noqa: N803 + serviceId: Annotated[uuid.UUID, Path()], # noqa: N803 data: ChangeFavouriteServiceRequest, ) -> JSONResponse: """修改服务收藏状态""" diff --git a/apps/scheduler/executor/step.py b/apps/scheduler/executor/step.py index 8ea823a8..00613fde 100644 --- a/apps/scheduler/executor/step.py +++ b/apps/scheduler/executor/step.py @@ -60,7 +60,7 @@ class StepExecutor(BaseExecutor): return flag @staticmethod - async def get_call_cls(call_id: uuid.UUID) -> type[CoreCall]: + async def get_call_cls(call_id: str) -> type[CoreCall]: """获取并验证Call类""" # 特判,用于处理隐藏节点 if call_id == SpecialCallType.EMPTY.value: diff --git a/apps/scheduler/pool/loader/app.py b/apps/scheduler/pool/loader/app.py index 8d5fab55..d9a4b5d3 100644 --- a/apps/scheduler/pool/loader/app.py +++ b/apps/scheduler/pool/loader/app.py @@ -7,13 +7,17 @@ import uuid from anyio import Path from fastapi.encoders import jsonable_encoder +from sqlalchemy import delete, select from apps.common.config import config -from apps.models.app import App +from apps.common.postgres import postgres +from apps.models.app import App, AppACL, AppHashes +from apps.models.flow import Flow +from apps.models.user import UserAppUsage, UserFavorite from apps.scheduler.pool.check import FileChecker from apps.schemas.agent import AgentAppMetadata from apps.schemas.enum_var import AppType -from apps.schemas.flow import AppFlow, AppMetadata, MetadataType, Permission +from apps.schemas.flow import AppFlow, AppMetadata, MetadataType, Permission, PermissionType from .flow import FlowLoader from .metadata import MetadataLoader @@ -53,7 +57,7 @@ class AppLoader: async for flow_file in flow_path.rglob("*.yaml"): if flow_file.stem not in flow_ids: logger.warning("[AppLoader] 工作流 %s 不在元数据中", flow_file) - flow = await flow_loader.load(app_id, flow_file.stem) + flow = await flow_loader.load(app_id, uuid.UUID(flow_file.stem)) if not flow: err = f"[AppLoader] 工作流 {flow_file} 加载失败" raise ValueError(err) @@ -61,7 +65,7 @@ class AppLoader: metadata.published = False new_flows.append( AppFlow( - id=flow_file.stem, + id=uuid.UUID(flow_file.stem), name=flow.name, description=flow.description, path=flow_file.as_posix(), @@ -85,6 +89,7 @@ class AppLoader: raise RuntimeError(err) from e await self._update_db(metadata) + async def save(self, metadata: AppMetadata | AgentAppMetadata, app_id: uuid.UUID) -> None: """ 保存应用 @@ -111,29 +116,21 @@ class AppLoader: :param app_id: 应用 ID """ - mongo = MongoDB() - try: - app_collection = mongo.get_collection("app") - await app_collection.delete_one({"_id": app_id}) # 删除应用数据 - user_collection = mongo.get_collection("user") - # 删除用户使用记录 - await user_collection.update_many( - {f"app_usage.{app_id}": {"$exists": True}}, - {"$unset": {f"app_usage.{app_id}": ""}}, - ) - # 删除用户收藏 - await user_collection.update_many( - {"fav_apps": {"$in": [app_id]}}, - {"$pull": {"fav_apps": app_id}}, - ) - except Exception: - logger.exception("[AppLoader] MongoDB删除App失败") + async with postgres.session() as session: + await session.execute(delete(App).where(App.id == app_id)) + await session.execute(delete(AppACL).where(AppACL.appId == app_id)) + await session.execute(delete(AppHashes).where(AppHashes.appId == app_id)) + await session.execute(delete(Flow).where(Flow.appId == app_id)) + await session.execute(delete(UserAppUsage).where(UserAppUsage.appId == app_id)) + await session.execute(delete(UserFavorite).where(UserFavorite.itemId == app_id)) + await session.commit() if not is_reload: app_path = BASE_PATH / str(app_id) if await app_path.exists(): shutil.rmtree(str(app_path), ignore_errors=True) + @staticmethod async def _update_db(metadata: AppMetadata | AgentAppMetadata) -> None: """更新数据库""" @@ -142,6 +139,25 @@ class AppLoader: logger.error(err) raise ValueError(err) # 更新应用数据 + async with postgres.session() as session: + # 保存App表 + await session.merge(App( + id=metadata.id, + name=metadata.name, + description=metadata.description, + author=metadata.author, + type=metadata.app_type, + isPublished=metadata.published, + permission=metadata.permission.type if metadata.permission else PermissionType.PRIVATE, + )) + # 保存AppACL表 + await session.merge(AppACL( + appId=metadata.id, + userSub=metadata.author, + permission=metadata.permission, + )) + # 保存AppHashes表 + await session.commit() mongo = MongoDB() try: app_collection = mongo.get_collection("app") diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 595de04c..0dbd0815 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -9,16 +9,17 @@ from typing import Any import aiofiles import yaml from anyio import Path -from sqlalchemy import delete, insert +from sqlalchemy import and_, delete, func, select from apps.common.config import config from apps.common.postgres import postgres from apps.llm.embedding import Embedding -from apps.models.app import App +from apps.models.app import App, AppHashes +from apps.models.flow import Flow as FlowInfo from apps.models.vectors import FlowPoolVector from apps.scheduler.util import yaml_enum_presenter, yaml_str_presenter from apps.schemas.enum_var import EdgeType -from apps.schemas.flow import AppFlow, Flow +from apps.schemas.flow import Flow from apps.services.node import NodeManager logger = logging.getLogger(__name__) @@ -129,7 +130,8 @@ class FlowLoader: flow_config = Flow.model_validate(flow_yaml) await self._update_db( app_id, - AppFlow( + FlowInfo( + appId=app_id, id=flow_id, name=flow_config.name, description=flow_config.description, @@ -143,6 +145,7 @@ class FlowLoader: logger.exception("[FlowLoader] 应用 %s:工作流 %s 格式不合法", app_id, flow_id) return None + async def save(self, app_id: uuid.UUID, flow_id: uuid.UUID, flow: Flow) -> None: """保存工作流""" flow_path = BASE_PATH / str(app_id) / "flow" / f"{flow_id}.yaml" @@ -162,7 +165,8 @@ class FlowLoader: ) await self._update_db( app_id, - AppFlow( + FlowInfo( + appId=app_id, id=flow_id, name=flow.name, description=flow.description, @@ -172,6 +176,7 @@ class FlowLoader: ), ) + async def delete(self, app_id: uuid.UUID, flow_id: uuid.UUID) -> bool: """删除指定工作流文件""" flow_path = BASE_PATH / str(app_id) / "flow" / f"{flow_id}.yaml" @@ -191,63 +196,50 @@ class FlowLoader: return True - async def _update_db(self, app_id: uuid.UUID, metadata: AppFlow) -> None: + async def _update_db(self, app_id: uuid.UUID, metadata: FlowInfo) -> None: """更新数据库""" - try: - app_collection = MongoDB().get_collection("app") - # 获取当前的flows - app_data = await app_collection.find_one({"_id": app_id}) - if not app_data: + # 检查App是否存在 + async with postgres.session() as session: + app_num = (await session.scalars( + select(func.count(App.id)).where(App.id == app_id), + )).one() + if app_num == 0: err = f"[FlowLoader] App {app_id} 不存在" logger.error(err) return - app_obj = AppPool.model_validate(app_data) - flows = app_obj.flows - - for flow in flows: - if flow.id == metadata.id: - flows.remove(flow) - break - flows.append(metadata) - - # 执行更新操作 - await app_collection.update_one( - filter={ - "_id": app_id, - }, - update={ - "$set": { - "flows": [flow.model_dump(by_alias=True, exclude_none=True) for flow in flows], - }, - }, - upsert=True, - ) - flow_path = BASE_PATH / app_id / "flow" / f"{metadata.id}.yaml" - async with aiofiles.open(flow_path, "rb") as f: - new_hash = sha256(await f.read()).hexdigest() - key = f"hashes.flow/{metadata.id}.yaml" - await app_collection.aggregate( - [ - {"$match": {"_id": app_id}}, - {"$replaceWith": {"$setField": {"field": key, "input": "$$ROOT", "value": new_hash}}}, - ], - ) - except Exception: - logger.exception("[FlowLoader] 更新 MongoDB 失败") + # 删除旧的Flow数据 + await session.execute(delete(FlowInfo).where(FlowInfo.appId == app_id)) + await session.execute(delete(AppHashes).where( + and_( + AppHashes.appId == app_id, + AppHashes.filePath == f"flow/{metadata.id}.yaml", + ), + )) + await session.execute(delete(FlowPoolVector).where(FlowPoolVector.id == metadata.id)) - # 删除重复的ID - async with postgres.session() as session: - await session.execute(delete(FlowPoolVector).where(FlowPoolVector.flow_id == metadata.id)) + # 创建新的Flow数据 + session.add(metadata) + + flow_path = BASE_PATH / str(app_id) / "flow" / f"{metadata.id}.yaml" + async with aiofiles.open(flow_path, "rb") as f: + new_hash = sha256(await f.read()).hexdigest() - # 进行向量化 - service_embedding = await Embedding.get_embedding([metadata.description]) - vector_data = [ - FlowPoolVector( - flow_id=metadata.id, + flow_hash = AppHashes( appId=app_id, - embedding=service_embedding[0], - ), - ] - async with postgres.session() as session: - await session.execute(insert(FlowPoolVector).values(vector_data)) + hash=new_hash, + filePath=f"flow/{metadata.id}.yaml", + ) + session.add(flow_hash) + + # 进行向量化 + service_embedding = await Embedding.get_embedding([metadata.description]) + vector_data = [ + FlowPoolVector( + id=metadata.id, + appId=app_id, + embedding=service_embedding[0], + ), + ] + session.add_all(vector_data) + await session.commit() diff --git a/apps/scheduler/pool/loader/metadata.py b/apps/scheduler/pool/loader/metadata.py index 002fa45a..6bf044dd 100644 --- a/apps/scheduler/pool/loader/metadata.py +++ b/apps/scheduler/pool/loader/metadata.py @@ -2,6 +2,7 @@ """元数据加载器""" import logging +import uuid from typing import Any import yaml @@ -46,7 +47,7 @@ class MetadataLoader: app_type = metadata_dict.get("app_type", AppType.FLOW) if app_type == AppType.FLOW: try: - app_id = file_path.parent.name + app_id = uuid.UUID(file_path.parent.name) metadata = AppMetadata(id=app_id, **metadata_dict) except Exception as e: err = "[MetadataLoader] App metadata.yaml格式错误" @@ -54,7 +55,7 @@ class MetadataLoader: raise RuntimeError(err) from e else: try: - app_id = file_path.parent.name + app_id = uuid.UUID(file_path.parent.name) metadata = AgentAppMetadata(id=app_id, **metadata_dict) except Exception as e: err = "[MetadataLoader] Agent app metadata.yaml格式错误" @@ -62,7 +63,7 @@ class MetadataLoader: raise RuntimeError(err) from e elif metadata_type == MetadataType.SERVICE.value: try: - service_id = file_path.parent.name + service_id = uuid.UUID(file_path.parent.name) metadata = ServiceMetadata(id=service_id, **metadata_dict) except Exception as e: err = "[MetadataLoader] Service metadata.yaml格式错误" @@ -79,7 +80,7 @@ class MetadataLoader: self, metadata_type: MetadataType, metadata: dict[str, Any] | AppMetadata | ServiceMetadata | AgentAppMetadata, - resource_id: str, + resource_id: uuid.UUID, ) -> None: """保存单个元数据""" class_dict = { @@ -89,9 +90,9 @@ class MetadataLoader: # 检查资源路径 if metadata_type == MetadataType.APP.value: - resource_path = BASE_PATH / "app" / resource_id / "metadata.yaml" + resource_path = BASE_PATH / "app" / str(resource_id) / "metadata.yaml" elif metadata_type == MetadataType.SERVICE.value: - resource_path = BASE_PATH / "service" / resource_id / "metadata.yaml" + resource_path = BASE_PATH / "service" / str(resource_id) / "metadata.yaml" else: err = f"[MetadataLoader] metadata_type类型错误: {metadata_type}" logger.error(err) diff --git a/apps/scheduler/pool/loader/service.py b/apps/scheduler/pool/loader/service.py index ea0df332..4297f6e3 100644 --- a/apps/scheduler/pool/loader/service.py +++ b/apps/scheduler/pool/loader/service.py @@ -12,7 +12,7 @@ from apps.common.config import config from apps.common.postgres import postgres from apps.llm.embedding import Embedding from apps.models.node import NodeInfo -from apps.models.service import Service +from apps.models.service import Service, ServiceACL, ServiceHashes from apps.models.vectors import NodePoolVector, ServicePoolVector from apps.scheduler.pool.check import FileChecker from apps.schemas.flow import PermissionType, ServiceMetadata @@ -77,7 +77,8 @@ class ServiceLoader: async with postgres.session() as session: await session.execute(delete(Service).where(Service.id == service_id)) await session.execute(delete(NodeInfo).where(NodeInfo.serviceId == service_id)) - + await session.execute(delete(ServiceACL).where(ServiceACL.serviceId == service_id)) + await session.execute(delete(ServiceHashes).where(ServiceHashes.serviceId == service_id)) await session.execute(delete(ServicePoolVector).where(ServicePoolVector.id == service_id)) await session.execute(delete(NodePoolVector).where(NodePoolVector.serviceId == service_id)) await session.commit() diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index 23b45d4b..36837a2d 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -1,16 +1,17 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """资源池,包含语义接口、应用等的载入和保存""" -import importlib import logging +import sys import uuid from typing import Any from anyio import Path +from sqlalchemy import select from apps.common.config import config from apps.common.postgres import postgres -from apps.models.app import App +from apps.models.flow import Flow as FlowInfo from apps.schemas.enum_var import MetadataType from apps.schemas.flow import Flow @@ -100,7 +101,7 @@ class Pool: # 批量加载 for service in changed_service: - hash_key = Path("service/" + service).as_posix() + hash_key = Path("service/" + str(service)).as_posix() if hash_key in checker.hashes: await service_loader.load(service, checker.hashes[hash_key]) @@ -117,7 +118,7 @@ class Pool: # 批量加载App for app in changed_app: - hash_key = Path("app/" + app).as_posix() + hash_key = Path("app/" + str(app)).as_posix() if hash_key in checker.hashes: await app_loader.load(app, checker.hashes[hash_key]) @@ -126,25 +127,15 @@ class Pool: await MCPLoader.init() - async def get_flow_metadata(self, app_id: str) -> list[AppFlow]: + async def get_flow_metadata(self, app_id: uuid.UUID) -> list[FlowInfo]: """从数据库中获取特定App的全部Flow的元数据""" - mongo = MongoDB() - app_collection = mongo.get_collection("app") - flow_metadata_list = [] - try: - flow_list = await app_collection.find_one({"_id": app_id}, {"flows": 1}) - if not flow_list: - return [] - for flow in flow_list["flows"]: - flow_metadata_list += [AppFlow.model_validate(flow)] - except Exception: - logger.exception("[Pool] 获取App %s 的Flow列表失败", app_id) - return [] - else: - return flow_metadata_list - - - async def get_flow(self, app_id: str, flow_id: str) -> Flow | None: + async with postgres.session() as session: + return list((await session.scalars( + select(FlowInfo).where(FlowInfo.appId == app_id), + )).all()) + + + async def get_flow(self, app_id: uuid.UUID, flow_id: uuid.UUID) -> Flow | None: """从文件系统中获取单个Flow的全部数据""" logger.info("[Pool] 获取工作流 %s", flow_id) flow_loader = FlowLoader() @@ -153,29 +144,5 @@ class Pool: async def get_call(self, call_id: str) -> Any: """[Exception] 拿到Call的信息""" - # 从MongoDB里拿到数据 - mongo = MongoDB() - call_collection = mongo.get_collection("call") - call_db_data = await call_collection.find_one({"_id": call_id}) - if not call_db_data: - err = f"[Pool] Call{call_id}不存在" - logger.error(err) - raise ValueError(err) - - call_metadata = CallPool.model_validate(call_db_data) - call_path_split = call_metadata.path.split("::") - if not call_path_split: - err = f"[Pool] Call路径{call_metadata.path}不合法" - logger.error(err) - raise ValueError(err) - - # Python类型的Call - if call_path_split[0] == "python": - try: - call_module = importlib.import_module(call_path_split[1]) - return getattr(call_module, call_path_split[2]) - except Exception as e: - err = f"[Pool] 获取Call{call_metadata.path}类失败" - logger.exception(err) - raise RuntimeError(err) from e - return None + call_module = sys.modules.get("apps.scheduler.call") + return getattr(call_module, call_id) diff --git a/apps/schemas/flow.py b/apps/schemas/flow.py index 840cff01..5cacb2ab 100644 --- a/apps/schemas/flow.py +++ b/apps/schemas/flow.py @@ -121,7 +121,7 @@ class ServiceMetadata(MetadataBase): class AppFlow(BaseModel): """Flow的元数据;会被存储在App下面""" - id: str + id: uuid.UUID name: str description: str enabled: bool = Field(description="是否启用", default=True) diff --git a/apps/services/activity.py b/apps/services/activity.py index e378fcef..55bfe2c7 100644 --- a/apps/services/activity.py +++ b/apps/services/activity.py @@ -1,10 +1,9 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """用户限流""" -import uuid from datetime import UTC, datetime, timedelta -from sqlalchemy import delete +from sqlalchemy import delete, func, select from apps.common.postgres import postgres from apps.constants import SLIDE_WINDOW_QUESTION_COUNT, SLIDE_WINDOW_TIME @@ -25,36 +24,36 @@ class Activity: """ time = datetime.now(tz=UTC) - # 检查窗口内总请求数 - count = await MongoDB().get_collection("activity").count_documents( - {"timestamp": {"$gte": time - SLIDE_WINDOW_TIME, "$lte": time}}, - ) - if count >= SLIDE_WINDOW_QUESTION_COUNT: - return True + async with postgres.session() as session: + # 检查窗口内总请求数 + count = (await session.scalars(select(func.count(SessionActivity.id)).where( + SessionActivity.timestamp >= time - timedelta(seconds=SLIDE_WINDOW_TIME), + SessionActivity.timestamp <= time, + ))).one() + if count >= SLIDE_WINDOW_QUESTION_COUNT: + return True - # 检查用户是否正在提问 - active = await MongoDB().get_collection("activity").find_one( - {"user_sub": user_sub}, - ) - return bool(active) + # 检查用户是否正在提问 + active = (await session.scalars(select(SessionActivity).where( + SessionActivity.userSub == user_sub, + ))).one_or_none() + return bool(active) @staticmethod async def set_active(user_sub: str) -> None: """设置用户的活跃标识""" - time = round(datetime.now(UTC).timestamp(), 3) + time = datetime.now(UTC) # 设置用户活跃状态 - collection = MongoDB().get_collection("activity") - active = await collection.find_one({"user_sub": user_sub}) - if active: - err = "用户正在提问" - raise ActivityError(err) - await collection.insert_one( - { - "_id": str(uuid.uuid4()), - "user_sub": user_sub, - "timestamp": time, - }, - ) + async with postgres.session() as session: + active = ( + await session.scalars(select(SessionActivity).where(SessionActivity.userSub == user_sub)) + ).one_or_none() + if active: + err = "用户正在提问" + raise ActivityError(err) + await session.merge(SessionActivity(userSub=user_sub, timestamp=time)) + await session.commit() + @staticmethod async def remove_active(user_sub: str) -> None: diff --git a/apps/services/appcenter.py b/apps/services/appcenter.py index 02f35674..3ca9365a 100644 --- a/apps/services/appcenter.py +++ b/apps/services/appcenter.py @@ -12,7 +12,7 @@ from apps.common.postgres import postgres from apps.constants import SERVICE_PAGE_SIZE from apps.exceptions import InstancePermissionError from apps.models.app import App -from apps.models.user import User, UserAppUsage +from apps.models.user import User, UserAppUsage, UserFavorite, UserFavoriteType from apps.scheduler.pool.loader.app import AppLoader from apps.schemas.agent import AgentAppMetadata from apps.schemas.appcenter import AppCenterCardItem, AppData, AppPermissionData @@ -30,7 +30,7 @@ class AppCenterManager: """应用中心管理器""" @staticmethod - async def validate_user_app_access(user_sub: str, app_id: str) -> bool: + async def validate_user_app_access(user_sub: str, app_id: uuid.UUID) -> bool: """ 验证用户对应用的访问权限 @@ -59,7 +59,7 @@ class AppCenterManager: @staticmethod - async def validate_app_belong_to_user(user_sub: str, app_id: str) -> bool: + async def validate_app_belong_to_user(user_sub: str, app_id: uuid.UUID) -> bool: """ 验证用户对应用的属权 @@ -163,7 +163,7 @@ class AppCenterManager: @staticmethod - async def fetch_app_data_by_id(app_id: str) -> App: + async def fetch_app_data_by_id(app_id: uuid.UUID) -> App: """ 根据应用ID获取应用元数据(使用PostgreSQL) @@ -181,7 +181,7 @@ class AppCenterManager: @staticmethod - async def create_app(user_sub: str, data: AppData) -> str: + async def create_app(user_sub: str, data: AppData) -> uuid.UUID: """ 创建应用 @@ -200,7 +200,7 @@ class AppCenterManager: @staticmethod - async def update_app(user_sub: str, app_id: str, data: AppData) -> None: + async def update_app(user_sub: str, app_id: uuid.UUID, data: AppData) -> None: """ 更新应用 @@ -226,7 +226,7 @@ class AppCenterManager: @staticmethod - async def update_app_publish_status(app_id: str, user_sub: str) -> bool: + async def update_app_publish_status(app_id: uuid.UUID, user_sub: str) -> bool: """ 发布应用 @@ -268,7 +268,7 @@ class AppCenterManager: @staticmethod - async def modify_favorite_app(app_id: str, user_sub: str, *, favorited: bool) -> None: + async def modify_favorite_app(app_id: uuid.UUID, user_sub: str, *, favorited: bool) -> None: """ 修改收藏状态 @@ -276,36 +276,42 @@ class AppCenterManager: :param user_sub: 用户唯一标识 :param favorited: 是否收藏 """ - mongo = MongoDB() - app_collection = mongo.get_collection("app") - user_collection = mongo.get_collection("user") - db_data = await app_collection.find_one({"_id": app_id}) - if not db_data: - msg = "应用不存在" - raise ValueError(msg) - user_data = await UserManager.get_user(user_sub) - if not user_data: - msg = "用户不存在" - raise ValueError(msg) + async with postgres.session() as session: + app_obj = (await session.scalars( + select(App).where(App.id == app_id), + )).one_or_none() + if not app_obj: + msg = f"[AppCenterManager] 应用不存在: {app_id}" + raise ValueError(msg) - already_favorited = app_id in user_data.fav_apps - if favorited == already_favorited: - msg = "重复操作" - raise ValueError(msg) + app_favorite = (await session.scalars( + select(UserFavorite).where( + and_( + UserFavorite.userSub == user_sub, + UserFavorite.itemId == app_id, + UserFavorite.type == UserFavoriteType.APP, + ), + ), + )).one_or_none() + if not app_favorite and favorited: + # 添加收藏 + app_favorite = UserFavorite( + userSub=user_sub, + type=UserFavoriteType.APP, + itemId=app_id, + ) + session.add(app_favorite) + elif app_favorite and not favorited: + # 删除收藏 + await session.delete(app_favorite) + else: + # 重复操作 + msg = f"[AppCenterManager] 重复操作: {app_id}" + raise ValueError(msg) - if favorited: - await user_collection.update_one( - {"_id": user_sub}, - {"$addToSet": {"fav_apps": app_id}}, - ) - else: - await user_collection.update_one( - {"_id": user_sub}, - {"$pull": {"fav_apps": app_id}}, - ) @staticmethod - async def delete_app(app_id: str, user_sub: str) -> None: + async def delete_app(app_id: uuid.UUID, user_sub: str) -> None: """ 删除应用 @@ -413,7 +419,7 @@ class AppCenterManager: @staticmethod - async def _get_app_data(app_id: str, user_sub: str, *, check_permission: bool = True) -> App: + async def _get_app_data(app_id: uuid.UUID, user_sub: str, *, check_permission: bool = True) -> App: """ 从数据库获取应用数据并验证权限 @@ -437,7 +443,7 @@ class AppCenterManager: @staticmethod - def _build_common_metadata_params(app_id: str, user_sub: str, source: AppPool | AppData) -> dict: + def _build_common_metadata_params(app_id: uuid.UUID, user_sub: str, source: App) -> dict: """构建元数据通用参数""" return { "type": MetadataType.APP, @@ -454,7 +460,8 @@ class AppCenterManager: def _create_flow_metadata( common_params: dict, data: AppData | None = None, - app_data: AppPool | None = None, + app_data: App | None = None, + *, published: bool | None = None, ) -> AppMetadata: """创建工作流应用的元数据""" @@ -497,6 +504,7 @@ class AppCenterManager: user_sub: str, data: AppData | None = None, app_data: App | None = None, + *, published: bool | None = None, ) -> AgentAppMetadata: """创建 Agent 应用的元数据""" diff --git a/apps/services/rag.py b/apps/services/rag.py index a923317d..9691ce8d 100644 --- a/apps/services/rag.py +++ b/apps/services/rag.py @@ -29,7 +29,8 @@ class RAG: """系统提示词""" user_prompt = """' - 你是openEuler社区的智能助手。请结合给出的背景信息, 回答用户的提问,并且基于给出的背景信息在相关句子后进行脚注。 + 你是openEuler社区的智能助手。请结合给出的背景信息, 回答用户的提问,\ +并且基于给出的背景信息在相关句子后进行脚注。 一个例子将在中给出。 上下文背景信息将在中给出。 用户的提问将在中给出。 -- Gitee