diff --git a/apps/common/config.py b/apps/common/config.py index 699fda4841a22bbec6b24be456da1a6e323ca37a..57cbd9012ce06992fb78a849a481ac5f0102c141 100644 --- a/apps/common/config.py +++ b/apps/common/config.py @@ -88,6 +88,8 @@ class ConfigModel(BaseModel): SEMANTICS_DIR: Optional[str] = Field(description="语义配置路径", default=None) # SQL接口路径 SQL_URL: str = Field(description="Chat2DB接口路径") + # 应用流本地存储路径 + SERVICE_DIR: Optional[str] = Field(description="应用流本地存储路径", default=None) class Config: diff --git a/apps/entities/flow.py b/apps/entities/flow.py index accf440ab0e4b05584fe7c96d89eba2d7817e3f0..37690666258f02d6239c8cf6a55c7695514da6f4 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -26,7 +26,7 @@ class Edge(BaseModel): id: str = Field(description="边的ID") edge_from: str = Field(description="边的来源节点ID") edge_to: str = Field(description="边的目标节点ID") - edge_type: Optional[EdgeType] = Field(description="边的类型") + edge_type: Optional[EdgeType] = Field(description="边的类型",default = EdgeType.NORMAL) class Step(BaseModel): diff --git a/apps/entities/pool.py b/apps/entities/pool.py index 28155cb78c602ff83ee24e72aa303a00fc76b0f6..f43ad36ab15055261123ea42798f8012d258f193 100644 --- a/apps/entities/pool.py +++ b/apps/entities/pool.py @@ -90,7 +90,7 @@ class AppFlow(BaseData): path: str = Field(description="Flow的路径") focus_point: PositionItem = Field( description="Flow的视觉焦点", default=PositionItem(x=0, y=0)) - + debug: bool = Field(description="调试是否成功", default=False) class AppPool(BaseData): """应用信息 diff --git a/apps/manager/flow.py b/apps/manager/flow.py index 848a58cf9fdecc41699320952adf3f773242bc59..70735de4001baa81b117d5681b1a2c141737c1a7 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -19,6 +19,7 @@ from apps.entities.flow_topology import ( ) from apps.entities.pool import AppFlow from apps.models.mongo import MongoDB +from apps.scheduler.pool.loader.flow import FlowLoader class FlowManager: @@ -205,11 +206,8 @@ class FlowManager: return None try: if flow_record: - flow_config_collection = MongoDB.get_collection("flow_config") - flow_config_record = await flow_config_collection.find_one({"app_id": app_id, "flow_id": flow_id}) - if flow_config_record is None or not flow_config_record.get("flow_config"): - return None - flow_config = flow_config_record["flow_config"] + flow_config= await FlowLoader.load(app_id, flow_id) + flow_config = flow_config.dict() if not flow_config: LOGGER.error( "Get flow config by app_id and flow_id failed") @@ -267,7 +265,7 @@ class FlowManager: @staticmethod async def put_flow_by_app_and_flow_id( - app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> Optional[str]: + app_id: str, flow_id: str, flow_item: FlowItem, focus_point: PositionItem) -> Optional[FlowItem]: """存储/更新flow的数据库数据和配置文件 :param app_id: 应用的id @@ -326,17 +324,8 @@ class FlowManager: edge_type=edge_item.type ) flow_config.edges.append(edge_config) - flow_config = FlowConfig(app_id=app_id, flow_id=flow_id, flow_config=flow_config) - try: - flow_config_collection = MongoDB.get_collection("flow_config") - await flow_config_collection.update_one( - {"app_id": app_id, "flow_id": flow_id}, - {"$set": flow_config.dict()}, - upsert=True, # 如果没有找到匹配的文档,则插入新文档 - ) - except Exception as e: - LOGGER.error(f"Error updating flow config due to: {e}") - return None + await FlowLoader.save(app_id, flow_id, flow_config) + flow_config = await FlowLoader.load(app_id, flow_id) if flow_record: app_collection = MongoDB.get_collection("app") result = await app_collection.find_one_and_update( @@ -387,11 +376,7 @@ class FlowManager: :return: 流的id """ try: - flow_config_collection = MongoDB.get_collection("flow_config") - result = await flow_config_collection.delete_one({"app_id": app_id, "flow_id": flow_id}) - if result.deleted_count == 0: - LOGGER.error("Delete flow config failed") - return None + result = await FlowLoader.delete(app_id, flow_id) app_pool_collection = MongoDB.get_collection("app") # 获取集合 result = await app_pool_collection.find_one_and_update( diff --git a/apps/routers/appcenter.py b/apps/routers/appcenter.py index de134cb46cd78567f6f6d077d1c7204e1aac073c..d0dce1978315153f7a24e644051d37927c327ef9 100644 --- a/apps/routers/appcenter.py +++ b/apps/routers/appcenter.py @@ -182,7 +182,7 @@ async def get_application( result={}, ).model_dump(exclude_none=True, by_alias=True), ) - workflows = [{"id": flow.id, "name": flow.name} for flow in app_data.flows] + workflows = [{"id": flow.id, "name": flow.name, "debug":flow.debug} for flow in app_data.flows] return JSONResponse( status_code=status.HTTP_200_OK, content=GetAppPropertyRsp( diff --git a/apps/routers/user.py b/apps/routers/user.py index 9413f5f317648ccaa41c40c610db1fbb45ad4c9d..7d1da95a41437a6f79f62e4d83fd31b530f2b1b9 100644 --- a/apps/routers/user.py +++ b/apps/routers/user.py @@ -46,6 +46,7 @@ async def chat( return JSONResponse(status_code=status.HTTP_200_OK, content=UserGetRsp( code=status.HTTP_200_OK, - message="节点元数据详细信息获取成功", + message="用户数据详细信息获取成功", result=UserGetMsp(userInfoList=user_info_list), ).model_dump(exclude_none=True, by_alias=True)) + diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index 8f7e1ea026cc3e435cd31224a149ce7991ad2db6..c455a030a4a348b318c107fcf73e532b659f4e10 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -2,12 +2,32 @@ Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. """ +import asyncio +from pathlib import Path + +import aiofiles import yaml from anyio import Path from apps.common.config import config +from apps.constants import LOGGER +from apps.entities.enum_var import EdgeType from apps.entities.flow import Flow +from apps.models.mongo import MongoDB + +async def search_step_type(node_id: str) -> str: + node_collection = MongoDB.get_collection("node") + # 查询 Node 集合获取对应的 call_id + node_doc = await node_collection.find_one({"_id": node_id}) + if not node_doc: + LOGGER.error(f"Node {node_id} not found") + return "" + call_id = node_doc.get("call_id") + if not call_id: + LOGGER.error(f"Node {node_id} has no associated call_id") + return "" + return call_id class FlowLoader: """工作流加载器""" @@ -15,28 +35,96 @@ class FlowLoader: @classmethod async def load(cls, app_id: str, flow_id: str) -> Flow: """从文件系统中加载【单个】工作流""" - flow_path = Path(config["SEMANTICS_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" + flow_path = Path(config["SERVICE_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" + async with aiofiles.open(flow_path, mode='r', encoding="utf-8") as f: + flow_yaml = yaml.safe_load(await f.read()) + + if "name" not in flow_yaml: + err = f"工作流名称不能为空:{flow_path!s}" + raise ValueError(err) + + if "::" in flow_id: + err = f"工作流名称包含非法字符:{flow_path!s}" + raise ValueError(err) + + for edge in flow_yaml["edges"]: + # 把from变成edge_from,to改成edge_to,type改成edge_type + if "from" in edge: + edge["edge_from"] = edge.pop("from") + if "to" in edge: + edge["edge_to"] = edge.pop("to") + if "type" in edge: + # 将 type 转换为 EdgeType 枚举类型 + try: + edge["edge_type"] = EdgeType[edge.pop("type").upper()] + except KeyError as e: + LOGGER.error(f"Invalid edge type: {edge['type']}") - f = await flow_path.open(encoding="utf-8") - flow_yaml = yaml.safe_load(await f.read()) - await f.aclose() + for step in flow_yaml["steps"]: + if step["node"] in ["start", "end"]: + step["type"] = step["node"] + else: + step["type"] = await search_step_type(step["node"]) - flow_yaml["id"] = flow_id try: # 检查Flow格式,并转换为Flow对象 flow = Flow.model_validate(flow_yaml) except Exception as e: - err = f"工作流格式错误:{e!s}; 文件路径:{flow_path!s}" - raise ValueError(err) from e - + LOGGER.error(f"Invalid flow format: {e}") + return None return flow - @classmethod async def save(cls, app_id: str, flow_id: str, flow: Flow) -> None: """保存工作流""" - file = Path(config["SEMANTICS_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" + flow_path = Path(config["SERVICE_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" + if not await flow_path.parent.exists(): + await flow_path.parent.mkdir(parents=True) + if not await flow_path.exists(): + await flow_path.touch() + + flow_dict = { + "name": flow.name, + "description": flow.description, + "on_error": flow.on_error.dict(), + "steps": [ + { + "id": step.id, + "name": step.name, + "description": step.description, + "node": step.node, + "params": step.params, + "pos": step.pos.dict(), + } + for step in flow.steps + ], + "edges": [ + { + "id": edge.id, + "from": edge.edge_from, + "to": edge.edge_to, + "type": edge.edge_type.value, + } + for edge in flow.edges + ] + } - file_handler = await file.open(mode="w", encoding="utf-8") - yaml.dump(flow.model_dump(), file_handler) - await file_handler.aclose() + async with aiofiles.open(flow_path, mode='w', encoding="utf-8") as f: + await f.write(yaml.dump(flow_dict, allow_unicode=True, sort_keys=False)) + + @classmethod + async def delete(cls, app_id: str, flow_id: str) -> bool: + """删除指定工作流文件""" + flow_path = Path(config["SERVICE_DIR"]) / "app" / app_id / "flow" / f"{flow_id}.yaml" + # 确保目标为文件且存在 + if await flow_path.is_file(): + try: + await flow_path.unlink() + LOGGER.info(f"Successfully deleted flow file: {flow_path}") + return True + except OSError as e: + LOGGER.error(f"Failed to delete flow file {flow_path}: {e}") + return False + else: + LOGGER.warning(f"Flow file does not exist or is not a file: {flow_path}") + return False