From 14a7fe4aa9e40cc29661f1585b19be70726f97f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Wed, 26 Feb 2025 02:36:24 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=A2=9E=E5=8A=A0service=E7=9A=84=E6=A3=80?= =?UTF-8?q?=E7=B4=A2=E6=96=B9=E5=BC=8F=EF=BC=8C=E5=8F=AF=E4=BB=A5=E6=A3=80?= =?UTF-8?q?=E7=B4=A2=E7=B3=BB=E7=BB=9F=E6=9C=8D=E5=8A=A1=E5=8F=8A=E5=85=B6?= =?UTF-8?q?=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/entities/flow_topology.py | 13 +++++---- apps/manager/flow.py | 50 ++++++++++++++++++++-------------- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/apps/entities/flow_topology.py b/apps/entities/flow_topology.py index 04e77281c..19649dbbd 100644 --- a/apps/entities/flow_topology.py +++ b/apps/entities/flow_topology.py @@ -20,6 +20,7 @@ class NodeMetaDataItem(BaseModel): editable: bool = Field(default=True) created_at: Optional[float] = Field(alias="createdAt") + class NodeServiceItem(BaseModel): """GET /api/flow/service 中单个service信息以及service下的节点元数据的信息""" @@ -27,7 +28,7 @@ class NodeServiceItem(BaseModel): name: str = Field(..., description="服务名称") type: str = Field(..., description="服务类型") node_meta_datas: list[NodeMetaDataItem] = Field(alias="nodeMetaDatas", default=[]) - created_at: str = Field(..., alias="createdAt", description="创建时间") + created_at: Optional[str] = Field(..., alias="createdAt", description="创建时间") class PositionItem(BaseModel): @@ -47,16 +48,16 @@ class DependencyItem(BaseModel): class NodeItem(BaseModel): """请求/响应中的节点变量类""" - node_id: str = Field(alias="nodeId",default="") - service_id: str = Field(alias="serviceId",default="") - node_meta_data_id: str = Field(alias="nodeMetaDataId",default="") - name: str=Field(default="") + node_id: str = Field(alias="nodeId", default="") + service_id: str = Field(alias="serviceId", default="") + node_meta_data_id: str = Field(alias="nodeMetaDataId", default="") + name: str = Field(default="") type: str = Field(default=NodeType.NORMAL.value) description: str = Field(default="") enable: bool = Field(default=True) parameters: dict[str, Any] = Field(default={}) depedency: Optional[DependencyItem] = None - position: PositionItem=Field(default=PositionItem()) + position: PositionItem = Field(default=PositionItem()) editable: bool = Field(default=True) diff --git a/apps/manager/flow.py b/apps/manager/flow.py index f7ac31a79..213e6c8c4 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -23,7 +23,7 @@ from apps.entities.pool import AppFlow, AppPool from apps.models.mongo import MongoDB from apps.scheduler.pool.loader.app import AppLoader from apps.scheduler.pool.loader.flow import FlowLoader - +from apps.manager.node import NodeManager class FlowManager: """Flow相关操作""" @@ -76,9 +76,10 @@ class FlowManager: nodes_meta_data_items = [] async for node_pool_record in cursor: + params_schema, output_schema = await NodeManager.get_node_params(node_pool_record["_id"]) parameters = { - "input_parameters": node_pool_record["params_schema"], - "output_parameters": node_pool_record["output_schema"], + "input_parameters": params_schema, + "output_parameters": output_schema, } node_meta_data_item = NodeMetaDataItem( nodeMetaDataId=node_pool_record["_id"], @@ -106,7 +107,6 @@ class FlowManager: service_collection = MongoDB.get_collection("service") try: match_conditions = [ - {"author": "system"}, {"author": user_sub}, { "$and": [ @@ -129,6 +129,14 @@ class FlowManager: ) service_records = await service_records_cursor.to_list(length=None) service_items = [ + NodeServiceItem( + serviceId="", + name="系统", + type="system", + nodeMetaDatas=[] + ) + ] + service_items += [ NodeServiceItem( serviceId=record["_id"], name=record["name"], @@ -141,7 +149,7 @@ class FlowManager: for service_item in service_items: node_meta_datas = await FlowManager.get_node_meta_datas_by_service_id(service_item.service_id) if node_meta_datas is None: - node_meta_datas=[] + node_meta_datas = [] service_item.node_meta_datas = node_meta_datas return service_items @@ -418,7 +426,7 @@ class FlowManager: return None @staticmethod - async def updata_flow_debug_by_app_and_flow_id(app_id: str, flow_id: str, debug: bool)-> bool: + async def updata_flow_debug_by_app_and_flow_id(app_id: str, flow_id: str, debug: bool) -> bool: try: app_pool_collection = MongoDB.get_collection("app") result = await app_pool_collection.find_one( @@ -429,21 +437,21 @@ class FlowManager: LOGGER.error("Update flow debug from app pool failed") return False app_pool = AppPool( - _id=result["_id"], # 使用 alias="_id" 自动映射 - name=result.get("name", ""), - description=result.get("description", ""), - created_at=result.get("created_at", None), - author=result.get("author", ""), - type=result.get("type", "default"), - icon=result.get("icon", ""), - published=result.get("published", False), - links=[AppLink(**link) for link in result.get("links", [])], - first_questions=result.get("first_questions", []), - history_len=result.get("history_len", 3), - permission=Permission(**result.get("permission", {})), - flows=[AppFlow(**flow) for flow in result.get("flows", [])], - hashes=result.get("hashes", {}) - ) + _id=result["_id"], # 使用 alias="_id" 自动映射 + name=result.get("name", ""), + description=result.get("description", ""), + created_at=result.get("created_at", None), + author=result.get("author", ""), + type=result.get("type", "default"), + icon=result.get("icon", ""), + published=result.get("published", False), + links=[AppLink(**link) for link in result.get("links", [])], + first_questions=result.get("first_questions", []), + history_len=result.get("history_len", 3), + permission=Permission(**result.get("permission", {})), + flows=[AppFlow(**flow) for flow in result.get("flows", [])], + hashes=result.get("hashes", {}) + ) return True except Exception as e: -- Gitee From a12db45aa17871a959429838ede6b1d02ce93398 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Wed, 26 Feb 2025 02:38:01 +0800 Subject: [PATCH 2/2] =?UTF-8?q?debug=E5=A4=84=E7=90=86fix?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/manager/flow.py | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/apps/manager/flow.py b/apps/manager/flow.py index f7ac31a79..ef49540b6 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -422,8 +422,7 @@ class FlowManager: try: app_pool_collection = MongoDB.get_collection("app") result = await app_pool_collection.find_one( - {"_id": app_id}, - array_filters=[{"flows.id": flow_id}] # 使用关键字参数 array_filters + {"_id": app_id,"flows.id": flow_id} # 使用关键字参数 array_filters ) if result is None: LOGGER.error("Update flow debug from app pool failed") @@ -434,7 +433,6 @@ class FlowManager: description=result.get("description", ""), created_at=result.get("created_at", None), author=result.get("author", ""), - type=result.get("type", "default"), icon=result.get("icon", ""), published=result.get("published", False), links=[AppLink(**link) for link in result.get("links", [])], @@ -442,9 +440,33 @@ class FlowManager: history_len=result.get("history_len", 3), permission=Permission(**result.get("permission", {})), flows=[AppFlow(**flow) for flow in result.get("flows", [])], - hashes=result.get("hashes", {}) ) - + metadata = AppMetadata( + id=app_pool.id, + name=app_pool.name, + description=app_pool.description, + author=app_pool.author, + icon=app_pool.id, + published=app_pool.published, + links=app_pool.links, + first_questions=app_pool.first_questions, + history_len=app_pool.history_len, + permission=app_pool.permission, + flows=app_pool.flows, + version="1.0", + ) + for flows in metadata.flows: + if flows.id == flow_id: + flows.debug = debug + app_loader = AppLoader.remote() + await app_loader.save.remote(metadata, app_id) # type: ignore[attr-type] + ray.kill(app_loader) + flow_loader = FlowLoader() + flow = await flow_loader.load(app_id, flow_id) + if flow is None: + return False + flow.debug = debug + await flow_loader.save(app_id=app_id,flow_id=flow_id,flow=flow) return True except Exception as e: LOGGER.error(f'Update flow debug from app pool failed: {e}') -- Gitee