From 460d34707916dbc0e3a18af76c70fd3536b38012 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Fri, 14 Feb 2025 15:39:40 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0debug=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/entities/flow.py | 3 ++- apps/entities/flow_topology.py | 3 ++- apps/manager/flow.py | 7 +++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/apps/entities/flow.py b/apps/entities/flow.py index 21592ff9..0a70646a 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -39,6 +39,7 @@ class Step(BaseModel): description: str = Field(description="Step的描述") pos: StepPos = Field(description="Step在画布上的位置", default=StepPos(x=0, y=0)) params: dict[str, Any] = Field(description="用户手动指定的Node参数", default={}) + debug: bool =Field(description="是否经过调试", default=False) class NextFlow(BaseModel): @@ -64,7 +65,7 @@ class Flow(BaseModel): steps: list[Step] = Field(description="节点列表", default=[]) edges: list[Edge] = Field(description="边列表", default=[]) next_flow: Optional[list[NextFlow]] = None - + debug: bool = Field(description="是否经过调试", default=False) class MetadataBase(BaseModel): """Service或App的元数据""" diff --git a/apps/entities/flow_topology.py b/apps/entities/flow_topology.py index 6c890fb4..f8db2dd7 100644 --- a/apps/entities/flow_topology.py +++ b/apps/entities/flow_topology.py @@ -51,7 +51,7 @@ class NodeItem(BaseModel): depedency: Optional[DependencyItem] = None position: PositionItem=Field(default=PositionItem()) editable: bool = Field(default=True) - + debug: bool = Field(default=False) class EdgeItem(BaseModel): """请求/响应中的边变量类""" @@ -72,3 +72,4 @@ class FlowItem(BaseModel): nodes: list[NodeItem] = Field(default=[]) edges: list[EdgeItem] = Field(default=[]) created_at: Optional[float] = Field(alias="createdAt", default=0) + debug: bool = Field(default=False) diff --git a/apps/manager/flow.py b/apps/manager/flow.py index 935110fd..2f83e92e 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -213,7 +213,8 @@ class FlowManager: editable=True, nodes=[], edges=[], - createdAt=flow_record["created_at"] + createdAt=flow_record["created_at"], + debug=flow_config['debug'], ) for node_config in flow_config['steps']: node_item = NodeItem( @@ -226,7 +227,8 @@ class FlowManager: type=node_config['type'], parameters=node_config['params'], position=PositionItem( - x=node_config['pos']['x'], y=node_config['pos']['y']) + x=node_config['pos']['x'], y=node_config['pos']['y']), + debug=node_config['debug'] ) flow_item.nodes.append(node_item) @@ -290,6 +292,7 @@ class FlowManager: description=flow_item.description, steps=[], edges=[], + debug=flow_item.debug, ) for node_item in flow_item.nodes: edge_config = Step( -- Gitee From b1fba06e1b86cd189d2e62a8db945534924bc941 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Fri, 14 Feb 2025 15:40:09 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=BA=94=E7=94=A8?= =?UTF-8?q?=E4=B8=AD=E5=BF=83=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/manager/appcenter.py | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/apps/manager/appcenter.py b/apps/manager/appcenter.py index bd0a3411..aa228967 100644 --- a/apps/manager/appcenter.py +++ b/apps/manager/appcenter.py @@ -2,6 +2,7 @@ Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. """ +from datetime import datetime, timezone import uuid from datetime import datetime, timezone from enum import Enum @@ -40,13 +41,36 @@ class AppCenterManager: ) -> tuple[list[AppCenterCardItem], int]: """获取所有应用列表""" try: - # 搜索条件 - filters: dict[str, Any] = AppCenterManager._build_filters( - {"published": True}, - search_type, - keyword, - ) if keyword and search_type != SearchType.AUTHOR else {} + # 构建基础搜索条件 + filters: dict[str, Any] = {} + + if keyword and search_type != SearchType.AUTHOR: + # 如果有关键词且不是按作者搜索,则使用原有的过滤逻辑 + filters = AppCenterManager._build_filters( + {"published": True}, + search_type, + keyword, + ) + else: + # 修改为新的搜索条件:author=user_sub 或 published=True + filters = { + "$or": [ + {"author": user_sub}, + {"published": True} + ] + } + + # 如果有关键词且是按作者搜索,额外添加关键词过滤 + if keyword and search_type == SearchType.AUTHOR: + filters["$and"] = [ + filters["$or"], + {"author": {"$regex": keyword, "$options": "i"}} + ] + + # 执行应用搜索 apps, total_apps = await AppCenterManager._search_apps_by_filter(filters, page, page_size) + + # 构建返回的应用卡片列表 return [ AppCenterCardItem( appId=app.id, @@ -59,6 +83,7 @@ class AppCenterManager: ) for app in apps ], total_apps + except Exception as e: LOGGER.error(f"[AppCenterManager] Get app list failed: {e}") return [], -1 @@ -113,6 +138,7 @@ class AppCenterManager: "_id": {"$in": fav_app}, "published": True, } + print(base_filter) filters: dict[str, Any] = AppCenterManager._build_filters( base_filter, search_type, -- Gitee From e8cff97f44ab6d0664a3d6a2eb901e91f7bb2faf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Fri, 14 Feb 2025 15:40:20 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dchat=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/scheduler/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/scheduler/scheduler/scheduler.py b/apps/scheduler/scheduler/scheduler.py index 5dadc6c8..1ea478c9 100644 --- a/apps/scheduler/scheduler/scheduler.py +++ b/apps/scheduler/scheduler/scheduler.py @@ -99,7 +99,7 @@ class Scheduler: need_recommend = True # 如果是智能问答,直接执行 if not user_selected_flow: - await push_init_message(self._task_id, self._queue, post_body, is_flow=False) + # await push_init_message(self._task_id, self._queue, post_body, is_flow=False) await asyncio.sleep(0.1) for doc in docs: # 保存使用的文件ID @@ -110,7 +110,7 @@ class Scheduler: await push_rag_message(self._task_id, self._queue, user_sub, rag_data) else: # 需要执行Flow - await push_init_message(self._task_id, self._queue, post_body, is_flow=True) + # await push_init_message(self._task_id, self._queue, post_body, is_flow=True) # 组装上下文 background = ExecutorBackground( conversation=context, -- Gitee From d56a58481afdaedb5998a8f5559108e86e2ec1c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Fri, 14 Feb 2025 16:21:46 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/manager/flow.py | 184 ++++++++++++++++++++++--------------------- 1 file changed, 93 insertions(+), 91 deletions(-) diff --git a/apps/manager/flow.py b/apps/manager/flow.py index 2f83e92e..a606791c 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -2,15 +2,16 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ -from typing import Tuple, List +from typing import Optional + from pymongo import ASCENDING from apps.constants import LOGGER -from apps.entities.flow import StepPos, Edge, Step, Flow, FlowConfig +from apps.entities.enum_var import PermissionType +from apps.entities.flow import Edge, Flow, FlowConfig, Step, StepPos +from apps.entities.flow_topology import EdgeItem, FlowItem, NodeItem, NodeMetaDataItem, NodeServiceItem, PositionItem from apps.entities.pool import AppFlow -from apps.entities.flow_topology import NodeServiceItem, NodeMetaDataItem, FlowItem, NodeItem, EdgeItem, PositionItem from apps.models.mongo import MongoDB -from apps.entities.enum_var import NodeType, PermissionType class FlowManager: @@ -23,7 +24,7 @@ class FlowManager: :param service_id: 服务id :return: 如果用户具有所需权限则返回True,否则返回False """ - node_pool_collection = MongoDB.get_collection("node") + node_pool_collection = MongoDB.get_collection("node") service_collection = MongoDB.get_collection("service") try: @@ -34,21 +35,23 @@ class FlowManager: { "$and": [ {"permissions.type": PermissionType.PROTECTED.value}, - {"permissions.users": user_sub} - ] - } + {"permissions.users": user_sub}, + ], + }, ] - query = {"$and": [{"_id": node_pool_record["service_id"]}, - {"$or": match_conditions} - ]} + query = {"$and": [ + {"_id": node_pool_record["service_id"]}, + {"$or": match_conditions}, + ]} result = await service_collection.count_documents(query) return (result > 0) except Exception as e: LOGGER.error(f"Validate user node meta data access failed due to: {e}") return False + @staticmethod - async def get_node_meta_datas_by_service_id(service_id: str) -> List[NodeMetaDataItem]: + async def get_node_meta_datas_by_service_id(service_id: str) -> Optional[list[NodeMetaDataItem]]: """serviceId获取service的接口数据,并将接口转换为节点元数据 :param service_id: 服务id @@ -63,7 +66,7 @@ class FlowManager: async for node_pool_record in cursor: parameters = { "input_parameters": node_pool_record["params_schema"], - "output_parameters": node_pool_record["output_schema"] + "output_parameters": node_pool_record["output_schema"], } node_meta_data_item = NodeMetaDataItem( nodeMetaDataId=node_pool_record["_id"], @@ -75,15 +78,14 @@ class FlowManager: parameters=parameters, # 添加 parametersTemplate 参数 ) nodes_meta_data_items.append(node_meta_data_item) - - return nodes_meta_data_items - except Exception as e: LOGGER.error( f"Get node metadatas by service_id failed due to: {e}") return None - async def get_service_by_user_id(user_sub: str) -> List[NodeServiceItem]: + + @staticmethod + async def get_service_by_user_id(user_sub: str) -> Optional[list[NodeServiceItem]]: """通过user_id获取用户自己上传的、其他人公开的且收藏的、受保护且有权限访问并收藏的service :user_sub: 用户的唯一标识符 @@ -97,22 +99,22 @@ class FlowManager: { "$and": [ {"permissions.type": PermissionType.PUBLIC.value}, - {"favorites": user_sub} - ] + {"favorites": user_sub}, + ], }, { "$and": [ {"permissions.type": PermissionType.PROTECTED.value}, {"permissions.users": user_sub}, - {"favorites": user_sub} - ] - } + {"favorites": user_sub}, + ], + }, ] query = {"$or": match_conditions} service_records_cursor = service_collection.find( query, - sort=[("created_at", ASCENDING)] + sort=[("created_at", ASCENDING)], ) service_records = await service_records_cursor.to_list(length=None) service_items = [ @@ -121,7 +123,7 @@ class FlowManager: name=record["name"], type="default", nodeMetaDatas=[], - createdAt=str(record["created_at"]) + createdAt=str(record["created_at"]), ) for record in service_records ] @@ -135,8 +137,9 @@ class FlowManager: except Exception as e: LOGGER.error(f"Get service by user id failed due to: {e}") return None + @staticmethod - async def get_node_meta_data_by_node_meta_data_id(node_meta_data_id: str) -> NodeMetaDataItem: + async def get_node_meta_data_by_node_meta_data_id(node_meta_data_id: str) -> Optional[NodeMetaDataItem]: """通过node_meta_data_id获取对应的节点源数据信息 :param node_meta_data_id: node_meta_data的id @@ -146,24 +149,24 @@ class FlowManager: try: node_pool_record = await node_pool_collection.find_one({"_id": node_meta_data_id}) parameters = { - "input_parameters": node_pool_record['params_schema'], - "output_parameters": node_pool_record['output_schema'] - } - node_meta_data=NodeMetaDataItem( + "input_parameters": node_pool_record["params_schema"], + "output_parameters": node_pool_record["output_schema"], + } + return NodeMetaDataItem( nodeMetaDataId=node_pool_record["_id"], type=node_pool_record["call_id"], - name=node_pool_record['name'], - description=node_pool_record['description'], + name=node_pool_record["name"], + description=node_pool_record["description"], editable=True, parameters=parameters, - createdAt=node_pool_record['created_at'], + createdAt=node_pool_record["created_at"], ) - return node_meta_data except Exception as e: LOGGER.error(f"获取节点元数据失败: {e}") return None + @staticmethod - async def get_flow_by_app_and_flow_id(app_id: str, flow_id: str) -> Tuple[FlowItem, PositionItem]: + async def get_flow_by_app_and_flow_id(app_id: str, flow_id: str) -> Optional[tuple[FlowItem, PositionItem]]: """通过appId flowId获取flow config的路径和focus,并通过flow config的路径获取flow config,并将其转换为flow item。 :param app_id: 应用的id @@ -178,7 +181,7 @@ class FlowManager: return None cursor = app_collection.find( {"_id": app_id, "flows._id": flow_id}, - {"flows.$": 1} # 只返回 flows 数组中符合条件的第一个元素 + {"flows.$": 1}, # 只返回 flows 数组中符合条件的第一个元素 ) # 获取结果列表,并限制长度为1,因为我们只期待一个结果 @@ -186,7 +189,7 @@ class FlowManager: if len(app_records) == 0: return None app_record = app_records[0] - if "flows" not in app_record.keys() or len(app_record["flows"]) == 0: + if "flows" not in app_record or len(app_record["flows"]) == 0: return None flow_record = app_record["flows"][0] except Exception as e: @@ -199,7 +202,7 @@ class FlowManager: flow_config_record = await flow_config_collection.find_one({"app_id": app_id, "flow_id": flow_id}) if flow_config_record is None or not flow_config_record.get("flow_config"): return None - flow_config = flow_config_record['flow_config'] + flow_config = flow_config_record["flow_config"] if not flow_config: LOGGER.error( "Get flow config by app_id and flow_id failed") @@ -207,8 +210,8 @@ class FlowManager: focus_point = flow_record["focus_point"] flow_item = FlowItem( flowId=flow_id, - name=flow_config['name'], - description=flow_config['description'], + name=flow_config["name"], + description=flow_config["description"], enable=True, editable=True, nodes=[], @@ -216,37 +219,37 @@ class FlowManager: createdAt=flow_record["created_at"], debug=flow_config['debug'], ) - for node_config in flow_config['steps']: + for node_config in flow_config["steps"]: node_item = NodeItem( - nodeId=node_config['id'], - nodeMetaDataId=node_config['node'], - name=node_config['name'], - description=node_config['description'], + nodeId=node_config["id"], + nodeMetaDataId=node_config["node"], + name=node_config["name"], + description=node_config["description"], enable=True, editable=True, - type=node_config['type'], - parameters=node_config['params'], + type=node_config["type"], + parameters=node_config["params"], position=PositionItem( x=node_config['pos']['x'], y=node_config['pos']['y']), debug=node_config['debug'] ) flow_item.nodes.append(node_item) - for edge_config in flow_config['edges']: - edge_from = edge_config['edge_from'] + for edge_config in flow_config["edges"]: + edge_from = edge_config["edge_from"] branch_id = "" - tmp_list = edge_config['edge_from'].split('.') - if len(tmp_list)==0 or len(tmp_list)>2: + tmp_list = edge_config["edge_from"].split(".") + if len(tmp_list) == 0 or len(tmp_list) > 2: LOGGER.error("edge from format error") continue if len(tmp_list) == 2: edge_from = tmp_list[0] branch_id = tmp_list[1] flow_item.edges.append(EdgeItem( - edgeId=edge_config['id'], + edgeId=edge_config["id"], sourceNode=edge_from, - targetNode=edge_config['edge_to'], - type=edge_config['edge_type'], + targetNode=edge_config["edge_to"], + type=edge_config["edge_type"], branchId=branch_id, )) return (flow_item, focus_point) @@ -258,7 +261,7 @@ class FlowManager: @staticmethod async def put_flow_by_app_and_flow_id( - app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> str: + app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> Optional[str]: """存储/更新flow的数据库数据和配置文件 :param app_id: 应用的id @@ -274,13 +277,13 @@ class FlowManager: return None cursor = app_collection.find( {"_id": app_id, "flows._id": flow_id}, - {"flows.$": 1} + {"flows.$": 1}, ) app_records = await cursor.to_list(length=1) flow_record = None if len(app_records) != 0: app_record = app_records[0] - if "flows" in app_record.keys() and len(app_record["flows"]) != 0: + if "flows" in app_record and len(app_record["flows"]) != 0: flow_record = app_record["flows"][0] except Exception as e: LOGGER.error( @@ -323,7 +326,7 @@ class FlowManager: await flow_config_collection.update_one( {"app_id": app_id, "flow_id": flow_id}, {"$set": flow_config.dict()}, - upsert=True # 如果没有找到匹配的文档,则插入新文档 + upsert=True, # 如果没有找到匹配的文档,则插入新文档 ) except Exception as e: LOGGER.error(f"Error updating flow config due to: {e}") @@ -331,53 +334,52 @@ class FlowManager: if flow_record: app_collection = MongoDB.get_collection("app") result = await app_collection.find_one_and_update( - {'_id': app_id}, + {"_id": app_id}, { - '$set': { - 'flows.$[element].focus_point': focus_point.model_dump(by_alias=True) - } + "$set": { + "flows.$[element].focus_point": focus_point.model_dump(by_alias=True), + }, }, - array_filters=[{'element._id': flow_id}], - return_document=True # 返回更新后的文档 + array_filters=[{"element._id": flow_id}], + return_document=True, # 返回更新后的文档 ) if result is None: LOGGER.error("Update flow failed") return None return result - else: - new_flow = AppFlow( - _id=flow_id, - name=flow_item.name, - description=flow_item.description, - path="", - focus_point=PositionItem(x=focus_point.x, y=focus_point.y), - ) - app_collection = MongoDB.get_collection("app") - result = await app_collection.find_one_and_update( - {'_id': app_id}, - { - '$push': { - 'flows': new_flow.model_dump(by_alias=True) - } - } - ) - if result is None: - LOGGER.error("Add flow failed") - return None - return flow_item + new_flow = AppFlow( + _id=flow_id, + name=flow_item.name, + description=flow_item.description, + path="", + focus_point=PositionItem(x=focus_point.x, y=focus_point.y), + ) + app_collection = MongoDB.get_collection("app") + result = await app_collection.find_one_and_update( + {"_id": app_id}, + { + "$push": { + "flows": new_flow.model_dump(by_alias=True), + }, + }, + ) + if result is None: + LOGGER.error("Add flow failed") + return None + return flow_item except Exception as e: LOGGER.error( f"Put flow by app_id and flow_id failed due to: {e}") return None - async def delete_flow_by_app_and_flow_id(app_id: str, flow_id: str) -> str: + @staticmethod + async def delete_flow_by_app_and_flow_id(app_id: str, flow_id: str) -> Optional[str]: """删除flow的数据库数据和配置文件 :param app_id: 应用的id :param flow_id: 流的id :return: 流的id """ - try: flow_config_collection = MongoDB.get_collection("flow_config") result = await flow_config_collection.delete_one({"app_id": app_id, "flow_id": flow_id}) @@ -387,12 +389,12 @@ class FlowManager: app_pool_collection = MongoDB.get_collection("app") # 获取集合 result = await app_pool_collection.find_one_and_update( - {'_id': app_id}, + {"_id": app_id}, { - '$pull': { - 'flows': {'_id': flow_id} - } - } + "$pull": { + "flows": {"_id": flow_id}, + }, + }, ) if result is None: LOGGER.error("Delete flow from app pool failed") -- Gitee From 244599b980f4d1da62b32749ff282ff9a281883e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Fri, 14 Feb 2025 17:20:18 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E5=8F=96=E6=B6=88node=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E7=9A=84debug=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/entities/flow.py | 9 ++++----- apps/entities/flow_topology.py | 1 - apps/manager/flow.py | 3 +-- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/apps/entities/flow.py b/apps/entities/flow.py index 0a70646a..554cefb9 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -39,7 +39,6 @@ class Step(BaseModel): description: str = Field(description="Step的描述") pos: StepPos = Field(description="Step在画布上的位置", default=StepPos(x=0, y=0)) params: dict[str, Any] = Field(description="用户手动指定的Node参数", default={}) - debug: bool =Field(description="是否经过调试", default=False) class NextFlow(BaseModel): @@ -153,7 +152,7 @@ class ServiceApiSpec(BaseModel): hash: str = Field(description="OpenAPI文件的hash值") class FlowConfig(BaseModel): - """Flow的配置信息 用于前期调试使用""" - app_id: str - flow_id: str - flow_config: Flow \ No newline at end of file + """Flow的配置信息 用于前期调试使用""" + app_id: str + flow_id: str + flow_config: Flow \ No newline at end of file diff --git a/apps/entities/flow_topology.py b/apps/entities/flow_topology.py index f8db2dd7..a84bd66d 100644 --- a/apps/entities/flow_topology.py +++ b/apps/entities/flow_topology.py @@ -51,7 +51,6 @@ class NodeItem(BaseModel): depedency: Optional[DependencyItem] = None position: PositionItem=Field(default=PositionItem()) editable: bool = Field(default=True) - debug: bool = Field(default=False) class EdgeItem(BaseModel): """请求/响应中的边变量类""" diff --git a/apps/manager/flow.py b/apps/manager/flow.py index a606791c..29be1d23 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -231,7 +231,6 @@ class FlowManager: parameters=node_config["params"], position=PositionItem( x=node_config['pos']['x'], y=node_config['pos']['y']), - debug=node_config['debug'] ) flow_item.nodes.append(node_item) @@ -295,7 +294,7 @@ class FlowManager: description=flow_item.description, steps=[], edges=[], - debug=flow_item.debug, + debug=False, ) for node_item in flow_item.nodes: edge_config = Step( -- Gitee