From 4e09360d8b4eabf6e66595373119ee0f9f4c095e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Mon, 24 Feb 2025 17:30:25 +0800 Subject: [PATCH 1/4] =?UTF-8?q?user=E5=8E=BB=E9=99=A4=E6=97=A0=E7=94=A8?= =?UTF-8?q?=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/routers/user.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/apps/routers/user.py b/apps/routers/user.py index 7d1da95a..1159560b 100644 --- a/apps/routers/user.py +++ b/apps/routers/user.py @@ -1,24 +1,16 @@ from typing import Annotated -from fastapi import APIRouter, Depends, HTTPException, status -from fastapi.responses import JSONResponse, StreamingResponse +from fastapi import APIRouter, Depends, status +from fastapi.responses import JSONResponse -from apps.common.queue import MessageQueue -from apps.common.wordscheck import WordsCheck -from apps.constants import LOGGER from apps.dependency import ( - get_session, get_user, verify_csrf_token, verify_user, ) -from apps.entities.request_data import RequestData -from apps.entities.response_data import ResponseData, UserGetMsp, UserGetRsp +from apps.entities.response_data import UserGetMsp, UserGetRsp from apps.entities.user import UserInfo -from apps.manager.appcenter import AppCenterManager -from apps.scheduler.scheduler import Scheduler -from apps.service.activity import Activity from apps.manager.user import UserManager router = APIRouter( @@ -29,7 +21,6 @@ router = APIRouter( @router.get("", dependencies=[Depends(verify_csrf_token), Depends(verify_user)]) async def chat( user_sub: Annotated[str, Depends(get_user)], - session_id: Annotated[str, Depends(get_session)], ) -> JSONResponse: """查询所有用户接口""" user_list = await UserManager.get_all_user_sub() -- Gitee From 2aae58749360f48b01638e0f18d08c8308e64b4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Mon, 24 Feb 2025 17:31:46 +0800 Subject: [PATCH 2/4] =?UTF-8?q?mock=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/routers/mock.py | 434 ++++++++++++------------------------------- 1 file changed, 114 insertions(+), 320 deletions(-) diff --git a/apps/routers/mock.py b/apps/routers/mock.py index d1271622..dc753ef1 100644 --- a/apps/routers/mock.py +++ b/apps/routers/mock.py @@ -1,7 +1,13 @@ -from fastapi import APIRouter, Depends, HTTPException, status -from fastapi.responses import StreamingResponse +import asyncio +import copy import json +import random +import time + import tiktoken +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.responses import StreamingResponse + from apps.common.config import config from apps.dependency import ( get_session, @@ -9,7 +15,9 @@ from apps.dependency import ( verify_csrf_token, verify_user, ) -from apps.entities.request_data import MockRequestData +from apps.entities.request_data import MockRequestData, RequestData +from apps.manager.flow import FlowManager +from apps.scheduler.pool.loader.flow import FlowLoader router = APIRouter( prefix="/api", @@ -17,13 +25,34 @@ router = APIRouter( ) -def mock_data(question): +def mock_data(appId="68dd3d90-6a97-4da0-aa62-d38a81c7d2f5", flowId="966c7964-e1c1-4bd8-9333-ed099cf25908", conversationId="eccb08c3-0621-4602-a4d2-4eaada892557", question="你好"): _encoder = tiktoken.get_encoding("cl100k_base") - conversationId="eccb08c3-0621-4602-a4d2-4eaada892557" - appId="68dd3d90-6a97-4da0-aa62-d38a81c7d2f5" - flowId="966c7964-e1c1-4bd8-9333-ed099cf25908" - stepIds = ["node1","b7607efc-0dc7-4f7a-a2b2-dba60013b3b5","d60a0fde-cd75-48ee-ba6c-b861b0a94f81"] - messages = [{ + start_message = [{ # 任务流开始 + "event": "flow.start", + "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", + "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", + "conversationId": conversationId, + "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", + "flow": { + "appId": appId, + "flowId": flowId, + "stepId": "start", + "stepStatus": "pending", + }, + "content": { + "question": "查询所有主机的CVE信息", + "params": { + "cveId": "CVE-2021-44228", + "host": "192.168.10.1" + } + }, + "metadata": { + "inputTokens": 200, + "outputTokens": 50, + "time_cost": 0.5 + } + }, + { "event": "init", "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", @@ -40,35 +69,11 @@ def mock_data(question): "metadata": { "input_tokens": 200, "output_tokens": 50, - "time": 0.5 - } - }, - { # 任务流开始 - "event": "flow.start", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", - "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "flow": { - "appId": appId, - "flowId": flowId, - "stepId": stepIds[0], - "stepStatus": "pending", - }, - "content": { - "question": "查询所有主机的CVE信息", - "params": { - "cveId": "CVE-2021-44228", - "host": "192.168.10.1" + "time_cost": 0.5 } }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5 - } - }, - { # 开始节点 + ] + sample_input = { # 开始节点 "event": "step.input", "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", @@ -77,19 +82,20 @@ def mock_data(question): "flow": { "appId": appId, "flowId":flowId, - "stepId": stepIds[0], + "stepId": "start", "stepName": "开始", "stepStatus": "running", }, "content": { + "text":"测试输入" }, "metadata": { "inputTokens": 200, "outputTokens": 50, - "time": 0.5, + "time_cost": 0.5, + } } - }, - { + sample_output = { # 开始节点 "event": "step.output", "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", @@ -98,147 +104,62 @@ def mock_data(question): "flow": { "appId": appId, "flowId":flowId, - "stepId": stepIds[0], + "stepId": "start", "stepName": "开始", "stepStatus": "success", }, "content": { + "text":"测试输出" }, "metadata": { "inputTokens": 200, "outputTokens": 50, - "time": 0.5, + "time_cost": 0.5, } - }, - { # 【API】获取任务简介 - "event": "step.input", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", - "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "flow": { - "appId": appId, - "flowId":flowId, - "stepId": stepIds[1], - "stepName": "【API】获取任务简介", - "stepStatus": "running", - }, - "content": { - "full_url": "https://a-ops3.local/vulnerabilities/task/list/get", - "service_id": "aops-apollo", - "method": "post", - "input_data": { - "page": 1, - "page_size": 10, - "filter": { - "cluster_list": [], - }, - }, - "timeout": 300, - "output_key": [ - { - "key": "data.result", - "path": "task_list", - }, - ], - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5, } - }, - { - "event": "step.output", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", - "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "flow": { - "appId": appId, - "flowId":flowId, - "stepId": stepIds[1], - "stepName": "【API】获取任务简介", - "stepStatus": "success", - }, - "content": { - "task_list":[ - { - "task_id":"eb717bc7-3435-4172-82d1-6b12e62f3fd6", - "task_name":"A", - "task_type":"cve_scan", - }, - { - "task_id":"eb717bc7-3435-4172-82d1-6b13e62f3fd6", - "task_name":"B", - "task_type":"cve_fix", - }, - ] - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5, - } - }, + mid_message = [] + flow = asyncio.run(FlowLoader.load(appId, flowId)) + now_flow_item = "start" + start_time = time.time() + last_item = "" + mapp={} + # print(json.dumps(flow)) + for step_id, step in flow.steps.items(): + mapp[step_id]= step.name, step.params + while now_flow_item != "end": + if last_item == now_flow_item: + break + #如果超过10s强制退出 + if time.time() - start_time > 10: + break + last_item = now_flow_item + for edge in flow.edges: + if edge.edge_from.split('.')[0] == now_flow_item: + sample_input["flow"]["stepId"] = now_flow_item + sample_input["flow"]["stepName"],sample_input["content"] = mapp[now_flow_item] + sample_input["content"] = sample_input["content"]["input_parameters"] if now_flow_item != "start" else sample_input["content"] + mid_message.append(copy.deepcopy(sample_input)) + sample_output["metadata"]["time_cost"] = random.uniform(0.5, 1.5) + sample_output["flow"]["stepId"] = now_flow_item + sample_output["flow"]["stepName"],sample_output["content"] = mapp[now_flow_item] + sample_output["content"] = sample_output["content"]["output_parameters"] if now_flow_item != "start" else sample_output["content"] + if sample_output["flow"]["stepName"] == "【RAG】知识库智能问答": + sample_output["content"] = call_rag() + if sample_output["flow"]["stepName"] == "【LLM】模型问答": + sample_output["content"] = call_llm() + mid_message.append(copy.deepcopy(sample_output)) + now_flow_item = edge.edge_to - { # 【CHOICE】判断任务类型 - "event": "step.input", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", - "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "flow": { - "appId": appId, - "flowId":flowId, - "stepId": stepIds[2], - "stepName": "【CHOICE】判断任务类型", - "stepStatus": "running", - }, - "content": { - "choices": [ - { - "branchId": "is_scan", - "description": '任务类型为"CVE修复任务"', - "propose": '当值为cve_scan时,任务类型为"CVE修复任务",选择此分支', - "variable_a": "{{input.task_list[0].task_type}}", - }, - { - "branchId": "is_fix", - "description": '任务类型为"CVE修复任务"', - "propose": '当值为cve_fix时,任务类型为"CVE修复任务",选择此分支', - "variable_a": "{{input.task_list[0].task_type}}", - }, - ], - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5, - } - }, - { - "event": "step.output", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", - "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "flow": { - "appId": appId, - "flowId":flowId, - "stepId": stepIds[2], - "stepName": "【CHOICE】判断任务类型", - "stepStatus": "error", - }, - "content": { - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5, - } - }, + if now_flow_item == "end": + sample_input["flow"]["stepId"] = now_flow_item + sample_input["flow"]["stepName"] = "结束" + mid_message.append(sample_input) + sample_output["flow"]["stepId"] = now_flow_item + sample_output["flow"]["stepName"] = "结束" + sample_output["metadata"]["time_cost"] = random.uniform(0.5, 1.5) + mid_message.append(sample_output) + end_message = [ { # flow结束 "event": "flow.stop", "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", @@ -248,179 +169,52 @@ def mock_data(question): "flow": { "appId": appId, "flowId":flowId, - "stepId": stepIds[2], - "stepName": "【CHOICE】判断任务类型", - "stepStatus": "error", + "stepId": "end", + "stepName": "end", + "stepStatus": "success", }, "content": { }, "measure": { "inputTokens": 200, "outputTokens": 50, - "time": 0.5 - } - }, - - - # 文字返回 - { - "event": "text.add", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", - "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "content": { - "text": "思考測試思考" - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5 - } - }, - - { - "event": "text.add", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", - "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "content": { - "text": "语句" - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5 - } - }, - - { - "event": "text.add", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", - "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "content": { - "text": "语句\n" - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5 - } - }, - { - "event": "text.add", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13-1", - "groupId": "8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "content": { - "text": "| Header1 | Header2 | Header3 |\n|---------|---------|---------|\n| Row1Col1| Row1Col2| Row1Col3|\n" - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5 - } - }, - { - "event": "text.add", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13-2", - "groupId": "8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "content": { - "text": "| Row2Col1| Row2Col2| Row2Col3|\n" - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5 - } - }, - { - "event": "text.add", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13-3", - "groupId": "8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": "conversationId", - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "content": { - "text": "| Row3Col1| Row3Col2| Row3Col3|\n" - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5 - } - }, - { - "event": "text.add", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13-4", - "groupId": "8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "content": { - "text": "| Row4Col1| Row4Col2| Row4Col3|\n" - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5 - } - }, - { - "event": "text.add", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13-5", - "groupId": "8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "content": { - "text": "| Row5Col1| Row5Col2| Row5Col3|\n" - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5 - } - }, - { - "event": "text.stop", - "id": "0f9d3e6b-7845-44ab-b247-35c522d38f13", - "groupId":"8b9d3e6b-a892-4602-b247-35c522d38f13", - "conversationId": conversationId, - "taskId": "eb717bc7-3435-4172-82d1-6b69e62f3fd6", - "content": { - "text": "[DONE]\n" - }, - "metadata": { - "inputTokens": 200, - "outputTokens": 50, - "time": 0.5 + "time_cost": 0.5 } - } + }] + chat_message = [ ] - - import time - import random + messages = [] + for message in start_message: + messages.append(message) + for message in mid_message: + messages.append(message) + for message in end_message: + messages.append(message) + for message in chat_message: + messages.append(message) + asyncio.run(FlowManager.updata_flow_debug_by_app_and_flow_id(appId,flowId,True)) + for message in messages: if message['event']=='step.output': - t=random.uniform(3, 5.5) + t=message['metadata']['time_cost'] time.sleep(t) - message['time_cost']=t elif message['event']=='text.add': t=random.uniform(0.15, 0.2) time.sleep(t) yield "data: " + json.dumps(message,ensure_ascii=False) + "\n\n" +async def call_rag(): + return "RAG" + +async def call_llm(): + return "LLM" @router.post("/mock/chat", dependencies=[Depends(verify_csrf_token), Depends(verify_user)]) async def chat( post_body: MockRequestData, ) -> StreamingResponse: """LLM流式对话接口""" - res = mock_data(post_body.question) + res = mock_data(appId=post_body.app_id, conversationId=post_body.conversation_id, flowId=post_body.flow_id,question=post_body.question) return StreamingResponse( content=res, media_type="text/event-stream", -- Gitee From 699a23fbef171a4326d7c92f8dab7bcd3ebddc7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Mon, 24 Feb 2025 17:34:42 +0800 Subject: [PATCH 3/4] =?UTF-8?q?flow=E6=95=B0=E6=8D=AE=E7=BB=93=E6=9E=84?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=80=82=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/manager/flow.py | 29 +++++++++++++++++++++-------- apps/routers/flow.py | 6 +++--- apps/scheduler/pool/loader/flow.py | 23 ++++++++++++----------- 3 files changed, 36 insertions(+), 22 deletions(-) diff --git a/apps/manager/flow.py b/apps/manager/flow.py index 70735de4..78e5ea9b 100644 --- a/apps/manager/flow.py +++ b/apps/manager/flow.py @@ -191,7 +191,6 @@ class FlowManager: {"_id": app_id, "flows._id": flow_id}, {"flows.$": 1}, # 只返回 flows 数组中符合条件的第一个元素 ) - # 获取结果列表,并限制长度为1,因为我们只期待一个结果 app_records = await cursor.to_list(length=1) if len(app_records) == 0: @@ -222,11 +221,10 @@ class FlowManager: nodes=[], edges=[], createdAt=flow_record["created_at"], - debug=flow_config["debug"], ) - for node_config in flow_config["steps"]: + for node_id, node_config in flow_config["steps"].items(): node_item = NodeItem( - nodeId=node_config["id"], + nodeId=node_id, nodeMetaDataId=node_config["node"], name=node_config["name"], description=node_config["description"], @@ -297,13 +295,12 @@ class FlowManager: flow_config = Flow( name=flow_item.name, description=flow_item.description, - steps=[], + steps={}, edges=[], debug=False, ) for node_item in flow_item.nodes: - edge_config = Step( - id=node_item.node_id, + flow_config.steps[node_item.node_id] = Step( type=node_item.type, node=node_item.node_meta_data_id, name=node_item.name, @@ -312,7 +309,6 @@ class FlowManager: y=node_item.position.y), params=node_item.parameters, ) - flow_config.steps.append(edge_config) for edge_item in flow_item.edges: edge_from = edge_item.source_node if edge_item.branch_id: @@ -395,3 +391,20 @@ class FlowManager: LOGGER.error( f"Delete flow by app_id and flow_id failed due to: {e}") return None + + @staticmethod + async def updata_flow_debug_by_app_and_flow_id(app_id: str, flow_id: str, debug: bool)-> bool: + try: + app_pool_collection = MongoDB.get_collection("app") + result = await app_pool_collection.find_one_and_update( + {"_id": app_id}, + {"$set": {"flows.$[flow].debug": debug}}, + array_filters=[{"flow._id": flow_id}] # 使用关键字参数 array_filters + ) + if result is None: + LOGGER.error("Update flow debug from app pool failed") + return False + return True + except Exception as e: + LOGGER.error(f'Update flow debug from app pool failed: {e}') + return False diff --git a/apps/routers/flow.py b/apps/routers/flow.py index 7ed4d5b8..8bc0d3e1 100644 --- a/apps/routers/flow.py +++ b/apps/routers/flow.py @@ -128,7 +128,7 @@ async def put_flow( message="用户没有权限访问该流", result=FlowStructurePutMsg(), ).model_dump(exclude_none=True, by_alias=True)) - # put_body.flow=await FlowService.remove_excess_structure_from_flow(put_body.flow) + put_body.flow=await FlowService.remove_excess_structure_from_flow(put_body.flow) if topology_check: await FlowService.validate_flow_connectivity(put_body.flow) await FlowService.validate_flow_illegal(put_body.flow) @@ -139,7 +139,7 @@ async def put_flow( message="应用下流更新失败", result=FlowStructurePutMsg(), ).model_dump(exclude_none=True, by_alias=True)) - flow,_=await FlowManager.get_flow_by_app_and_flow_id(app_id, flow_id) + flow=await FlowManager.get_flow_by_app_and_flow_id(app_id, flow_id) if flow is None: return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=FlowStructurePutRsp( code=status.HTTP_500_INTERNAL_SERVER_ERROR, @@ -149,7 +149,7 @@ async def put_flow( return JSONResponse(status_code=status.HTTP_200_OK, content=FlowStructurePutRsp( code=status.HTTP_200_OK, message="应用下流更新成功", - result=FlowStructurePutMsg(flow=flow) + result=FlowStructurePutMsg(flow=flow[0]) ).model_dump(exclude_none=True, by_alias=True)) diff --git a/apps/scheduler/pool/loader/flow.py b/apps/scheduler/pool/loader/flow.py index c0d39df3..013ab433 100644 --- a/apps/scheduler/pool/loader/flow.py +++ b/apps/scheduler/pool/loader/flow.py @@ -16,6 +16,7 @@ from apps.entities.flow import Flow from apps.models.mongo import MongoDB +# TODO:修改成公共函数 async def search_step_type(node_id: str) -> str: node_collection = MongoDB.get_collection("node") # 查询 Node 集合获取对应的 call_id @@ -31,7 +32,7 @@ async def search_step_type(node_id: str) -> str: async def search_step_name(node_id: str) -> str: node_collection = MongoDB.get_collection("node") - # 查询 Node 集合获取对应的 call_id + # 查询 Node 集合获取对应的 name node_doc = await node_collection.find_one({"_id": node_id}) if not node_doc: LOGGER.error(f"Node {node_id} not found") @@ -54,11 +55,13 @@ class FlowLoader: if "name" not in flow_yaml: err = f"工作流名称不能为空:{flow_path!s}" - raise ValueError(err) + LOGGER.error(err) + return None if "::" in flow_id: err = f"工作流名称包含非法字符:{flow_path!s}" - raise ValueError(err) + LOGGER.error(err) + return None for edge in flow_yaml["edges"]: # 把from变成edge_from,to改成edge_to,type改成edge_type @@ -73,14 +76,13 @@ class FlowLoader: except KeyError as e: LOGGER.error(f"Invalid edge type: {edge['type']}") - for step in flow_yaml["steps"]: + for key, step in flow_yaml["steps"].items(): if step["node"] in ["start", "end"]: step["type"] = step["node"] - step["name"] = step["node"] + step["name"] = "开始" if step["node"] == "start" else "结束" else: step["type"] = await search_step_type(step["node"]) step["name"] = await search_step_name(step["node"]) - try: # 检查Flow格式,并转换为Flow对象 flow = Flow.model_validate(flow_yaml) @@ -102,17 +104,16 @@ class FlowLoader: "name": flow.name, "description": flow.description, "on_error": flow.on_error.dict(), - "steps": [ - { - "id": step.id, + "steps": { + id: { "name": step.name, "description": step.description, "node": step.node, "params": step.params, "pos": step.pos.dict(), } - for step in flow.steps - ], + for id, step in flow.steps.items() + }, "edges": [ { "id": edge.id, -- Gitee From 7a97f591b39ae65d168a88119b2b9c8dcfa4f07d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=E5=8D=9A?= <1016318004@qq.com> Date: Mon, 24 Feb 2025 17:35:42 +0800 Subject: [PATCH 4/4] =?UTF-8?q?flow=E6=95=B0=E6=8D=AE=E7=BB=93=E6=9E=84?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/entities/flow.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/entities/flow.py b/apps/entities/flow.py index 37690666..134c22fb 100644 --- a/apps/entities/flow.py +++ b/apps/entities/flow.py @@ -32,7 +32,6 @@ class Edge(BaseModel): class Step(BaseModel): """Flow中Step的数据""" - id: str = Field(description="Step的ID") node: str = Field(description="Step的Node ID") type: str = Field(description="Step的类型") name: str = Field(description="Step的名称") @@ -52,9 +51,9 @@ class Flow(BaseModel): """Flow(工作流)的数据格式""" name: str = Field(description="Flow的名称", min_length=1) - description: str = Field(description="Flow的描述") + description: str = Field(description="Flow的描述 ") on_error: FlowError = FlowError(use_llm=True) - steps: list[Step] = Field(description="节点列表", default=[]) + steps: dict[str,Step] = Field(description="节点列表", default={}) edges: list[Edge] = Field(description="边列表", default=[]) debug: bool = Field(description="是否经过调试", default=False) -- Gitee