diff --git a/omniadvisor/src/omniadvisor/interface/config_tuning.py b/omniadvisor/src/omniadvisor/interface/config_tuning.py index c0a5523d427acf1192fbdd76fe0591391f854e67..221a22961d2297b1318b566ff4abdd364e8598fe 100644 --- a/omniadvisor/src/omniadvisor/interface/config_tuning.py +++ b/omniadvisor/src/omniadvisor/interface/config_tuning.py @@ -46,7 +46,6 @@ def unified_tuning(load, retest_way: str, tuning_method: str): if tuning_method == OA_CONF.TuningMethod.iterative: perf_history = get_tuning_result_history(load) next_config = SmacAppendTuning.tune(perf_history.tuning_history) - TuningRecordRepository.create(load=load, config=next_config, method=OA_CONF.TuningMethod.iterative) elif tuning_method == OA_CONF.TuneMethod.expert: perf_history = get_tuning_result_history(load) next_config = ExpertTuning.tune(perf_history.tuning_history) @@ -54,6 +53,14 @@ def unified_tuning(load, retest_way: str, tuning_method: str): else: raise ValueError(f'Not supported tuning method: {tuning_method}') + # 用户的default_config上叠加next_config叠加 + if load: + next_config = {**load.default_config, **next_config} + else: + raise ValueError("Invalid load: configuration cannot be loaded") + + TuningRecordRepository.create(load=load, config=next_config, method=tuning_method) + # 复测 if retest_way == OA_CONF.RetestWay.backend: retest(load, next_config) diff --git a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py index 5578b88a9029cad864cdb2d9d3049558e359c6d3..4ea917abe1eb9e9479b6efa07732297c0e3c2e46 100644 --- a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py +++ b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py @@ -57,9 +57,18 @@ def _get_exec_config_from_load(load: Load): return load.default_config -def _refresh_best_config(load: Load): +def _process_load_config(load: Load): + """ + 1.检查当前负载中test_config是否完成复测流程 + 2.根据复测情况判断是否需要清空load中的test_config + 3.根据测试性能判断是否需要刷新load中保存的best_config + :param load: 本次测试用负载 + :return: + """ tuning_result = get_tuning_result(load, load.test_config) - if tuning_result.exam_times >= OA_CONF.tuning_retest_times: + if tuning_result.status == OA_CONF.ExecStatus.fail: + LoadRepository.update_test_config(load, {}) + elif tuning_result.status == OA_CONF.ExecStatus.success: # 获取当前best_config的平均测试性能 best_config_results = get_tuning_result(load, load.best_config) # 若调优性能优于最佳性能 刷新当前的best_config @@ -106,7 +115,7 @@ def hijack_recommend(spark_sql_cmd: str): print(output) if exec_config == load.test_config: - _refresh_best_config(load=load) + _process_load_config(load=load) def main(): diff --git a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py index 34a993943b0d58173e3c1d05e27f0a8eec8bac51..e61d9629ab7705181bc634c28823261bea5f1b64 100644 --- a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py +++ b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py @@ -95,27 +95,29 @@ class TuningResult: @property def runtime(self): - # 若配置未复测过,则runtime为失败返回值 - if not self._exam_records: - return OA_CONF.exec_fail_return_value - # 计算执行状态为success的平均runtime success_runtimes = [ exam_record.runtime for exam_record in self._exam_records if exam_record.status == OA_CONF.ExecStatus.success ] + + # 若配置成功复测的次数为0,则runtime为失败返回值 + if not success_runtimes: + return OA_CONF.exec_fail_return_runtime + return sum(success_runtimes) / len(success_runtimes) @property def trace(self): - # 若配置未复测过,则trace为失败返回值 - if not self._exam_records: - return OA_CONF.exec_fail_return_trace - success_exam_results = [ exam_record for exam_record in self._exam_records if exam_record.status == OA_CONF.ExecStatus.success ] + + # 若配置成功复测的次数为0,则trac为失败返回值 + if not success_exam_results: + return OA_CONF.exec_fail_return_trace + best_exam_record = min(success_exam_results, key=lambda exam_record: exam_record.runtime) return best_exam_record.trace diff --git a/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py b/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py index 16c350e25e564ddc772beeccf4f998c2178719ec..c437e60aa6a9276922fc19e486a2b8c295abd044 100644 --- a/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py +++ b/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py @@ -12,7 +12,8 @@ class TestTuning: def setup_class(self): # 创建表 - self.load = None + self.load = MagicMock() + self.load.default_config = {"test_config": "1"} self.retest_times = 3 self.tuning_method = OA_CONF.TuningMethod.iterative @@ -88,7 +89,7 @@ class TestTuning: :return: """ sys.argv = ['config_tuning.py', - '--load-id', self.load, + '--load-id', None, '--retest-way', OA_CONF.RetestWay.backend, '--tuning-method', self.tuning_method] diff --git a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py index 7dfe1e947f81f22f440f4f0e455ff07bcfaf037a..8335adae77ef320902986c613b70261715fa22d5 100644 --- a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py +++ b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py @@ -48,7 +48,7 @@ class TestHijackRecommend(unittest.TestCase): @patch("omniadvisor.interface.hijack_recommend.SparkParameterParser") @patch("omniadvisor.interface.hijack_recommend.LoadRepository") @patch("omniadvisor.interface.hijack_recommend.spark_run") - @patch("omniadvisor.interface.hijack_recommend._refresh_best_config") + @patch("omniadvisor.interface.hijack_recommend._process_load_config") def test_hijack_recommend_failure_with_fallback(self, mock_refreshed_load, mock_spark_run, mock_load_repo, mock_parser): """ 测试 hijack_recommend 在任务失败且需要回退到用户默认配置时的行为。