diff --git a/apps/entities/collection.py b/apps/entities/collection.py index 7e486e49322d69ac75990d2f83dfba81f2595c05..48a3bfbd73f171f722855f55e74128967d38149c 100644 --- a/apps/entities/collection.py +++ b/apps/entities/collection.py @@ -183,28 +183,3 @@ class Domain(BaseModel): definition: str updated_at: float = Field(default_factory=lambda: round(datetime.now(tz=timezone.utc).timestamp(), 3)) - -class NodeMetaData(BaseModel): - """节点元数据""" - pass -class ServiceNodeMetaDatas(BaseModel): - """节点原数据""" - pass -class Position(BaseModel): - """前端相对位置""" - pass -class Flow(BaseModel): - """流的拓扑数据""" - pass -class Branch(BaseModel): - """节点分支信息""" - pass -class Dependency(BaseModel): - """节点伴生关系""" - pass -class Node(BaseModel): - """流拓扑中的节点数据""" - pass -class Edge(BaseModel): - """流拓扑中的边信息""" - pass diff --git a/apps/entities/flow.py b/apps/entities/flow.py index ad6caa1e5edc5a68a87f2ae45e6f36876f473537..ca82a4726bc2bf83e898a50a342c3fbf9e1d25f1 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -150,3 +150,9 @@ class ServiceApiSpec(BaseModel): size: int = Field(description="OpenAPI文件大小(单位:KB)") path: str = Field(description="OpenAPI文件路径") hash: str = Field(description="OpenAPI文件的hash值") + +class FlowConfig(BaseModel): + """Flow的配置信息 用于前期调试使用""" + app_id: str + flow_id: str + flow_config: Flow \ No newline at end of file diff --git a/apps/entities/flow_topology.py b/apps/entities/flow_topology.py index 9369a1029a4172cca4f7d8fe5f9c83e0acabcec9..e3e7b3f17406bd6d9e0f197acb70f6df46c1da29 100644 --- a/apps/entities/flow_topology.py +++ b/apps/entities/flow_topology.py @@ -21,18 +21,18 @@ class ServiceItem(BaseModel): class NodeMetaDataItem(BaseModel): """节点元数据类""" api_id: str = Field(alias="apiId") - name: str type: str + name: str description: str - parameters_template: str = Field(alias="parametersTemplate") + parameters_template: dict[str, Any] = Field(alias="parametersTemplate") editable: bool = Field(default=True) created_at: Optional[float] = Field(alias="createdAt") class PositionItem(BaseModel): """请求/响应中的前端相对位置变量类""" - x: float - y: float + x: float = Field(default=0.0) + y: float = Field(default=0.0) class DependencyItem(BaseModel): @@ -48,12 +48,11 @@ class NodeItem(BaseModel): name: str type: str = Field(default=NodeType.NORMAL.value) description: str = Field(default='') - enable: str = Field(default=True) + enable: bool = Field(default=True) parameters: dict[str, Any] depedency: Optional[DependencyItem] = None position: PositionItem editable: bool = Field(default=True) - created_at: Optional[float] = Field(alias="createdAt") class EdgeItem(BaseModel): @@ -63,16 +62,15 @@ class EdgeItem(BaseModel): target_node: str = Field(alias="targetNode") type: str = Field(default=EdgeType.NORMAL.value) branch_id: str = Field(alias="branchId") - created_at: Optional[float] = Field(alias="createdAt") class FlowItem(BaseModel): """请求/响应中的流变量类""" - flow_id: Optional[str] = Field(alias="flowId") - name: str - description: str + flow_id: Optional[str] = Field(alias="flowId", default='flow id') + name: str = Field(default='flow name') + description: str = Field(default='flow description') enable: bool = Field(default=True) editable: bool = Field(default=True) - created_at: Optional[float] = Field(alias="createdAt") - nodes: list[NodeItem] - edeges: list[EdgeItem] + nodes: list[NodeItem] = Field(default=[]) + edges: list[EdgeItem] = Field(default=[]) + created_at: Optional[float] = Field(alias="createdAt", default=0) diff --git a/apps/entities/request_data.py b/apps/entities/request_data.py index 50ab6a6769ccef503543c4002c3ee1dc9e26c5d0..b70ff1e45e324b96ae60000ce91f1dbf035e0344 100644 --- a/apps/entities/request_data.py +++ b/apps/entities/request_data.py @@ -119,6 +119,5 @@ class PutFlowReq(BaseModel): """创建/修改流拓扑结构""" flow: FlowItem - nodes: list[NodeItem] - edges: list[EdgeItem] focus_point: PositionItem = Field(alias="focusPoint") + diff --git a/apps/entities/response_data.py b/apps/entities/response_data.py index ae685d19e12a7b9d752b0bd5679b0845bd7b6a55..9cde67a5157a9345ec7ac0ce40d807088e1ce5df 100644 --- a/apps/entities/response_data.py +++ b/apps/entities/response_data.py @@ -7,7 +7,7 @@ from typing import Any, Optional from pydantic import BaseModel, Field from apps.entities.appcenter import AppCenterCardItem, AppData -from apps.entities.collection import Blacklist, Document, NodeMetaData +from apps.entities.collection import Blacklist, Document from apps.entities.enum_var import DocumentStatus from apps.entities.flow_topology import ( FlowItem, @@ -320,77 +320,51 @@ class GetRecentAppListRsp(ResponseData): class NodeServiceListMsg(BaseModel): """GET /api/flow/service result""" - - total: int - services: list[ServiceItem] + services: list[ServiceItem]=Field(default=[]) class NodeServiceListRsp(ResponseData): """GET /api/flow/service 返回数据结构""" - result: NodeServiceListMsg class NodeMetaDataListMsg(BaseModel): """GET /api/flow/service/node result""" - - service_id: str = Field(alias="serviceId") - node_meta_datas: list[NodeMetaDataItem] - - -class ServiceNodeMetaDatasMsg(BaseModel): - """GET /api/flow/node/metadata 服务与服务下节点元数据结构""" - - service_id: str = Field(alias="serviceId") - name: str - type: str - node_meta_datas: list[NodeMetaDataItem] = Field( - alias="nodeMetaDatas", default=[]) - created_at: Optional[float] = Field(alias="createdAt") - - -class NodeMetaDataListMsg(ResponseData): - services: list[ServiceNodeMetaDatasMsg] + service_id: str = Field(alias="serviceId",default=[]) + node_meta_datas: list[NodeMetaDataItem] = Field(alias="nodeMetaDatas", default=[]) class NodeMetaDataListRsp(ResponseData): """GET /api/flow/service/node 返回数据结构""" - result: NodeMetaDataListMsg - class FlowStructureGetMsg(BaseModel): """GET /api/flow result""" - - flow: FlowItem - focus_point: PositionItem + flow: FlowItem = Field(default=FlowItem()) + focus_point: PositionItem = Field(default=PositionItem(x=0.0, y=0.0)) class FlowStructureGetRsp(ResponseData): """GET /api/flow 返回数据结构""" - result: FlowStructureGetMsg +class PutFlowReq(BaseModel): + """创建/修改流拓扑结构""" + flow: FlowItem + focus_point: PositionItem = Field(alias="focusPoint") class FlowStructurePutMsg(BaseModel): """PUT /api/flow result""" - - flow_id: str = Field(alias="flowId") - - + flow: FlowItem = Field(default=FlowItem()) class FlowStructurePutRsp(ResponseData): """PUT /api/flow 返回数据结构""" - result: FlowStructurePutMsg - class FlowStructureDeleteMsg(BaseModel): - """DELETE /api/flow/{flowId} result""" - - flow_id: str = Field(alias="flowId") + """DELETE /api/flow/ result""" + flow_id: str = Field(alias="flowId", default="") class FlowStructureDeleteRsp(ResponseData): - """DELETE /api/flow/{flowId} 返回数据结构""" - + """DELETE /api/flow/ 返回数据结构""" result: FlowStructureDeleteMsg diff --git a/apps/manager/application.py b/apps/manager/application.py index 1716657e27cc5f840aecf6f9881617d9fb402435..5cde5670356704928ff3aa100e5eaa1695d56ae3 100644 --- a/apps/manager/application.py +++ b/apps/manager/application.py @@ -2,7 +2,6 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from pymongo import ASCENDING from apps.constants import LOGGER from apps.models.mongo import MongoDB @@ -20,22 +19,43 @@ class AppManager: :return: 如果用户具有所需权限则返回True,否则返回False """ try: - app_collection = MongoDB.get_collection("app") # 获取应用集合' - match_conditions = [ - {"author": user_sub}, - {"permissions.type": PermissionType.PUBLIC.value}, - { - "$and": [ - {"permissions.type": PermissionType.PROTECTED.value}, - {"permissions.users": user_sub} - ] - } - ] - query = {"$and": [{"_id": app_id}, - {"$or": match_conditions}]} - - result = await app_collection.count_documents(query) - return (result > 0) + app_collection = MongoDB.get_collection("app") + query = { + "_id": app_id, + "$or": [ + {"author": user_sub}, + {"permission.type": PermissionType.PUBLIC.value}, + { + "$and": [ + {"permission.type": PermissionType.PROTECTED.value}, + {"permission.users": user_sub} + ] + } + ] + } + + result = await app_collection.find_one(query) + return (result is not None) except Exception as e: LOGGER.error(f"Validate user app access failed due to: {e}") return False + + async def validate_app_belong_to_user(user_sub: str, app_id: str) -> bool: + """验证用户对应用的属权 + + :param user_sub: 用户唯一标识符 + :param app_id: 应用id + :return: 如果应用属于用户则返回True,否则返回False + """ + try: + app_collection = MongoDB.get_collection("app") # 获取应用集合' + query = { + "_id": app_id, + "author": user_sub + } + + result = await app_collection.find_one(query) + return (result is not None) + except Exception as e: + LOGGER.error(f"Validate app belong to user failed due to: {e}") + return False diff --git a/apps/manager/flow.py b/apps/manager/flow.py index d8c35b92c2da083106bd258a46e491d576daeda1..2cad12303443a0eee70fe033ad40e89edc5cc968 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -6,7 +6,7 @@ from typing import Tuple, List from pymongo import ASCENDING from apps.constants import LOGGER -from apps.entities.flow import StepPos, Edge, Step, Flow +from apps.entities.flow import StepPos, Edge, Step, Flow, FlowConfig from apps.entities.pool import AppFlow from apps.entities.flow_topology import ServiceItem, NodeMetaDataItem, FlowItem, NodeItem, EdgeItem, PositionItem from apps.models.mongo import MongoDB @@ -38,8 +38,8 @@ class FlowManager: } ] query = {"$and": [{"_id": service_id}, - {"$or": match_conditions}, - {"favorites": user_sub}]} + {"$or": match_conditions} + ]} result = await service_collection.count_documents(query) return (result > 0) @@ -47,50 +47,45 @@ class FlowManager: LOGGER.error(f"Validate user service access failed due to: {e}") return False - @staticmethod - async def get_service_by_user_id(user_sub: str, page: int, page_size: int) -> Tuple[int, List[ServiceItem]]: - """根据用户ID获取用户有执行权限且收藏的服务列表 - - :param user_sub: 用户唯一标识符 - :param page: 当前页码 - :param page_size: 每页大小 - :return: 返回符合条件的服务总数和分页后的服务列表 - """ + async def get_service_by_user_id(user_sub: str) -> Tuple[int, List[ServiceItem]]: service_collection = MongoDB.get_collection("service") try: - skip_count = (page - 1) * page_size match_conditions = [ + {"name": "系统"}, {"author": user_sub}, - {"permissions.type": "PUBLIC"}, { "$and": [ - {"permissions.type": "PROTECTED"}, - {"permissions.users": user_sub} + {"permissions.type": PermissionType.PUBLIC.value}, + {"permissions.users": user_sub}, + {"favorites": user_sub} + ] + }, + { + "$and": [ + {"permissions.type": PermissionType.PROTECTED.value}, + {"permissions.users": user_sub}, + {"favorites": user_sub} ] } ] - query = {"$and": [{"$or": match_conditions}, - {"favorites": user_sub}]} + query = {"$or": match_conditions} - total_services = await service_collection.count_documents(query) service_records_cursor = service_collection.find( query, - skip=skip_count, - limit=page_size, sort=[("created_at", ASCENDING)] ) service_records = await service_records_cursor.to_list(length=None) service_items = [ ServiceItem( - service_id=str(record["_id"]), + serviceId=str(record["_id"]), name=record["name"], - type=record["type"], - created_at=record["created_at"] + type="default", + createdAt=record["created_at"] ) for record in service_records ] - return total_services, service_items + return service_items except Exception as e: LOGGER.error(f"Get service by user id failed due to: {e}") @@ -106,19 +101,35 @@ class FlowManager: node_pool_collection = MongoDB.get_collection("node") # 获取节点集合 try: cursor = node_pool_collection.find( - {"service": service_id}).sort("created_at", ASCENDING) # 查询指定service_id的所有node_poool + {"service_id": service_id}).sort("created_at", ASCENDING) # 查询指定service_id的所有node_poool nodes_meta_data_items = [] async for node_pool_record in cursor: # 将每个node_pool换成NodeMetaDataItem实例 + parameters_template = { + "fixed_params": node_pool_record['fixed_params'], + "params_schema": node_pool_record['params_schema'], + "output_schema": node_pool_record['output_schema'] + } + if node_pool_record['call_id'] == 'choice': + parameters_template['choice'] = [ + { + "branch": "valid", + "description": "返回值存在有效数据" + }, + { + "branch": "invalid", + "description": "返回值不存在有效数据" + }, + ] node_meta_data_item = NodeMetaDataItem( - api_id=node_pool_record["id"], + apiId=node_pool_record["_id"], + type=node_pool_record["call_id"], name=node_pool_record['name'], - type=node_pool_record['type'], description=node_pool_record['description'], - parameters_template=node_pool_record['parameters_template'], + parametersTemplate=parameters_template, editable=True, - created_at=node_pool_record['created_at'] + createdAt=node_pool_record['created_at'], ) nodes_meta_data_items.append(node_meta_data_item) @@ -138,64 +149,87 @@ class FlowManager: :return: 流的item和用户在这个流上的视觉焦点 """ try: - flow_collection = MongoDB.get_collection("app") - flow_record = await flow_collection.aggregate([ - {"$match": {"_id": app_id, "flows._id": flow_id}}, - {"$unwind": "$flows"}, - {"$match": {"flows._id": flow_id}}, - {"$limit": 1}, - ]).to_list(length=1) + app_collection = MongoDB.get_collection("app") + app_record = await app_collection.find_one({"_id": app_id}) + if app_record is None: + LOGGER.error(f"应用{app_id}不存在") + return None + cursor = app_collection.find( + {"_id": app_id, "flows._id": flow_id}, + {"flows.$": 1} # 只返回 flows 数组中符合条件的第一个元素 + ) + + # 获取结果列表,并限制长度为1,因为我们只期待一个结果 + app_records = await cursor.to_list(length=1) + if len(app_records) == 0: + return None + app_record = app_records[0] + if "flows" not in app_record.keys() or len(app_record["flows"]) == 0: + return None + flow_record = app_record["flows"][0] except Exception as e: LOGGER.error( f"Get flow by app_id and flow_id failed due to: {e}") return None if flow_record: - flow_config = await get_flow_config(app_id, flow_id) + flow_config_collection = MongoDB.get_collection("flow_config") + flow_config_record = await flow_config_collection.find_one({"app_id": app_id, "flow_id": flow_id}) + if flow_config_record is None or not flow_config_record.get("flow_config"): + return None + flow_config = flow_config_record['flow_config'] if not flow_config: LOGGER.error( "Get flow config by app_id and flow_id failed") return None focus_point = flow_record["focus_point"] flow_item = FlowItem( - flow_id=flow_id, - name=flow_config.name, - description=flow_config.description, + flowId=flow_id, + name=flow_config['name'], + description=flow_config['description'], enable=True, editable=True, nodes=[], - edeges=[] + edges=[], + createdAt=flow_record["created_at"] ) - for node_config in flow_config.steps: + for node_config in flow_config['steps']: node_item = NodeItem( - node_id=node_config.node_id, - app_id=node_config.node, - name=node_config.name, - description=node_config.description, + nodeId=node_config['id'], + apiId=node_config['node'], + name=node_config['name'], + description=node_config['description'], enable=True, editable=True, - type=node_config.type, - parameters=node_config.params, + type=node_config['type'], + parameters=node_config['params'], position=PositionItem( - x=node_config.position.x, y=node_config.position.y) + x=node_config['pos']['x'], y=node_config['pos']['y']) ) flow_item.nodes.append(node_item) - for edge_config in flow_config.edges: - tmp_list = edge_config.edge_from.split('.') - branch_id = tmp_list[1] if len(tmp_list) > 2 else '' - flow_item.edeges.append(EdgeItem( - edge_id=edge_config.id, - source_node=edge_config.edge_from, - target_node=edge_config.edge_to, - type=edge_config.type.value, - branch_id=branch_id, + for edge_config in flow_config['edges']: + edge_from = edge_config['edge_from'] + branch_id = '' + tmp_list = edge_config['edge_from'].split('.') + if len(tmp_list)==0 or len(tmp_list)>=2: + LOGGER.error("edge from format error") + continue + if len(tmp_list) == 2: + edge_from = tmp_list[0] + branch_id = tmp_list[1] + flow_item.edges.append(EdgeItem( + edgeId=edge_config['id'], + sourceNode=edge_from, + targetNode=edge_config['edge_to'], + type=edge_config['edge_type'], + branchId=branch_id, )) - flow_item.nodes.append(node_item) return (flow_item, focus_point) return None @staticmethod - async def put_flow_by_app_and_flow_id(app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> str: + async def put_flow_by_app_and_flow_id( + app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> str: """存储/更新flow的数据库数据和配置文件 :param app_id: 应用的id @@ -204,13 +238,21 @@ class FlowManager: :return: 流的id """ try: - flow_collection = MongoDB.get_collection("app") - flow_record = await flow_collection.aggregate([ - {"$match": {"_id": app_id, "flows._id": flow_id}}, - {"$unwind": "$flows"}, - {"$match": {"flows._id": flow_id}}, - {"$limit": 1}, - ]).to_list(length=1) + app_collection = MongoDB.get_collection("app") + app_record = await app_collection.find_one({"_id": app_id}) + if app_record is None: + LOGGER.error(f"应用{app_id}不存在") + return None + cursor = app_collection.find( + {"_id": app_id, "flows._id": flow_id}, + {"flows.$": 1} + ) + app_records = await cursor.to_list(length=1) + flow_record = None + if len(app_records) != 0: + app_record = app_records[0] + if "flows" in app_record.keys() and len(app_record["flows"]) != 0: + flow_record = app_record["flows"][0] except Exception as e: LOGGER.error( f"Get flow by app_id and flow_id failed due to: {e}") @@ -225,6 +267,7 @@ class FlowManager: for node_item in flow_item.nodes: edge_config = Step( id=node_item.node_id, + type=node_item.type, node=node_item.api_id, name=node_item.name, description=node_item.description, @@ -233,7 +276,7 @@ class FlowManager: params=node_item.parameters ) flow_config.steps.append(edge_config) - for edge_item in flow_item.edeges: + for edge_item in flow_item.edges: edge_from = edge_item.source_node if edge_item.branch_id: edge_from = edge_from+'.'+edge_item.branch_id @@ -244,38 +287,43 @@ class FlowManager: edge_type=edge_item.type ) flow_config.edges.append(edge_config) + flow_config = FlowConfig(app_id=app_id, flow_id=flow_id, flow_config=flow_config) + try: + flow_config_collection = MongoDB.get_collection("flow_config") + await flow_config_collection.update_one( + {"app_id": app_id, "flow_id": flow_id}, + {"$set": flow_config.dict()}, + upsert=True # 如果没有找到匹配的文档,则插入新文档 + ) + except Exception as e: + LOGGER.error(f"Error updating flow config due to: {e}") + return None if flow_record: - result = await update_flow_config(app_id, flow_id, flow_config) - if not result: - LOGGER.error(f"Update flow config failed") - return None - app_collection = await MongoDB.get_collection("app") - result = await app_collection.update_one( + app_collection = MongoDB.get_collection("app") + result = await app_collection.find_one_and_update( {'_id': app_id}, { '$set': { - 'flows.$[flow].focus_point': focus_point + 'flows.$[element].focus_point': focus_point.model_dump(by_alias=True) } }, - array_filters=[{'flow._id': flow_id}] + array_filters=[{'element._id': flow_id}], + return_document=True # 返回更新后的文档 ) - if result.modified_count > 0: - return flow_id - else: + if result is None: + LOGGER.error("Update flow failed") return None + return result else: - new_path = await add_flow_config(app_id, flow_id, flow_config) - if not new_path: - LOGGER.error(f"Add flow config failed") - return None new_flow = AppFlow( - id=flow_id, + _id=flow_id, name=flow_item.name, description=flow_item.description, - path=new_path, + path="", focus_point=PositionItem(x=focus_point.x, y=focus_point.y), ) - result = await app_collection.update_one( + app_collection = MongoDB.get_collection("app") + result = await app_collection.find_one_and_update( {'_id': app_id}, { '$push': { @@ -283,16 +331,15 @@ class FlowManager: } } ) - if result.modified_count > 0: - return flow_id - else: + if result is None: + LOGGER.error("Add flow failed") return None + return flow_item except Exception as e: LOGGER.error( f"Put flow by app_id and flow_id failed due to: {e}") - return flow_id + return None - @staticmethod async def delete_flow_by_app_and_flow_id(app_id: str, flow_id: str) -> str: """删除flow的数据库数据和配置文件 @@ -302,26 +349,25 @@ class FlowManager: """ try: - result = await delete_flow_config(app_id, flow_id) - if not result: - LOGGER.error(f"Delete flow config failed") + flow_config_collection = MongoDB.get_collection("flow_config") + result = await flow_config_collection.delete_one({"app_id": app_id, "flow_id": flow_id}) + if result.deleted_count == 0: + LOGGER.error("Delete flow config failed") return None - app_pool_collection = await MongoDB.get_collection("app") # 获取集合 + app_pool_collection = MongoDB.get_collection("app") # 获取集合 - # 执行删除操作,从指定的AppPool文档中移除匹配的AppFlow - result = await app_pool_collection.update_one( - {'_id': app_id}, # 查询条件,找到要更新的AppPool文档 + result = await app_pool_collection.find_one_and_update( + {'_id': app_id}, { '$pull': { - 'flows': {'id': flow_id} # 假设'flows'数组中每个对象都有'id'字段 + 'flows': {'_id': flow_id} } } ) - - if result.modified_count > 0: - return flow_id - else: + if result is None: + LOGGER.error("Delete flow from app pool failed") return None + return flow_id except Exception as e: LOGGER.error( f"Delete flow by app_id and flow_id failed due to: {e}") diff --git a/apps/routers/flow.py b/apps/routers/flow.py index 521dad9016dfbcdf035b100e693c39ceb254fd95..ee717489ec49027b5c533663559562fd7bd7a8f9 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -2,7 +2,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from fastapi import APIRouter, Depends, status, Path, Query, Body +from fastapi import APIRouter, Depends, status, Query, Body from fastapi.responses import JSONResponse from typing import Annotated, Optional @@ -28,31 +28,30 @@ router = APIRouter( @router.get("/service", response_model=NodeMetaDataListRsp, responses={ status.HTTP_404_NOT_FOUND: {"model": ResponseData}, }) -async def get_node_metadatas( - user_sub: Annotated[str, Depends(get_user)], - page: int = Query(...), - page_size: int = Query(..., alias="pageSize"), +async def get_services( + user_sub: Annotated[str, Depends(get_user)] ): """获取用户可访问的节点元数据所在服务的信息""" - result = await FlowManager.get_node_meta_datas_by_service_id(user_sub, page, page_size) - if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + services = await FlowManager.get_service_by_user_id(user_sub) + if services is None: + return NodeServiceListRsp( code=status.HTTP_404_NOT_FOUND, - message="节点元数据所在服务信息不存在", - result={}, - ).model_dump(exclude_none=True, by_alias=True)) - return JSONResponse(status_code=status.HTTP_200_OK, content=NodeServiceListRsp( + message="未找到符合条件的服务", + result=NodeServiceListMsg(services=[]) + ) + + return NodeServiceListRsp( code=status.HTTP_200_OK, message="节点元数据所在服务信息获取成功", - result=NodeServiceListMsg(total=result[0], services=result[1]) - ).model_dump(exclude_none=True, by_alias=True)) + result=NodeServiceListMsg(services=services) + ) @router.get("/service/node", response_model=NodeMetaDataListRsp, responses={ status.HTTP_403_FORBIDDEN: {"model": ResponseData}, status.HTTP_404_NOT_FOUND: {"model": ResponseData} }) -async def get_flow( +async def get_node_metadatas( user_sub: Annotated[str, Depends(get_user)], service_id: int = Query(..., alias="serviceId") ): @@ -66,15 +65,15 @@ async def get_flow( result = await FlowManager.get_node_meta_datas_by_service_id(service_id) if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=NodeMetaDataListRsp( code=status.HTTP_404_NOT_FOUND, message="服务下节点元数据获取失败", - result={}, + result=NodeMetaDataListMsg(serviceId=service_id), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=NodeMetaDataListRsp( code=status.HTTP_200_OK, message="服务下节点元数据获取成功", - result=NodeMetaDataListMsg(node_meta_datas=result) + result=NodeMetaDataListMsg(serviceId=service_id, nodeMetaDatas=result) ).model_dump(exclude_none=True, by_alias=True)) @@ -89,17 +88,17 @@ async def get_flow( ): """获取流拓扑结构""" if not await AppManager.validate_user_app_access(user_sub, app_id): - return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=FlowStructureGetRsp( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该流", - result={}, + result=FlowStructureGetMsg(), ).model_dump(exclude_none=True, by_alias=True)) result = await FlowManager.get_flow_by_app_and_flow_id(app_id, flow_id) if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=FlowStructureGetRsp( code=status.HTTP_404_NOT_FOUND, message="应用下流程获取失败", - result={}, + result=FlowStructureGetMsg(), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructureGetRsp( code=status.HTTP_200_OK, @@ -122,29 +121,38 @@ async def put_flow( put_body: PutFlowReq = Body(...) ): """修改流拓扑结构""" - if not await AppManager.validate_user_app_access(user_sub, app_id): - return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + if not await AppManager.validate_app_belong_to_user(user_sub, app_id): + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=FlowStructurePutRsp( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该流", - result={}, + result=FlowStructurePutMsg(), ).model_dump(exclude_none=True, by_alias=True)) + put_body.flow=await FlowService.remove_excess_structure_from_flow(put_body.flow) if topology_check: await FlowService.validate_flow_connectivity(put_body.flow) await FlowService.validate_flow_illegal(put_body.flow) result = await FlowManager.put_flow_by_app_and_flow_id(app_id, flow_id, put_body.flow, put_body.focus_point) if result is None: - return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=ResponseData( + return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=FlowStructurePutRsp( code=status.HTTP_500_INTERNAL_SERVER_ERROR, - message="应用下流程更新失败", - result={}, + message="应用下流更新失败", + result=FlowStructurePutMsg(), + ).model_dump(exclude_none=True, by_alias=True)) + flow,_=await FlowManager.get_flow_by_app_and_flow_id(app_id, flow_id) + if flow is None: + return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=FlowStructurePutRsp( + code=status.HTTP_500_INTERNAL_SERVER_ERROR, + message="应用下流更新后获取失败", + result=FlowStructurePutMsg(), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructurePutRsp( code=status.HTTP_200_OK, - message="应用下流程更新成功", - result=FlowStructurePutMsg(flow_id=result) + message="应用下流更新成功", + result=FlowStructurePutMsg(flow=flow) ).model_dump(exclude_none=True, by_alias=True)) + @router.delete("", response_model=FlowStructureDeleteRsp, responses={ status.HTTP_404_NOT_FOUND: {"model": ResponseData} }) @@ -154,21 +162,21 @@ async def delete_flow( flow_id: str = Query(..., alias="flowId") ): """删除流拓扑结构""" - if not await AppManager.validate_user_app_access(user_sub, app_id): - return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + if not await AppManager.validate_app_belong_to_user(user_sub, app_id): + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=FlowStructureDeleteRsp( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该流", - result={}, + result=FlowStructureDeleteMsg(), ).model_dump(exclude_none=True, by_alias=True)) result = await FlowManager.delete_flow_by_app_and_flow_id(app_id, flow_id) if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=FlowStructureDeleteRsp( code=status.HTTP_404_NOT_FOUND, message="应用下流程删除失败", - result={}, + result=FlowStructureDeleteMsg(), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructureDeleteRsp( code=status.HTTP_200_OK, message="应用下流程删除成功", - result=FlowStructureDeleteMsg(flow_id=result) + result=FlowStructureDeleteMsg(flowId=result) ).model_dump(exclude_none=True, by_alias=True)) diff --git a/apps/utils/flow.py b/apps/utils/flow.py index 3ed20d780a01dadc2d9501fae5d9dbfb35949011..c869f8ebd07fca95577f449f390dcbecf142e761 100644 --- a/apps/utils/flow.py +++ b/apps/utils/flow.py @@ -2,19 +2,48 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from typing import Tuple, List - from apps.constants import LOGGER from apps.entities.enum_var import NodeType -from apps.entities.flow_topology import ServiceItem, NodeMetaDataItem, FlowItem, NodeItem, EdgeItem, PositionItem +from apps.entities.flow_topology import FlowItem class FlowService: + @staticmethod + async def remove_excess_structure_from_flow(flow_item: FlowItem) -> FlowItem: + node_to_branches = dict() + branch_illegal_chars='.' + for node in flow_item.nodes: + node_to_branches[node.node_id] = set() + if node.type == NodeType.CHOICE.value: + if 'choices' not in node.parameters.keys(): + node.parameters['choices'] = [] + for branch in node.parameters['choices']: + if branch['branch'] in node_to_branches[node.node_id]: + LOGGER.error(msg="分支id重复") + raise Exception(f"节点{node.name}的分支{branch['branch']}重复") + for illegal_char in branch_illegal_chars: + if illegal_char in branch['branch']: + LOGGER.error(msg="分支名称中含有非法字符") + raise Exception(f"节点{node.name}的分支{branch['branch']}名称中含有非法字符") + node_to_branches[node.node_id].add(branch['branch']) + else: + node_to_branches[node.node_id].add('') + new_edges_items = [] + for edge in flow_item.edges: + if edge.source_node not in node_to_branches.keys(): + continue + if edge.target_node not in node_to_branches.keys(): + continue + if edge.branch_id not in node_to_branches[edge.source_node]: + continue + new_edges_items.append(edge) + flow_item.edges = new_edges_items + return flow_item + @staticmethod async def validate_flow_illegal(flow_item: FlowItem) -> None: node_id_set = set() edge_id_set = set() - node_to_branches = dict() edge_to_branch = dict() num_of_start_node = 0 num_of_end_node = 0 @@ -33,19 +62,10 @@ class FlowService: if node.type == NodeType.END.value: num_of_end_node += 1 id_of_end_node = node.node_id - node_to_branches[node.node_id] = set() - if node.type == NodeType.CHOICE.value: - for branch in node.parameters['choices']: - if branch.branch in node_to_branches[node.node_id]: - LOGGER.error(msg=f"分支id重复") - raise Exception(f"节点{node.name}的分支{branch.branch }重复") - node_to_branches[node.node_id].add(branch.branch) - else: - node_to_branches[node.node_id].add('') if num_of_start_node != 1 or num_of_end_node != 1: LOGGER.error(msg="起始节点和终止节点数量不为1") raise Exception("起始节点和终止节点数量不为1") - for edge in flow_item.edeges: + for edge in flow_item.edges: if edge.edge_id in edge_id_set: LOGGER.error(msg="边id重复") raise Exception(f"边{edge.edge_id}的id重复") @@ -53,29 +73,20 @@ class FlowService: if edge.source_node == edge.target_node: LOGGER.error(msg="边起始节点和终止节点相同") raise Exception(f"边{edge.edge_id}的起始节点和终止节点相同") - if edge.source_node not in node_id_set: - LOGGER.error(msg=f"边{edge.edge_id}的起始节点{edge.source_node}不存在") - raise Exception(f"边{edge.edge_id}的起始节点{edge.source_node}不存在") - if edge.target_node not in node_id_set: - LOGGER.error(msg=f"边{edge.edge_id}的终止节点{edge.target_node}不存在") - raise Exception(f"边{edge.edge_id}的终止节点{edge.target_node}不存在") - if edge.branch_id not in node_to_branches[edge.source_node]: - LOGGER.error(msg=f"边{edge.edge_id}的分支{edge.branch_id}不存在") - raise Exception(f"边{edge.edge_id}的分支{edge.branch_id}不存在") + if edge.source_node not in edge_to_branch: + edge_to_branch[edge.source_node] = set() if edge.branch_id in edge_to_branch[edge.source_node]: LOGGER.error(msg=f"边{edge.edge_id}的分支{edge.branch_id}重复") raise Exception(f"边{edge.edge_id}的分支{edge.branch_id}重复") - if edge.source_node not in edge_to_branch: - edge_to_branch[edge.source_node] = set() - edge_to_branch[edge.source_node].add(edge.edge_id) + edge_to_branch[edge.source_node].add(edge.branch_id) node_in_degrees[edge.target_node] = node_in_degrees.get( edge.target_node, 0) + 1 node_out_degrees[edge.source_node] = node_out_degrees.get( edge.source_node, 0) + 1 - if node_in_degrees[id_of_start_node] != 0: + if id_of_start_node in node_in_degrees.keys() and node_in_degrees[id_of_start_node] != 0: LOGGER.error(msg=f"起始节点{id_of_start_node}的入度不为0") raise Exception(f"起始节点{id_of_start_node}的入度不为0") - if node_out_degrees[id_of_end_node] != 0: + if id_of_end_node in node_out_degrees.keys() and node_out_degrees[id_of_end_node] != 0: LOGGER.error(msg=f"终止节点{id_of_end_node}的出度不为0") raise Exception(f"终止节点{id_of_end_node}的出度不为0") @@ -90,7 +101,7 @@ class FlowService: id_of_start_node = node.node_id if node.type == NodeType.END.value: id_of_end_node = node.node_id - for edge in flow_item.edeges: + for edge in flow_item.edges: node_in_degrees[edge.target_node] = node_in_degrees.get( edge.target_node, 0) + 1 node_out_degrees[edge.source_node] = node_out_degrees.get(