From 0d2f823736fcdb26ddc5f196210fc8a96bad2ff4 Mon Sep 17 00:00:00 2001 From: zxstty Date: Fri, 24 Jan 2025 02:27:29 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E8=87=AA=E6=B5=8B=E4=BF=AE=E5=A4=8Dflow=20?= =?UTF-8?q?router=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/entities/collection.py | 25 ------- apps/entities/flow_topology.py | 22 +++--- apps/entities/request_data.py | 4 +- apps/entities/response_data.py | 30 ++------ apps/manager/application.py | 31 ++++----- apps/manager/flow.py | 121 ++++++++++++++++++++------------- apps/routers/flow.py | 73 ++++++++++---------- apps/utils/flow.py | 19 +++--- 8 files changed, 154 insertions(+), 171 deletions(-) diff --git a/apps/entities/collection.py b/apps/entities/collection.py index 7e486e49..48a3bfbd 100644 --- a/apps/entities/collection.py +++ b/apps/entities/collection.py @@ -183,28 +183,3 @@ class Domain(BaseModel): definition: str updated_at: float = Field(default_factory=lambda: round(datetime.now(tz=timezone.utc).timestamp(), 3)) - -class NodeMetaData(BaseModel): - """节点元数据""" - pass -class ServiceNodeMetaDatas(BaseModel): - """节点原数据""" - pass -class Position(BaseModel): - """前端相对位置""" - pass -class Flow(BaseModel): - """流的拓扑数据""" - pass -class Branch(BaseModel): - """节点分支信息""" - pass -class Dependency(BaseModel): - """节点伴生关系""" - pass -class Node(BaseModel): - """流拓扑中的节点数据""" - pass -class Edge(BaseModel): - """流拓扑中的边信息""" - pass diff --git a/apps/entities/flow_topology.py b/apps/entities/flow_topology.py index 9369a102..91e14f24 100644 --- a/apps/entities/flow_topology.py +++ b/apps/entities/flow_topology.py @@ -21,26 +21,24 @@ class ServiceItem(BaseModel): class NodeMetaDataItem(BaseModel): """节点元数据类""" api_id: str = Field(alias="apiId") - name: str type: str + name: str description: str - parameters_template: str = Field(alias="parametersTemplate") + parameters_template: dict[str, Any] = Field(alias="parametersTemplate") editable: bool = Field(default=True) created_at: Optional[float] = Field(alias="createdAt") class PositionItem(BaseModel): """请求/响应中的前端相对位置变量类""" - x: float - y: float - + x: float = Field(default=0.0) + y: float = Field(default=0.0) class DependencyItem(BaseModel): """请求/响应中的节点依赖变量类""" node_id: str = Field(alias="nodeId") type: str - class NodeItem(BaseModel): """请求/响应中的节点变量类""" node_id: str = Field(alias="nodeId") @@ -48,12 +46,11 @@ class NodeItem(BaseModel): name: str type: str = Field(default=NodeType.NORMAL.value) description: str = Field(default='') - enable: str = Field(default=True) + enable: bool = Field(default=True) parameters: dict[str, Any] depedency: Optional[DependencyItem] = None position: PositionItem editable: bool = Field(default=True) - created_at: Optional[float] = Field(alias="createdAt") class EdgeItem(BaseModel): @@ -63,16 +60,15 @@ class EdgeItem(BaseModel): target_node: str = Field(alias="targetNode") type: str = Field(default=EdgeType.NORMAL.value) branch_id: str = Field(alias="branchId") - created_at: Optional[float] = Field(alias="createdAt") class FlowItem(BaseModel): """请求/响应中的流变量类""" flow_id: Optional[str] = Field(alias="flowId") - name: str - description: str + name: str = Field(default='') + description: str = Field(default='') enable: bool = Field(default=True) editable: bool = Field(default=True) + nodes: list[NodeItem] = Field(default=[]) + edges: list[EdgeItem] = Field(default=[]) created_at: Optional[float] = Field(alias="createdAt") - nodes: list[NodeItem] - edeges: list[EdgeItem] diff --git a/apps/entities/request_data.py b/apps/entities/request_data.py index 08540509..41d12ba4 100644 --- a/apps/entities/request_data.py +++ b/apps/entities/request_data.py @@ -7,7 +7,7 @@ from typing import Optional from pydantic import BaseModel, Field from apps.entities.appcenter import AppData -from apps.entities.flow import EdgeItem, FlowItem, NodeItem, PositionItem +from apps.entities.flow_topology import PositionItem, FlowItem from apps.entities.task import RequestDataApp @@ -117,7 +117,5 @@ class PostKnowledgeIDData(BaseModel): class PutFlowReq(BaseModel): """创建/修改流拓扑结构""" flow: FlowItem - nodes: list[NodeItem] - edges: list[EdgeItem] focus_point: PositionItem = Field(alias="focusPoint") \ No newline at end of file diff --git a/apps/entities/response_data.py b/apps/entities/response_data.py index 5e938722..f7c81fe8 100644 --- a/apps/entities/response_data.py +++ b/apps/entities/response_data.py @@ -7,7 +7,7 @@ from typing import Any, Optional from pydantic import BaseModel, Field from apps.entities.appcenter import AppCenterCardItem, AppData -from apps.entities.collection import Blacklist, Document, NodeMetaData +from apps.entities.collection import Blacklist, Document from apps.entities.enum_var import DocumentStatus from apps.entities.record import RecordData from apps.entities.flow_topology import ServiceItem, NodeMetaDataItem, PositionItem, FlowItem @@ -315,8 +315,8 @@ class GetRecentAppListRsp(ResponseData): class NodeServiceListMsg(BaseModel): """GET /api/flow/service result""" - total: int - services: list[ServiceItem] + total: int=Field(default=0, description="服务总数") + services: list[ServiceItem]=Field(default=[], description="服务列表") class NodeServiceListRsp(ResponseData): @@ -327,32 +327,16 @@ class NodeServiceListRsp(ResponseData): class NodeMetaDataListMsg(BaseModel): """GET /api/flow/service/node result""" service_id: str = Field(alias="serviceId") - node_meta_datas: list[NodeMetaDataItem] - - -class ServiceNodeMetaDatasMsg(BaseModel): - """GET /api/flow/node/metadata 服务与服务下节点元数据结构""" - service_id: str = Field(alias="serviceId") - name: str - type: str - node_meta_datas: list[NodeMetaDataItem] = Field( - alias="nodeMetaDatas", default=[]) - created_at: Optional[float] = Field(alias="createdAt") - - -class NodeMetaDataListMsg(ResponseData): - services: list[ServiceNodeMetaDatasMsg] - + node_meta_datas: list[NodeMetaDataItem]=Field(alias="nodeMetaDatas",default=[]) class NodeMetaDataListRsp(ResponseData): """GET /api/flow/service/node 返回数据结构""" result: NodeMetaDataListMsg - class FlowStructureGetMsg(BaseModel): """GET /api/flow result""" flow: FlowItem - focus_point: PositionItem + focus_point: PositionItem=Field(alias="focusPoint",default=PositionItem(x=0,y=0)) class FlowStructureGetRsp(ResponseData): @@ -362,7 +346,7 @@ class FlowStructureGetRsp(ResponseData): class FlowStructurePutMsg(BaseModel): """PUT /api/flow result""" - flow_id: str = Field(alias="flowId") + flow_id: str = Field(alias="flowId",default="") class FlowStructurePutRsp(ResponseData): @@ -372,7 +356,7 @@ class FlowStructurePutRsp(ResponseData): class FlowStructureDeleteMsg(BaseModel): """DELETE /api/flow/{flowId} result""" - flow_id: str = Field(alias="flowId") + flow_id: str = Field(alias="flowId",default="") class FlowStructureDeleteRsp(ResponseData): diff --git a/apps/manager/application.py b/apps/manager/application.py index 1716657e..182cd3f8 100644 --- a/apps/manager/application.py +++ b/apps/manager/application.py @@ -2,8 +2,6 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from pymongo import ASCENDING - from apps.constants import LOGGER from apps.models.mongo import MongoDB from apps.entities.enum_var import PermissionType @@ -21,21 +19,22 @@ class AppManager: """ try: app_collection = MongoDB.get_collection("app") # 获取应用集合' - match_conditions = [ - {"author": user_sub}, - {"permissions.type": PermissionType.PUBLIC.value}, - { - "$and": [ - {"permissions.type": PermissionType.PROTECTED.value}, - {"permissions.users": user_sub} - ] - } - ] - query = {"$and": [{"_id": app_id}, - {"$or": match_conditions}]} + query = { + "_id": app_id, + "$or": [ + {"author": user_sub}, + {"permission.type": PermissionType.PUBLIC.value}, + { + "$and": [ + {"permission.type": PermissionType.PROTECTED.value}, + {"permission.users": user_sub} + ] + } + ] + } - result = await app_collection.count_documents(query) - return (result > 0) + result = await app_collection.find_one(query) + return (result is not None) except Exception as e: LOGGER.error(f"Validate user app access failed due to: {e}") return False diff --git a/apps/manager/flow.py b/apps/manager/flow.py index d8c35b92..c08725d1 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -61,10 +61,10 @@ class FlowManager: skip_count = (page - 1) * page_size match_conditions = [ {"author": user_sub}, - {"permissions.type": "PUBLIC"}, + {"permissions.type": PermissionType.PUBLIC.value}, { "$and": [ - {"permissions.type": "PROTECTED"}, + {"permissions.type": PermissionType.PROTECTED.value}, {"permissions.users": user_sub} ] } @@ -82,10 +82,10 @@ class FlowManager: service_records = await service_records_cursor.to_list(length=None) service_items = [ ServiceItem( - service_id=str(record["_id"]), + serviceId=str(record["_id"]), name=record["name"], - type=record["type"], - created_at=record["created_at"] + type="default", + createdAt=record["created_at"] ) for record in service_records ] @@ -103,22 +103,25 @@ class FlowManager: :param service_id: 服务id :return: 节点元数据的列表 """ - node_pool_collection = MongoDB.get_collection("node") # 获取节点集合 + node_pool_collection = MongoDB.get_collection("node") try: cursor = node_pool_collection.find( - {"service": service_id}).sort("created_at", ASCENDING) # 查询指定service_id的所有node_poool + {"service_id": service_id}).sort("_id", ASCENDING) nodes_meta_data_items = [] async for node_pool_record in cursor: - # 将每个node_pool换成NodeMetaDataItem实例 node_meta_data_item = NodeMetaDataItem( - api_id=node_pool_record["id"], + apiId=node_pool_record["_id"], + type=node_pool_record["call_id"], name=node_pool_record['name'], - type=node_pool_record['type'], description=node_pool_record['description'], - parameters_template=node_pool_record['parameters_template'], + parametersTemplate={ + "fixed_params": node_pool_record['fixed_params'], + "params_schema": node_pool_record['params_schema'], + "output_schema": node_pool_record['output_schema'] + }, editable=True, - created_at=node_pool_record['created_at'] + createdAt=node_pool_record['created_at'], ) nodes_meta_data_items.append(node_meta_data_item) @@ -139,12 +142,19 @@ class FlowManager: """ try: flow_collection = MongoDB.get_collection("app") - flow_record = await flow_collection.aggregate([ - {"$match": {"_id": app_id, "flows._id": flow_id}}, - {"$unwind": "$flows"}, - {"$match": {"flows._id": flow_id}}, - {"$limit": 1}, - ]).to_list(length=1) + cursor = flow_collection.find( + {"_id": app_id, "flows._id": flow_id}, + {"flows.$": 1} # 只返回 flows 数组中符合条件的第一个元素 + ) + + # 获取结果列表,并限制长度为1,因为我们只期待一个结果 + app_records = await cursor.to_list(length=1) + if app_records and len(app_records) == 0: + return None + app_record = app_records[0] + if "flows" not in app_record.keys() or len(app_record["flows"]) == 0: + return None + flow_record = app_record["flows"][0] except Exception as e: LOGGER.error( f"Get flow by app_id and flow_id failed due to: {e}") @@ -152,23 +162,23 @@ class FlowManager: if flow_record: flow_config = await get_flow_config(app_id, flow_id) if not flow_config: - LOGGER.error( - "Get flow config by app_id and flow_id failed") + LOGGER.error("Get flow config by app_id and flow_id failed") return None focus_point = flow_record["focus_point"] flow_item = FlowItem( - flow_id=flow_id, + flowId=flow_id, name=flow_config.name, description=flow_config.description, enable=True, editable=True, nodes=[], - edeges=[] + edges=[], + createdAt=flow_record["created_at"] ) for node_config in flow_config.steps: node_item = NodeItem( - node_id=node_config.node_id, - app_id=node_config.node, + nodeId=node_config.id, + apiId=node_config.node, name=node_config.name, description=node_config.description, enable=True, @@ -176,26 +186,31 @@ class FlowManager: type=node_config.type, parameters=node_config.params, position=PositionItem( - x=node_config.position.x, y=node_config.position.y) + x=node_config.pos.x, y=node_config.pos.y) ) flow_item.nodes.append(node_item) for edge_config in flow_config.edges: + edge_from = edge_config.edge_from + branch_id = '' tmp_list = edge_config.edge_from.split('.') - branch_id = tmp_list[1] if len(tmp_list) > 2 else '' - flow_item.edeges.append(EdgeItem( - edge_id=edge_config.id, - source_node=edge_config.edge_from, - target_node=edge_config.edge_to, - type=edge_config.type.value, - branch_id=branch_id, + if len(tmp_list) == 2: + edge_from = tmp_list[0] + branch_id = tmp_list[1] + flow_item.edges.append(EdgeItem( + edgeId=edge_config.id, + sourceNode=edge_from, + targetNode=edge_config.edge_to, + type=edge_config.edge_type, + branchId=branch_id, )) flow_item.nodes.append(node_item) return (flow_item, focus_point) return None @staticmethod - async def put_flow_by_app_and_flow_id(app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> str: + async def put_flow_by_app_and_flow_id( + app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> str: """存储/更新flow的数据库数据和配置文件 :param app_id: 应用的id @@ -205,12 +220,20 @@ class FlowManager: """ try: flow_collection = MongoDB.get_collection("app") - flow_record = await flow_collection.aggregate([ - {"$match": {"_id": app_id, "flows._id": flow_id}}, - {"$unwind": "$flows"}, - {"$match": {"flows._id": flow_id}}, - {"$limit": 1}, - ]).to_list(length=1) + cursor = flow_collection.find( + {"_id": app_id, "flows._id": flow_id}, + {"flows.$": 1} # 只返回 flows 数组中符合条件的第一个元素 + ) + + # 获取结果列表,并限制长度为1,因为我们只期待一个结果 + app_records = await cursor.to_list(length=1) + if app_records and len(app_records) == 0: + return None + app_record = app_records[0] + if "flows" not in app_record.keys() or len(app_record["flows"]) == 0: + return None + flow_record = app_record["flows"][0] + LOGGER.error(flow_record) except Exception as e: LOGGER.error( f"Get flow by app_id and flow_id failed due to: {e}") @@ -225,6 +248,7 @@ class FlowManager: for node_item in flow_item.nodes: edge_config = Step( id=node_item.node_id, + type=node_item.type, node=node_item.api_id, name=node_item.name, description=node_item.description, @@ -233,7 +257,7 @@ class FlowManager: params=node_item.parameters ) flow_config.steps.append(edge_config) - for edge_item in flow_item.edeges: + for edge_item in flow_item.edges: edge_from = edge_item.source_node if edge_item.branch_id: edge_from = edge_from+'.'+edge_item.branch_id @@ -245,26 +269,29 @@ class FlowManager: ) flow_config.edges.append(edge_config) if flow_record: - result = await update_flow_config(app_id, flow_id, flow_config) + # result = await update_flow_config(app_id, flow_id, flow_config) + result = 1 if not result: LOGGER.error(f"Update flow config failed") return None - app_collection = await MongoDB.get_collection("app") - result = await app_collection.update_one( + app_collection = MongoDB.get_collection("app") + result = await app_collection.find_one_and_update( {'_id': app_id}, { '$set': { - 'flows.$[flow].focus_point': focus_point + 'flows.$[element].focus_point': focus_point.model_dump(by_alias=True) } }, - array_filters=[{'flow._id': flow_id}] + array_filters=[{'element._id': flow_id}], + return_document=True # 返回更新后的文档 ) - if result.modified_count > 0: + if result: return flow_id else: return None else: - new_path = await add_flow_config(app_id, flow_id, flow_config) + # new_path = await add_flow_config(app_id, flow_id, flow_config) + new_path = "" if not new_path: LOGGER.error(f"Add flow config failed") return None diff --git a/apps/routers/flow.py b/apps/routers/flow.py index 521dad90..2420a9a6 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -2,7 +2,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from fastapi import APIRouter, Depends, status, Path, Query, Body +from fastapi import APIRouter, Depends, status, Query, Body from fastapi.responses import JSONResponse from typing import Annotated, Optional @@ -25,21 +25,21 @@ router = APIRouter( ) -@router.get("/service", response_model=NodeMetaDataListRsp, responses={ +@router.get("/service", response_model=NodeServiceListRsp, responses={ status.HTTP_404_NOT_FOUND: {"model": ResponseData}, }) -async def get_node_metadatas( - user_sub: Annotated[str, Depends(get_user)], - page: int = Query(...), - page_size: int = Query(..., alias="pageSize"), +async def get_services( + user_sub: Annotated[str, Depends(get_user)], + page: Annotated[int, Query(gt=0, description="当前页码")], + page_size: Annotated[int, Query(gt=0, alias="pageSize", description="每页大小")] ): """获取用户可访问的节点元数据所在服务的信息""" - result = await FlowManager.get_node_meta_datas_by_service_id(user_sub, page, page_size) + result = await FlowManager.get_service_by_user_id(user_sub, page, page_size) if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=NodeServiceListRsp( code=status.HTTP_404_NOT_FOUND, message="节点元数据所在服务信息不存在", - result={}, + result=NodeServiceListMsg(), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=NodeServiceListRsp( code=status.HTTP_200_OK, @@ -52,12 +52,12 @@ async def get_node_metadatas( status.HTTP_403_FORBIDDEN: {"model": ResponseData}, status.HTTP_404_NOT_FOUND: {"model": ResponseData} }) -async def get_flow( +async def get_servide_nodes( user_sub: Annotated[str, Depends(get_user)], - service_id: int = Query(..., alias="serviceId") + service_id: Annotated[str, Query(..., alias="serviceId")] ): """获取用户可访问的节点元数据""" - if not await AppManager.validate_user_app_access(user_sub, service_id): + if not await FlowManager.validate_user_service_access(user_sub, service_id): return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该服务", @@ -66,15 +66,15 @@ async def get_flow( result = await FlowManager.get_node_meta_datas_by_service_id(service_id) if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=NodeMetaDataListRsp( code=status.HTTP_404_NOT_FOUND, message="服务下节点元数据获取失败", - result={}, + result=NodeMetaDataListMsg(serviceId=service_id), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=NodeMetaDataListRsp( code=status.HTTP_200_OK, message="服务下节点元数据获取成功", - result=NodeMetaDataListMsg(node_meta_datas=result) + result=NodeMetaDataListMsg(serviceId=service_id, nodeMetaDatas=result) ).model_dump(exclude_none=True, by_alias=True)) @@ -84,22 +84,22 @@ async def get_flow( }) async def get_flow( user_sub: Annotated[str, Depends(get_user)], - app_id: str = Query(..., alias="appId"), - flow_id: str = Query(..., alias="flowId") + app_id: Annotated[str, Query(..., alias="appId")], + flow_id: Annotated[str, Query(..., alias="flowId")] ): """获取流拓扑结构""" - if not await AppManager.validate_user_app_access(user_sub, app_id): - return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + if not await FlowManager.validate_user_service_access(user_sub, app_id): + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=FlowStructureGetRsp( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该流", - result={}, + result=FlowStructureGetMsg(), ).model_dump(exclude_none=True, by_alias=True)) result = await FlowManager.get_flow_by_app_and_flow_id(app_id, flow_id) if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=FlowStructureGetRsp( code=status.HTTP_404_NOT_FOUND, message="应用下流程获取失败", - result={}, + result=FlowStructureGetMsg(), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructureGetRsp( code=status.HTTP_200_OK, @@ -122,29 +122,30 @@ async def put_flow( put_body: PutFlowReq = Body(...) ): """修改流拓扑结构""" - if not await AppManager.validate_user_app_access(user_sub, app_id): - return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + if not await FlowManager.validate_user_app_access(user_sub, app_id): + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=FlowStructurePutRsp( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该流", - result={}, + result=FlowStructurePutMsg(), ).model_dump(exclude_none=True, by_alias=True)) if topology_check: - await FlowService.validate_flow_connectivity(put_body.flow) - await FlowService.validate_flow_illegal(put_body.flow) + await FlowManager.validate_flow_connectivity(put_body.flow) + await FlowManager.validate_flow_illegal(put_body.flow) result = await FlowManager.put_flow_by_app_and_flow_id(app_id, flow_id, put_body.flow, put_body.focus_point) if result is None: - return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=ResponseData( + return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=FlowStructurePutRsp( code=status.HTTP_500_INTERNAL_SERVER_ERROR, - message="应用下流程更新失败", - result={}, + message="应用下流更新失败", + result=FlowStructurePutMsg(), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructurePutRsp( code=status.HTTP_200_OK, - message="应用下流程更新成功", - result=FlowStructurePutMsg(flow_id=result) + message="应用下流更新成功", + result=FlowStructurePutMsg(flowId=result) ).model_dump(exclude_none=True, by_alias=True)) + @router.delete("", response_model=FlowStructureDeleteRsp, responses={ status.HTTP_404_NOT_FOUND: {"model": ResponseData} }) @@ -155,20 +156,20 @@ async def delete_flow( ): """删除流拓扑结构""" if not await AppManager.validate_user_app_access(user_sub, app_id): - return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=FlowStructureDeleteRsp( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该流", - result={}, + result=FlowStructureDeleteMsg(), ).model_dump(exclude_none=True, by_alias=True)) result = await FlowManager.delete_flow_by_app_and_flow_id(app_id, flow_id) if result is None: return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( code=status.HTTP_404_NOT_FOUND, message="应用下流程删除失败", - result={}, + result=FlowStructureDeleteMsg(), ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructureDeleteRsp( code=status.HTTP_200_OK, message="应用下流程删除成功", - result=FlowStructureDeleteMsg(flow_id=result) + result=FlowStructureDeleteMsg(flowId=result) ).model_dump(exclude_none=True, by_alias=True)) diff --git a/apps/utils/flow.py b/apps/utils/flow.py index 3ed20d78..40678182 100644 --- a/apps/utils/flow.py +++ b/apps/utils/flow.py @@ -36,16 +36,16 @@ class FlowService: node_to_branches[node.node_id] = set() if node.type == NodeType.CHOICE.value: for branch in node.parameters['choices']: - if branch.branch in node_to_branches[node.node_id]: - LOGGER.error(msg=f"分支id重复") - raise Exception(f"节点{node.name}的分支{branch.branch }重复") - node_to_branches[node.node_id].add(branch.branch) + if branch['branch'] in node_to_branches[node.node_id]: + LOGGER.error(msg="分支id重复") + raise Exception(f"节点{node.name}的分支{branch['branch']}重复") + node_to_branches[node.node_id].add(branch['branch']) else: node_to_branches[node.node_id].add('') if num_of_start_node != 1 or num_of_end_node != 1: LOGGER.error(msg="起始节点和终止节点数量不为1") raise Exception("起始节点和终止节点数量不为1") - for edge in flow_item.edeges: + for edge in flow_item.edges: if edge.edge_id in edge_id_set: LOGGER.error(msg="边id重复") raise Exception(f"边{edge.edge_id}的id重复") @@ -62,9 +62,12 @@ class FlowService: if edge.branch_id not in node_to_branches[edge.source_node]: LOGGER.error(msg=f"边{edge.edge_id}的分支{edge.branch_id}不存在") raise Exception(f"边{edge.edge_id}的分支{edge.branch_id}不存在") + if edge.source_node not in edge_to_branch: + edge_to_branch[edge.source_node] = set() if edge.branch_id in edge_to_branch[edge.source_node]: LOGGER.error(msg=f"边{edge.edge_id}的分支{edge.branch_id}重复") raise Exception(f"边{edge.edge_id}的分支{edge.branch_id}重复") + node_to_branches[edge.source_node].add(edge.branch_id) if edge.source_node not in edge_to_branch: edge_to_branch[edge.source_node] = set() edge_to_branch[edge.source_node].add(edge.edge_id) @@ -72,10 +75,10 @@ class FlowService: edge.target_node, 0) + 1 node_out_degrees[edge.source_node] = node_out_degrees.get( edge.source_node, 0) + 1 - if node_in_degrees[id_of_start_node] != 0: + if id_of_start_node in node_in_degrees.keys() and node_in_degrees[id_of_start_node] != 0: LOGGER.error(msg=f"起始节点{id_of_start_node}的入度不为0") raise Exception(f"起始节点{id_of_start_node}的入度不为0") - if node_out_degrees[id_of_end_node] != 0: + if id_of_end_node in node_out_degrees.keys() and node_out_degrees[id_of_end_node] != 0: LOGGER.error(msg=f"终止节点{id_of_end_node}的出度不为0") raise Exception(f"终止节点{id_of_end_node}的出度不为0") @@ -90,7 +93,7 @@ class FlowService: id_of_start_node = node.node_id if node.type == NodeType.END.value: id_of_end_node = node.node_id - for edge in flow_item.edeges: + for edge in flow_item.edges: node_in_degrees[edge.target_node] = node_in_degrees.get( edge.target_node, 0) + 1 node_out_degrees[edge.source_node] = node_out_degrees.get( -- Gitee From fe31b6d04c4e7a344b82776d0edd79cfd47ef8dc Mon Sep 17 00:00:00 2001 From: zxstty Date: Fri, 24 Jan 2025 11:55:22 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E5=AE=8C=E5=96=84flow=E7=9A=84api=E3=80=81?= =?UTF-8?q?manger?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/entities/response_data.py | 6 +-- apps/manager/application.py | 20 ++++++++++ apps/manager/flow.py | 71 +++++++++++++++++----------------- apps/routers/flow.py | 12 +++--- 4 files changed, 64 insertions(+), 45 deletions(-) diff --git a/apps/entities/response_data.py b/apps/entities/response_data.py index f7c81fe8..523e96db 100644 --- a/apps/entities/response_data.py +++ b/apps/entities/response_data.py @@ -355,10 +355,10 @@ class FlowStructurePutRsp(ResponseData): class FlowStructureDeleteMsg(BaseModel): - """DELETE /api/flow/{flowId} result""" + """DELETE /api/flow/ result""" flow_id: str = Field(alias="flowId",default="") class FlowStructureDeleteRsp(ResponseData): - """DELETE /api/flow/{flowId} 返回数据结构""" - result: FlowStructureDeleteMsg + """DELETE /api/flow/ 返回数据结构""" + result: FlowStructureDeleteMsg \ No newline at end of file diff --git a/apps/manager/application.py b/apps/manager/application.py index 182cd3f8..c0430273 100644 --- a/apps/manager/application.py +++ b/apps/manager/application.py @@ -38,3 +38,23 @@ class AppManager: except Exception as e: LOGGER.error(f"Validate user app access failed due to: {e}") return False + @staticmethod + async def validate_app_belong_to_user(user_sub: str, app_id: str) -> bool: + """验证用户对应用的属权 + + :param user_sub: 用户唯一标识符 + :param app_id: 应用id + :return: 如果应用属于用户则返回True,否则返回False + """ + try: + app_collection = MongoDB.get_collection("app") # 获取应用集合' + query = { + "_id": app_id, + "author": user_sub + } + + result = await app_collection.find_one(query) + return (result is not None) + except Exception as e: + LOGGER.error(f"Validate user app access failed due to: {e}") + return False \ No newline at end of file diff --git a/apps/manager/flow.py b/apps/manager/flow.py index c08725d1..542d67c5 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -3,7 +3,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ from typing import Tuple, List -from pymongo import ASCENDING +from pymongo import ASCENDING,ReturnDocument from apps.constants import LOGGER from apps.entities.flow import StepPos, Edge, Step, Flow @@ -204,7 +204,6 @@ class FlowManager: type=edge_config.edge_type, branchId=branch_id, )) - flow_item.nodes.append(node_item) return (flow_item, focus_point) return None @@ -220,6 +219,10 @@ class FlowManager: """ try: flow_collection = MongoDB.get_collection("app") + app_record = await flow_collection.find_one({"_id":app_id}) + if app_record is None: + LOGGER.error(f"应用{app_id}不存在") + return None cursor = flow_collection.find( {"_id": app_id, "flows._id": flow_id}, {"flows.$": 1} # 只返回 flows 数组中符合条件的第一个元素 @@ -227,13 +230,9 @@ class FlowManager: # 获取结果列表,并限制长度为1,因为我们只期待一个结果 app_records = await cursor.to_list(length=1) - if app_records and len(app_records) == 0: - return None - app_record = app_records[0] - if "flows" not in app_record.keys() or len(app_record["flows"]) == 0: - return None - flow_record = app_record["flows"][0] - LOGGER.error(flow_record) + flow_record=None + if len(app_records)!=0: + flow_record=app_records[0]["flows"][0] except Exception as e: LOGGER.error( f"Get flow by app_id and flow_id failed due to: {e}") @@ -269,8 +268,7 @@ class FlowManager: ) flow_config.edges.append(edge_config) if flow_record: - # result = await update_flow_config(app_id, flow_id, flow_config) - result = 1 + result = await update_flow_config(app_id, flow_id, flow_config) if not result: LOGGER.error(f"Update flow config failed") return None @@ -283,43 +281,43 @@ class FlowManager: } }, array_filters=[{'element._id': flow_id}], - return_document=True # 返回更新后的文档 + return_document=ReturnDocument.AFTER ) - if result: - return flow_id - else: + if result is None: + LOGGER.error(f"Add flow failed") return None + return flow_id else: - # new_path = await add_flow_config(app_id, flow_id, flow_config) - new_path = "" + new_path = await add_flow_config(app_id, flow_id, flow_config) if not new_path: - LOGGER.error(f"Add flow config failed") + LOGGER.error("Add flow config failed") return None new_flow = AppFlow( - id=flow_id, + _id=flow_id, name=flow_item.name, description=flow_item.description, path=new_path, focus_point=PositionItem(x=focus_point.x, y=focus_point.y), ) - result = await app_collection.update_one( + app_collection = MongoDB.get_collection("app") + result = await app_collection.find_one_and_update( {'_id': app_id}, { '$push': { 'flows': new_flow.model_dump(by_alias=True) } - } + }, + return_document=ReturnDocument.AFTER ) - if result.modified_count > 0: - return flow_id - else: + if result is None: + LOGGER.error(f"Add flow failed") return None + return flow_id except Exception as e: LOGGER.error( f"Put flow by app_id and flow_id failed due to: {e}") return flow_id - @staticmethod async def delete_flow_by_app_and_flow_id(app_id: str, flow_id: str) -> str: """删除flow的数据库数据和配置文件 @@ -329,27 +327,28 @@ class FlowManager: """ try: - result = await delete_flow_config(app_id, flow_id) + # result = await delete_flow_config(app_id, flow_id) + result=True if not result: - LOGGER.error(f"Delete flow config failed") + print("Delete flow config failed") return None - app_pool_collection = await MongoDB.get_collection("app") # 获取集合 + app_pool_collection = MongoDB.get_collection("app") # 获取集合 # 执行删除操作,从指定的AppPool文档中移除匹配的AppFlow - result = await app_pool_collection.update_one( + result = await app_pool_collection.find_one_and_update( {'_id': app_id}, # 查询条件,找到要更新的AppPool文档 { '$pull': { - 'flows': {'id': flow_id} # 假设'flows'数组中每个对象都有'id'字段 + 'flows': {'_id': flow_id} # 假设'flows'数组中每个对象都有'id'字段 } - } + }, + return_document=ReturnDocument.AFTER ) - - if result.modified_count > 0: - return flow_id - else: + if result is None: + LOGGER.error("Delete flow from app pool failed") return None + return flow_id except Exception as e: LOGGER.error( f"Delete flow by app_id and flow_id failed due to: {e}") - return None + return None \ No newline at end of file diff --git a/apps/routers/flow.py b/apps/routers/flow.py index 2420a9a6..3795d052 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -122,15 +122,15 @@ async def put_flow( put_body: PutFlowReq = Body(...) ): """修改流拓扑结构""" - if not await FlowManager.validate_user_app_access(user_sub, app_id): + if not await AppManager.validate_app_belong_to_user(user_sub, app_id): return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=FlowStructurePutRsp( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该流", result=FlowStructurePutMsg(), ).model_dump(exclude_none=True, by_alias=True)) if topology_check: - await FlowManager.validate_flow_connectivity(put_body.flow) - await FlowManager.validate_flow_illegal(put_body.flow) + await FlowService.validate_flow_connectivity(put_body.flow) + await FlowService.validate_flow_illegal(put_body.flow) result = await FlowManager.put_flow_by_app_and_flow_id(app_id, flow_id, put_body.flow, put_body.focus_point) if result is None: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=FlowStructurePutRsp( @@ -145,7 +145,6 @@ async def put_flow( ).model_dump(exclude_none=True, by_alias=True)) - @router.delete("", response_model=FlowStructureDeleteRsp, responses={ status.HTTP_404_NOT_FOUND: {"model": ResponseData} }) @@ -154,8 +153,9 @@ async def delete_flow( app_id: str = Query(..., alias="appId"), flow_id: str = Query(..., alias="flowId") ): + print(user_sub) """删除流拓扑结构""" - if not await AppManager.validate_user_app_access(user_sub, app_id): + if not await AppManager.validate_app_belong_to_user(user_sub, app_id): return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=FlowStructureDeleteRsp( code=status.HTTP_403_FORBIDDEN, message="用户没有权限访问该流", @@ -163,7 +163,7 @@ async def delete_flow( ).model_dump(exclude_none=True, by_alias=True)) result = await FlowManager.delete_flow_by_app_and_flow_id(app_id, flow_id) if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=ResponseData( + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=FlowStructureDeleteRsp( code=status.HTTP_404_NOT_FOUND, message="应用下流程删除失败", result=FlowStructureDeleteMsg(), -- Gitee From d32fcf94c1e39821c8d4b6208c624f443e739580 Mon Sep 17 00:00:00 2001 From: zxstty Date: Sat, 25 Jan 2025 17:37:11 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dflow=E7=9A=84api=E3=80=81?= =?UTF-8?q?manager=E5=92=8Cservice=E7=9B=B8=E5=85=B3=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/entities/response_data.py | 3 +- apps/manager/flow.py | 74 ++++++++++++++++++---------------- apps/routers/flow.py | 21 +++++----- 3 files changed, 51 insertions(+), 47 deletions(-) diff --git a/apps/entities/response_data.py b/apps/entities/response_data.py index 523e96db..dfc2d0a6 100644 --- a/apps/entities/response_data.py +++ b/apps/entities/response_data.py @@ -315,13 +315,12 @@ class GetRecentAppListRsp(ResponseData): class NodeServiceListMsg(BaseModel): """GET /api/flow/service result""" - total: int=Field(default=0, description="服务总数") services: list[ServiceItem]=Field(default=[], description="服务列表") class NodeServiceListRsp(ResponseData): """GET /api/flow/service 返回数据结构""" - result: NodeServiceListMsg + result: NodeServiceListMsg=Field(default=NodeServiceListMsg(), description="服务列表") class NodeMetaDataListMsg(BaseModel): diff --git a/apps/manager/flow.py b/apps/manager/flow.py index 542d67c5..a9df372d 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -3,7 +3,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ from typing import Tuple, List -from pymongo import ASCENDING,ReturnDocument +from pymongo import ASCENDING, ReturnDocument from apps.constants import LOGGER from apps.entities.flow import StepPos, Edge, Step, Flow @@ -38,8 +38,7 @@ class FlowManager: } ] query = {"$and": [{"_id": service_id}, - {"$or": match_conditions}, - {"favorites": user_sub}]} + {"$or": match_conditions}]} result = await service_collection.count_documents(query) return (result > 0) @@ -48,35 +47,36 @@ class FlowManager: return False @staticmethod - async def get_service_by_user_id(user_sub: str, page: int, page_size: int) -> Tuple[int, List[ServiceItem]]: + async def get_service_by_user_id(user_sub: str) -> Tuple[int, List[ServiceItem]]: """根据用户ID获取用户有执行权限且收藏的服务列表 :param user_sub: 用户唯一标识符 - :param page: 当前页码 - :param page_size: 每页大小 :return: 返回符合条件的服务总数和分页后的服务列表 """ service_collection = MongoDB.get_collection("service") try: - skip_count = (page - 1) * page_size match_conditions = [ + {"name": "系统"}, {"author": user_sub}, - {"permissions.type": PermissionType.PUBLIC.value}, + { + "$and": [ + {"permissions.type": PermissionType.PUBLIC.value}, + {"permissions.users": user_sub}, + {"favorites": user_sub} + ] + }, { "$and": [ {"permissions.type": PermissionType.PROTECTED.value}, - {"permissions.users": user_sub} + {"permissions.users": user_sub}, + {"favorites": user_sub} ] } ] - query = {"$and": [{"$or": match_conditions}, - {"favorites": user_sub}]} + query = {"$or": match_conditions} - total_services = await service_collection.count_documents(query) service_records_cursor = service_collection.find( query, - skip=skip_count, - limit=page_size, sort=[("created_at", ASCENDING)] ) service_records = await service_records_cursor.to_list(length=None) @@ -90,10 +90,10 @@ class FlowManager: for record in service_records ] - return total_services, service_items + return service_items except Exception as e: - LOGGER.error(f"Get service by user id failed due to: {e}") + print(f"Get service by user id failed due to: {e}") return None @staticmethod @@ -141,15 +141,18 @@ class FlowManager: :return: 流的item和用户在这个流上的视觉焦点 """ try: - flow_collection = MongoDB.get_collection("app") - cursor = flow_collection.find( + app_collection = MongoDB.get_collection("app") + app_record = await app_collection.find_one({"_id": app_id}) + if app_record is None: + LOGGER.error(f"应用{app_id}不存在") + return None + cursor = app_collection.find( {"_id": app_id, "flows._id": flow_id}, - {"flows.$": 1} # 只返回 flows 数组中符合条件的第一个元素 + {"flows.$": 1} ) - # 获取结果列表,并限制长度为1,因为我们只期待一个结果 app_records = await cursor.to_list(length=1) - if app_records and len(app_records) == 0: + if len(app_records) == 0: return None app_record = app_records[0] if "flows" not in app_record.keys() or len(app_record["flows"]) == 0: @@ -160,7 +163,7 @@ class FlowManager: f"Get flow by app_id and flow_id failed due to: {e}") return None if flow_record: - flow_config = await get_flow_config(app_id, flow_id) + flow_config = await app_collection(app_id, flow_id) if not flow_config: LOGGER.error("Get flow config by app_id and flow_id failed") return None @@ -218,21 +221,21 @@ class FlowManager: :return: 流的id """ try: - flow_collection = MongoDB.get_collection("app") - app_record = await flow_collection.find_one({"_id":app_id}) + app_collection = MongoDB.get_collection("app") + app_record = await app_collection.find_one({"_id": app_id}) if app_record is None: LOGGER.error(f"应用{app_id}不存在") return None - cursor = flow_collection.find( + cursor = app_collection.find( {"_id": app_id, "flows._id": flow_id}, - {"flows.$": 1} # 只返回 flows 数组中符合条件的第一个元素 + {"flows.$": 1} ) - - # 获取结果列表,并限制长度为1,因为我们只期待一个结果 app_records = await cursor.to_list(length=1) - flow_record=None - if len(app_records)!=0: - flow_record=app_records[0]["flows"][0] + flow_record = None + if len(app_records) != 0: + app_record = app_records[0] + if "flows" in app_record.keys() and len(app_record["flows"]) != 0: + flow_record = app_record["flows"][0] except Exception as e: LOGGER.error( f"Get flow by app_id and flow_id failed due to: {e}") @@ -267,6 +270,10 @@ class FlowManager: edge_type=edge_item.type ) flow_config.edges.append(edge_config) + except Exception as e: + LOGGER.error(f"Parse flow config failed: {e}") + return None + try: if flow_record: result = await update_flow_config(app_id, flow_id, flow_config) if not result: @@ -327,8 +334,7 @@ class FlowManager: """ try: - # result = await delete_flow_config(app_id, flow_id) - result=True + result = await delete_flow_config(app_id, flow_id) if not result: print("Delete flow config failed") return None @@ -351,4 +357,4 @@ class FlowManager: except Exception as e: LOGGER.error( f"Delete flow by app_id and flow_id failed due to: {e}") - return None \ No newline at end of file + return None diff --git a/apps/routers/flow.py b/apps/routers/flow.py index 3795d052..1fb57b5c 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -30,22 +30,21 @@ router = APIRouter( }) async def get_services( user_sub: Annotated[str, Depends(get_user)], - page: Annotated[int, Query(gt=0, description="当前页码")], - page_size: Annotated[int, Query(gt=0, alias="pageSize", description="每页大小")] ): """获取用户可访问的节点元数据所在服务的信息""" - result = await FlowManager.get_service_by_user_id(user_sub, page, page_size) - if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=NodeServiceListRsp( + services = await FlowManager.get_service_by_user_id(user_sub) + if services is None: + return NodeServiceListRsp( code=status.HTTP_404_NOT_FOUND, - message="节点元数据所在服务信息不存在", - result=NodeServiceListMsg(), - ).model_dump(exclude_none=True, by_alias=True)) - return JSONResponse(status_code=status.HTTP_200_OK, content=NodeServiceListRsp( + message="未找到符合条件的服务", + result=NodeServiceListMsg(services=[]).model_dump(exclude_none=True, by_alias=True) + ) + + return NodeServiceListRsp( code=status.HTTP_200_OK, message="节点元数据所在服务信息获取成功", - result=NodeServiceListMsg(total=result[0], services=result[1]) - ).model_dump(exclude_none=True, by_alias=True)) + result=NodeServiceListMsg(services=services).model_dump(exclude_none=True, by_alias=True) + ) @router.get("/service/node", response_model=NodeMetaDataListRsp, responses={ -- Gitee From fda1c467b0b1ab1437622ec4fa56847e70e83f3b Mon Sep 17 00:00:00 2001 From: zxstty Date: Sun, 26 Jan 2025 16:03:13 +0800 Subject: [PATCH 4/4] fix bug --- apps/entities/response_data.py | 16 ++++++----- apps/routers/flow.py | 10 ++++++- apps/utils/flow.py | 50 ++++++++++++++++++---------------- 3 files changed, 45 insertions(+), 31 deletions(-) diff --git a/apps/entities/response_data.py b/apps/entities/response_data.py index dfc2d0a6..5202fb15 100644 --- a/apps/entities/response_data.py +++ b/apps/entities/response_data.py @@ -315,27 +315,29 @@ class GetRecentAppListRsp(ResponseData): class NodeServiceListMsg(BaseModel): """GET /api/flow/service result""" - services: list[ServiceItem]=Field(default=[], description="服务列表") + services: list[ServiceItem] = Field(default=[], description="服务列表") class NodeServiceListRsp(ResponseData): """GET /api/flow/service 返回数据结构""" - result: NodeServiceListMsg=Field(default=NodeServiceListMsg(), description="服务列表") + result: NodeServiceListMsg = Field(default=NodeServiceListMsg(), description="服务列表") class NodeMetaDataListMsg(BaseModel): """GET /api/flow/service/node result""" service_id: str = Field(alias="serviceId") - node_meta_datas: list[NodeMetaDataItem]=Field(alias="nodeMetaDatas",default=[]) + node_meta_datas: list[NodeMetaDataItem] = Field(alias="nodeMetaDatas", default=[]) + class NodeMetaDataListRsp(ResponseData): """GET /api/flow/service/node 返回数据结构""" result: NodeMetaDataListMsg + class FlowStructureGetMsg(BaseModel): """GET /api/flow result""" flow: FlowItem - focus_point: PositionItem=Field(alias="focusPoint",default=PositionItem(x=0,y=0)) + focus_point: PositionItem = Field(alias="focusPoint", default=PositionItem(x=0, y=0)) class FlowStructureGetRsp(ResponseData): @@ -345,7 +347,7 @@ class FlowStructureGetRsp(ResponseData): class FlowStructurePutMsg(BaseModel): """PUT /api/flow result""" - flow_id: str = Field(alias="flowId",default="") + flow: FlowItem = Field(alias="flowId", default=FlowItem()) class FlowStructurePutRsp(ResponseData): @@ -355,9 +357,9 @@ class FlowStructurePutRsp(ResponseData): class FlowStructureDeleteMsg(BaseModel): """DELETE /api/flow/ result""" - flow_id: str = Field(alias="flowId",default="") + flow_id: str = Field(alias="flowId", default="") class FlowStructureDeleteRsp(ResponseData): """DELETE /api/flow/ 返回数据结构""" - result: FlowStructureDeleteMsg \ No newline at end of file + result: FlowStructureDeleteMsg diff --git a/apps/routers/flow.py b/apps/routers/flow.py index 1fb57b5c..2537e137 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -127,6 +127,7 @@ async def put_flow( message="用户没有权限访问该流", result=FlowStructurePutMsg(), ).model_dump(exclude_none=True, by_alias=True)) + put_body.flow = await FlowService.remove_excess_structure_from_flow(put_body.flow) if topology_check: await FlowService.validate_flow_connectivity(put_body.flow) await FlowService.validate_flow_illegal(put_body.flow) @@ -137,10 +138,17 @@ async def put_flow( message="应用下流更新失败", result=FlowStructurePutMsg(), ).model_dump(exclude_none=True, by_alias=True)) + flow, _ = await FlowManager.get_flow_by_app_and_flow_id(app_id, flow_id) + if flow is None: + return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=FlowStructurePutRsp( + code=status.HTTP_500_INTERNAL_SERVER_ERROR, + message="应用下流更新后获取失败", + result=FlowStructurePutMsg(), + ).model_dump(exclude_none=True, by_alias=True)) return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructurePutRsp( code=status.HTTP_200_OK, message="应用下流更新成功", - result=FlowStructurePutMsg(flowId=result) + result=FlowStructurePutMsg(flow=flow) ).model_dump(exclude_none=True, by_alias=True)) diff --git a/apps/utils/flow.py b/apps/utils/flow.py index 40678182..bb4e49ee 100644 --- a/apps/utils/flow.py +++ b/apps/utils/flow.py @@ -11,10 +11,35 @@ from apps.entities.flow_topology import ServiceItem, NodeMetaDataItem, FlowItem, class FlowService: @staticmethod + async def remove_excess_structure_from_flow(flow_item: FlowItem) -> FlowItem: + node_to_branches = dict() + for node in flow_item.nodes: + node_to_branches[node.node_id] = set() + if node.type == NodeType.CHOICE.value: + if 'choices' not in node.parameters.keys(): + node.parameters['choices'] = [] + for branch in node.parameters['choices']: + if branch['branch'] in node_to_branches[node.node_id]: + LOGGER.error(f"节点{node.node_id}的分支{branch['branch']}id重复") + raise Exception(f"节点{node.name}的分支{branch['branch']}重复") + node_to_branches[node.node_id].add(branch['branch']) + else: + node_to_branches[node.node_id].add('') + new_edges_items = [] + for edge in flow_item.edges: + if edge.source_node not in node_to_branches.keys(): + continue + if edge.target_node not in node_to_branches.keys(): + continue + if edge.branch_id not in node_to_branches[edge.source_node]: + continue + new_edges_items.append(edge) + flow_item.edges = new_edges_items + return flow_item + @staticmethod async def validate_flow_illegal(flow_item: FlowItem) -> None: node_id_set = set() edge_id_set = set() - node_to_branches = dict() edge_to_branch = dict() num_of_start_node = 0 num_of_end_node = 0 @@ -33,15 +58,6 @@ class FlowService: if node.type == NodeType.END.value: num_of_end_node += 1 id_of_end_node = node.node_id - node_to_branches[node.node_id] = set() - if node.type == NodeType.CHOICE.value: - for branch in node.parameters['choices']: - if branch['branch'] in node_to_branches[node.node_id]: - LOGGER.error(msg="分支id重复") - raise Exception(f"节点{node.name}的分支{branch['branch']}重复") - node_to_branches[node.node_id].add(branch['branch']) - else: - node_to_branches[node.node_id].add('') if num_of_start_node != 1 or num_of_end_node != 1: LOGGER.error(msg="起始节点和终止节点数量不为1") raise Exception("起始节点和终止节点数量不为1") @@ -53,24 +69,12 @@ class FlowService: if edge.source_node == edge.target_node: LOGGER.error(msg="边起始节点和终止节点相同") raise Exception(f"边{edge.edge_id}的起始节点和终止节点相同") - if edge.source_node not in node_id_set: - LOGGER.error(msg=f"边{edge.edge_id}的起始节点{edge.source_node}不存在") - raise Exception(f"边{edge.edge_id}的起始节点{edge.source_node}不存在") - if edge.target_node not in node_id_set: - LOGGER.error(msg=f"边{edge.edge_id}的终止节点{edge.target_node}不存在") - raise Exception(f"边{edge.edge_id}的终止节点{edge.target_node}不存在") - if edge.branch_id not in node_to_branches[edge.source_node]: - LOGGER.error(msg=f"边{edge.edge_id}的分支{edge.branch_id}不存在") - raise Exception(f"边{edge.edge_id}的分支{edge.branch_id}不存在") if edge.source_node not in edge_to_branch: edge_to_branch[edge.source_node] = set() if edge.branch_id in edge_to_branch[edge.source_node]: LOGGER.error(msg=f"边{edge.edge_id}的分支{edge.branch_id}重复") raise Exception(f"边{edge.edge_id}的分支{edge.branch_id}重复") - node_to_branches[edge.source_node].add(edge.branch_id) - if edge.source_node not in edge_to_branch: - edge_to_branch[edge.source_node] = set() - edge_to_branch[edge.source_node].add(edge.edge_id) + edge_to_branch[edge.source_node].add(edge.branch_id) node_in_degrees[edge.target_node] = node_in_degrees.get( edge.target_node, 0) + 1 node_out_degrees[edge.source_node] = node_out_degrees.get( -- Gitee