From 783fb9f6633d8e42d49b97b9d636ae063cbc622b Mon Sep 17 00:00:00 2001 From: wtingkai <330445001@qq.com> Date: Tue, 19 Aug 2025 22:05:15 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(OmniAdvisor):=20=E9=80=9A=E8=BF=87?= =?UTF-8?q?=E4=BC=AA=E9=80=A0load=E7=9A=84=E6=96=B9=E5=BC=8F=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E5=90=8E=E5=8F=B0=E5=A4=8D=E6=B5=8B=E7=9A=84=E9=9A=94?= =?UTF-8?q?=E7=A6=BB=E9=98=9F=E5=88=97=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/omniadvisor/interface/hijack_recommend.py | 2 +- .../src/omniadvisor/repository/load_repository.py | 13 +++++++++++++ .../src/omniadvisor/service/retest_service.py | 10 +++++++++- .../omniadvisor/service/spark_service/spark_run.py | 12 ++---------- .../omniadvisor/interface/test_hijack_recommend.py | 2 +- .../service/spark_service/test_spark_run.py | 2 +- 6 files changed, 27 insertions(+), 14 deletions(-) diff --git a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py index 76dfde80f..3af0cb3a0 100644 --- a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py +++ b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py @@ -265,7 +265,7 @@ def hijack_recommend(argv: list) -> None: # 根据配置和负载执行Spark任务 global_logger.info("Going to execute Spark load ……") - exam_record, output = spark_run(load=load, config=exec_config, wait_for_trace=False, exec_in_isolation=False) + exam_record, output = spark_run(load=load, config=exec_config, wait_for_trace=False) # 执行结果分析 if exam_record.status == OA_CONF.ExecStatus.success: # 打印结果输出 diff --git a/omniadvisor/src/omniadvisor/repository/load_repository.py b/omniadvisor/src/omniadvisor/repository/load_repository.py index c400405e8..154aa073b 100644 --- a/omniadvisor/src/omniadvisor/repository/load_repository.py +++ b/omniadvisor/src/omniadvisor/repository/load_repository.py @@ -159,3 +159,16 @@ class LoadRepository(Repository): model_attr=model_attr ) return Load(database_model=database_load) + + @classmethod + def fake_update_name_and_exec_attr(cls, load: Load, exec_attr: dict) -> Load: + """ + 虚假更新和时间相关的信息,不保存至数据库 + + :param load: 负载 + :param exec_attr: 负载执行属性 + :return: 负载实例 + """ + database_load = load.database_model + database_load.exec_attr = exec_attr + return Load(database_model=database_load) diff --git a/omniadvisor/src/omniadvisor/service/retest_service.py b/omniadvisor/src/omniadvisor/service/retest_service.py index 8a3ff1e5f..1a3fdeb47 100644 --- a/omniadvisor/src/omniadvisor/service/retest_service.py +++ b/omniadvisor/src/omniadvisor/service/retest_service.py @@ -1,6 +1,7 @@ from typing import Any from common.constant import OA_CONF from omniadvisor.repository.model.load import Load +from omniadvisor.repository.load_repository import LoadRepository from omniadvisor.service.spark_service.spark_run import spark_run from omniadvisor.service.tuning_result.tuning_result import get_tuning_result from omniadvisor.utils.logger import global_logger @@ -22,9 +23,16 @@ def retest(load: Load, config: dict[str, Any]) -> None: :raises Exception: 当任务执行过程中出现非 Spark 异常时抛出。 """ global_logger.debug('Starting retest config...') + + # 若配置文件中存在后台复测队列信息,则伪造Load,将其执行属性中的队列信息换成后台复测队列 + if OA_CONF.backend_retest_queue: + exec_attr = load.exec_attr.copy() + exec_attr['queue'] = OA_CONF.backend_retest_queue + load = LoadRepository.fake_update_name_and_exec_attr(load=load, exec_attr=exec_attr) + for i in range(1, OA_CONF.tuning_retest_times + 1): try: - exam_record, spark_output = spark_run(load=load, config=config, wait_for_trace=True, exec_in_isolation=True) + exam_record, spark_output = spark_run(load=load, config=config, wait_for_trace=True) except Exception as e: global_logger.error( 'Retest failed in round %d. Exception source: Non-Spark exception. Details: %s.', i, e diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index 2a6eba772..9a172407a 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -26,8 +26,7 @@ _RETURN_CODE_MAP = { } -def spark_run(load: Load, config: dict, wait_for_trace: bool = True, - exec_in_isolation: bool = False) -> Tuple[ExamRecord, str]: +def spark_run(load: Load, config: dict, wait_for_trace: bool = True) -> Tuple[ExamRecord, str]: """ 输入负载与配置,执行Spark任务 并从Spark命令的返回值中获取 生成一条记录本次执行信息的exam_record @@ -36,17 +35,10 @@ def spark_run(load: Load, config: dict, wait_for_trace: bool = True, :param load: 负载 :param config: 参数配置 :param wait_for_trace: 是否阻塞等待获取trace - :param exec_in_isolation: 是否在隔离队列中执行 :return:测试记录实例, Spark任务执行的返回结果 """ - # 是否要在隔离中执行,若在隔离中执行,则将队列改为隔离队列 - if exec_in_isolation and OA_CONF.backend_retest_queue: - exec_attr = load.exec_attr - exec_attr['queue'] = OA_CONF.backend_retest_queue - else: - exec_attr = load.exec_attr # 从解析后的参数列表中提取负载与任务的相关信息 - submit_cmd = SparkCMDParser.reconstruct_cmd(exec_attr=exec_attr, conf_params=config) + submit_cmd = SparkCMDParser.reconstruct_cmd(exec_attr=load.exec_attr, conf_params=config) # 判断当前执行配置是否为租户基线配置,若不相同则需设置超时时间 if config != load.default_config: diff --git a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py index 01585b1ec..70a5cb7d8 100644 --- a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py +++ b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py @@ -31,7 +31,7 @@ class TestHijackRecommend: mock_spark_run.return_value = (MagicMock(status="success"), "job output") hijack_recommend(argv) - mock_spark_run.assert_called_once_with(load=load, config=exec_config, wait_for_trace=False, exec_in_isolation=False) + mock_spark_run.assert_called_once_with(load=load, config=exec_config, wait_for_trace=False) mock_process_config.assert_not_called() # 场景 2: 执行失败 + 进入安全机制 diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_run.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_run.py index c55efdb44..34dccaeed 100644 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_run.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_run.py @@ -76,7 +76,7 @@ class TestSparkRun: mock_get_spark_runtime.return_value = 20.4 # 方法调用 - result_exam_record, spark_output = spark_run(self.mock_load, self.conf, wait_for_trace=False, exec_in_isolation=False) + result_exam_record, spark_output = spark_run(self.mock_load, self.conf, wait_for_trace=False) # 结果验证 assert result_exam_record == mock_exam_record assert spark_output == self.mock_exec_result.output -- Gitee From d4aabc1c6fbc5287a6118c474e872d9cd60764ca Mon Sep 17 00:00:00 2001 From: wtingkai <330445001@qq.com> Date: Wed, 20 Aug 2025 09:07:11 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat(OmniAdvisor):=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA=E5=88=B6=E5=90=8E=E5=8F=B0=E8=B0=83=E4=BC=98=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../omniadvisor/interface/config_tuning.py | 53 +++++++++++++++++-- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/omniadvisor/src/omniadvisor/interface/config_tuning.py b/omniadvisor/src/omniadvisor/interface/config_tuning.py index 0eec5bc30..3128715d0 100644 --- a/omniadvisor/src/omniadvisor/interface/config_tuning.py +++ b/omniadvisor/src/omniadvisor/interface/config_tuning.py @@ -1,5 +1,6 @@ import argparse import signal +import re from typing import Optional from algo.expert.tuning import ExpertTuning @@ -21,7 +22,7 @@ from omniadvisor.service.tuning_result.tuning_result_history import ( get_other_tuning_result_history ) from omniadvisor.utils.logger import global_logger -from omniadvisor.utils.utils import float_format +from omniadvisor.utils.utils import float_format, safe_read_file def handler(signum: int, frame) -> None: @@ -53,6 +54,10 @@ def _parse_tuning_args() -> argparse.Namespace: choices=[method for method in OA_CONF.TuningMethod.all if method != OA_CONF.TuningMethod.user], help='Tuning methods used to give optimal config, for iterative, use BO; for expert ...') + parser.add_argument("--force", type=bool, required=False, + help='Dangerous function!!! For loads that are prohibited from background retesting, forced ' + 'retesting will be carried out by modifying the load content') + args = parser.parse_args() return args @@ -200,6 +205,43 @@ def _query_and_check_load(load_id: str) -> Optional[Load]: return load +def _modify_load_in_force_test_mode(load: Load): + """ + 在强行复测模式下,修改负载执行信息,即删除负载中的数据修改动作 + + :param load: 负载实例 + :returns: 修改后的负载实例,若无法修改则返回None + """ + exec_attr = load.exec_attr.copy() + + if 'e' in exec_attr: + sql_statement = exec_attr['e'] + exec_attr.pop('e') + elif 'f' in exec_attr: + try: + sql_statement = safe_read_file(exec_attr['f']) + except Exception: + raise TuningPreconditionError( + 'Can not retest in backend force mode because of load not supporting, only retest way of hijacking is supported.' + ) + exec_attr.pop('f') + else: + raise TuningPreconditionError( + 'Can not retest in backend force mode because of load not supporting, only retest way of hijacking is supported.' + ) + + # 匹配删除第一个select语句之前的内容 + pattern = re.compile(r'select', re.IGNORECASE) + match = pattern.search(sql_statement) + if match: + sql_statement = sql_statement[match.start():] + # 将修改后的sql语句放入执行属性中 + exec_attr['e'] = sql_statement + + return LoadRepository.fake_update_name_and_exec_attr(load=load, exec_attr=exec_attr) + + + def main(): """ 配置调优主函数 @@ -225,9 +267,12 @@ def main(): # 3. 当前负载不支持后台复测,则退出 elif args.retest_way == OA_CONF.RetestWay.backend: if load.backend_retest_forbidden: - raise TuningPreconditionError( - 'Current load contains data manipulation operations, only retest way of hijacking is supported.' - ) + if args.force: + load = _modify_load_in_force_test_mode(load=load) + else: + raise TuningPreconditionError( + 'Current load contains data manipulation operations, only retest way of hijacking is supported.' + ) if args.tuning_method: _single_tuning(load=load, retest_way=args.retest_way, tuning_method=args.tuning_method) -- Gitee