From c1339a2ca9024b92ae79e29d487d377cd8f98a70 Mon Sep 17 00:00:00 2001 From: Ethan-Zhang Date: Mon, 20 Oct 2025 20:13:49 +0800 Subject: [PATCH] =?UTF-8?q?Fix:=20=E5=B7=A5=E4=BD=9C=E6=B5=81=E7=BC=96?= =?UTF-8?q?=E6=8E=92=E6=8F=92=E4=BB=B6=E7=B1=BB=E5=9E=8B=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=AD=98=E5=82=A8schema=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/call/plugin.py | 58 ++++++++++++++++++++++++++++++++++ apps/scheduler/pool/pool.py | 11 +++++++ apps/schemas/enum_var.py | 1 + apps/schemas/flow.py | 2 ++ apps/schemas/flow_topology.py | 1 + apps/services/flow.py | 14 ++++++++ apps/services/flow_validate.py | 27 +++++++++++++--- apps/services/node.py | 41 ++++++++++++++++++++++++ 8 files changed, 151 insertions(+), 4 deletions(-) create mode 100644 apps/scheduler/call/plugin.py diff --git a/apps/scheduler/call/plugin.py b/apps/scheduler/call/plugin.py new file mode 100644 index 00000000..87651950 --- /dev/null +++ b/apps/scheduler/call/plugin.py @@ -0,0 +1,58 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""API插件Call""" + +from collections.abc import AsyncGenerator +from typing import Any, ClassVar + +from apps.scheduler.call.core import CoreCall, DataBase +from apps.schemas.enum_var import CallOutputType, CallType, LanguageType +from apps.schemas.scheduler import CallInfo, CallOutputChunk, CallVars + + +class Plugin(CoreCall, input_model=DataBase, output_model=DataBase): + """API插件Call""" + i18n_info: ClassVar[dict[str, dict]] = { + LanguageType.CHINESE: { + "name": "API插件节点", + "type": CallType.DEFAULT, + "description": "API插件节点,用于调用外部API服务", + }, + LanguageType.ENGLISH: { + "name": "API Plugin Node", + "type": CallType.DEFAULT, + "description": "API Plugin node for calling external API services", + }, + } + + async def _init(self, call_vars: CallVars) -> DataBase: + """ + 初始化Call + + :param CallVars call_vars: 由Executor传入的变量,包含当前运行信息 + :return: Call的输入 + :rtype: DataBase + """ + return DataBase() + + + async def _exec( + self, input_data: dict[str, Any], language: LanguageType = LanguageType.CHINESE + ) -> AsyncGenerator[CallOutputChunk, None]: + """ + 执行Call + + :param dict[str, Any] input_data: 填充后的Call的最终输入 + :return: Call的输出 + :rtype: AsyncGenerator[CallOutputChunk, None] + """ + # API插件节点的执行逻辑 + # 这里可以根据serviceId调用相应的API服务 + # 目前先返回空结果,表示插件不可用 + output = CallOutputChunk( + type=CallOutputType.DATA, + content={ + "message": "API插件节点不可用,请检查插件配置", + "status": "unavailable" + } + ) + yield output diff --git a/apps/scheduler/pool/pool.py b/apps/scheduler/pool/pool.py index ead552fc..52e740f3 100644 --- a/apps/scheduler/pool/pool.py +++ b/apps/scheduler/pool/pool.py @@ -154,6 +154,17 @@ class Pool: async def get_call(self, call_id: str) -> Any: """[Exception] 拿到Call的信息""" + # 特殊处理:Empty和Plugin类型的Call + from apps.schemas.enum_var import SpecialCallType + if call_id == SpecialCallType.EMPTY.value: + # 对于Empty类型的节点,返回一个空的Call类 + from apps.scheduler.call.empty import Empty + return Empty + elif call_id == SpecialCallType.PLUGIN.value: + # 对于Plugin类型的节点,返回API插件Call类 + from apps.scheduler.call.plugin import Plugin + return Plugin + # 从MongoDB里拿到数据 mongo = MongoDB() call_collection = mongo.get_collection("call") diff --git a/apps/schemas/enum_var.py b/apps/schemas/enum_var.py index 7d024c88..cb81f556 100644 --- a/apps/schemas/enum_var.py +++ b/apps/schemas/enum_var.py @@ -167,6 +167,7 @@ class SpecialCallType(str, Enum): """特殊Call类型""" EMPTY = "Empty" + PLUGIN = "Plugin" SUMMARY = "Summary" FACTS = "Facts" SLOT = "Slot" diff --git a/apps/schemas/flow.py b/apps/schemas/flow.py index 4dacb078..db7a63ee 100644 --- a/apps/schemas/flow.py +++ b/apps/schemas/flow.py @@ -43,6 +43,8 @@ class Step(BaseModel): description: str = Field(description="Step的描述") pos: PositionItem = Field(description="Step在画布上的位置", default=PositionItem(x=0, y=0)) params: dict[str, Any] = Field(description="用户手动指定的Node参数", default={}) + service_id: str = Field(description="Step的服务ID", default="") + plugin_type: str | None = Field(description="插件类型", default=None) class FlowError(BaseModel): diff --git a/apps/schemas/flow_topology.py b/apps/schemas/flow_topology.py index 2ed64a54..6ffb35de 100644 --- a/apps/schemas/flow_topology.py +++ b/apps/schemas/flow_topology.py @@ -60,6 +60,7 @@ class NodeItem(BaseModel): depedency: DependencyItem | None = None position: PositionItem = Field(default=PositionItem()) editable: bool = Field(default=True) + plugin_type: str | None = Field(alias="pluginType", default=None, description="插件类型") class EdgeItem(BaseModel): diff --git a/apps/services/flow.py b/apps/services/flow.py index 8f2928f1..3bc1b3e0 100644 --- a/apps/services/flow.py +++ b/apps/services/flow.py @@ -324,8 +324,13 @@ class FlowManager: "output_parameters": processed_output_parameters, } + # 从Step中读取serviceId和pluginType + service_id = getattr(node_config, 'service_id', "") + plugin_type = getattr(node_config, 'plugin_type', None) + node_item = NodeItem( stepId=node_id, + serviceId=service_id, nodeId=node_config.node, name=node_config.name, description=node_config.description, @@ -334,6 +339,7 @@ class FlowManager: callId=node_config.type, parameters=parameters, position=PositionItem(x=node_config.pos.x, y=node_config.pos.y), + pluginType=plugin_type, ) flow_item.nodes.append(node_item) @@ -479,6 +485,8 @@ class FlowManager: description=node_item.description, pos=node_item.position, params=params, + service_id=node_item.service_id, + plugin_type=node_item.plugin_type, ) # 检查是否有节点被删除,如果有则清理相关变量 @@ -874,6 +882,8 @@ class FlowManager: description=node_item.description, pos=node_item.position, params=params, + service_id=node_item.service_id, + plugin_type=node_item.plugin_type, ) for edge_item in flow_item.edges: @@ -979,6 +989,9 @@ class FlowManager: "output_parameters": processed_output_parameters, } + # 从Step中读取pluginType + plugin_type = getattr(node_config, 'plugin_type', None) + node_item = NodeItem( stepId=node_id, serviceId="", # 子工作流节点默认serviceId为空 @@ -990,6 +1003,7 @@ class FlowManager: callId=node_config.type, parameters=parameters, position=PositionItem(x=node_config.pos.x, y=node_config.pos.y), + pluginType=plugin_type, ) flow_item.nodes.append(node_item) diff --git a/apps/services/flow_validate.py b/apps/services/flow_validate.py index 88b6b789..786eb03b 100644 --- a/apps/services/flow_validate.py +++ b/apps/services/flow_validate.py @@ -40,13 +40,32 @@ class FlowService: for node in flow_item.nodes: from apps.scheduler.pool.pool import Pool from pydantic import BaseModel - if node.node_id != 'start' and node.node_id != 'end' and node.node_id != SpecialCallType.EMPTY.value: + if node.node_id != 'start' and node.node_id != 'end' and node.node_id != SpecialCallType.EMPTY.value and node.node_id != SpecialCallType.PLUGIN.value: try: await Pool().get_call(node.call_id) except Exception as e: - node.node_id = SpecialCallType.EMPTY.value - node.description = '【对应的api工具被删除!节点不可用!请联系相关人员!】\n\n'+node.description - logger.error(f"[FlowService] 获取步骤的call_id失败{node.call_id}由于:{e}") + # 保存原始的serviceId和callId,以便API插件节点能够正确识别 + original_service_id = node.service_id + original_call_id = node.call_id + + # 根据是否有serviceId来判断是API插件还是普通的Empty节点 + if original_service_id and original_service_id.strip(): + # 有serviceId,标记为Plugin类型(API插件节点) + node.node_id = SpecialCallType.PLUGIN.value + node.call_id = SpecialCallType.PLUGIN.value + node.service_id = original_service_id + logger.info(f"[FlowService] 将节点 {original_call_id} 标记为API插件节点,serviceId: {original_service_id}") + else: + # 没有serviceId,标记为Empty类型 + node.node_id = SpecialCallType.EMPTY.value + node.call_id = SpecialCallType.EMPTY.value + logger.info(f"[FlowService] 将节点 {original_call_id} 标记为Empty节点") + + # 更新描述信息,保留原有描述 + original_description = node.description or "" + node.description = f'【对应的api工具被删除!节点不可用!请联系相关人员!】\n\n{original_description}' + + logger.error(f"[FlowService] 获取步骤的call_id失败 {original_call_id},错误: {e}") node_branch_map[node.step_id] = set() if node.call_id == NodeType.CHOICE.value: input_parameters = node.parameters["input_parameters"] diff --git a/apps/services/node.py b/apps/services/node.py index bd98a41b..a09af2e4 100644 --- a/apps/services/node.py +++ b/apps/services/node.py @@ -24,6 +24,13 @@ class NodeManager: @staticmethod async def get_node_call_id(node_id: str) -> str: """获取Node的call_id""" + # 特殊处理:对于特殊节点类型,直接返回对应的call_id + if node_id == SpecialCallType.EMPTY.value: + return SpecialCallType.EMPTY.value + elif node_id == SpecialCallType.PLUGIN.value: + return SpecialCallType.PLUGIN.value + + # 其他节点类型:从数据库查询 node_collection = MongoDB().get_collection("node") node = await node_collection.find_one({"_id": node_id}, {"call_id": 1}) if not node: @@ -83,6 +90,40 @@ class NodeManager: # 如果是空节点,返回空Schema return {}, {} + if node_id == SpecialCallType.PLUGIN.value: + # 如果是Plugin节点,返回API插件的默认Schema + return { + "type": "object", + "properties": { + "api_url": { + "type": "string", + "description": "API接口地址" + }, + "method": { + "type": "string", + "description": "HTTP请求方法", + "default": "GET" + }, + "headers": { + "type": "object", + "description": "请求头" + }, + "parameters": { + "type": "object", + "description": "请求参数" + } + } + }, { + "response": { + "type": "object", + "description": "API响应结果" + }, + "status_code": { + "type": "integer", + "description": "HTTP状态码" + } + } + # 查找Node信息 logger.info("[NodeManager] 获取节点 %s", node_id) node_collection = MongoDB().get_collection("node") -- Gitee