diff --git a/apps/entities/flow_topology.py b/apps/entities/flow_topology.py index e3e7b3f17406bd6d9e0f197acb70f6df46c1da29..bde97fcb248c19ea5577c7e0b6cc828e98321797 100644 --- a/apps/entities/flow_topology.py +++ b/apps/entities/flow_topology.py @@ -9,26 +9,23 @@ from pydantic import BaseModel, Field from apps.entities.enum_var import EdgeType, NodeType - -class ServiceItem(BaseModel): - """元数据归属的服务类""" - service_id: str = Field(alias="serviceId") - name: str - type: str - created_at: Optional[float] = Field(alias="createdAt") - - class NodeMetaDataItem(BaseModel): """节点元数据类""" - api_id: str = Field(alias="apiId") + node_meta_data_id: str = Field(alias="nodeMetaDataId") type: str name: str description: str - parameters_template: dict[str, Any] = Field(alias="parametersTemplate") + parameters_template: Optional[dict[str, Any]] = Field(alias="parametersTemplate") editable: bool = Field(default=True) created_at: Optional[float] = Field(alias="createdAt") - +class NodeServiceItem(BaseModel): + """GET /api/flow/service 中单个service信息以及service下的节点元数据的信息""" + service_id: str = Field(..., alias="serviceId", description="服务ID") + name: str = Field(..., description="服务名称") + type: str = Field(..., description="服务类型") + node_meta_datas: list[NodeMetaDataItem] = Field(alias="nodeMetaDatas", default=[]) + created_at: str = Field(..., alias="createdAt", description="创建时间") class PositionItem(BaseModel): """请求/响应中的前端相对位置变量类""" x: float = Field(default=0.0) @@ -43,15 +40,15 @@ class DependencyItem(BaseModel): class NodeItem(BaseModel): """请求/响应中的节点变量类""" - node_id: str = Field(alias="nodeId") - api_id: str = Field(alias="apiId") - name: str + node_id: str = Field(alias="nodeId",default="") + api_id: str = Field(alias="apiId",default="") + name: str=Field(default="") type: str = Field(default=NodeType.NORMAL.value) description: str = Field(default='') enable: bool = Field(default=True) - parameters: dict[str, Any] + parameters: Optional[dict[str, Any]]=None depedency: Optional[DependencyItem] = None - position: PositionItem + position: PositionItem=Field(default=PositionItem()) editable: bool = Field(default=True) diff --git a/apps/entities/pool.py b/apps/entities/pool.py index ca0773b43b8c6aa4c796d37697da5794d8104622..b2f1e09a315ea4975c1286d2e921e8313bf02c1c 100644 --- a/apps/entities/pool.py +++ b/apps/entities/pool.py @@ -68,7 +68,6 @@ class NodePool(PoolBase): id: str = Field(description="Node的ID", default_factory=lambda: str(uuid.uuid4()), alias="_id") service_id: str = Field(description="Node所属的Service ID") call_id: str = Field(description="所使用的Call的ID") - fixed_params: dict[str, Any] = Field(description="Node的固定参数", default={}) params_schema: dict[str, Any] = Field(description="Node的参数schema;只包含用户可以改变的参数", default={}) output_schema: dict[str, Any] = Field(description="Node的输出schema;做输出的展示用", default={}) diff --git a/apps/entities/response_data.py b/apps/entities/response_data.py index 9cde67a5157a9345ec7ac0ce40d807088e1ce5df..4d5ca1a928638770eb3fb69962cab500f42c875d 100644 --- a/apps/entities/response_data.py +++ b/apps/entities/response_data.py @@ -12,8 +12,8 @@ from apps.entities.enum_var import DocumentStatus from apps.entities.flow_topology import ( FlowItem, NodeMetaDataItem, + NodeServiceItem, PositionItem, - ServiceItem, ) from apps.entities.record import RecordData @@ -320,28 +320,18 @@ class GetRecentAppListRsp(ResponseData): class NodeServiceListMsg(BaseModel): """GET /api/flow/service result""" - services: list[ServiceItem]=Field(default=[]) - - + services: list[NodeServiceItem] = Field(..., description="服务列表",default=[]) class NodeServiceListRsp(ResponseData): """GET /api/flow/service 返回数据结构""" result: NodeServiceListMsg - - -class NodeMetaDataListMsg(BaseModel): - """GET /api/flow/service/node result""" - service_id: str = Field(alias="serviceId",default=[]) - node_meta_datas: list[NodeMetaDataItem] = Field(alias="nodeMetaDatas", default=[]) - - -class NodeMetaDataListRsp(ResponseData): +class NodeMetaDataRsp(ResponseData): """GET /api/flow/service/node 返回数据结构""" - result: NodeMetaDataListMsg + result: NodeMetaDataItem class FlowStructureGetMsg(BaseModel): """GET /api/flow result""" flow: FlowItem = Field(default=FlowItem()) - focus_point: PositionItem = Field(default=PositionItem(x=0.0, y=0.0)) + focus_point: PositionItem = Field(default=PositionItem()) class FlowStructureGetRsp(ResponseData): diff --git a/apps/manager/flow.py b/apps/manager/flow.py index 2cad12303443a0eee70fe033ad40e89edc5cc968..57e6bf912b3f8e238304b373741e4f9f5273f8a4 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -8,7 +8,7 @@ from pymongo import ASCENDING from apps.constants import LOGGER from apps.entities.flow import StepPos, Edge, Step, Flow, FlowConfig from apps.entities.pool import AppFlow -from apps.entities.flow_topology import ServiceItem, NodeMetaDataItem, FlowItem, NodeItem, EdgeItem, PositionItem +from apps.entities.flow_topology import NodeServiceItem, NodeMetaDataItem, FlowItem, NodeItem, EdgeItem, PositionItem from apps.models.mongo import MongoDB from apps.entities.enum_var import PermissionType @@ -16,17 +16,18 @@ from apps.entities.enum_var import PermissionType class FlowManager: @staticmethod - async def validate_user_service_access(user_sub: str, service_id: str) -> bool: + async def validate_user_node_meta_data_access(user_sub: str, node_meta_data_id: str) -> bool: """验证用户对服务的访问权限 :param user_sub: 用户唯一标识符 :param service_id: 服务id :return: 如果用户具有所需权限则返回True,否则返回False """ + node_pool_collection = MongoDB.get_collection("node") service_collection = MongoDB.get_collection("service") try: - service_collection = MongoDB.get_collection("service") + node_pool_record = await node_pool_collection.find_one({"_id": node_meta_data_id}) match_conditions = [ {"author": user_sub}, {"permissions.type": PermissionType.PUBLIC.value}, @@ -37,17 +38,51 @@ class FlowManager: ] } ] - query = {"$and": [{"_id": service_id}, + query = {"$and": [{"_id": node_pool_record["service_id"]}, {"$or": match_conditions} ]} result = await service_collection.count_documents(query) return (result > 0) except Exception as e: - LOGGER.error(f"Validate user service access failed due to: {e}") + LOGGER.error(f"Validate user node meta data access failed due to: {e}") return False + @staticmethod + async def get_node_meta_datas_by_service_id(service_id: str) -> List[NodeMetaDataItem]: + """serviceId获取service的接口数据,并将接口转换为节点元数据 - async def get_service_by_user_id(user_sub: str) -> Tuple[int, List[ServiceItem]]: + :param service_id: 服务id + :return: 节点元数据的列表 + """ + node_pool_collection = MongoDB.get_collection("node") # 获取节点集合 + try: + cursor = node_pool_collection.find( + {"service_id": service_id}).sort("created_at", ASCENDING) + + nodes_meta_data_items = [] + async for node_pool_record in cursor: + node_meta_data_item = NodeMetaDataItem( + nodeMetaDataId=node_pool_record["_id"], + type=node_pool_record["call_id"], + name=node_pool_record['name'], + description=node_pool_record['description'], + editable=True, + createdAt=node_pool_record['created_at'], + ) + nodes_meta_data_items.append(node_meta_data_item) + + return nodes_meta_data_items + + except Exception as e: + LOGGER.error( + f"Get node metadatas by service_id failed due to: {e}") + return None + async def get_service_by_user_id(user_sub: str) -> List[NodeServiceItem]: + """通过user_id获取用户自己上传的、其他人公开的且收藏的、受保护且有权限访问并收藏的service + + :user_sub: 用户的唯一标识符 + :return: service的列表 + """ service_collection = MongoDB.get_collection("service") try: match_conditions = [ @@ -56,7 +91,6 @@ class FlowManager: { "$and": [ {"permissions.type": PermissionType.PUBLIC.value}, - {"permissions.users": user_sub}, {"favorites": user_sub} ] }, @@ -76,70 +110,52 @@ class FlowManager: ) service_records = await service_records_cursor.to_list(length=None) service_items = [ - ServiceItem( - serviceId=str(record["_id"]), + NodeServiceItem( + serviceId=record["_id"], name=record["name"], type="default", + nodeMetaDatas=[], createdAt=record["created_at"] ) for record in service_records ] - + for service_item in service_items: + node_meta_datas = await FlowManager.get_node_meta_datas_by_service_id(service_item.service_id) + if node_meta_datas is None: + node_meta_datas=[] + service_item.node_meta_datas = node_meta_datas return service_items except Exception as e: LOGGER.error(f"Get service by user id failed due to: {e}") return None - @staticmethod - async def get_node_meta_datas_by_service_id(service_id: str) -> List[NodeMetaDataItem]: - """serviceId获取service的接口数据,并将接口转换为节点元数据 + async def get_node_meta_data_by_node_meta_data_id(node_meta_data_id: str) -> NodeMetaDataItem: + """通过node_meta_data_id获取对应的节点源数据信息 - :param service_id: 服务id - :return: 节点元数据的列表 + :param node_meta_data_id: node_meta_data的id + :return: node meta data id对应的节点源数据信息 """ node_pool_collection = MongoDB.get_collection("node") # 获取节点集合 try: - cursor = node_pool_collection.find( - {"service_id": service_id}).sort("created_at", ASCENDING) # 查询指定service_id的所有node_poool - - nodes_meta_data_items = [] - async for node_pool_record in cursor: - # 将每个node_pool换成NodeMetaDataItem实例 - parameters_template = { - "fixed_params": node_pool_record['fixed_params'], - "params_schema": node_pool_record['params_schema'], + node_pool_record = await node_pool_collection.find_one({"_id": node_meta_data_id}) + parameters_template = { + "input_schema": node_pool_record['params_schema'], "output_schema": node_pool_record['output_schema'] } - if node_pool_record['call_id'] == 'choice': - parameters_template['choice'] = [ - { - "branch": "valid", - "description": "返回值存在有效数据" - }, - { - "branch": "invalid", - "description": "返回值不存在有效数据" - }, - ] - node_meta_data_item = NodeMetaDataItem( - apiId=node_pool_record["_id"], - type=node_pool_record["call_id"], - name=node_pool_record['name'], - description=node_pool_record['description'], - parametersTemplate=parameters_template, - editable=True, - createdAt=node_pool_record['created_at'], - ) - nodes_meta_data_items.append(node_meta_data_item) - - return nodes_meta_data_items - + node_meta_data=NodeMetaDataItem( + nodeMetaDataId=node_pool_record["_id"], + type=node_pool_record["call_id"], + name=node_pool_record['name'], + description=node_pool_record['description'], + editable=True, + parametersTemplate=parameters_template, + createdAt=node_pool_record['created_at'], + ) + return node_meta_data except Exception as e: - LOGGER.error( - f"Get node metadatas by service_id failed due to: {e}") + LOGGER.error(f"获取节点元数据失败: {e}") return None - @staticmethod async def get_flow_by_app_and_flow_id(app_id: str, flow_id: str) -> Tuple[FlowItem, PositionItem]: """通过appId flowId获取flow config的路径和focus,并通过flow config的路径获取flow config,并将其转换为flow item。 diff --git a/apps/routers/flow.py b/apps/routers/flow.py index ee717489ec49027b5c533663559562fd7bd7a8f9..eea7655d81d47d4d3883c98b0daaef77480e345c 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -9,8 +9,9 @@ from typing import Annotated, Optional from apps.dependency.csrf import verify_csrf_token from apps.dependency.user import verify_user from apps.dependency import get_user +from apps.entities.flow_topology import NodeItem from apps.entities.request_data import PutFlowReq -from apps.entities.response_data import NodeServiceListRsp, NodeServiceListMsg, NodeMetaDataListRsp, NodeMetaDataListMsg, FlowStructureGetRsp, \ +from apps.entities.response_data import NodeServiceListRsp, NodeServiceListMsg, NodeMetaDataRsp, FlowStructureGetRsp, \ FlowStructureGetMsg, FlowStructurePutRsp, FlowStructurePutMsg, FlowStructureDeleteRsp, FlowStructureDeleteMsg, ResponseData from apps.manager.flow import FlowManager from apps.manager.application import AppManager @@ -25,7 +26,7 @@ router = APIRouter( ) -@router.get("/service", response_model=NodeMetaDataListRsp, responses={ +@router.get("/service", response_model=NodeServiceListRsp, responses={ status.HTTP_404_NOT_FOUND: {"model": ResponseData}, }) async def get_services( @@ -37,7 +38,7 @@ async def get_services( return NodeServiceListRsp( code=status.HTTP_404_NOT_FOUND, message="未找到符合条件的服务", - result=NodeServiceListMsg(services=[]) + result=NodeServiceListMsg() ) return NodeServiceListRsp( @@ -47,33 +48,33 @@ async def get_services( ) -@router.get("/service/node", response_model=NodeMetaDataListRsp, responses={ +@router.get("/service/node", response_model=NodeMetaDataRsp, responses={ status.HTTP_403_FORBIDDEN: {"model": ResponseData}, status.HTTP_404_NOT_FOUND: {"model": ResponseData} }) async def get_node_metadatas( user_sub: Annotated[str, Depends(get_user)], - service_id: int = Query(..., alias="serviceId") + node_metadata_id: int = Query(..., alias="NodeMetadataId") ): - """获取用户可访问的节点元数据""" - if not await AppManager.validate_user_app_access(user_sub, service_id): - return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=ResponseData( + """获取节点元数据的详细信息""" + if not await FlowManager.validate_user_node_meta_data_access(user_sub, node_metadata_id): + return JSONResponse(status_code=status.HTTP_403_FORBIDDEN, content=NodeMetaDataRsp( code=status.HTTP_403_FORBIDDEN, - message="用户没有权限访问该服务", - result={}, + message="用户没有权限访问该节点原数据", + result=NodeItem(), ).model_dump(exclude_none=True, by_alias=True)) - result = await FlowManager.get_node_meta_datas_by_service_id(service_id) + result = await FlowManager.get_node_meta_data_by_node_meta_data_id(node_metadata_id) if result is None: - return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=NodeMetaDataListRsp( + return JSONResponse(status_code=status.HTTP_404_NOT_FOUND, content=NodeMetaDataRsp( code=status.HTTP_404_NOT_FOUND, - message="服务下节点元数据获取失败", - result=NodeMetaDataListMsg(serviceId=service_id), + message="节点元数据详细信息获取失败", + result=NodeItem(), ).model_dump(exclude_none=True, by_alias=True)) - return JSONResponse(status_code=status.HTTP_200_OK, content=NodeMetaDataListRsp( + return JSONResponse(status_code=status.HTTP_200_OK, content=NodeMetaDataRsp( code=status.HTTP_200_OK, - message="服务下节点元数据获取成功", - result=NodeMetaDataListMsg(serviceId=service_id, nodeMetaDatas=result) + message="节点元数据详细信息获取成功", + result=result ).model_dump(exclude_none=True, by_alias=True)) diff --git a/apps/utils/flow.py b/apps/utils/flow.py index c869f8ebd07fca95577f449f390dcbecf142e761..761d8c3fc86e7109778c95a36eedb768de155f96 100644 --- a/apps/utils/flow.py +++ b/apps/utils/flow.py @@ -2,6 +2,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ +import queue from apps.constants import LOGGER from apps.entities.enum_var import NodeType from apps.entities.flow_topology import FlowItem @@ -93,23 +94,29 @@ class FlowService: @staticmethod async def validate_flow_connectivity(flow_item: FlowItem) -> None: id_of_start_node = None - id_of_end_node = None - node_in_degrees = {} - node_out_degrees = {} + node_id_set=set() + node_edge_dict={} for node in flow_item.nodes: if node.type == NodeType.START.value: id_of_start_node = node.node_id - if node.type == NodeType.END.value: - id_of_end_node = node.node_id for edge in flow_item.edges: - node_in_degrees[edge.target_node] = node_in_degrees.get( - edge.target_node, 0) + 1 - node_out_degrees[edge.source_node] = node_out_degrees.get( - edge.source_node, 0) + 1 - for node in flow_item.nodes: - if node.node_id != id_of_start_node and node.node_id not in node_in_degrees.keys(): - LOGGER.error(msg=f"节点{node.node_id}的入度为0") - raise Exception(f"节点{node.node_id}的入度为0") - if node.node_id != id_of_end_node and node.node_id not in node_out_degrees.keys(): - LOGGER.error(msg=f"节点{node.node_id}的出度为0") - raise Exception(f"节点{node.node_id}的出度为0") + if edge.source_node not in node_edge_dict.keys(): + node_edge_dict[edge.source_node] = [] + node_edge_dict[edge.target_node].append(edge.target_node) + node_q=queue.Queue() + node_q.put(id_of_start_node) + node_reached_cnt=0 + node_id_set.add(id_of_start_node) + while len(node_q)>0: + node_id=node_q.get() + node_reached_cnt+=1 + if node_id in node_edge_dict.keys(): + for target_node in node_edge_dict[node_id]: + if target_node not in node_id_set: + node_id_set.add(target_node) + node_q.put(target_node) + if node_reached_cnt!=len(flow_item.nodes): + LOGGER.error(msg="流程图存在孤立子图") + raise Exception("流程图存在孤立子图") + +