diff --git a/apps/entities/collection.py b/apps/entities/collection.py index 7e486e49322d69ac75990d2f83dfba81f2595c05..48a3bfbd73f171f722855f55e74128967d38149c 100644 --- a/apps/entities/collection.py +++ b/apps/entities/collection.py @@ -183,28 +183,3 @@ class Domain(BaseModel): definition: str updated_at: float = Field(default_factory=lambda: round(datetime.now(tz=timezone.utc).timestamp(), 3)) - -class NodeMetaData(BaseModel): - """节点元数据""" - pass -class ServiceNodeMetaDatas(BaseModel): - """节点原数据""" - pass -class Position(BaseModel): - """前端相对位置""" - pass -class Flow(BaseModel): - """流的拓扑数据""" - pass -class Branch(BaseModel): - """节点分支信息""" - pass -class Dependency(BaseModel): - """节点伴生关系""" - pass -class Node(BaseModel): - """流拓扑中的节点数据""" - pass -class Edge(BaseModel): - """流拓扑中的边信息""" - pass diff --git a/apps/entities/flow_topology.py b/apps/entities/flow_topology.py index 9369a1029a4172cca4f7d8fe5f9c83e0acabcec9..91e14f242978d5d254088c3885b695ba9db63e68 100644 --- a/apps/entities/flow_topology.py +++ b/apps/entities/flow_topology.py @@ -21,26 +21,24 @@ class ServiceItem(BaseModel): class NodeMetaDataItem(BaseModel): """节点元数据类""" api_id: str = Field(alias="apiId") - name: str type: str + name: str description: str - parameters_template: str = Field(alias="parametersTemplate") + parameters_template: dict[str, Any] = Field(alias="parametersTemplate") editable: bool = Field(default=True) created_at: Optional[float] = Field(alias="createdAt") class PositionItem(BaseModel): """请求/响应中的前端相对位置变量类""" - x: float - y: float - + x: float = Field(default=0.0) + y: float = Field(default=0.0) class DependencyItem(BaseModel): """请求/响应中的节点依赖变量类""" node_id: str = Field(alias="nodeId") type: str - class NodeItem(BaseModel): """请求/响应中的节点变量类""" node_id: str = Field(alias="nodeId") @@ -48,12 +46,11 @@ class NodeItem(BaseModel): name: str type: str = Field(default=NodeType.NORMAL.value) description: str = Field(default='') - enable: str = Field(default=True) + enable: bool = Field(default=True) parameters: dict[str, Any] depedency: Optional[DependencyItem] = None position: PositionItem editable: bool = Field(default=True) - created_at: Optional[float] = Field(alias="createdAt") class EdgeItem(BaseModel): @@ -63,16 +60,15 @@ class EdgeItem(BaseModel): target_node: str = Field(alias="targetNode") type: str = Field(default=EdgeType.NORMAL.value) branch_id: str = Field(alias="branchId") - created_at: Optional[float] = Field(alias="createdAt") class FlowItem(BaseModel): """请求/响应中的流变量类""" flow_id: Optional[str] = Field(alias="flowId") - name: str - description: str + name: str = Field(default='') + description: str = Field(default='') enable: bool = Field(default=True) editable: bool = Field(default=True) + nodes: list[NodeItem] = Field(default=[]) + edges: list[EdgeItem] = Field(default=[]) created_at: Optional[float] = Field(alias="createdAt") - nodes: list[NodeItem] - edeges: list[EdgeItem] diff --git a/apps/entities/request_data.py b/apps/entities/request_data.py index 08540509a16cb0c7aceac13fafc02461a8347b94..41d12ba4c82603cbb7d9004659bbee7760aab239 100644 --- a/apps/entities/request_data.py +++ b/apps/entities/request_data.py @@ -7,7 +7,7 @@ from typing import Optional from pydantic import BaseModel, Field from apps.entities.appcenter import AppData -from apps.entities.flow import EdgeItem, FlowItem, NodeItem, PositionItem +from apps.entities.flow_topology import PositionItem, FlowItem from apps.entities.task import RequestDataApp @@ -117,7 +117,5 @@ class PostKnowledgeIDData(BaseModel): class PutFlowReq(BaseModel): """创建/修改流拓扑结构""" flow: FlowItem - nodes: list[NodeItem] - edges: list[EdgeItem] focus_point: PositionItem = Field(alias="focusPoint") \ No newline at end of file diff --git a/apps/entities/response_data.py b/apps/entities/response_data.py index 5e938722a98755c24919a5216e0108759e9fe10f..5202fb153257fe1088e4c1012ed10142a95fc0ab 100644 --- a/apps/entities/response_data.py +++ b/apps/entities/response_data.py @@ -7,7 +7,7 @@ from typing import Any, Optional from pydantic import BaseModel, Field from apps.entities.appcenter import AppCenterCardItem, AppData -from apps.entities.collection import Blacklist, Document, NodeMetaData +from apps.entities.collection import Blacklist, Document from apps.entities.enum_var import DocumentStatus from apps.entities.record import RecordData from apps.entities.flow_topology import ServiceItem, NodeMetaDataItem, PositionItem, FlowItem @@ -315,33 +315,18 @@ class GetRecentAppListRsp(ResponseData): class NodeServiceListMsg(BaseModel): """GET /api/flow/service result""" - total: int - services: list[ServiceItem] + services: list[ServiceItem] = Field(default=[], description="服务列表") class NodeServiceListRsp(ResponseData): """GET /api/flow/service 返回数据结构""" - result: NodeServiceListMsg + result: NodeServiceListMsg = Field(default=NodeServiceListMsg(), description="服务列表") class NodeMetaDataListMsg(BaseModel): """GET /api/flow/service/node result""" service_id: str = Field(alias="serviceId") - node_meta_datas: list[NodeMetaDataItem] - - -class ServiceNodeMetaDatasMsg(BaseModel): - """GET /api/flow/node/metadata 服务与服务下节点元数据结构""" - service_id: str = Field(alias="serviceId") - name: str - type: str - node_meta_datas: list[NodeMetaDataItem] = Field( - alias="nodeMetaDatas", default=[]) - created_at: Optional[float] = Field(alias="createdAt") - - -class NodeMetaDataListMsg(ResponseData): - services: list[ServiceNodeMetaDatasMsg] + node_meta_datas: list[NodeMetaDataItem] = Field(alias="nodeMetaDatas", default=[]) class NodeMetaDataListRsp(ResponseData): @@ -352,7 +337,7 @@ class NodeMetaDataListRsp(ResponseData): class FlowStructureGetMsg(BaseModel): """GET /api/flow result""" flow: FlowItem - focus_point: PositionItem + focus_point: PositionItem = Field(alias="focusPoint", default=PositionItem(x=0, y=0)) class FlowStructureGetRsp(ResponseData): @@ -362,7 +347,7 @@ class FlowStructureGetRsp(ResponseData): class FlowStructurePutMsg(BaseModel): """PUT /api/flow result""" - flow_id: str = Field(alias="flowId") + flow: FlowItem = Field(alias="flowId", default=FlowItem()) class FlowStructurePutRsp(ResponseData): @@ -371,10 +356,10 @@ class FlowStructurePutRsp(ResponseData): class FlowStructureDeleteMsg(BaseModel): - """DELETE /api/flow/{flowId} result""" - flow_id: str = Field(alias="flowId") + """DELETE /api/flow/ result""" + flow_id: str = Field(alias="flowId", default="") class FlowStructureDeleteRsp(ResponseData): - """DELETE /api/flow/{flowId} 返回数据结构""" + """DELETE /api/flow/ 返回数据结构""" result: FlowStructureDeleteMsg diff --git a/apps/manager/application.py b/apps/manager/application.py index 1716657e27cc5f840aecf6f9881617d9fb402435..c04302730a67038c83ae2be706feaff9a2746b9e 100644 --- a/apps/manager/application.py +++ b/apps/manager/application.py @@ -2,8 +2,6 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from pymongo import ASCENDING - from apps.constants import LOGGER from apps.models.mongo import MongoDB from apps.entities.enum_var import PermissionType @@ -21,21 +19,42 @@ class AppManager: """ try: app_collection = MongoDB.get_collection("app") # 获取应用集合' - match_conditions = [ - {"author": user_sub}, - {"permissions.type": PermissionType.PUBLIC.value}, - { - "$and": [ - {"permissions.type": PermissionType.PROTECTED.value}, - {"permissions.users": user_sub} - ] - } - ] - query = {"$and": [{"_id": app_id}, - {"$or": match_conditions}]} - - result = await app_collection.count_documents(query) - return (result > 0) + query = { + "_id": app_id, + "$or": [ + {"author": user_sub}, + {"permission.type": PermissionType.PUBLIC.value}, + { + "$and": [ + {"permission.type": PermissionType.PROTECTED.value}, + {"permission.users": user_sub} + ] + } + ] + } + + result = await app_collection.find_one(query) + return (result is not None) except Exception as e: LOGGER.error(f"Validate user app access failed due to: {e}") return False + @staticmethod + async def validate_app_belong_to_user(user_sub: str, app_id: str) -> bool: + """验证用户对应用的属权 + + :param user_sub: 用户唯一标识符 + :param app_id: 应用id + :return: 如果应用属于用户则返回True,否则返回False + """ + try: + app_collection = MongoDB.get_collection("app") # 获取应用集合' + query = { + "_id": app_id, + "author": user_sub + } + + result = await app_collection.find_one(query) + return (result is not None) + except Exception as e: + LOGGER.error(f"Validate user app access failed due to: {e}") + return False \ No newline at end of file diff --git a/apps/manager/flow.py b/apps/manager/flow.py index d8c35b92c2da083106bd258a46e491d576daeda1..a9df372d44cc2471341f297f11632ed9ed495bf4 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -3,7 +3,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ from typing import Tuple, List -from pymongo import ASCENDING +from pymongo import ASCENDING, ReturnDocument from apps.constants import LOGGER from apps.entities.flow import StepPos, Edge, Step, Flow @@ -38,8 +38,7 @@ class FlowManager: } ] query = {"$and": [{"_id": service_id}, - {"$or": match_conditions}, - {"favorites": user_sub}]} + {"$or": match_conditions}]} result = await service_collection.count_documents(query) return (result > 0) @@ -48,52 +47,53 @@ class FlowManager: return False @staticmethod - async def get_service_by_user_id(user_sub: str, page: int, page_size: int) -> Tuple[int, List[ServiceItem]]: + async def get_service_by_user_id(user_sub: str) -> Tuple[int, List[ServiceItem]]: """根据用户ID获取用户有执行权限且收藏的服务列表 :param user_sub: 用户唯一标识符 - :param page: 当前页码 - :param page_size: 每页大小 :return: 返回符合条件的服务总数和分页后的服务列表 """ service_collection = MongoDB.get_collection("service") try: - skip_count = (page - 1) * page_size match_conditions = [ + {"name": "系统"}, {"author": user_sub}, - {"permissions.type": "PUBLIC"}, { "$and": [ - {"permissions.type": "PROTECTED"}, - {"permissions.users": user_sub} + {"permissions.type": PermissionType.PUBLIC.value}, + {"permissions.users": user_sub}, + {"favorites": user_sub} + ] + }, + { + "$and": [ + {"permissions.type": PermissionType.PROTECTED.value}, + {"permissions.users": user_sub}, + {"favorites": user_sub} ] } ] - query = {"$and": [{"$or": match_conditions}, - {"favorites": user_sub}]} + query = {"$or": match_conditions} - total_services = await service_collection.count_documents(query) service_records_cursor = service_collection.find( query, - skip=skip_count, - limit=page_size, sort=[("created_at", ASCENDING)] ) service_records = await service_records_cursor.to_list(length=None) service_items = [ ServiceItem( - service_id=str(record["_id"]), + serviceId=str(record["_id"]), name=record["name"], - type=record["type"], - created_at=record["created_at"] + type="default", + createdAt=record["created_at"] ) for record in service_records ] - return total_services, service_items + return service_items except Exception as e: - LOGGER.error(f"Get service by user id failed due to: {e}") + print(f"Get service by user id failed due to: {e}") return None @staticmethod @@ -103,22 +103,25 @@ class FlowManager: :param service_id: 服务id :return: 节点元数据的列表 """ - node_pool_collection = MongoDB.get_collection("node") # 获取节点集合 + node_pool_collection = MongoDB.get_collection("node") try: cursor = node_pool_collection.find( - {"service": service_id}).sort("created_at", ASCENDING) # 查询指定service_id的所有node_poool + {"service_id": service_id}).sort("_id", ASCENDING) nodes_meta_data_items = [] async for node_pool_record in cursor: - # 将每个node_pool换成NodeMetaDataItem实例 node_meta_data_item = NodeMetaDataItem( - api_id=node_pool_record["id"], + apiId=node_pool_record["_id"], + type=node_pool_record["call_id"], name=node_pool_record['name'], - type=node_pool_record['type'], description=node_pool_record['description'], - parameters_template=node_pool_record['parameters_template'], + parametersTemplate={ + "fixed_params": node_pool_record['fixed_params'], + "params_schema": node_pool_record['params_schema'], + "output_schema": node_pool_record['output_schema'] + }, editable=True, - created_at=node_pool_record['created_at'] + createdAt=node_pool_record['created_at'], ) nodes_meta_data_items.append(node_meta_data_item) @@ -138,37 +141,47 @@ class FlowManager: :return: 流的item和用户在这个流上的视觉焦点 """ try: - flow_collection = MongoDB.get_collection("app") - flow_record = await flow_collection.aggregate([ - {"$match": {"_id": app_id, "flows._id": flow_id}}, - {"$unwind": "$flows"}, - {"$match": {"flows._id": flow_id}}, - {"$limit": 1}, - ]).to_list(length=1) + app_collection = MongoDB.get_collection("app") + app_record = await app_collection.find_one({"_id": app_id}) + if app_record is None: + LOGGER.error(f"应用{app_id}不存在") + return None + cursor = app_collection.find( + {"_id": app_id, "flows._id": flow_id}, + {"flows.$": 1} + ) + + app_records = await cursor.to_list(length=1) + if len(app_records) == 0: + return None + app_record = app_records[0] + if "flows" not in app_record.keys() or len(app_record["flows"]) == 0: + return None + flow_record = app_record["flows"][0] except Exception as e: LOGGER.error( f"Get flow by app_id and flow_id failed due to: {e}") return None if flow_record: - flow_config = await get_flow_config(app_id, flow_id) + flow_config = await app_collection(app_id, flow_id) if not flow_config: - LOGGER.error( - "Get flow config by app_id and flow_id failed") + LOGGER.error("Get flow config by app_id and flow_id failed") return None focus_point = flow_record["focus_point"] flow_item = FlowItem( - flow_id=flow_id, + flowId=flow_id, name=flow_config.name, description=flow_config.description, enable=True, editable=True, nodes=[], - edeges=[] + edges=[], + createdAt=flow_record["created_at"] ) for node_config in flow_config.steps: node_item = NodeItem( - node_id=node_config.node_id, - app_id=node_config.node, + nodeId=node_config.id, + apiId=node_config.node, name=node_config.name, description=node_config.description, enable=True, @@ -176,26 +189,30 @@ class FlowManager: type=node_config.type, parameters=node_config.params, position=PositionItem( - x=node_config.position.x, y=node_config.position.y) + x=node_config.pos.x, y=node_config.pos.y) ) flow_item.nodes.append(node_item) for edge_config in flow_config.edges: + edge_from = edge_config.edge_from + branch_id = '' tmp_list = edge_config.edge_from.split('.') - branch_id = tmp_list[1] if len(tmp_list) > 2 else '' - flow_item.edeges.append(EdgeItem( - edge_id=edge_config.id, - source_node=edge_config.edge_from, - target_node=edge_config.edge_to, - type=edge_config.type.value, - branch_id=branch_id, + if len(tmp_list) == 2: + edge_from = tmp_list[0] + branch_id = tmp_list[1] + flow_item.edges.append(EdgeItem( + edgeId=edge_config.id, + sourceNode=edge_from, + targetNode=edge_config.edge_to, + type=edge_config.edge_type, + branchId=branch_id, )) - flow_item.nodes.append(node_item) return (flow_item, focus_point) return None @staticmethod - async def put_flow_by_app_and_flow_id(app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> str: + async def put_flow_by_app_and_flow_id( + app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> str: """存储/更新flow的数据库数据和配置文件 :param app_id: 应用的id @@ -204,13 +221,21 @@ class FlowManager: :return: 流的id """ try: - flow_collection = MongoDB.get_collection("app") - flow_record = await flow_collection.aggregate([ - {"$match": {"_id": app_id, "flows._id": flow_id}}, - {"$unwind": "$flows"}, - {"$match": {"flows._id": flow_id}}, - {"$limit": 1}, - ]).to_list(length=1) + app_collection = MongoDB.get_collection("app") + app_record = await app_collection.find_one({"_id": app_id}) + if app_record is None: + LOGGER.error(f"应用{app_id}不存在") + return None + cursor = app_collection.find( + {"_id": app_id, "flows._id": flow_id}, + {"flows.$": 1} + ) + app_records = await cursor.to_list(length=1) + flow_record = None + if len(app_records) != 0: + app_record = app_records[0] + if "flows" in app_record.keys() and len(app_record["flows"]) != 0: + flow_record = app_record["flows"][0] except Exception as e: LOGGER.error( f"Get flow by app_id and flow_id failed due to: {e}") @@ -225,6 +250,7 @@ class FlowManager: for node_item in flow_item.nodes: edge_config = Step( id=node_item.node_id, + type=node_item.type, node=node_item.api_id, name=node_item.name, description=node_item.description, @@ -233,7 +259,7 @@ class FlowManager: params=node_item.parameters ) flow_config.steps.append(edge_config) - for edge_item in flow_item.edeges: + for edge_item in flow_item.edges: edge_from = edge_item.source_node if edge_item.branch_id: edge_from = edge_from+'.'+edge_item.branch_id @@ -244,55 +270,61 @@ class FlowManager: edge_type=edge_item.type ) flow_config.edges.append(edge_config) + except Exception as e: + LOGGER.error(f"Parse flow config failed: {e}") + return None + try: if flow_record: result = await update_flow_config(app_id, flow_id, flow_config) if not result: LOGGER.error(f"Update flow config failed") return None - app_collection = await MongoDB.get_collection("app") - result = await app_collection.update_one( + app_collection = MongoDB.get_collection("app") + result = await app_collection.find_one_and_update( {'_id': app_id}, { '$set': { - 'flows.$[flow].focus_point': focus_point + 'flows.$[element].focus_point': focus_point.model_dump(by_alias=True) } }, - array_filters=[{'flow._id': flow_id}] + array_filters=[{'element._id': flow_id}], + return_document=ReturnDocument.AFTER ) - if result.modified_count > 0: - return flow_id - else: + if result is None: + LOGGER.error(f"Add flow failed") return None + return flow_id else: new_path = await add_flow_config(app_id, flow_id, flow_config) if not new_path: - LOGGER.error(f"Add flow config failed") + LOGGER.error("Add flow config failed") return None new_flow = AppFlow( - id=flow_id, + _id=flow_id, name=flow_item.name, description=flow_item.description, path=new_path, focus_point=PositionItem(x=focus_point.x, y=focus_point.y), ) - result = await app_collection.update_one( + app_collection = MongoDB.get_collection("app") + result = await app_collection.find_one_and_update( {'_id': app_id}, { '$push': { 'flows': new_flow.model_dump(by_alias=True) } - } + }, + return_document=ReturnDocument.AFTER ) - if result.modified_count > 0: - return flow_id - else: + if result is None: + LOGGER.error(f"Add flow failed") return None + return flow_id except Exception as e: LOGGER.error( f"Put flow by app_id and flow_id failed due to: {e}") return flow_id - @staticmethod async def delete_flow_by_app_and_flow_id(app_id: str, flow_id: str) -> str: """删除flow的数据库数据和配置文件 @@ -304,24 +336,24 @@ class FlowManager: try: result = await delete_flow_config(app_id, flow_id) if not result: - LOGGER.error(f"Delete flow config failed") + print("Delete flow config failed") return None - app_pool_collection = await MongoDB.get_collection("app") # 获取集合 + app_pool_collection = MongoDB.get_collection("app") # 获取集合 # 执行删除操作,从指定的AppPool文档中移除匹配的AppFlow - result = await app_pool_collection.update_one( + result = await app_pool_collection.find_one_and_update( {'_id': app_id}, # 查询条件,找到要更新的AppPool文档 { '$pull': { - 'flows': {'id': flow_id} # 假设'flows'数组中每个对象都有'id'字段 + 'flows': {'_id': flow_id} # 假设'flows'数组中每个对象都有'id'字段 } - } + }, + return_document=ReturnDocument.AFTER ) - - if result.modified_count > 0: - return flow_id - else: + if result is None: + LOGGER.error("Delete flow from app pool failed") return None + return flow_id except Exception as e: LOGGER.error( f"Delete flow by app_id and flow_id failed due to: {e}") diff --git a/apps/routers/flow.py b/apps/routers/flow.py index 521dad9016dfbcdf035b100e693c39ceb254fd95..2537e1373992f8db78bcae0817c79ccd3149d046 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -2,7 +2,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from fastapi import APIRouter, Depends, status, Path, Query, Body +from fastapi import APIRouter, Depends, status, Query, Body from fastapi.responses import JSONResponse from typing import Annotated, Optional @@ -25,39 +25,38 @@ router = APIRouter( ) -@router.get("/service", response_model=NodeMetaDataListRsp, responses={ +@router.get("/service", response_model=NodeServiceListRsp, responses={ status.HTTP_404_NOT_FOUND: {"model": ResponseData}, }) -async def get_node_metadatas( - user_sub: Annotated[str, Depends(get_user)], - page: int = Query(...), - page_size: int = Query(..., alias="pageSize"), +async def get_services( + user_sub: Annotated[str, Depends(get_user)], ): """获取用户可访问的节点元数据所在服务的信息""" - result = await FlowManager.get_node_meta_datas_by_service_id(user_sub, page, page_size) - if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + services = await FlowManager.get_service_by_user_id(user_sub) + if services is None: + return NodeServiceListRsp( code=status.HTTP_404_NOT_FOUND, - message="节点元数据所在服务信息不存在", - result={}, - ).model_dump(exclude_none=True, by_alias=True)) - return JSONResponse(status_code=status.HTTP_200_OK, content=NodeServiceListRsp( + message="未找到符合条件的服务", + result=NodeServiceListMsg(services=[]).model_dump(exclude_none=True, by_alias=True) + ) + + return NodeServiceListRsp( code=status.HTTP_200_OK, message="节点元数据所在服务信息获取成功", - result=NodeServiceListMsg(total=result[0], services=result[1]) - ).model_dump(exclude_none=True, by_alias=True)) + result=NodeServiceListMsg(services=services).model_dump(exclude_none=True, by_alias=True) + ) @router.get("/service/node", response_model=NodeMetaDataListRsp, responses={ status.HTTP_403_FORBIDDEN: {"model": ResponseData}, status.HTTP_404_NOT_FOUND: {"model": ResponseData} }) -async def get_flow( +async def get_servide_nodes( user_sub: Annotated[str, Depends(get_user)], - service_id: int = Query(..., alias="serviceId") + service_id: Annotated[str, Query(..., alias="serviceId")] ): """获取用户可访问的节点元数据""" - if not await AppManager.validate_user_app_access(user_sub, service_id): + if not await FlowManager.validate_user_service_access(user_sub, service_id): return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该服务", @@ -66,15 +65,15 @@ async def get_flow( result = await FlowManager.get_node_meta_datas_by_service_id(service_id) if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=NodeMetaDataListRsp( code=status.HTTP_404_NOT_FOUND, message="服务下节点元数据获取失败", - result={}, + result=NodeMetaDataListMsg(serviceId=service_id), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=NodeMetaDataListRsp( code=status.HTTP_200_OK, message="服务下节点元数据获取成功", - result=NodeMetaDataListMsg(node_meta_datas=result) + result=NodeMetaDataListMsg(serviceId=service_id, nodeMetaDatas=result) ).model_dump(exclude_none=True, by_alias=True)) @@ -84,22 +83,22 @@ async def get_flow( }) async def get_flow( user_sub: Annotated[str, Depends(get_user)], - app_id: str = Query(..., alias="appId"), - flow_id: str = Query(..., alias="flowId") + app_id: Annotated[str, Query(..., alias="appId")], + flow_id: Annotated[str, Query(..., alias="flowId")] ): """获取流拓扑结构""" - if not await AppManager.validate_user_app_access(user_sub, app_id): - return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + if not await FlowManager.validate_user_service_access(user_sub, app_id): + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=FlowStructureGetRsp( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该流", - result={}, + result=FlowStructureGetMsg(), ).model_dump(exclude_none=True, by_alias=True)) result = await FlowManager.get_flow_by_app_and_flow_id(app_id, flow_id) if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=FlowStructureGetRsp( code=status.HTTP_404_NOT_FOUND, message="应用下流程获取失败", - result={}, + result=FlowStructureGetMsg(), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructureGetRsp( code=status.HTTP_200_OK, @@ -122,26 +121,34 @@ async def put_flow( put_body: PutFlowReq = Body(...) ): """修改流拓扑结构""" - if not await AppManager.validate_user_app_access(user_sub, app_id): - return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + if not await AppManager.validate_app_belong_to_user(user_sub, app_id): + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=FlowStructurePutRsp( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该流", - result={}, + result=FlowStructurePutMsg(), ).model_dump(exclude_none=True, by_alias=True)) + put_body.flow = await FlowService.remove_excess_structure_from_flow(put_body.flow) if topology_check: await FlowService.validate_flow_connectivity(put_body.flow) await FlowService.validate_flow_illegal(put_body.flow) result = await FlowManager.put_flow_by_app_and_flow_id(app_id, flow_id, put_body.flow, put_body.focus_point) if result is None: - return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=ResponseData( + return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=FlowStructurePutRsp( code=status.HTTP_500_INTERNAL_SERVER_ERROR, - message="应用下流程更新失败", - result={}, + message="应用下流更新失败", + result=FlowStructurePutMsg(), + ).model_dump(exclude_none=True, by_alias=True)) + flow, _ = await FlowManager.get_flow_by_app_and_flow_id(app_id, flow_id) + if flow is None: + return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=FlowStructurePutRsp( + code=status.HTTP_500_INTERNAL_SERVER_ERROR, + message="应用下流更新后获取失败", + result=FlowStructurePutMsg(), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructurePutRsp( code=status.HTTP_200_OK, - message="应用下流程更新成功", - result=FlowStructurePutMsg(flow_id=result) + message="应用下流更新成功", + result=FlowStructurePutMsg(flow=flow) ).model_dump(exclude_none=True, by_alias=True)) @@ -153,22 +160,23 @@ async def delete_flow( app_id: str = Query(..., alias="appId"), flow_id: str = Query(..., alias="flowId") ): + print(user_sub) """删除流拓扑结构""" - if not await AppManager.validate_user_app_access(user_sub, app_id): - return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + if not await AppManager.validate_app_belong_to_user(user_sub, app_id): + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=FlowStructureDeleteRsp( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该流", - result={}, + result=FlowStructureDeleteMsg(), ).model_dump(exclude_none=True, by_alias=True)) result = await FlowManager.delete_flow_by_app_and_flow_id(app_id, flow_id) if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=FlowStructureDeleteRsp( code=status.HTTP_404_NOT_FOUND, message="应用下流程删除失败", - result={}, + result=FlowStructureDeleteMsg(), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructureDeleteRsp( code=status.HTTP_200_OK, message="应用下流程删除成功", - result=FlowStructureDeleteMsg(flow_id=result) + result=FlowStructureDeleteMsg(flowId=result) ).model_dump(exclude_none=True, by_alias=True)) diff --git a/apps/utils/flow.py b/apps/utils/flow.py index 3ed20d780a01dadc2d9501fae5d9dbfb35949011..bb4e49ee1f3e46c155c986dab95ecb5bafb9eb91 100644 --- a/apps/utils/flow.py +++ b/apps/utils/flow.py @@ -11,10 +11,35 @@ from apps.entities.flow_topology import ServiceItem, NodeMetaDataItem, FlowItem, class FlowService: @staticmethod + async def remove_excess_structure_from_flow(flow_item: FlowItem) -> FlowItem: + node_to_branches = dict() + for node in flow_item.nodes: + node_to_branches[node.node_id] = set() + if node.type == NodeType.CHOICE.value: + if 'choices' not in node.parameters.keys(): + node.parameters['choices'] = [] + for branch in node.parameters['choices']: + if branch['branch'] in node_to_branches[node.node_id]: + LOGGER.error(f"节点{node.node_id}的分支{branch['branch']}id重复") + raise Exception(f"节点{node.name}的分支{branch['branch']}重复") + node_to_branches[node.node_id].add(branch['branch']) + else: + node_to_branches[node.node_id].add('') + new_edges_items = [] + for edge in flow_item.edges: + if edge.source_node not in node_to_branches.keys(): + continue + if edge.target_node not in node_to_branches.keys(): + continue + if edge.branch_id not in node_to_branches[edge.source_node]: + continue + new_edges_items.append(edge) + flow_item.edges = new_edges_items + return flow_item + @staticmethod async def validate_flow_illegal(flow_item: FlowItem) -> None: node_id_set = set() edge_id_set = set() - node_to_branches = dict() edge_to_branch = dict() num_of_start_node = 0 num_of_end_node = 0 @@ -33,19 +58,10 @@ class FlowService: if node.type == NodeType.END.value: num_of_end_node += 1 id_of_end_node = node.node_id - node_to_branches[node.node_id] = set() - if node.type == NodeType.CHOICE.value: - for branch in node.parameters['choices']: - if branch.branch in node_to_branches[node.node_id]: - LOGGER.error(msg=f"分支id重复") - raise Exception(f"节点{node.name}的分支{branch.branch }重复") - node_to_branches[node.node_id].add(branch.branch) - else: - node_to_branches[node.node_id].add('') if num_of_start_node != 1 or num_of_end_node != 1: LOGGER.error(msg="起始节点和终止节点数量不为1") raise Exception("起始节点和终止节点数量不为1") - for edge in flow_item.edeges: + for edge in flow_item.edges: if edge.edge_id in edge_id_set: LOGGER.error(msg="边id重复") raise Exception(f"边{edge.edge_id}的id重复") @@ -53,29 +69,20 @@ class FlowService: if edge.source_node == edge.target_node: LOGGER.error(msg="边起始节点和终止节点相同") raise Exception(f"边{edge.edge_id}的起始节点和终止节点相同") - if edge.source_node not in node_id_set: - LOGGER.error(msg=f"边{edge.edge_id}的起始节点{edge.source_node}不存在") - raise Exception(f"边{edge.edge_id}的起始节点{edge.source_node}不存在") - if edge.target_node not in node_id_set: - LOGGER.error(msg=f"边{edge.edge_id}的终止节点{edge.target_node}不存在") - raise Exception(f"边{edge.edge_id}的终止节点{edge.target_node}不存在") - if edge.branch_id not in node_to_branches[edge.source_node]: - LOGGER.error(msg=f"边{edge.edge_id}的分支{edge.branch_id}不存在") - raise Exception(f"边{edge.edge_id}的分支{edge.branch_id}不存在") + if edge.source_node not in edge_to_branch: + edge_to_branch[edge.source_node] = set() if edge.branch_id in edge_to_branch[edge.source_node]: LOGGER.error(msg=f"边{edge.edge_id}的分支{edge.branch_id}重复") raise Exception(f"边{edge.edge_id}的分支{edge.branch_id}重复") - if edge.source_node not in edge_to_branch: - edge_to_branch[edge.source_node] = set() - edge_to_branch[edge.source_node].add(edge.edge_id) + edge_to_branch[edge.source_node].add(edge.branch_id) node_in_degrees[edge.target_node] = node_in_degrees.get( edge.target_node, 0) + 1 node_out_degrees[edge.source_node] = node_out_degrees.get( edge.source_node, 0) + 1 - if node_in_degrees[id_of_start_node] != 0: + if id_of_start_node in node_in_degrees.keys() and node_in_degrees[id_of_start_node] != 0: LOGGER.error(msg=f"起始节点{id_of_start_node}的入度不为0") raise Exception(f"起始节点{id_of_start_node}的入度不为0") - if node_out_degrees[id_of_end_node] != 0: + if id_of_end_node in node_out_degrees.keys() and node_out_degrees[id_of_end_node] != 0: LOGGER.error(msg=f"终止节点{id_of_end_node}的出度不为0") raise Exception(f"终止节点{id_of_end_node}的出度不为0") @@ -90,7 +97,7 @@ class FlowService: id_of_start_node = node.node_id if node.type == NodeType.END.value: id_of_end_node = node.node_id - for edge in flow_item.edeges: + for edge in flow_item.edges: node_in_degrees[edge.target_node] = node_in_degrees.get( edge.target_node, 0) + 1 node_out_degrees[edge.source_node] = node_out_degrees.get(