diff --git a/apps/entities/appcenter.py b/apps/entities/appcenter.py index 5445fb2808f03b4ec000e7c16fb3e53f831b47cb..a34be181cf0c2a21f99ff16b3521fa6892144c24 100644 --- a/apps/entities/appcenter.py +++ b/apps/entities/appcenter.py @@ -6,7 +6,7 @@ from typing import Optional from pydantic import BaseModel, Field -from apps.entities.enum_var import AppPermissionType +from apps.entities.enum_var import PermissionType from apps.entities.flow import AppLink @@ -25,8 +25,8 @@ class AppCenterCardItem(BaseModel): class AppPermissionData(BaseModel): """应用权限数据结构""" - type: AppPermissionType = Field( - default=AppPermissionType.PRIVATE, + type: PermissionType = Field( + default=PermissionType.PRIVATE, alias="visibility", description="可见性(public/private/protected)", ) diff --git a/apps/entities/enum_var.py b/apps/entities/enum_var.py index d4ec50e68d9bb40ef74d7ef38253d5da3f61d7c9..9979fee5a677992c76d1e6ba6219106d2e2b079e 100644 --- a/apps/entities/enum_var.py +++ b/apps/entities/enum_var.py @@ -71,14 +71,6 @@ class MetadataType(str, Enum): APP = "app" -class AppPermissionType(str, Enum): - """App的权限类型""" - - PROTECTED = "protected" - PUBLIC = "public" - PRIVATE = "private" - - class EdgeType(str, Enum): """边类型 @@ -89,6 +81,18 @@ class EdgeType(str, Enum): LOOP = "loop" +class NodeType(str, Enum): + """节点类型 + + 注:此处为临时定义,待扩展 + """ + + START = "start" + END = "end" + NORMAL = "normal" + CHOICE = "choice" + + class SaveType(str, Enum): """检查类型""" @@ -97,6 +101,12 @@ class SaveType(str, Enum): FLOW = "flow" +class PermissionType(str, Enum): + """权限类型""" + + PROTECTED = "protected" + PUBLIC = "public" + PRIVATE = "private" class SearchType(str, Enum): """搜索类型""" diff --git a/apps/entities/flow.py b/apps/entities/flow.py index 1846a0eb3a9776dc3b1b467a5c5988961a15633b..2fe35f8709f73fd708dcb98f95932d29ce12b4f2 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -7,7 +7,7 @@ from typing import Any, Optional from pydantic import BaseModel, Field, HttpUrl from apps.entities.enum_var import ( - AppPermissionType, + PermissionType, EdgeType, MetadataType, ) @@ -16,8 +16,8 @@ from apps.entities.enum_var import ( class StepPos(BaseModel): """节点在画布上的位置""" - x: int = Field(description="节点在画布上的X坐标") - y: int = Field(description="节点在画布上的Y坐标") + x: float = Field(description="节点在画布上的X坐标") + y: float = Field(description="节点在画布上的Y坐标") class Edge(BaseModel): @@ -34,6 +34,7 @@ class Step(BaseModel): id: str = Field(description="Step的ID") node: str = Field(description="Step的Node ID") + type: str = Field(description="Step的类型") name: str = Field(description="Step的名称") description: str = Field(description="Step的描述") pos: StepPos = Field(description="Step在画布上的位置", default=StepPos(x=0, y=0)) @@ -93,10 +94,14 @@ class ServiceApiAuthKeyVal(BaseModel): class ServiceApiAuth(BaseModel): """Service的API鉴权方式""" - header: list[ServiceApiAuthKeyVal] = Field(description="HTTP头鉴权配置", default=[]) - cookie: list[ServiceApiAuthKeyVal] = Field(description="HTTP Cookie鉴权配置", default=[]) - query: list[ServiceApiAuthKeyVal] = Field(description="HTTP URL参数鉴权配置", default=[]) - oidc: Optional[ServiceApiAuthOidc] = Field(description="OIDC鉴权配置", default=None) + header: list[ServiceApiAuthKeyVal] = Field( + description="HTTP头鉴权配置", default=[]) + cookie: list[ServiceApiAuthKeyVal] = Field( + description="HTTP Cookie鉴权配置", default=[]) + query: list[ServiceApiAuthKeyVal] = Field( + description="HTTP URL参数鉴权配置", default=[]) + oidc: Optional[ServiceApiAuthOidc] = Field( + description="OIDC鉴权配置", default=None) class ServiceApiConfig(BaseModel): @@ -123,7 +128,8 @@ class AppLink(BaseModel): class AppPermission(BaseModel): """App的权限配置""" - type: AppPermissionType = Field(description="权限类型", default=AppPermissionType.PRIVATE) + type: PermissionType = Field( + description="权限类型", default=PermissionType.PRIVATE) users: list[str] = Field(description="可访问的用户列表", default=[]) @@ -134,7 +140,8 @@ class AppMetadata(MetadataBase): links: list[AppLink] = Field(description="相关链接", default=[]) first_questions: list[str] = Field(description="首次提问", default=[]) history_len: int = Field(description="对话轮次", default=3, le=10) - permissions: Optional[AppPermission] = Field(description="应用权限配置", default=None) + permissions: Optional[AppPermission] = Field( + description="应用权限配置", default=None) class ServiceApiSpec(BaseModel): @@ -145,45 +152,3 @@ class ServiceApiSpec(BaseModel): size: int = Field(description="OpenAPI文件大小(单位:KB)") path: str = Field(description="OpenAPI文件路径") hash: str = Field(description="OpenAPI文件的hash值") - -class PositionItem(BaseModel): - """请求/响应中的前端相对位置变量类""" - x:float - y:float -class FlowItem(BaseModel): - """请求/响应中的流变量类""" - flow_id:str=Optional[Field](alias="flowId") - name:str - description:str - enable:bool - created_at: str= Field(alias="createdAt") -class BranchItem(BaseModel): - """请求/响应中的节点分支变量类""" - branch_id:str=Field(alias="branchId") - type:str - description:str -class DependencyItem(BaseModel): - """请求/响应中的节点依赖变量类""" - node_id:str=Field(alias="nodeId") - type:str -class NodeItem(BaseModel): - """请求/响应中的节点变量类""" - node_id:str=Field(alias="nodeId") - api_id:str=Field(alias="apiId") - name:str - type:str - description:str - enable:str - branches:list[BranchItem] - depedency:DependencyItem - position:PositionItem - editable:bool - created_at: str= Field(alias="createdAt") -class EdgeItem(BaseModel): - """请求/响应中的边变量类""" - egde_id:str=Field(alias="edgeId") - source_node:str=Field(alias="sourceNode") - target_node:str=Field(alias="targetNode") - type:str - branch_id:str=Field(alias="branchId") - created_at: str= Field(alias="createdAt") \ No newline at end of file diff --git a/apps/entities/flow_topology.py b/apps/entities/flow_topology.py new file mode 100644 index 0000000000000000000000000000000000000000..9369a1029a4172cca4f7d8fe5f9c83e0acabcec9 --- /dev/null +++ b/apps/entities/flow_topology.py @@ -0,0 +1,78 @@ +"""前端展示flow用到的数据结构 + +Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +""" +from typing import Any, Optional +from typing import Optional + +from pydantic import BaseModel, Field + +from apps.entities.enum_var import EdgeType, NodeType + + +class ServiceItem(BaseModel): + """元数据归属的服务类""" + service_id: str = Field(alias="serviceId") + name: str + type: str + created_at: Optional[float] = Field(alias="createdAt") + + +class NodeMetaDataItem(BaseModel): + """节点元数据类""" + api_id: str = Field(alias="apiId") + name: str + type: str + description: str + parameters_template: str = Field(alias="parametersTemplate") + editable: bool = Field(default=True) + created_at: Optional[float] = Field(alias="createdAt") + + +class PositionItem(BaseModel): + """请求/响应中的前端相对位置变量类""" + x: float + y: float + + +class DependencyItem(BaseModel): + """请求/响应中的节点依赖变量类""" + node_id: str = Field(alias="nodeId") + type: str + + +class NodeItem(BaseModel): + """请求/响应中的节点变量类""" + node_id: str = Field(alias="nodeId") + api_id: str = Field(alias="apiId") + name: str + type: str = Field(default=NodeType.NORMAL.value) + description: str = Field(default='') + enable: str = Field(default=True) + parameters: dict[str, Any] + depedency: Optional[DependencyItem] = None + position: PositionItem + editable: bool = Field(default=True) + created_at: Optional[float] = Field(alias="createdAt") + + +class EdgeItem(BaseModel): + """请求/响应中的边变量类""" + edge_id: str = Field(alias="edgeId") + source_node: str = Field(alias="sourceNode") + target_node: str = Field(alias="targetNode") + type: str = Field(default=EdgeType.NORMAL.value) + branch_id: str = Field(alias="branchId") + created_at: Optional[float] = Field(alias="createdAt") + + +class FlowItem(BaseModel): + """请求/响应中的流变量类""" + flow_id: Optional[str] = Field(alias="flowId") + name: str + description: str + enable: bool = Field(default=True) + editable: bool = Field(default=True) + created_at: Optional[float] = Field(alias="createdAt") + nodes: list[NodeItem] + edeges: list[EdgeItem] diff --git a/apps/entities/pool.py b/apps/entities/pool.py index f8fc1709cdb616a60896e4b9bc2d7601cf5b8f34..851c7c13828526860a7fb9ff0d27fdb174999546 100644 --- a/apps/entities/pool.py +++ b/apps/entities/pool.py @@ -7,14 +7,15 @@ from typing import Any, Optional from pydantic import BaseModel, Field -from apps.entities.enum_var import CallType -from apps.entities.flow import AppLink, AppPermission +from apps.entities.enum_var import CallType, PermissionType +from apps.entities.flow_topology import PositionItem +from apps.entities.flow import AppLink class PoolBase(BaseModel): """Pool的基础信息""" - id: str + id: str = Field(alias='_id') name: str description: str created_at: float = Field(default_factory=lambda: round(datetime.now(tz=timezone.utc).timestamp(), 3)) @@ -28,6 +29,14 @@ class ServiceApiInfo(BaseModel): path: str = Field(description="OpenAPI文件路径") +class Permission(BaseModel): + """App的权限配置""" + + type: PermissionType = Field( + description="权限类型", default=PermissionType.PRIVATE) + users: list[str] = Field(description="可访问的用户列表", default=[]) + + class ServicePool(PoolBase): """外部服务信息 @@ -36,7 +45,11 @@ class ServicePool(PoolBase): author: str api: list[ServiceApiInfo] = Field(description="API信息列表", default=[]) - hashes: dict[str, str] = Field(description="关联文件的hash值;Service作为整体更新或删除", default={}) + permissions: Optional[Permission] = Field( + description="用户与服务的权限关系", default=[]) + favorites: list[str] = Field(description="收藏此应用的用户列表", default=[]) + hashes: dict[str, str] = Field( + description="关联文件的hash值;Service作为整体更新或删除", default={}) class NodePool(PoolBase): @@ -55,7 +68,8 @@ class NodePool(PoolBase): type: CallType = Field(description="Call的类型") base_node_id: Optional[str] = Field(description="基类Node的ID", default=None) input_schema: dict[str, Any] = Field(description="输入参数的schema", default={}) - output_schema: dict[str, Any] = Field(description="输出参数的schema", default={}) + output_schema: dict[str, Any] = Field( + description="输出参数的schema", default={}) params: dict[str, Any] = Field(description="参数", default={}) params_schema: dict[str, Any] = Field(description="参数的schema", default={}) path: str = Field(description="Node的路径;包括Node的作用域等") @@ -66,6 +80,8 @@ class AppFlow(PoolBase): enabled: bool = Field(description="是否启用", default=True) path: str = Field(description="Flow的路径") + focus_point: PositionItem = Field( + description="Flow的视觉焦点", default=PositionItem(x=0, y=0)) class AppPool(PoolBase): @@ -75,12 +91,14 @@ class AppPool(PoolBase): """ author: str = Field(description="作者的用户ID") + type: str = Field(description="应用类型", default="default") icon: str = Field(description="应用图标") published: bool = Field(description="是否发布", default=False) links: list[AppLink] = Field(description="相关链接", default=[]) first_questions: list[str] = Field(description="推荐问题", default=[]) history_len: int = Field(3, ge=1, le=10, description="对话轮次(1~10)") - permission: AppPermission = Field(description="应用权限配置", default=AppPermission()) + permission: Permission = Field( + description="应用权限配置", default=Permission(type=PermissionType.PRIVATE.value,users=[])) flows: list[AppFlow] = Field(description="Flow列表", default=[]) favorites: list[str] = Field(description="收藏此应用的用户列表", default=[]) hashes: dict[str, str] = Field(description="关联文件的hash值", default={}) diff --git a/apps/entities/request_data.py b/apps/entities/request_data.py index 4282d02b2036d33ab2ca910f3f0f4aa5e40fbbde..68161cb223ca39c235515a0446e9eaaa884dec86 100644 --- a/apps/entities/request_data.py +++ b/apps/entities/request_data.py @@ -116,11 +116,8 @@ class PostKnowledgeIDData(BaseModel): class PutFlowReq(BaseModel): """创建/修改流拓扑结构""" - flow_id:Optional[str]=Field(alias="flowId") - flow:FlowItem - nodes:list[NodeItem] - edges:list[EdgeItem] - focus_point:PositionItem=Field(alias="focusPoint") -class PutNodeParameterReq: - """修改节点的参数""" - content:str \ No newline at end of file + flow: FlowItem + nodes: list[NodeItem] + edges: list[EdgeItem] + focus_point: PositionItem = Field(alias="focusPoint") + \ No newline at end of file diff --git a/apps/entities/response_data.py b/apps/entities/response_data.py index f7ef82bf1b1d9bbee6fb8bae87fee49cce2decd5..c47b995f7af6b1f119a9b2169303a02687584e6f 100644 --- a/apps/entities/response_data.py +++ b/apps/entities/response_data.py @@ -9,8 +9,8 @@ from pydantic import BaseModel, Field from apps.entities.appcenter import AppCenterCardItem, AppData from apps.entities.collection import Blacklist, Document, NodeMetaData from apps.entities.enum_var import DocumentStatus -from apps.entities.flow import EdgeItem, FlowItem, NodeItem, PositionItem from apps.entities.record import RecordData +from apps.entities.flow_topology import ServiceItem, NodeMetaDataItem, PositionItem, FlowItem class ResponseData(BaseModel): @@ -57,6 +57,7 @@ class PostClientSessionRsp(ResponseData): result: PostClientSessionMsg + class AuthUserMsg(BaseModel): """GET /api/auth/user Result数据结构""" @@ -75,6 +76,7 @@ class HealthCheckRsp(BaseModel): status: str + class GetBlacklistUserMsg(BaseModel): """GET /api/blacklist/user Result数据结构""" @@ -107,6 +109,7 @@ class ConversationListItem(BaseModel): doc_count: int = Field(alias="docCount") created_time: str = Field(alias="createdTime") + class ConversationListMsg(BaseModel): """GET /api/conversation Result数据结构""" @@ -142,6 +145,7 @@ class AddConversationRsp(ResponseData): result: AddConversationMsg + class UpdateConversationRsp(ResponseData): """POST /api/conversation 返回数据结构""" @@ -153,6 +157,7 @@ class RecordListMsg(BaseModel): records: list[RecordData] + class RecordListRsp(ResponseData): """GET /api/record/{conversation_id} 返回数据结构""" @@ -204,6 +209,7 @@ class UploadDocumentMsg(BaseModel): documents: list[UploadDocumentMsgItem] + class UploadDocumentRsp(ResponseData): """POST /api/document/{conversation_id} 返回数据结构""" @@ -227,6 +233,7 @@ class GetKnowledgeIDMsg(BaseModel): kb_id: str + class GetKnowledgeIDRsp(ResponseData): """GET /api/knowledge 返回数据结构""" @@ -304,73 +311,68 @@ class GetRecentAppListRsp(ResponseData): result: RecentAppList -class NodeMetaDataItem(BaseModel): - """GET /api/flow/node/metadata 单个节点元数据结构""" - api_id: str = Field(alias="apiId") - name:str - type:str - created_at: str= Field(alias="createdAt") -class ServiceNodeMetaDatasItem(BaseModel): +class NodeServiceListMsg(BaseModel): + """GET /api/flow/service result""" + total: int + services: list[ServiceItem] + + +class NodeServiceListRsp(ResponseData): + """GET /api/flow/service 返回数据结构""" + result: NodeServiceListMsg + + +class NodeMetaDataListMsg(BaseModel): + """GET /api/flow/service/node result""" + service_id: str = Field(alias="serviceId") + node_meta_datas: list[NodeMetaDataItem] + + +class ServiceNodeMetaDatasMsg(BaseModel): """GET /api/flow/node/metadata 服务与服务下节点元数据结构""" - service_id:str=Field(alias="serviceId") - name:str - type:str - node_meta_datas:list[NodeMetaDataItem]=Field(alias="nodeMetaData",default=[]) - created_at: str= Field(alias="createdAt") + service_id: str = Field(alias="serviceId") + name: str + type: str + node_meta_datas: list[NodeMetaDataItem] = Field( + alias="nodeMetaDatas", default=[]) + created_at: Optional[float] = Field(alias="createdAt") + + class NodeMetaDataListMsg(ResponseData): - services:list[ServiceNodeMetaDatasItem] + services: list[ServiceNodeMetaDatasMsg] + + class NodeMetaDataListRsp(ResponseData): - """GET /api/flow/node/metadata 返回数据结构""" - result:NodeMetaDataListMsg + """GET /api/flow/service/node 返回数据结构""" + result: NodeMetaDataListMsg class FlowStructureGetMsg(BaseModel): - """GET /api/flow/{flowId} result""" - flow:FlowItem - nodes:list[NodeItem] - edges:list[EdgeItem] - focus_point:PositionItem=Field(alias="focusPoint") - -class FlowStructureGetRsp(BaseModel): - """GET /api/flow/{flowId} 返回数据结构""" - result:FlowStructureGetMsg + """GET /api/flow result""" + flow: FlowItem + focus_point: PositionItem + + +class FlowStructureGetRsp(ResponseData): + """GET /api/flow 返回数据结构""" + result: FlowStructureGetMsg + + class FlowStructurePutMsg(BaseModel): """PUT /api/flow result""" - flow_id:str=Field(alias="flowId") + flow_id: str = Field(alias="flowId") + + class FlowStructurePutRsp(ResponseData): """PUT /api/flow 返回数据结构""" - flow_id:str=Field(alias="flowId") + result: FlowStructurePutMsg + class FlowStructureDeleteMsg(BaseModel): """DELETE /api/flow/{flowId} result""" - flow_id:str=Field(alias="flowId") + flow_id: str = Field(alias="flowId") + + class FlowStructureDeleteRsp(ResponseData): """DELETE /api/flow/{flowId} 返回数据结构""" - flow_id:str=Field(alias="flowId") - -class NodeParameterItem(BaseModel): - parameter_id:str=Field(alias="parameterId") - content:str - updated_at: str= Field(alias="updatedAt") -class NodeParameterGetMsg(BaseModel): - """GET /api/flow/node/parameter result""" - node_id:str=Field(alias="nodeId") - parameter:NodeParameterItem -class NodeParameterGetRsp(ResponseData): - """GET /api/flow/node/parameter 返回数据结构""" - result:NodeParameterGetMsg -class NodeParameterListMsg(BaseModel): - """GET /api/flow/node/parameter/history result""" - node_id:str=Field(alias="nodeId") - parameter_history:list[NodeParameterItem]=Field(alias="parameterHistory") -class NodeParameterListRsp(ResponseData): - """GET /api/flow/node/parameter/history 返回数据结构""" - result:NodeParameterListMsg - -class NodeParameterPutMsg(BaseModel): - """PUT /api/flow/node/parameter result""" - node_id:str=Field(alias="nodeId") - parameter_id:str=Field(alias="parameterId") -class NodeParameterPutRsp(ResponseData): - """PUT /api/flow/node/parameter 返回数据结构""" - result:NodeParameterPutMsg + result: FlowStructureDeleteMsg diff --git a/apps/manager/application.py b/apps/manager/application.py new file mode 100644 index 0000000000000000000000000000000000000000..1716657e27cc5f840aecf6f9881617d9fb402435 --- /dev/null +++ b/apps/manager/application.py @@ -0,0 +1,41 @@ +"""flow Manager + +Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +""" +from pymongo import ASCENDING + +from apps.constants import LOGGER +from apps.models.mongo import MongoDB +from apps.entities.enum_var import PermissionType + + +class AppManager: + + @staticmethod + async def validate_user_app_access(user_sub: str, app_id: str) -> bool: + """验证用户对应用的访问权限 + + :param user_sub: 用户唯一标识符 + :param app_id: 应用id + :return: 如果用户具有所需权限则返回True,否则返回False + """ + try: + app_collection = MongoDB.get_collection("app") # 获取应用集合' + match_conditions = [ + {"author": user_sub}, + {"permissions.type": PermissionType.PUBLIC.value}, + { + "$and": [ + {"permissions.type": PermissionType.PROTECTED.value}, + {"permissions.users": user_sub} + ] + } + ] + query = {"$and": [{"_id": app_id}, + {"$or": match_conditions}]} + + result = await app_collection.count_documents(query) + return (result > 0) + except Exception as e: + LOGGER.error(f"Validate user app access failed due to: {e}") + return False diff --git a/apps/manager/flow.py b/apps/manager/flow.py new file mode 100644 index 0000000000000000000000000000000000000000..d8c35b92c2da083106bd258a46e491d576daeda1 --- /dev/null +++ b/apps/manager/flow.py @@ -0,0 +1,328 @@ +"""flow Manager + +Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +""" +from typing import Tuple, List +from pymongo import ASCENDING + +from apps.constants import LOGGER +from apps.entities.flow import StepPos, Edge, Step, Flow +from apps.entities.pool import AppFlow +from apps.entities.flow_topology import ServiceItem, NodeMetaDataItem, FlowItem, NodeItem, EdgeItem, PositionItem +from apps.models.mongo import MongoDB +from apps.entities.enum_var import PermissionType + + +class FlowManager: + + @staticmethod + async def validate_user_service_access(user_sub: str, service_id: str) -> bool: + """验证用户对服务的访问权限 + + :param user_sub: 用户唯一标识符 + :param service_id: 服务id + :return: 如果用户具有所需权限则返回True,否则返回False + """ + service_collection = MongoDB.get_collection("service") + + try: + service_collection = MongoDB.get_collection("service") + match_conditions = [ + {"author": user_sub}, + {"permissions.type": PermissionType.PUBLIC.value}, + { + "$and": [ + {"permissions.type": PermissionType.PROTECTED.value}, + {"permissions.users": user_sub} + ] + } + ] + query = {"$and": [{"_id": service_id}, + {"$or": match_conditions}, + {"favorites": user_sub}]} + + result = await service_collection.count_documents(query) + return (result > 0) + except Exception as e: + LOGGER.error(f"Validate user service access failed due to: {e}") + return False + + @staticmethod + async def get_service_by_user_id(user_sub: str, page: int, page_size: int) -> Tuple[int, List[ServiceItem]]: + """根据用户ID获取用户有执行权限且收藏的服务列表 + + :param user_sub: 用户唯一标识符 + :param page: 当前页码 + :param page_size: 每页大小 + :return: 返回符合条件的服务总数和分页后的服务列表 + """ + service_collection = MongoDB.get_collection("service") + try: + skip_count = (page - 1) * page_size + match_conditions = [ + {"author": user_sub}, + {"permissions.type": "PUBLIC"}, + { + "$and": [ + {"permissions.type": "PROTECTED"}, + {"permissions.users": user_sub} + ] + } + ] + query = {"$and": [{"$or": match_conditions}, + {"favorites": user_sub}]} + + total_services = await service_collection.count_documents(query) + service_records_cursor = service_collection.find( + query, + skip=skip_count, + limit=page_size, + sort=[("created_at", ASCENDING)] + ) + service_records = await service_records_cursor.to_list(length=None) + service_items = [ + ServiceItem( + service_id=str(record["_id"]), + name=record["name"], + type=record["type"], + created_at=record["created_at"] + ) + for record in service_records + ] + + return total_services, service_items + + except Exception as e: + LOGGER.error(f"Get service by user id failed due to: {e}") + return None + + @staticmethod + async def get_node_meta_datas_by_service_id(service_id: str) -> List[NodeMetaDataItem]: + """serviceId获取service的接口数据,并将接口转换为节点元数据 + + :param service_id: 服务id + :return: 节点元数据的列表 + """ + node_pool_collection = MongoDB.get_collection("node") # 获取节点集合 + try: + cursor = node_pool_collection.find( + {"service": service_id}).sort("created_at", ASCENDING) # 查询指定service_id的所有node_poool + + nodes_meta_data_items = [] + async for node_pool_record in cursor: + # 将每个node_pool换成NodeMetaDataItem实例 + node_meta_data_item = NodeMetaDataItem( + api_id=node_pool_record["id"], + name=node_pool_record['name'], + type=node_pool_record['type'], + description=node_pool_record['description'], + parameters_template=node_pool_record['parameters_template'], + editable=True, + created_at=node_pool_record['created_at'] + ) + nodes_meta_data_items.append(node_meta_data_item) + + return nodes_meta_data_items + + except Exception as e: + LOGGER.error( + f"Get node metadatas by service_id failed due to: {e}") + return None + + @staticmethod + async def get_flow_by_app_and_flow_id(app_id: str, flow_id: str) -> Tuple[FlowItem, PositionItem]: + """通过appId flowId获取flow config的路径和focus,并通过flow config的路径获取flow config,并将其转换为flow item。 + + :param app_id: 应用的id + :param flow_id: 流的id + :return: 流的item和用户在这个流上的视觉焦点 + """ + try: + flow_collection = MongoDB.get_collection("app") + flow_record = await flow_collection.aggregate([ + {"$match": {"_id": app_id, "flows._id": flow_id}}, + {"$unwind": "$flows"}, + {"$match": {"flows._id": flow_id}}, + {"$limit": 1}, + ]).to_list(length=1) + except Exception as e: + LOGGER.error( + f"Get flow by app_id and flow_id failed due to: {e}") + return None + if flow_record: + flow_config = await get_flow_config(app_id, flow_id) + if not flow_config: + LOGGER.error( + "Get flow config by app_id and flow_id failed") + return None + focus_point = flow_record["focus_point"] + flow_item = FlowItem( + flow_id=flow_id, + name=flow_config.name, + description=flow_config.description, + enable=True, + editable=True, + nodes=[], + edeges=[] + ) + for node_config in flow_config.steps: + node_item = NodeItem( + node_id=node_config.node_id, + app_id=node_config.node, + name=node_config.name, + description=node_config.description, + enable=True, + editable=True, + type=node_config.type, + parameters=node_config.params, + position=PositionItem( + x=node_config.position.x, y=node_config.position.y) + ) + flow_item.nodes.append(node_item) + + for edge_config in flow_config.edges: + tmp_list = edge_config.edge_from.split('.') + branch_id = tmp_list[1] if len(tmp_list) > 2 else '' + flow_item.edeges.append(EdgeItem( + edge_id=edge_config.id, + source_node=edge_config.edge_from, + target_node=edge_config.edge_to, + type=edge_config.type.value, + branch_id=branch_id, + )) + flow_item.nodes.append(node_item) + return (flow_item, focus_point) + return None + + @staticmethod + async def put_flow_by_app_and_flow_id(app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> str: + """存储/更新flow的数据库数据和配置文件 + + :param app_id: 应用的id + :param flow_id: 流的id + :param flow_item: 流的item + :return: 流的id + """ + try: + flow_collection = MongoDB.get_collection("app") + flow_record = await flow_collection.aggregate([ + {"$match": {"_id": app_id, "flows._id": flow_id}}, + {"$unwind": "$flows"}, + {"$match": {"flows._id": flow_id}}, + {"$limit": 1}, + ]).to_list(length=1) + except Exception as e: + LOGGER.error( + f"Get flow by app_id and flow_id failed due to: {e}") + return None + try: + flow_config = Flow( + name=flow_item.name, + description=flow_item.description, + steps=[], + edges=[], + ) + for node_item in flow_item.nodes: + edge_config = Step( + id=node_item.node_id, + node=node_item.api_id, + name=node_item.name, + description=node_item.description, + pos=StepPos(x=node_item.position.x, + y=node_item.position.y), + params=node_item.parameters + ) + flow_config.steps.append(edge_config) + for edge_item in flow_item.edeges: + edge_from = edge_item.source_node + if edge_item.branch_id: + edge_from = edge_from+'.'+edge_item.branch_id + edge_config = Edge( + id=edge_item.edge_id, + edge_from=edge_from, + edge_to=edge_item.target_node, + edge_type=edge_item.type + ) + flow_config.edges.append(edge_config) + if flow_record: + result = await update_flow_config(app_id, flow_id, flow_config) + if not result: + LOGGER.error(f"Update flow config failed") + return None + app_collection = await MongoDB.get_collection("app") + result = await app_collection.update_one( + {'_id': app_id}, + { + '$set': { + 'flows.$[flow].focus_point': focus_point + } + }, + array_filters=[{'flow._id': flow_id}] + ) + if result.modified_count > 0: + return flow_id + else: + return None + else: + new_path = await add_flow_config(app_id, flow_id, flow_config) + if not new_path: + LOGGER.error(f"Add flow config failed") + return None + new_flow = AppFlow( + id=flow_id, + name=flow_item.name, + description=flow_item.description, + path=new_path, + focus_point=PositionItem(x=focus_point.x, y=focus_point.y), + ) + result = await app_collection.update_one( + {'_id': app_id}, + { + '$push': { + 'flows': new_flow.model_dump(by_alias=True) + } + } + ) + if result.modified_count > 0: + return flow_id + else: + return None + except Exception as e: + LOGGER.error( + f"Put flow by app_id and flow_id failed due to: {e}") + return flow_id + + @staticmethod + async def delete_flow_by_app_and_flow_id(app_id: str, flow_id: str) -> str: + """删除flow的数据库数据和配置文件 + + :param app_id: 应用的id + :param flow_id: 流的id + :return: 流的id + """ + + try: + result = await delete_flow_config(app_id, flow_id) + if not result: + LOGGER.error(f"Delete flow config failed") + return None + app_pool_collection = await MongoDB.get_collection("app") # 获取集合 + + # 执行删除操作,从指定的AppPool文档中移除匹配的AppFlow + result = await app_pool_collection.update_one( + {'_id': app_id}, # 查询条件,找到要更新的AppPool文档 + { + '$pull': { + 'flows': {'id': flow_id} # 假设'flows'数组中每个对象都有'id'字段 + } + } + ) + + if result.modified_count > 0: + return flow_id + else: + return None + except Exception as e: + LOGGER.error( + f"Delete flow by app_id and flow_id failed due to: {e}") + return None diff --git a/apps/routers/flow.py b/apps/routers/flow.py index 592b550ca7859605b280daf3e684b929889efcd7..521dad9016dfbcdf035b100e693c39ceb254fd95 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -1,17 +1,20 @@ -"""FastAPI 用户画像相关API +"""FastAPI Flow拓扑结构展示API Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from fastapi import APIRouter, Depends, status,Path,Query,Body +from fastapi import APIRouter, Depends, status, Path, Query, Body from fastapi.responses import JSONResponse +from typing import Annotated, Optional from apps.dependency.csrf import verify_csrf_token from apps.dependency.user import verify_user -from apps.entities.request_data import PutFlowReq,PutNodeParameterReq -from apps.entities.response_data import NodeMetaDataListRsp,FlowStructureGetRsp,FlowStructurePutRsp,\ - FlowStructureDeleteRsp,NodeParameterGetRsp,NodeParameterListRsp,NodeParameterPutRsp -from apps.manager.domain import DomainManager - +from apps.dependency import get_user +from apps.entities.request_data import PutFlowReq +from apps.entities.response_data import NodeServiceListRsp, NodeServiceListMsg, NodeMetaDataListRsp, NodeMetaDataListMsg, FlowStructureGetRsp, \ + FlowStructureGetMsg, FlowStructurePutRsp, FlowStructurePutMsg, FlowStructureDeleteRsp, FlowStructureDeleteMsg, ResponseData +from apps.manager.flow import FlowManager +from apps.manager.application import AppManager +from apps.utils.flow import FlowService router = APIRouter( prefix="/api/flow", tags=["flow"], @@ -22,43 +25,150 @@ router = APIRouter( ) -@router.get("/node/metadata",response_model=NodeMetaDataListRsp) -async def get_node_metadatas(page:int =Query(...), - page_size:int =Query(...,alias="pageSize")): # noqa: ANN201 - """获取节点元数据""" - pass +@router.get("/service", response_model=NodeMetaDataListRsp, responses={ + status.HTTP_404_NOT_FOUND: {"model": ResponseData}, +}) +async def get_node_metadatas( + user_sub: Annotated[str, Depends(get_user)], + page: int = Query(...), + page_size: int = Query(..., alias="pageSize"), +): + """获取用户可访问的节点元数据所在服务的信息""" + result = await FlowManager.get_node_meta_datas_by_service_id(user_sub, page, page_size) + if result is None: + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + code=status.HTTP_404_NOT_FOUND, + message="节点元数据所在服务信息不存在", + result={}, + ).model_dump(exclude_none=True, by_alias=True)) + return JSONResponse(status_code=status.HTTP_200_OK, content=NodeServiceListRsp( + code=status.HTTP_200_OK, + message="节点元数据所在服务信息获取成功", + result=NodeServiceListMsg(total=result[0], services=result[1]) + ).model_dump(exclude_none=True, by_alias=True)) + + +@router.get("/service/node", response_model=NodeMetaDataListRsp, responses={ + status.HTTP_403_FORBIDDEN: {"model": ResponseData}, + status.HTTP_404_NOT_FOUND: {"model": ResponseData} +}) +async def get_flow( + user_sub: Annotated[str, Depends(get_user)], + service_id: int = Query(..., alias="serviceId") +): + """获取用户可访问的节点元数据""" + if not await AppManager.validate_user_app_access(user_sub, service_id): + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + code=status.HTTP_403_FORBIDDEN, + message="用户没有权限访问该服务", + result={}, + ).model_dump(exclude_none=True, by_alias=True)) + + result = await FlowManager.get_node_meta_datas_by_service_id(service_id) + if result is None: + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + code=status.HTTP_404_NOT_FOUND, + message="服务下节点元数据获取失败", + result={}, + ).model_dump(exclude_none=True, by_alias=True)) + return JSONResponse(status_code=status.HTTP_200_OK, content=NodeMetaDataListRsp( + code=status.HTTP_200_OK, + message="服务下节点元数据获取成功", + result=NodeMetaDataListMsg(node_meta_datas=result) + ).model_dump(exclude_none=True, by_alias=True)) + -@router.get("/{flowId}", response_model=FlowStructureGetRsp) -async def get_flow(flowId: str = Path(..., title="流的id")): - flow_id=flowId - pass +@router.get("", response_model=FlowStructureGetRsp, responses={ + status.HTTP_403_FORBIDDEN: {"model": ResponseData}, + status.HTTP_404_NOT_FOUND: {"model": ResponseData} +}) +async def get_flow( + user_sub: Annotated[str, Depends(get_user)], + app_id: str = Query(..., alias="appId"), + flow_id: str = Query(..., alias="flowId") +): + """获取流拓扑结构""" + if not await AppManager.validate_user_app_access(user_sub, app_id): + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + code=status.HTTP_403_FORBIDDEN, + message="用户没有权限访问该流", + result={}, + ).model_dump(exclude_none=True, by_alias=True)) + result = await FlowManager.get_flow_by_app_and_flow_id(app_id, flow_id) + if result is None: + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + code=status.HTTP_404_NOT_FOUND, + message="应用下流程获取失败", + result={}, + ).model_dump(exclude_none=True, by_alias=True)) + return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructureGetRsp( + code=status.HTTP_200_OK, + message="应用下流程获取成功", + result=FlowStructureGetMsg(flow=result[0], focus_point=result[1]) + ).model_dump(exclude_none=True, by_alias=True)) -@router.put("", response_model=FlowStructurePutRsp) -async def put_flow(flow_id:str = Query(..., alias="flowId"), - put_body: PutFlowReq=Body(...)): - pass -@router.delete("/{flowId}", response_model=FlowStructureDeleteRsp) -async def delte_flow(flowId: str = Path(..., title="流的id")): - flow_id=flowId - pass +@router.put("", response_model=FlowStructurePutRsp, responses={ + status.HTTP_400_BAD_REQUEST: {"model": ResponseData}, + status.HTTP_403_FORBIDDEN: {"model": ResponseData}, + status.HTTP_404_NOT_FOUND: {"model": ResponseData}, + status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": ResponseData} +}) +async def put_flow( + user_sub: Annotated[str, Depends(get_user)], + app_id: str = Query(..., alias="appId"), + flow_id: str = Query(..., alias="flowId"), + topology_check: Optional[bool] = Query(..., alias="topologyCheck"), + put_body: PutFlowReq = Body(...) +): + """修改流拓扑结构""" + if not await AppManager.validate_user_app_access(user_sub, app_id): + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + code=status.HTTP_403_FORBIDDEN, + message="用户没有权限访问该流", + result={}, + ).model_dump(exclude_none=True, by_alias=True)) + if topology_check: + await FlowService.validate_flow_connectivity(put_body.flow) + await FlowService.validate_flow_illegal(put_body.flow) + result = await FlowManager.put_flow_by_app_and_flow_id(app_id, flow_id, put_body.flow, put_body.focus_point) + if result is None: + return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=ResponseData( + code=status.HTTP_500_INTERNAL_SERVER_ERROR, + message="应用下流程更新失败", + result={}, + ).model_dump(exclude_none=True, by_alias=True)) + return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructurePutRsp( + code=status.HTTP_200_OK, + message="应用下流程更新成功", + result=FlowStructurePutMsg(flow_id=result) + ).model_dump(exclude_none=True, by_alias=True)) -@router.get("/node/parameter", response_model=NodeParameterGetRsp) -async def get_node_parameter(flow_id:str = Query(..., alias="flowId"), - node_id:str = Query(..., alias="nodeId")): - pass -@router.get("/node/parameter/history", response_model=NodeParameterListRsp) -async def get_node_parameter_history(flow_id:str = Query(..., alias="flowId"), - node_id:str = Query(..., alias="nodeId"), - start_time:str = Query(..., alias="startTime"), - end_time:str = Query(..., alias="endTime"), - page:int =Query(...), - page_size:int =Query(...,alias="pageSize"), - ): - pass -@router.put("/node/parameter", response_model=NodeParameterPutRsp) -async def put_node_parameter(flow_id:str = Query(..., alias="flowId"), - node_id:str = Query(..., alias="nodeId"), - put_body:PutNodeParameterReq=Body(...)): - pass +@router.delete("", response_model=FlowStructureDeleteRsp, responses={ + status.HTTP_404_NOT_FOUND: {"model": ResponseData} +}) +async def delete_flow( + user_sub: Annotated[str, Depends(get_user)], + app_id: str = Query(..., alias="appId"), + flow_id: str = Query(..., alias="flowId") +): + """删除流拓扑结构""" + if not await AppManager.validate_user_app_access(user_sub, app_id): + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + code=status.HTTP_403_FORBIDDEN, + message="用户没有权限访问该流", + result={}, + ).model_dump(exclude_none=True, by_alias=True)) + result = await FlowManager.delete_flow_by_app_and_flow_id(app_id, flow_id) + if result is None: + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + code=status.HTTP_404_NOT_FOUND, + message="应用下流程删除失败", + result={}, + ).model_dump(exclude_none=True, by_alias=True)) + return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructureDeleteRsp( + code=status.HTTP_200_OK, + message="应用下流程删除成功", + result=FlowStructureDeleteMsg(flow_id=result) + ).model_dump(exclude_none=True, by_alias=True)) diff --git a/apps/utils/flow.py b/apps/utils/flow.py new file mode 100644 index 0000000000000000000000000000000000000000..3ed20d780a01dadc2d9501fae5d9dbfb35949011 --- /dev/null +++ b/apps/utils/flow.py @@ -0,0 +1,104 @@ +"""flow拓扑相关函数 + +Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. +""" +from typing import Tuple, List + +from apps.constants import LOGGER +from apps.entities.enum_var import NodeType +from apps.entities.flow_topology import ServiceItem, NodeMetaDataItem, FlowItem, NodeItem, EdgeItem, PositionItem + + +class FlowService: + @staticmethod + async def validate_flow_illegal(flow_item: FlowItem) -> None: + node_id_set = set() + edge_id_set = set() + node_to_branches = dict() + edge_to_branch = dict() + num_of_start_node = 0 + num_of_end_node = 0 + id_of_start_node = None + id_of_end_node = None + node_in_degrees = {} + node_out_degrees = {} + for node in flow_item.nodes: + if node.node_id in node_id_set: + LOGGER.error(msg="节点id重复") + raise Exception(f"节点{node.name}的id重复") + node_id_set.add(node.node_id) + if node.type == NodeType.START.value: + num_of_start_node += 1 + id_of_start_node = node.node_id + if node.type == NodeType.END.value: + num_of_end_node += 1 + id_of_end_node = node.node_id + node_to_branches[node.node_id] = set() + if node.type == NodeType.CHOICE.value: + for branch in node.parameters['choices']: + if branch.branch in node_to_branches[node.node_id]: + LOGGER.error(msg=f"分支id重复") + raise Exception(f"节点{node.name}的分支{branch.branch }重复") + node_to_branches[node.node_id].add(branch.branch) + else: + node_to_branches[node.node_id].add('') + if num_of_start_node != 1 or num_of_end_node != 1: + LOGGER.error(msg="起始节点和终止节点数量不为1") + raise Exception("起始节点和终止节点数量不为1") + for edge in flow_item.edeges: + if edge.edge_id in edge_id_set: + LOGGER.error(msg="边id重复") + raise Exception(f"边{edge.edge_id}的id重复") + edge_id_set.add(edge.edge_id) + if edge.source_node == edge.target_node: + LOGGER.error(msg="边起始节点和终止节点相同") + raise Exception(f"边{edge.edge_id}的起始节点和终止节点相同") + if edge.source_node not in node_id_set: + LOGGER.error(msg=f"边{edge.edge_id}的起始节点{edge.source_node}不存在") + raise Exception(f"边{edge.edge_id}的起始节点{edge.source_node}不存在") + if edge.target_node not in node_id_set: + LOGGER.error(msg=f"边{edge.edge_id}的终止节点{edge.target_node}不存在") + raise Exception(f"边{edge.edge_id}的终止节点{edge.target_node}不存在") + if edge.branch_id not in node_to_branches[edge.source_node]: + LOGGER.error(msg=f"边{edge.edge_id}的分支{edge.branch_id}不存在") + raise Exception(f"边{edge.edge_id}的分支{edge.branch_id}不存在") + if edge.branch_id in edge_to_branch[edge.source_node]: + LOGGER.error(msg=f"边{edge.edge_id}的分支{edge.branch_id}重复") + raise Exception(f"边{edge.edge_id}的分支{edge.branch_id}重复") + if edge.source_node not in edge_to_branch: + edge_to_branch[edge.source_node] = set() + edge_to_branch[edge.source_node].add(edge.edge_id) + node_in_degrees[edge.target_node] = node_in_degrees.get( + edge.target_node, 0) + 1 + node_out_degrees[edge.source_node] = node_out_degrees.get( + edge.source_node, 0) + 1 + if node_in_degrees[id_of_start_node] != 0: + LOGGER.error(msg=f"起始节点{id_of_start_node}的入度不为0") + raise Exception(f"起始节点{id_of_start_node}的入度不为0") + if node_out_degrees[id_of_end_node] != 0: + LOGGER.error(msg=f"终止节点{id_of_end_node}的出度不为0") + raise Exception(f"终止节点{id_of_end_node}的出度不为0") + + @staticmethod + async def validate_flow_connectivity(flow_item: FlowItem) -> None: + id_of_start_node = None + id_of_end_node = None + node_in_degrees = {} + node_out_degrees = {} + for node in flow_item.nodes: + if node.type == NodeType.START.value: + id_of_start_node = node.node_id + if node.type == NodeType.END.value: + id_of_end_node = node.node_id + for edge in flow_item.edeges: + node_in_degrees[edge.target_node] = node_in_degrees.get( + edge.target_node, 0) + 1 + node_out_degrees[edge.source_node] = node_out_degrees.get( + edge.source_node, 0) + 1 + for node in flow_item.nodes: + if node.node_id != id_of_start_node and node.node_id not in node_in_degrees.keys(): + LOGGER.error(msg=f"节点{node.node_id}的入度为0") + raise Exception(f"节点{node.node_id}的入度为0") + if node.node_id != id_of_end_node and node.node_id not in node_out_degrees.keys(): + LOGGER.error(msg=f"节点{node.node_id}的出度为0") + raise Exception(f"节点{node.node_id}的出度为0")