diff --git a/apps/main.py b/apps/main.py index 717311037ee04f7023d448370ee0c72de413b16b..d0aea344f570fc5fcb923a23a767362b92e8f347 100644 --- a/apps/main.py +++ b/apps/main.py @@ -40,7 +40,7 @@ from apps.routers import ( record, service, user, - parameter + variable ) from apps.scheduler.pool.pool import Pool from apps.services.predecessor_cache_service import cleanup_background_tasks @@ -121,7 +121,7 @@ app.include_router(llm.router) app.include_router(mcp_service.router) app.include_router(flow.router) app.include_router(user.router) -app.include_router(parameter.router) +app.include_router(variable.router) # logger配置 LOGGER_FORMAT = "%(funcName)s() - %(message)s" diff --git a/apps/routers/parameter.py b/apps/routers/parameter.py deleted file mode 100644 index 6edbe2e142cb6589be8947c28ae2eb4a7287baa1..0000000000000000000000000000000000000000 --- a/apps/routers/parameter.py +++ /dev/null @@ -1,77 +0,0 @@ -from typing import Annotated - -from fastapi import APIRouter, Depends, Query, status -from fastapi.responses import JSONResponse - -from apps.dependency import get_user -from apps.dependency.user import verify_user -from apps.services.parameter import ParameterManager -from apps.schemas.response_data import ( - GetOperaRsp, - GetParamsRsp -) -from apps.services.application import AppManager -from apps.services.flow import FlowManager - -router = APIRouter( - prefix="/api/parameter", - tags=["parameter"], - dependencies=[ - Depends(verify_user), - ], -) - - -@router.get("", response_model=GetParamsRsp) -async def get_parameters( - user_sub: Annotated[str, Depends(get_user)], - app_id: Annotated[str, Query(alias="appId")], - flow_id: Annotated[str, Query(alias="flowId")], - step_id: Annotated[str, Query(alias="stepId")], -) -> JSONResponse: - """Get parameters for node choice.""" - if not await AppManager.validate_user_app_access(user_sub, app_id): - return JSONResponse( - status_code=status.HTTP_403_FORBIDDEN, - content=GetParamsRsp( - code=status.HTTP_403_FORBIDDEN, - message="用户没有权限访问该流", - result=[], - ).model_dump(exclude_none=True, by_alias=True), - ) - flow = await FlowManager.get_flow_by_app_and_flow_id(app_id, flow_id) - if not flow: - return JSONResponse( - status_code=status.HTTP_404_NOT_FOUND, - content=GetParamsRsp( - code=status.HTTP_404_NOT_FOUND, - message="未找到该流", - result=[], - ).model_dump(exclude_none=True, by_alias=True), - ) - result = await ParameterManager.get_pre_params_by_flow_and_step_id(flow, step_id) - return JSONResponse( - status_code=status.HTTP_200_OK, - content=GetParamsRsp( - code=status.HTTP_200_OK, - message="获取参数成功", - result=result - ).model_dump(exclude_none=True, by_alias=True), - ) - - -@router.get("/operate", response_model=GetOperaRsp) -async def get_operate_parameters( - user_sub: Annotated[str, Depends(get_user)], - param_type: Annotated[str, Query(alias="ParamType")], -) -> JSONResponse: - """Get parameters for node choice.""" - result = await ParameterManager.get_operate_and_bind_type(param_type) - return JSONResponse( - status_code=status.HTTP_200_OK, - content=GetOperaRsp( - code=status.HTTP_200_OK, - message="获取操作成功", - result=result - ).model_dump(exclude_none=True, by_alias=True), - ) diff --git a/apps/scheduler/call/choice/choice.py b/apps/scheduler/call/choice/choice.py index 8cab828889db4b0d97ff87318760adb507a9f3b1..64f44d961f3c7c414920e40b7fb712efd366d78d 100644 --- a/apps/scheduler/call/choice/choice.py +++ b/apps/scheduler/call/choice/choice.py @@ -16,10 +16,11 @@ from apps.scheduler.call.choice.schema import ( ChoiceInput, ChoiceOutput, Logic, + Value, ) -from apps.schemas.parameters import Type +from apps.schemas.parameters import ValueType from apps.scheduler.call.core import CoreCall -from apps.schemas.enum_var import CallOutputType +from apps.schemas.enum_var import CallOutputType, CallType from apps.schemas.scheduler import ( CallError, CallInfo, @@ -40,90 +41,141 @@ class Choice(CoreCall, input_model=ChoiceInput, output_model=ChoiceOutput): @classmethod def info(cls) -> CallInfo: """返回Call的名称和描述""" - return CallInfo(name="选择器", description="使用大模型或使用程序做出判断") + return CallInfo( + name="条件分支", + type=CallType.LOGIC, + description="使用大模型或使用程序做出条件判断,决定后续分支" + ) - async def _prepare_message(self, call_vars: CallVars) -> list[dict[str, Any]]: - """替换choices中的系统变量""" + def _validate_branch_logic(self, choice: ChoiceBranch) -> bool: + """验证分支的逻辑运算符是否有效 + + Args: + choice: 需要验证的分支 + + Returns: + bool: 逻辑运算符是否有效 + """ + if choice.logic not in [Logic.AND, Logic.OR]: + logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: 无效的逻辑运算符: {choice.logic}") + return False + return True + + async def _process_condition_value(self, value: Value, call_vars: CallVars, + branch_id: str, value_position: str) -> tuple[bool, str]: + """处理条件值(左值或右值) + + Args: + value: 需要处理的值对象 + call_vars: Call变量 + branch_id: 分支ID,用于日志记录 + value_position: 值的位置描述("左值"或"右值") + + Returns: + tuple[bool, str]: (是否成功, 错误消息) + """ + # 处理reference类型 + if value.type == ValueType.REFERENCE: + try: + value.value = await self._resolve_variables_in_config(value.value, call_vars) + except Exception as e: + return False, f"{value_position}引用解析失败: {e}" + + # 检查解析后的值类型 + if value.value is None: + return False, f"{value_position}引用解析后为空" + + # 验证值类型 + if not ConditionHandler.check_value_type(value.value, value.type): + return False, f"{value_position}类型不匹配: {value.value} 应为 {value.type.value}" + + return True, "" + + async def _process_condition(self, condition: Condition, call_vars: CallVars, + branch_id: str) -> tuple[bool, str]: + """处理单个条件 + + Args: + condition: 需要处理的条件 + call_vars: Call变量 + branch_id: 分支ID,用于日志记录 + + Returns: + tuple[bool, str]: (是否成功, 错误消息) + """ + # 处理左值 + success, error_msg = await self._process_condition_value( + condition.left, call_vars, branch_id, "左值") + if not success: + return False, error_msg + + # 处理右值 + success, error_msg = await self._process_condition_value( + condition.right, call_vars, branch_id, "右值") + if not success: + return False, error_msg + + return True, "" + + async def _process_branch(self, choice: ChoiceBranch, call_vars: CallVars) -> tuple[bool, list[str]]: + """处理单个分支 + + Args: + choice: 需要处理的分支 + call_vars: Call变量 + + Returns: + tuple[bool, list[str]]: (是否成功, 错误消息列表) + """ + # 验证逻辑运算符 + if not self._validate_branch_logic(choice): + return False, ["无效的逻辑运算符"] + + valid_conditions = [] + error_messages = [] + + # 处理每个条件 + for condition_original in choice.conditions: + condition = copy.deepcopy(condition_original) + success, error_msg = await self._process_condition(condition, call_vars, choice.branch_id) + + if success: + valid_conditions.append(condition) + else: + error_messages.append(error_msg) + logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {error_msg}") + + # 如果没有有效条件,返回失败 + if not valid_conditions: + error_messages.append("分支没有有效条件") + return False, error_messages + + # 更新有效条件 + choice.conditions = valid_conditions + return True, [] + + async def _prepare_message(self, call_vars: CallVars) -> list[ChoiceBranch]: + """替换choices中的系统变量 + + Args: + call_vars: Call变量 + + Returns: + list[ChoiceBranch]: 处理后的有效分支列表 + """ valid_choices = [] for choice in self.choices: try: - # 验证逻辑运算符 - if choice.logic not in [Logic.AND, Logic.OR]: - msg = f"无效的逻辑运算符: {choice.logic}" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") - continue - - valid_conditions = [] - for i in range(len(choice.conditions)): - condition = copy.deepcopy(choice.conditions[i]) - # 处理左值 - if condition.left.step_id is not None: - condition.left.value = self._extract_history_variables( - condition.left.step_id+'/'+condition.left.value, call_vars.history) - # 检查历史变量是否成功提取 - if condition.left.value is None: - msg = f"步骤 {condition.left.step_id} 的历史变量不存在" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") - continue - if not ConditionHandler.check_value_type( - condition.left.value, condition.left.type): - msg = f"左值类型不匹配: {condition.left.value} 应为 {condition.left.type.value}" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") - continue - else: - msg = "左侧变量缺少step_id" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") - continue - # 处理右值 - if condition.right.step_id is not None: - condition.right.value = self._extract_history_variables( - condition.right.step_id+'/'+condition.right.value, call_vars.history) - # 检查历史变量是否成功提取 - if condition.right.value is None: - msg = f"步骤 {condition.right.step_id} 的历史变量不存在" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") - continue - if not ConditionHandler.check_value_type( - condition.right.value, condition.right.type): - msg = f"右值类型不匹配: {condition.right.value} 应为 {condition.right.type.value}" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") - continue - else: - # 如果右值没有step_id,尝试从call_vars中获取 - right_value_type = await ConditionHandler.get_value_type_from_operate( - condition.operate) - if right_value_type is None: - msg = f"不支持的运算符: {condition.operate}" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") - continue - if condition.right.type != right_value_type: - msg = f"右值类型不匹配: {condition.right.value} 应为 {right_value_type.value}" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") - continue - if right_value_type == Type.STRING: - condition.right.value = str(condition.right.value) - else: - condition.right.value = ast.literal_eval(condition.right.value) - if not ConditionHandler.check_value_type( - condition.right.value, condition.right.type): - msg = f"右值类型不匹配: {condition.right.value} 应为 {condition.right.type.value}" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") - continue - valid_conditions.append(condition) - - # 如果所有条件都无效,抛出异常 - if not valid_conditions: - msg = "分支没有有效条件" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") - continue - - # 更新有效条件 - choice.conditions = valid_conditions - valid_choices.append(choice) - - except ValueError as e: - logger.warning("分支 %s 处理失败: %s,已跳过", choice.branch_id, str(e)) + success, error_messages = await self._process_branch(choice, call_vars) + + if success: + valid_choices.append(choice) + else: + logger.warning(f"[Choice] 分支 {choice.branch_id} 处理失败: {'; '.join(error_messages)}") + + except Exception as e: + logger.warning(f"[Choice] 分支 {choice.branch_id} 处理失败: {e},已跳过") continue return valid_choices diff --git a/apps/scheduler/call/choice/condition_handler.py b/apps/scheduler/call/choice/condition_handler.py index 7542f2944eedff72b820a79725d64b5ddeba5a11..07e8a5e559c735cf15f878fd89e5cf59901e2d18 100644 --- a/apps/scheduler/call/choice/condition_handler.py +++ b/apps/scheduler/call/choice/condition_handler.py @@ -7,7 +7,7 @@ import logging from pydantic import BaseModel from apps.schemas.parameters import ( - Type, + ValueType, NumberOperate, StringOperate, ListOperate, @@ -29,46 +29,46 @@ class ConditionHandler(BaseModel): """条件分支处理器""" @staticmethod async def get_value_type_from_operate(operate: NumberOperate | StringOperate | ListOperate | - BoolOperate | DictOperate) -> Type: + BoolOperate | DictOperate) -> ValueType: """获取右值的类型""" if isinstance(operate, NumberOperate): - return Type.NUMBER + return ValueType.NUMBER if operate in [ StringOperate.EQUAL, StringOperate.NOT_EQUAL, StringOperate.CONTAINS, StringOperate.NOT_CONTAINS, StringOperate.STARTS_WITH, StringOperate.ENDS_WITH, StringOperate.REGEX_MATCH]: - return Type.STRING + return ValueType.STRING if operate in [StringOperate.LENGTH_EQUAL, StringOperate.LENGTH_GREATER_THAN, StringOperate.LENGTH_GREATER_THAN_OR_EQUAL, StringOperate.LENGTH_LESS_THAN, StringOperate.LENGTH_LESS_THAN_OR_EQUAL]: - return Type.NUMBER + return ValueType.NUMBER if operate in [ListOperate.EQUAL, ListOperate.NOT_EQUAL]: - return Type.LIST + return ValueType.LIST if operate in [ListOperate.CONTAINS, ListOperate.NOT_CONTAINS]: - return Type.STRING + return ValueType.STRING if operate in [ListOperate.LENGTH_EQUAL, ListOperate.LENGTH_GREATER_THAN, ListOperate.LENGTH_GREATER_THAN_OR_EQUAL, ListOperate.LENGTH_LESS_THAN, ListOperate.LENGTH_LESS_THAN_OR_EQUAL]: - return Type.NUMBER + return ValueType.NUMBER if operate in [BoolOperate.EQUAL, BoolOperate.NOT_EQUAL]: - return Type.BOOL + return ValueType.BOOL if operate in [DictOperate.EQUAL, DictOperate.NOT_EQUAL]: - return Type.DICT + return ValueType.DICT if operate in [DictOperate.CONTAINS_KEY, DictOperate.NOT_CONTAINS_KEY]: - return Type.STRING + return ValueType.STRING return None @staticmethod - def check_value_type(value: Value, expected_type: Type) -> bool: + def check_value_type(value: Value, expected_type: ValueType) -> bool: """检查值的类型是否符合预期""" - if expected_type == Type.STRING and isinstance(value.value, str): + if expected_type == ValueType.STRING and isinstance(value.value, str): return True - if expected_type == Type.NUMBER and isinstance(value.value, (int, float)): + if expected_type == ValueType.NUMBER and isinstance(value.value, (int, float)): return True - if expected_type == Type.LIST and isinstance(value.value, list): + if expected_type == ValueType.LIST and isinstance(value.value, list): return True - if expected_type == Type.DICT and isinstance(value.value, dict): + if expected_type == ValueType.DICT and isinstance(value.value, dict): return True - if expected_type == Type.BOOL and isinstance(value.value, bool): + if expected_type == ValueType.BOOL and isinstance(value.value, bool): return True return False @@ -115,15 +115,15 @@ class ConditionHandler(BaseModel): value_type = condition.type result = None - if value_type == Type.STRING: + if value_type == ValueType.STRING: result = ConditionHandler._judge_string_condition(left, operate, right) - elif value_type == Type.NUMBER: - result = ConditionHandler._judge_int_condition(left, operate, right) - elif value_type == Type.BOOL: + elif value_type == ValueType.NUMBER: + result = ConditionHandler._judge_number_condition(left, operate, right) + elif value_type == ValueType.BOOL: result = ConditionHandler._judge_bool_condition(left, operate, right) - elif value_type == Type.LIST: + elif value_type == ValueType.LIST: result = ConditionHandler._judge_list_condition(left, operate, right) - elif value_type == Type.DICT: + elif value_type == ValueType.DICT: result = ConditionHandler._judge_dict_condition(left, operate, right) else: logger.error("不支持的数据类型: %s", value_type) diff --git a/apps/scheduler/call/choice/schema.py b/apps/scheduler/call/choice/schema.py index b95b166879608bc431525958e7e120ad93c5e5a3..332a33c704a99e1791c78b4678e405a754e93fdc 100644 --- a/apps/scheduler/call/choice/schema.py +++ b/apps/scheduler/call/choice/schema.py @@ -4,15 +4,15 @@ import uuid from enum import Enum -from pydantic import BaseModel, Field +from pydantic import Field from apps.schemas.parameters import ( - Type, NumberOperate, StringOperate, ListOperate, BoolOperate, DictOperate, + ValueType, ) from apps.scheduler.call.core import DataBase @@ -27,8 +27,7 @@ class Logic(str, Enum): class Value(DataBase): """值的结构""" - step_id: str | None = Field(description="步骤id", default=None) - type: Type | None = Field(description="值的类型", default=None) + type: ValueType | None = Field(description="值的类型", default=None) value: str | float | int | bool | list | dict | None = Field(description="值", default=None) diff --git a/apps/scheduler/slot/slot.py b/apps/scheduler/slot/slot.py index 89433cade929ef6917664450bb3645500ed2df5a..4c9453c47710d7cd9998364559416551b33eb05d 100644 --- a/apps/scheduler/slot/slot.py +++ b/apps/scheduler/slot/slot.py @@ -13,7 +13,7 @@ from jsonschema.protocols import Validator from jsonschema.validators import extend from apps.schemas.response_data import ParamsNode -from apps.scheduler.call.choice.schema import Type +from apps.scheduler.call.choice.schema import ValueType from apps.scheduler.slot.parser import ( SlotConstParser, SlotDateParser, @@ -232,15 +232,15 @@ class Slot: param_type = schema_node["type"] if param_type == "object": - param_type = Type.DICT + param_type = ValueType.DICT elif param_type == "array": - param_type = Type.LIST + param_type = ValueType.LIST elif param_type == "string": - param_type = Type.STRING + param_type = ValueType.STRING elif param_type == "number": - param_type = Type.NUMBER + param_type = ValueType.NUMBER elif param_type == "boolean": - param_type = Type.BOOL + param_type = ValueType.BOOL else: logger.warning(f"[Slot] 不支持的参数类型: {param_type}") return None diff --git a/apps/schemas/parameters.py b/apps/schemas/parameters.py index bd908d2375415c798f4804c4eabde797f6a4f7a0..21f8aee9bb677f0189634473eb14ebe5a1aef810 100644 --- a/apps/schemas/parameters.py +++ b/apps/schemas/parameters.py @@ -59,11 +59,14 @@ class DictOperate(str, Enum): NOT_CONTAINS_KEY = "dict_not_contains_key" -class Type(str, Enum): - """Choice 工具支持的类型""" +class ValueType(str, Enum): + """Choice 工具逻辑表达式左右值支持的类型""" STRING = "string" NUMBER = "number" LIST = "list" DICT = "dict" BOOL = "bool" + + # 变量引用 + REFERENCE = "reference" \ No newline at end of file diff --git a/apps/schemas/response_data.py b/apps/schemas/response_data.py index b1dc77b75ba11943e149dc8c471bc3141d684442..eac430d11d48f1d0f9197706999af30aaf85d399 100644 --- a/apps/schemas/response_data.py +++ b/apps/schemas/response_data.py @@ -15,7 +15,7 @@ from apps.schemas.flow_topology import ( PositionItem, ) from apps.schemas.parameters import ( - Type, + ValueType, NumberOperate, StringOperate, ListOperate, @@ -642,7 +642,7 @@ class ParamsNode(BaseModel): """参数数据结构""" param_name: str = Field(..., description="参数名称", alias="paramName") param_path: str = Field(..., description="参数路径", alias="paramPath") - param_type: Type = Field(..., description="参数类型", alias="paramType") + param_type: ValueType = Field(..., description="参数类型", alias="paramType") sub_params: list["ParamsNode"] | None = Field( default=None, description="子参数列表", alias="subParams" ) @@ -668,7 +668,7 @@ class OperateAndBindType(BaseModel): """操作和绑定类型数据结构""" operate: NumberOperate | StringOperate | ListOperate | BoolOperate | DictOperate = Field(description="操作类型") - bind_type: Type = Field(description="绑定类型") + bind_type: ValueType = Field(description="绑定类型") class GetOperaRsp(ResponseData): diff --git a/apps/services/parameter.py b/apps/services/parameter.py deleted file mode 100644 index ae375e97fcf92bebfc8dca2b384ffcead4da58bf..0000000000000000000000000000000000000000 --- a/apps/services/parameter.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. -"""flow Manager""" - -import logging - -from pymongo import ASCENDING - -from apps.services.node import NodeManager -from apps.schemas.flow_topology import FlowItem -from apps.scheduler.slot.slot import Slot -from apps.scheduler.call.choice.condition_handler import ConditionHandler -from apps.scheduler.call.choice.schema import ( - NumberOperate, - StringOperate, - ListOperate, - BoolOperate, - DictOperate, - Type -) -from apps.schemas.response_data import ( - OperateAndBindType, - ParamsNode, - StepParams, -) -from apps.services.node import NodeManager -logger = logging.getLogger(__name__) - - -class ParameterManager: - """Parameter Manager""" - @staticmethod - async def get_operate_and_bind_type(param_type: Type) -> list[OperateAndBindType]: - """Get operate and bind type""" - result = [] - operate = None - if param_type == Type.NUMBER: - operate = NumberOperate - elif param_type == Type.STRING: - operate = StringOperate - elif param_type == Type.LIST: - operate = ListOperate - elif param_type == Type.BOOL: - operate = BoolOperate - elif param_type == Type.DICT: - operate = DictOperate - if operate: - for item in operate: - result.append(OperateAndBindType( - operate=item, - bind_type=ConditionHandler.get_value_type_from_operate(item))) - return result - - @staticmethod - async def get_pre_params_by_flow_and_step_id(flow: FlowItem, step_id: str) -> list[StepParams]: - """Get pre params by flow and step id""" - index = 0 - q = [step_id] - in_edges = {} - step_id_to_node_id = {} - for step in flow.nodes: - step_id_to_node_id[step.step_id] = step.node_id - for edge in flow.edges: - if edge.target_node not in in_edges: - in_edges[edge.target_node] = [] - in_edges[edge.target_node].append(edge.source_node) - while index < len(q): - tmp_step_id = q[index] - index += 1 - for i in range(len(in_edges.get(tmp_step_id, []))): - pre_node_id = in_edges[tmp_step_id][i] - if pre_node_id not in q: - q.append(pre_node_id) - pre_step_params = [] - for step_id in q: - node_id = step_id_to_node_id.get(step_id) - params_schema, output_schema = await NodeManager.get_node_params(node_id) - slot = Slot(output_schema) - params_node = slot.get_params_node_from_schema(root='/output') - pre_step_params.append( - StepParams( - stepId=node_id, - name=params_schema.get("name", ""), - paramsNode=params_node - ) - ) - return pre_step_params