From 7f677f120b9a3f7eca1dbaf75a7f5c6b6805ab62 Mon Sep 17 00:00:00 2001 From: z30057876 Date: Thu, 7 Aug 2025 15:26:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=AD=A3Choice=E5=B7=A5=E5=85=B7?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/scheduler/call/choice/choice.py | 77 ++++---- .../call/choice/condition_handler.py | 176 ++++++++++-------- apps/scheduler/call/choice/schema.py | 13 +- 3 files changed, 148 insertions(+), 118 deletions(-) diff --git a/apps/scheduler/call/choice/choice.py b/apps/scheduler/call/choice/choice.py index 01ac7106..f294c9cb 100644 --- a/apps/scheduler/call/choice/choice.py +++ b/apps/scheduler/call/choice/choice.py @@ -11,15 +11,15 @@ from pydantic import Field from apps.scheduler.call.choice.condition_handler import ConditionHandler from apps.scheduler.call.choice.schema import ( - Condition, ChoiceBranch, ChoiceInput, ChoiceOutput, + Condition, Logic, ) -from apps.schemas.parameters import Type from apps.scheduler.call.core import CoreCall from apps.schemas.enum_var import CallOutputType +from apps.schemas.parameters import Type from apps.schemas.scheduler import ( CallError, CallInfo, @@ -42,7 +42,7 @@ class Choice(CoreCall, input_model=ChoiceInput, output_model=ChoiceOutput): """返回Call的名称和描述""" return CallInfo(name="选择器", description="使用大模型或使用程序做出判断") - async def _prepare_message(self, call_vars: CallVars) -> list[dict[str, Any]]: + async def _prepare_message(self, call_vars: CallVars) -> list[ChoiceBranch]: # noqa: C901, PLR0912, PLR0915 """替换choices中的系统变量""" valid_choices = [] @@ -50,8 +50,8 @@ class Choice(CoreCall, input_model=ChoiceInput, output_model=ChoiceOutput): try: # 验证逻辑运算符 if choice.logic not in [Logic.AND, Logic.OR]: - msg = f"无效的逻辑运算符: {choice.logic}" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + msg = f"[Choice] 分支 {choice.branch_id} 条件处理失败:无效的逻辑运算符:{choice.logic}" + logger.warning(msg) continue valid_conditions = [] @@ -60,62 +60,74 @@ class Choice(CoreCall, input_model=ChoiceInput, output_model=ChoiceOutput): # 处理左值 if condition.left.step_id is not None: condition.left.value = self._extract_history_variables( - condition.left.step_id+'/'+condition.left.value, call_vars.history) + f"{condition.left.step_id}/{condition.left.value}", call_vars.history) # 检查历史变量是否成功提取 if condition.left.value is None: - msg = f"步骤 {condition.left.step_id} 的历史变量不存在" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + msg = (f"[Choice] 分支 {choice.branch_id} 条件处理失败:" + f"步骤 {condition.left.step_id} 的历史变量不存在") + logger.warning(msg) continue - if not ConditionHandler.check_value_type( - condition.left.value, condition.left.type): - msg = f"左值类型不匹配: {condition.left.value} 应为 {condition.left.type.value}" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + if not ConditionHandler.check_value_type(condition.left, condition.left.type): + msg = (f"[Choice] 分支 {choice.branch_id} 条件处理失败:" + f"左值类型不匹配:{condition.left.value}" + f"应为 {condition.left.type.value if condition.left.type else 'None'}") + logger.warning(msg) continue else: - msg = "左侧变量缺少step_id" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + msg = f"[Choice] 分支 {choice.branch_id} 条件处理失败:左侧变量缺少step_id" + logger.warning(msg) continue # 处理右值 if condition.right.step_id is not None: condition.right.value = self._extract_history_variables( - condition.right.step_id+'/'+condition.right.value, call_vars.history) + f"{condition.right.step_id}/{condition.right.value}", call_vars.history, + ) # 检查历史变量是否成功提取 if condition.right.value is None: - msg = f"步骤 {condition.right.step_id} 的历史变量不存在" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + msg = (f"[Choice] 分支 {choice.branch_id} 条件处理失败:" + f"步骤 {condition.right.step_id} 的历史变量不存在") + logger.warning(msg) continue if not ConditionHandler.check_value_type( - condition.right.value, condition.right.type): - msg = f"右值类型不匹配: {condition.right.value} 应为 {condition.right.type.value}" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + condition.right, condition.right.type, + ): + msg = (f"[Choice] 分支 {choice.branch_id} 条件处理失败:" + f"右值类型不匹配:{condition.right.value}" + f"应为 {condition.right.type.value if condition.right.type else 'None'}") + logger.warning(msg) continue else: # 如果右值没有step_id,尝试从call_vars中获取 right_value_type = await ConditionHandler.get_value_type_from_operate( - condition.operate) + condition.operate, + ) if right_value_type is None: - msg = f"不支持的运算符: {condition.operate}" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + msg = f"[Choice] 分支 {choice.branch_id} 条件处理失败:不支持的运算符:{condition.operate}" + logger.warning(msg) continue if condition.right.type != right_value_type: - msg = f"右值类型不匹配: {condition.right.value} 应为 {right_value_type.value}" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + msg = (f"[Choice] 分支 {choice.branch_id} 条件处理失败:" + f"右值类型不匹配:{condition.right.value} 应为 {right_value_type.value}") + logger.warning(msg) continue if right_value_type == Type.STRING: condition.right.value = str(condition.right.value) else: condition.right.value = ast.literal_eval(condition.right.value) if not ConditionHandler.check_value_type( - condition.right.value, condition.right.type): - msg = f"右值类型不匹配: {condition.right.value} 应为 {condition.right.type.value}" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + condition.right, condition.right.type, + ): + msg = (f"[Choice] 分支 {choice.branch_id} 条件处理失败:" + f"右值类型不匹配:{condition.right.value}" + f"应为 {condition.right.type.value if condition.right.type else 'None'}") + logger.warning(msg) continue valid_conditions.append(condition) # 如果所有条件都无效,抛出异常 if not valid_conditions and not choice.is_default: - msg = "分支没有有效条件" - logger.warning(f"[Choice] 分支 {choice.branch_id} 条件处理失败: {msg}") + msg = f"[Choice] 分支 {choice.branch_id} 条件处理失败:没有有效条件" + logger.warning(msg) continue # 更新有效条件 @@ -123,7 +135,8 @@ class Choice(CoreCall, input_model=ChoiceInput, output_model=ChoiceOutput): valid_choices.append(choice) except ValueError as e: - logger.warning("分支 %s 处理失败: %s,已跳过", choice.branch_id, str(e)) + msg = f"[Choice] 分支 {choice.branch_id} 处理失败:{e!s},已跳过" + logger.warning(msg) continue return valid_choices @@ -135,7 +148,7 @@ class Choice(CoreCall, input_model=ChoiceInput, output_model=ChoiceOutput): ) async def _exec( - self, input_data: dict[str, Any] + self, input_data: dict[str, Any], ) -> AsyncGenerator[CallOutputChunk, None]: """执行Choice工具""" # 解析输入数据 diff --git a/apps/scheduler/call/choice/condition_handler.py b/apps/scheduler/call/choice/condition_handler.py index 6f10f2c8..3c1354c9 100644 --- a/apps/scheduler/call/choice/condition_handler.py +++ b/apps/scheduler/call/choice/condition_handler.py @@ -1,25 +1,24 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """处理条件分支的工具""" - import logging +import re from pydantic import BaseModel -from apps.schemas.parameters import ( - Type, - NumberOperate, - StringOperate, - ListOperate, - BoolOperate, - DictOperate, -) - from apps.scheduler.call.choice.schema import ( ChoiceBranch, Condition, Logic, - Value + Value, +) +from apps.schemas.parameters import ( + BoolOperate, + DictOperate, + ListOperate, + NumberOperate, + StringOperate, + Type, ) logger = logging.getLogger(__name__) @@ -27,9 +26,11 @@ logger = logging.getLogger(__name__) class ConditionHandler(BaseModel): """条件分支处理器""" + @staticmethod - async def get_value_type_from_operate(operate: NumberOperate | StringOperate | ListOperate | - BoolOperate | DictOperate) -> Type: + async def get_value_type_from_operate( # noqa: PLR0911 + operate: NumberOperate | StringOperate | ListOperate | BoolOperate | DictOperate | None, + ) -> Type | None: """获取右值的类型""" if isinstance(operate, NumberOperate): return Type.NUMBER @@ -58,7 +59,7 @@ class ConditionHandler(BaseModel): return None @staticmethod - def check_value_type(value: Value, expected_type: Type) -> bool: + def check_value_type(value: Value, expected_type: Type | None) -> bool: """检查值的类型是否符合预期""" if expected_type == Type.STRING and isinstance(value.value, str): return True @@ -68,14 +69,11 @@ class ConditionHandler(BaseModel): return True if expected_type == Type.DICT and isinstance(value.value, dict): return True - if expected_type == Type.BOOL and isinstance(value.value, bool): - return True - return False + return bool(expected_type == Type.BOOL and isinstance(value.value, bool)) @staticmethod def handler(choices: list[ChoiceBranch]) -> str: """处理条件""" - for block_judgement in choices[::-1]: results = [] if block_judgement.is_default: @@ -85,7 +83,8 @@ class ConditionHandler(BaseModel): if result is not None: results.append(result) if not results: - logger.warning(f"[Choice] 分支 {block_judgement.branch_id} 条件处理失败: 没有有效的条件") + err = f"[Choice] 分支 {block_judgement.branch_id} 条件处理失败: 没有有效的条件" + logger.warning(err) continue if block_judgement.logic == Logic.AND: final_result = all(results) @@ -94,7 +93,6 @@ class ConditionHandler(BaseModel): if final_result: return block_judgement.branch_id - return "" @staticmethod @@ -112,27 +110,27 @@ class ConditionHandler(BaseModel): left = condition.left operate = condition.operate right = condition.right - value_type = condition.type + value_type = condition.left.type - result = None - if value_type == Type.STRING: + result = False + if value_type == Type.STRING and isinstance(operate, StringOperate): result = ConditionHandler._judge_string_condition(left, operate, right) - elif value_type == Type.NUMBER: + elif value_type == Type.NUMBER and isinstance(operate, NumberOperate): result = ConditionHandler._judge_number_condition(left, operate, right) - elif value_type == Type.BOOL: + elif value_type == Type.BOOL and isinstance(operate, BoolOperate): result = ConditionHandler._judge_bool_condition(left, operate, right) - elif value_type == Type.LIST: + elif value_type == Type.LIST and isinstance(operate, ListOperate): result = ConditionHandler._judge_list_condition(left, operate, right) - elif value_type == Type.DICT: + elif value_type == Type.DICT and isinstance(operate, DictOperate): result = ConditionHandler._judge_dict_condition(left, operate, right) else: - msg = f"不支持的数据类型: {value_type}" - logger.error(f"[Choice] 条件处理失败: {msg}") - return None + msg = f"[Choice] 条件处理失败: 不支持的数据类型: {value_type}" + logger.error(msg) + return False return result @staticmethod - def _judge_string_condition(left: Value, operate: StringOperate, right: Value) -> bool: + def _judge_string_condition(left: Value, operate: StringOperate, right: Value) -> bool: # noqa: C901, PLR0911, PLR0912 """ 判断字符串类型的条件。 @@ -149,33 +147,37 @@ class ConditionHandler(BaseModel): if not isinstance(left_value, str): msg = f"左值必须是字符串类型 ({left_value})" logger.warning(msg) - return None + return False right_value = right.value + if not isinstance(right_value, str): + msg = f"右值必须是字符串类型 ({right_value})" + logger.warning(msg) + return False + if operate == StringOperate.EQUAL: return left_value == right_value - elif operate == StringOperate.NOT_EQUAL: + if operate == StringOperate.NOT_EQUAL: return left_value != right_value - elif operate == StringOperate.CONTAINS: + if operate == StringOperate.CONTAINS: return right_value in left_value - elif operate == StringOperate.NOT_CONTAINS: + if operate == StringOperate.NOT_CONTAINS: return right_value not in left_value - elif operate == StringOperate.STARTS_WITH: + if operate == StringOperate.STARTS_WITH: return left_value.startswith(right_value) - elif operate == StringOperate.ENDS_WITH: + if operate == StringOperate.ENDS_WITH: return left_value.endswith(right_value) - elif operate == StringOperate.REGEX_MATCH: - import re + if operate == StringOperate.REGEX_MATCH: return bool(re.match(right_value, left_value)) - elif operate == StringOperate.LENGTH_EQUAL: + if operate == StringOperate.LENGTH_EQUAL: return len(left_value) == right_value - elif operate == StringOperate.LENGTH_GREATER_THAN: - return len(left_value) > right_value - elif operate == StringOperate.LENGTH_GREATER_THAN_OR_EQUAL: - return len(left_value) >= right_value - elif operate == StringOperate.LENGTH_LESS_THAN: - return len(left_value) < right_value - elif operate == StringOperate.LENGTH_LESS_THAN_OR_EQUAL: - return len(left_value) <= right_value + if operate == StringOperate.LENGTH_GREATER_THAN: + return len(left_value) > len(right_value) + if operate == StringOperate.LENGTH_GREATER_THAN_OR_EQUAL: + return len(left_value) >= len(right_value) + if operate == StringOperate.LENGTH_LESS_THAN: + return len(left_value) < len(right_value) + if operate == StringOperate.LENGTH_LESS_THAN_OR_EQUAL: + return len(left_value) <= len(right_value) return False @staticmethod @@ -196,19 +198,24 @@ class ConditionHandler(BaseModel): if not isinstance(left_value, (int, float)): msg = f"左值必须是数字类型 ({left_value})" logger.warning(msg) - return None + return False right_value = right.value + if not isinstance(right_value, (int, float)): + msg = f"右值必须是数字类型 ({right_value})" + logger.warning(msg) + return False + if operate == NumberOperate.EQUAL: return left_value == right_value - elif operate == NumberOperate.NOT_EQUAL: + if operate == NumberOperate.NOT_EQUAL: return left_value != right_value - elif operate == NumberOperate.GREATER_THAN: + if operate == NumberOperate.GREATER_THAN: return left_value > right_value - elif operate == NumberOperate.LESS_THAN: # noqa: PLR2004 + if operate == NumberOperate.LESS_THAN: return left_value < right_value - elif operate == NumberOperate.GREATER_THAN_OR_EQUAL: + if operate == NumberOperate.GREATER_THAN_OR_EQUAL: return left_value >= right_value - elif operate == NumberOperate.LESS_THAN_OR_EQUAL: + if operate == NumberOperate.LESS_THAN_OR_EQUAL: return left_value <= right_value return False @@ -230,20 +237,21 @@ class ConditionHandler(BaseModel): if not isinstance(left_value, bool): msg = "左值必须是布尔类型" logger.warning(msg) - return None + return False right_value = right.value + if not isinstance(right_value, bool): + msg = "右值必须是布尔类型" + logger.warning(msg) + return False + if operate == BoolOperate.EQUAL: return left_value == right_value - elif operate == BoolOperate.NOT_EQUAL: + if operate == BoolOperate.NOT_EQUAL: return left_value != right_value - elif operate == BoolOperate.IS_EMPTY: - return not left_value - elif operate == BoolOperate.NOT_EMPTY: - return left_value return False @staticmethod - def _judge_list_condition(left: Value, operate: ListOperate, right: Value): + def _judge_list_condition(left: Value, operate: ListOperate, right: Value) -> bool: # noqa: C901, PLR0911 """ 判断列表类型的条件。 @@ -260,30 +268,35 @@ class ConditionHandler(BaseModel): if not isinstance(left_value, list): msg = f"左值必须是列表类型 ({left_value})" logger.warning(msg) - return None + return False right_value = right.value + if not isinstance(right_value, list): + msg = f"右值必须是列表类型 ({right_value})" + logger.warning(msg) + return False + if operate == ListOperate.EQUAL: return left_value == right_value - elif operate == ListOperate.NOT_EQUAL: + if operate == ListOperate.NOT_EQUAL: return left_value != right_value - elif operate == ListOperate.CONTAINS: + if operate == ListOperate.CONTAINS: return right_value in left_value - elif operate == ListOperate.NOT_CONTAINS: + if operate == ListOperate.NOT_CONTAINS: return right_value not in left_value - elif operate == ListOperate.LENGTH_EQUAL: + if operate == ListOperate.LENGTH_EQUAL: return len(left_value) == right_value - elif operate == ListOperate.LENGTH_GREATER_THAN: - return len(left_value) > right_value - elif operate == ListOperate.LENGTH_GREATER_THAN_OR_EQUAL: - return len(left_value) >= right_value - elif operate == ListOperate.LENGTH_LESS_THAN: - return len(left_value) < right_value - elif operate == ListOperate.LENGTH_LESS_THAN_OR_EQUAL: - return len(left_value) <= right_value + if operate == ListOperate.LENGTH_GREATER_THAN: + return len(left_value) > len(right_value) + if operate == ListOperate.LENGTH_GREATER_THAN_OR_EQUAL: + return len(left_value) >= len(right_value) + if operate == ListOperate.LENGTH_LESS_THAN: + return len(left_value) < len(right_value) + if operate == ListOperate.LENGTH_LESS_THAN_OR_EQUAL: + return len(left_value) <= len(right_value) return False @staticmethod - def _judge_dict_condition(left: Value, operate: DictOperate, right: Value): + def _judge_dict_condition(left: Value, operate: DictOperate, right: Value) -> bool: # noqa: PLR0911 """ 判断字典类型的条件。 @@ -300,14 +313,19 @@ class ConditionHandler(BaseModel): if not isinstance(left_value, dict): msg = f"左值必须是字典类型 ({left_value})" logger.warning(msg) - return None + return False right_value = right.value + if not isinstance(right_value, dict): + msg = f"右值必须是字典类型 ({right_value})" + logger.warning(msg) + return False + if operate == DictOperate.EQUAL: return left_value == right_value - elif operate == DictOperate.NOT_EQUAL: + if operate == DictOperate.NOT_EQUAL: return left_value != right_value - elif operate == DictOperate.CONTAINS_KEY: + if operate == DictOperate.CONTAINS_KEY: return right_value in left_value - elif operate == DictOperate.NOT_CONTAINS_KEY: + if operate == DictOperate.NOT_CONTAINS_KEY: return right_value not in left_value return False diff --git a/apps/scheduler/call/choice/schema.py b/apps/scheduler/call/choice/schema.py index d97a0c8d..95532270 100644 --- a/apps/scheduler/call/choice/schema.py +++ b/apps/scheduler/call/choice/schema.py @@ -1,20 +1,19 @@ # Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. """Choice Call的输入和输出""" import uuid - from enum import Enum -from pydantic import BaseModel, Field +from pydantic import Field +from apps.scheduler.call.core import DataBase from apps.schemas.parameters import ( - Type, - NumberOperate, - StringOperate, - ListOperate, BoolOperate, DictOperate, + ListOperate, + NumberOperate, + StringOperate, + Type, ) -from apps.scheduler.call.core import DataBase class Logic(str, Enum): -- Gitee