From 96cd7c75d14febc3aa2fe792a0816a56666fed8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Sat, 22 Feb 2025 16:14:42 +0800 Subject: [PATCH 1/9] =?UTF-8?q?user=E8=BF=94=E5=9B=9E=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/routers/user.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/routers/user.py b/apps/routers/user.py index 9413f5f3..7d1da95a 100644 --- a/apps/routers/user.py +++ b/apps/routers/user.py @@ -46,6 +46,7 @@ async def chat( return JSONResponse(status_code=status.HTTP_200_OK, content=UserGetRsp( code=status.HTTP_200_OK, - message="节点元数据详细信息获取成功", + message="用户数据详细信息获取成功", result=UserGetMsp(userInfoList=user_info_list), ).model_dump(exclude_none=True, by_alias=True)) + -- Gitee From 793c28dbc338f4a58d769dc40e4d6743df814a3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Sat, 22 Feb 2025 18:03:34 +0800 Subject: [PATCH 2/9] =?UTF-8?q?=E6=9C=AC=E5=9C=B0=E5=AD=98=E5=8F=96flow=5F?= =?UTF-8?q?config?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/entities/flow.py | 2 +- apps/scheduler/pool/loader/flow.py | 71 +++++++++++++++++++++++++++--- 2 files changed, 65 insertions(+), 8 deletions(-) diff --git a/apps/entities/flow.py b/apps/entities/flow.py index bafe3664..cc29c4c1 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -26,7 +26,7 @@ class Edge(BaseModel): id: str = Field(description="边的ID") edge_from: str = Field(description="边的来源节点ID") edge_to: str = Field(description="边的目标节点ID") - edge_type: Optional[EdgeType] = Field(description="边的类型") + edge_type: Optional[EdgeType] = Field(description="边的类型",default = EdgeType.NORMAL) class Step(BaseModel): diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 703a99b2..69ecaa43 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -2,22 +2,48 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ +import asyncio from pathlib import Path import yaml from apps.common.config import config +from apps.constants import LOGGER +from apps.entities.enum_var import EdgeType from apps.entities.flow import Flow +from apps.models.mongo import MongoDB +async def search_step_type(node_id: str) -> str: + node_collection = MongoDB.get_collection("node") + call_collection = MongoDB.get_collection("call") + # 查询 Node 集合获取对应的 call_id + node_doc = await node_collection.find_one({"_id": node_id}) + if not node_doc: + LOGGER.error(f"Node {node_id} not found") + return None + call_id = node_doc.get("call_id") + if not call_id: + LOGGER.error(f"Node {node_id} has no associated call_id") + return None + # 查询 Call 集合获取 node_type + call_doc = await call_collection.find_one({"_id": call_id}) + if not call_doc: + LOGGER.error(f"No call found with call_id: {call_id}") + return None + node_type = call_doc.get("type") + if not node_type: + LOGGER.error(f"Call {call_id} has no associated node_type") + return None + return node_type + class FlowLoader: """工作流加载器""" @classmethod - def load(cls, app_id: str, flow_id: str) -> Flow: + async def load(cls, app_id: str, flow_id: str) -> Flow: """从文件系统中加载【单个】工作流""" flow_path = Path(config["SERVICE_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" - with flow_path.open(encoding="utf-8") as f: flow_yaml = yaml.safe_load(f) @@ -25,21 +51,52 @@ class FlowLoader: err = f"工作流名称不能为空:{flow_path!s}" raise ValueError(err) - if "::" in flow_yaml["id"]: + if "::" in flow_id: err = f"工作流名称包含非法字符:{flow_path!s}" raise ValueError(err) + for edge in flow_yaml["edges"]: + # 把from变成edge_from,to改成edge_to,type改成edge_type + if "from" in edge: + edge["edge_from"] = edge.pop("from") + if "to" in edge: + edge["edge_to"] = edge.pop("to") + if "type" in edge: + # 将 type 转换为 EdgeType 枚举类型 + try: + edge["edge_type"] = EdgeType[edge.pop("type").upper()] + except KeyError as e: + LOGGER.error(f"Invalid edge type: {edge['type']}") + + for step in flow_yaml["steps"]: + step["type"] = await search_step_type(step["node"]) + try: # 检查Flow格式,并转换为Flow对象 flow = Flow.model_validate(flow_yaml) except Exception as e: - err = f"工作流格式错误:{e!s}; 文件路径:{flow_path!s}" - raise ValueError(err) from e + LOGGER.error(f"Invalid flow format: {e}") + return None return flow @classmethod - def save(cls, app_id: str, flow_id: str, flow: Flow) -> None: + async def save(cls, app_id: str, flow_id: str, flow: Flow) -> None: """保存工作流""" - pass + flow_path = Path(config["SERVICE_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" + if not flow_path.parent.exists(): + flow_path.parent.mkdir(parents=True) + if not flow_path.exists(): + flow_path.touch() + #输出到文件 + with open(flow_path, "w", encoding="utf-8") as f: + yaml.dump(flow.dict(), f, allow_unicode=True) + + + +if __name__ == "__main__": + # 测试代码 + Loader=FlowLoader() + flow = asyncio.run(Loader.load("1","1")) + asyncio.run(Loader.save("1","2",flow)) \ No newline at end of file -- Gitee From 581b76c0ec33eccf0164dc767dbf964c2bb08ed0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Sat, 22 Feb 2025 18:05:00 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E5=BA=94=E7=94=A8=E4=B8=AD=E5=BF=83?= =?UTF-8?q?=E5=BA=94=E7=94=A8=E6=8E=A5=E5=8F=A3=E5=A2=9E=E5=8A=A0=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E6=B5=81debug=E6=83=85=E5=86=B5=E8=BF=94=E5=9B=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/entities/pool.py | 2 +- apps/routers/appcenter.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/entities/pool.py b/apps/entities/pool.py index 810b6358..89c9b901 100644 --- a/apps/entities/pool.py +++ b/apps/entities/pool.py @@ -84,7 +84,7 @@ class AppFlow(BaseData): path: str = Field(description="Flow的路径") focus_point: PositionItem = Field( description="Flow的视觉焦点", default=PositionItem(x=0, y=0)) - + debug: bool = Field(description="调试是否成功", default=False) class AppPool(BaseData): """应用信息 diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index de134cb4..d0dce197 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -182,7 +182,7 @@ async def get_application( result={}, ).model_dump(exclude_none=True, by_alias=True), ) - workflows = [{"id": flow.id, "name": flow.name} for flow in app_data.flows] + workflows = [{"id": flow.id, "name": flow.name, "debug":flow.debug} for flow in app_data.flows] return JSONResponse( status_code=status.HTTP_200_OK, content=GetAppPropertyRsp( -- Gitee From 144f5c45b337157dbeb07f773d9d33e1f6ba8ebe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Sat, 22 Feb 2025 18:26:16 +0800 Subject: [PATCH 4/9] =?UTF-8?q?flow=5Fconfig=E5=88=A0=E5=8E=BB=E6=97=A0?= =?UTF-8?q?=E7=94=A8=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/pool/loader/flow.py | 32 +++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 69ecaa43..da740e88 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -21,20 +21,20 @@ async def search_step_type(node_id: str) -> str: node_doc = await node_collection.find_one({"_id": node_id}) if not node_doc: LOGGER.error(f"Node {node_id} not found") - return None + return "" call_id = node_doc.get("call_id") if not call_id: LOGGER.error(f"Node {node_id} has no associated call_id") - return None + return "" # 查询 Call 集合获取 node_type call_doc = await call_collection.find_one({"_id": call_id}) if not call_doc: LOGGER.error(f"No call found with call_id: {call_id}") - return None + return "" node_type = call_doc.get("type") if not node_type: LOGGER.error(f"Call {call_id} has no associated node_type") - return None + return "" return node_type class FlowLoader: @@ -90,8 +90,30 @@ class FlowLoader: if not flow_path.exists(): flow_path.touch() #输出到文件 + flow_dict ={} + flow_dict["name"]=flow.name + flow_dict["description"]=flow.description + flow_dict["on_error"]=flow.on_error.dict() + flow_dict["steps"]=[] + for step in flow.steps: + flow_dict["steps"].append({ + "name":step.name, + "description":step.description, + "node":step.node, + "params":step.params, + "pos":step.pos.dict(), + }) + flow_dict["edges"]=[] + for edge in flow.edges: + flow_dict["edges"].append({ + "id":edge.id, + "from":edge.edge_from, + "to":edge.edge_to, + "type":edge.edge_type.value, + }) + with open(flow_path, "w", encoding="utf-8") as f: - yaml.dump(flow.dict(), f, allow_unicode=True) + yaml.dump(flow_dict, f, allow_unicode=True, sort_keys=False) -- Gitee From 31e5d66b23b380f7f165e4f2c9303c2c13d63db3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Sun, 23 Feb 2025 14:14:17 +0800 Subject: [PATCH 5/9] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/pool/loader/flow.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index da740e88..a86df021 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -114,11 +114,3 @@ class FlowLoader: with open(flow_path, "w", encoding="utf-8") as f: yaml.dump(flow_dict, f, allow_unicode=True, sort_keys=False) - - - -if __name__ == "__main__": - # 测试代码 - Loader=FlowLoader() - flow = asyncio.run(Loader.load("1","1")) - asyncio.run(Loader.save("1","2",flow)) \ No newline at end of file -- Gitee From 20d1f36b0fd5c4bf7a5e10280faf11abbbbb2fc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Sun, 23 Feb 2025 16:20:09 +0800 Subject: [PATCH 6/9] =?UTF-8?q?flow=20config=E5=AD=98=E5=82=A8=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E5=88=B0=E6=9C=AC=E5=9C=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/manager/flow.py | 29 +++++++---------------------- apps/scheduler/pool/loader/flow.py | 26 ++++++++++++++++++++++++-- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/apps/manager/flow.py b/apps/manager/flow.py index 848a58cf..70735de4 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -19,6 +19,7 @@ from apps.entities.flow_topology import ( ) from apps.entities.pool import AppFlow from apps.models.mongo import MongoDB +from apps.scheduler.pool.loader.flow import FlowLoader class FlowManager: @@ -205,11 +206,8 @@ class FlowManager: return None try: if flow_record: - flow_config_collection = MongoDB.get_collection("flow_config") - flow_config_record = await flow_config_collection.find_one({"app_id": app_id, "flow_id": flow_id}) - if flow_config_record is None or not flow_config_record.get("flow_config"): - return None - flow_config = flow_config_record["flow_config"] + flow_config= await FlowLoader.load(app_id, flow_id) + flow_config = flow_config.dict() if not flow_config: LOGGER.error( "Get flow config by app_id and flow_id failed") @@ -267,7 +265,7 @@ class FlowManager: @staticmethod async def put_flow_by_app_and_flow_id( - app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> Optional[str]: + app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> Optional[FlowItem]: """存储/更新flow的数据库数据和配置文件 :param app_id: 应用的id @@ -326,17 +324,8 @@ class FlowManager: edge_type=edge_item.type ) flow_config.edges.append(edge_config) - flow_config = FlowConfig(app_id=app_id, flow_id=flow_id, flow_config=flow_config) - try: - flow_config_collection = MongoDB.get_collection("flow_config") - await flow_config_collection.update_one( - {"app_id": app_id, "flow_id": flow_id}, - {"$set": flow_config.dict()}, - upsert=True, # 如果没有找到匹配的文档,则插入新文档 - ) - except Exception as e: - LOGGER.error(f"Error updating flow config due to: {e}") - return None + await FlowLoader.save(app_id, flow_id, flow_config) + flow_config = await FlowLoader.load(app_id, flow_id) if flow_record: app_collection = MongoDB.get_collection("app") result = await app_collection.find_one_and_update( @@ -387,11 +376,7 @@ class FlowManager: :return: 流的id """ try: - flow_config_collection = MongoDB.get_collection("flow_config") - result = await flow_config_collection.delete_one({"app_id": app_id, "flow_id": flow_id}) - if result.deleted_count == 0: - LOGGER.error("Delete flow config failed") - return None + result = await FlowLoader.delete(app_id, flow_id) app_pool_collection = MongoDB.get_collection("app") # 获取集合 result = await app_pool_collection.find_one_and_update( diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index a86df021..fc733a59 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -69,7 +69,12 @@ class FlowLoader: LOGGER.error(f"Invalid edge type: {edge['type']}") for step in flow_yaml["steps"]: - step["type"] = await search_step_type(step["node"]) + if step["id"] == "node1": + step["type"] = "start" + elif step["id"] == "node2": + step["type"] = "end" + else: + step["type"] = await search_step_type(step["node"]) try: # 检查Flow格式,并转换为Flow对象 @@ -77,7 +82,6 @@ class FlowLoader: except Exception as e: LOGGER.error(f"Invalid flow format: {e}") return None - return flow @@ -97,6 +101,7 @@ class FlowLoader: flow_dict["steps"]=[] for step in flow.steps: flow_dict["steps"].append({ + "id":step.id, "name":step.name, "description":step.description, "node":step.node, @@ -114,3 +119,20 @@ class FlowLoader: with open(flow_path, "w", encoding="utf-8") as f: yaml.dump(flow_dict, f, allow_unicode=True, sort_keys=False) + + @classmethod + async def delete(cls, app_id: str, flow_id: str) -> bool: + """删除指定工作流文件""" + flow_path = Path(config["SERVICE_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" + # 确保目标为文件且存在 + if flow_path.is_file(): + try: + flow_path.unlink() + LOGGER.info(f"Successfully deleted flow file: {flow_path}") + return True + except OSError as e: + LOGGER.error(f"Failed to delete flow file {flow_path}: {e}") + return False + else: + LOGGER.warning(f"Flow file does not exist or is not a file: {flow_path}") + return False -- Gitee From 71d5e98640b8d03aa5a7ab0ff428fffc6d9a8320 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Sun, 23 Feb 2025 16:43:46 +0800 Subject: [PATCH 7/9] =?UTF-8?q?config=E4=B8=AD=E5=A2=9E=E5=8A=A0=E6=9C=AC?= =?UTF-8?q?=E5=9C=B0=E5=AD=98=E5=82=A8=E8=B7=AF=E5=BE=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/common/config.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/common/config.py b/apps/common/config.py index 699fda48..57cbd901 100644 --- a/apps/common/config.py +++ b/apps/common/config.py @@ -88,6 +88,8 @@ class ConfigModel(BaseModel): SEMANTICS_DIR: Optional[str] = Field(description="语义配置路径", default=None) # SQL接口路径 SQL_URL: str = Field(description="Chat2DB接口路径") + # 应用流本地存储路径 + SERVICE_DIR: Optional[str] = Field(description="应用流本地存储路径", default=None) class Config: -- Gitee From 7e0302241b440effdff2556d76f4b845501236f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Sun, 23 Feb 2025 17:09:15 +0800 Subject: [PATCH 8/9] =?UTF-8?q?=E8=8A=82=E7=82=B9=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E6=9B=B4=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/pool/loader/flow.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 7e4d3443..c82c9585 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -71,10 +71,8 @@ class FlowLoader: LOGGER.error(f"Invalid edge type: {edge['type']}") for step in flow_yaml["steps"]: - if step["id"] == "node1": - step["type"] = "start" - elif step["id"] == "node2": - step["type"] = "end" + if step["id"] in ["start", "end"]: + step["type"] = step["id"] else: step["type"] = await search_step_type(step["node"]) -- Gitee From 48df4d0d317116ef86e28ed62a881084ad9433b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Sun, 23 Feb 2025 18:06:48 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/pool/loader/flow.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index c82c9585..c455a030 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -18,7 +18,6 @@ from apps.models.mongo import MongoDB async def search_step_type(node_id: str) -> str: node_collection = MongoDB.get_collection("node") - call_collection = MongoDB.get_collection("call") # 查询 Node 集合获取对应的 call_id node_doc = await node_collection.find_one({"_id": node_id}) if not node_doc: @@ -28,16 +27,7 @@ async def search_step_type(node_id: str) -> str: if not call_id: LOGGER.error(f"Node {node_id} has no associated call_id") return "" - # 查询 Call 集合获取 node_type - call_doc = await call_collection.find_one({"_id": call_id}) - if not call_doc: - LOGGER.error(f"No call found with call_id: {call_id}") - return "" - node_type = call_doc.get("type") - if not node_type: - LOGGER.error(f"Call {call_id} has no associated node_type") - return "" - return node_type + return call_id class FlowLoader: """工作流加载器""" @@ -71,8 +61,8 @@ class FlowLoader: LOGGER.error(f"Invalid edge type: {edge['type']}") for step in flow_yaml["steps"]: - if step["id"] in ["start", "end"]: - step["type"] = step["id"] + if step["node"] in ["start", "end"]: + step["type"] = step["node"] else: step["type"] = await search_step_type(step["node"]) -- Gitee