From 8d27415ca96f505f2f24e19851f9dcac98a3948c Mon Sep 17 00:00:00 2001 From: wtingkai <330445001@qq.com> Date: Tue, 8 Jul 2025 20:54:43 +0800 Subject: [PATCH 1/6] =?UTF-8?q?feat(OmniAdvisor):=20=E4=B8=BAprint?= =?UTF-8?q?=E8=AF=AD=E5=8F=A5=E6=B7=BB=E5=8A=A0flush,=E9=81=BF=E5=85=8D?= =?UTF-8?q?=E9=99=B7=E5=85=A5=E5=86=99Buffer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/src/omniadvisor/interface/hijack_recommend.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py index ef6a67f4d..8ce6f13e4 100644 --- a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py +++ b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py @@ -117,7 +117,7 @@ def hijack_recommend(argv: list): if exam_record.status == OA_CONF.ExecStatus.success: # 打印结果输出 global_logger.info("Spark execute success, going to print Spark output.") - print(output) + print(output, flush=True) # 若执行失败 则判断是否需要拉起安全机制 else: if exec_config != user_config: @@ -125,10 +125,10 @@ def hijack_recommend(argv: list): safe_exam_record, safe_output = spark_run(load, user_config) global_logger.info("Spark execute in security protection mechanism, going to print Spark output.") # 打印安全机制下任务的输出 - print(safe_output) + print(safe_output, flush=True) else: global_logger.warning("Spark execute failed in user config, going to print Spark output.") - print(output) + print(output, flush=True) if exec_config == load.test_config: _process_load_config(load=load) -- Gitee From f256310d8ba0384754c2ba459c4b91d50960038b Mon Sep 17 00:00:00 2001 From: wtingkai <330445001@qq.com> Date: Tue, 8 Jul 2025 21:57:52 +0800 Subject: [PATCH 2/6] =?UTF-8?q?feat(OmniAdvisor):=20=E5=A2=9E=E5=A4=A7Spar?= =?UTF-8?q?k=E4=BB=BB=E5=8A=A1=E6=89=A7=E8=A1=8C=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E7=9A=84=E6=AF=94=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/config/common_config.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/omniadvisor/config/common_config.cfg b/omniadvisor/config/common_config.cfg index 9ece4a220..4a9ad9b9e 100755 --- a/omniadvisor/config/common_config.cfg +++ b/omniadvisor/config/common_config.cfg @@ -14,4 +14,4 @@ spark.fetch.trace.timeout=10 # Spark从History Sever抓取Trace的间隔用时 spark.fetch.trace.interval=2 # Spark任务执行的超时时间,对比基线的比例 -spark.exec.timeout.ratio=3.0 \ No newline at end of file +spark.exec.timeout.ratio=10.0 \ No newline at end of file -- Gitee From 20db05d582d2f6b0d43f2ab085bb0faec9d095ad Mon Sep 17 00:00:00 2001 From: wtingkai <330445001@qq.com> Date: Wed, 9 Jul 2025 20:19:37 +0800 Subject: [PATCH 3/6] =?UTF-8?q?feat(OmniAdvisor):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E5=AF=B9=E9=87=8D=E5=A4=8D=E6=8E=A8=E8=8D=90=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E7=9A=84=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/src/omniadvisor/interface/config_tuning.py | 4 ++++ .../tests/omniadvisor/interface/test_config_tuning.py | 8 ++++++++ 2 files changed, 12 insertions(+) diff --git a/omniadvisor/src/omniadvisor/interface/config_tuning.py b/omniadvisor/src/omniadvisor/interface/config_tuning.py index a735c323b..c85e7cbf5 100644 --- a/omniadvisor/src/omniadvisor/interface/config_tuning.py +++ b/omniadvisor/src/omniadvisor/interface/config_tuning.py @@ -70,6 +70,10 @@ def unified_tuning(load, retest_way: str, tuning_method: str): global_logger.info("Load config tuning success, get new config to retest.") next_config = {**load.default_config, **next_config} + # 判断是否有重复推荐配置 + if TuningRecordRepository.query_by_load_and_config(load=load, config=next_config): + raise NoOptimalConfigError('The recommending config already exists, please try other tuning methods.') + TuningRecordRepository.create(load=load, config=next_config, method=tuning_method, method_extend=method_extend) # 复测 diff --git a/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py b/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py index dd4d701b2..5ca2633f1 100644 --- a/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py +++ b/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py @@ -24,6 +24,7 @@ class TestTuning: :return: """ with patch('omniadvisor.repository.load_repository.LoadRepository.query_by_id'), \ + patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.query_by_load_and_config') as mock_query_tuning_record, \ patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.create'), \ patch('omniadvisor.service.retest_service.spark_run') as mock_spark_run, \ patch('omniadvisor.interface.config_tuning.get_tuning_result'), \ @@ -31,6 +32,7 @@ class TestTuning: patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning, \ patch('omniadvisor.interface.config_tuning.float_format'), \ patch('omniadvisor.service.retest_service.float_format'): + mock_query_tuning_record.return_value = list() mock_exam_record = MagicMock() mock_smac_tuning.return_value = self.tune_return_val mock_exam_record.status = OA_CONF.ExecStatus.success @@ -49,6 +51,7 @@ class TestTuning: :return: """ with patch('omniadvisor.repository.load_repository.LoadRepository.query_by_id'), \ + patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.query_by_load_and_config') as mock_query_tuning_record, \ patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.create'), \ patch('omniadvisor.interface.config_tuning.get_tuning_result'), \ patch('omniadvisor.service.retest_service.spark_run') as mock_spark_run, \ @@ -59,6 +62,7 @@ class TestTuning: patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config') as mock_update_best, \ patch('omniadvisor.interface.config_tuning.float_format'), \ patch('omniadvisor.service.retest_service.float_format'): + mock_query_tuning_record.return_value = list() mock_exam_record = MagicMock() mock_exam_record.status = OA_CONF.ExecStatus.fail spark_output = self.empty_str @@ -84,12 +88,14 @@ class TestTuning: """ with patch('omniadvisor.repository.load_repository.LoadRepository.query_by_id'), \ + patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.query_by_load_and_config') as mock_query_tuning_record, \ patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.create'), \ patch('omniadvisor.service.retest_service.spark_run', side_effect=RuntimeError) as mock_spark_run, \ patch('omniadvisor.interface.config_tuning.get_tuning_result_history'), \ patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config'), \ patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning, \ patch('omniadvisor.interface.config_tuning.remove_tuning_result') as mock_remove_tuning_result: + mock_query_tuning_record.return_value = list() mock_exam_record = MagicMock() mock_exam_record.status = OA_CONF.ExecStatus.success mock_smac_tuning.return_value = self.tune_return_val @@ -124,10 +130,12 @@ class TestTuning: :return: """ with patch('omniadvisor.repository.load_repository.LoadRepository.query_by_id'), \ + patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.query_by_load_and_config') as mock_query_tuning_record, \ patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.create'), \ patch('omniadvisor.interface.config_tuning.get_tuning_result_history') as mocked_query_perf, \ patch('omniadvisor.repository.load_repository.LoadRepository.update_test_config') as mock_update_test, \ patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning: + mock_query_tuning_record.return_value = list() mock_smac_tuning.return_value = self.tune_return_val unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.hijacking, tuning_method=self.tuning_method) -- Gitee From c78f173c2004541b1fa3524092b5fa5f36115b21 Mon Sep 17 00:00:00 2001 From: wtingkai <330445001@qq.com> Date: Wed, 9 Jul 2025 20:31:41 +0800 Subject: [PATCH 4/6] =?UTF-8?q?feat(OmniAdvisor):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E5=AF=B9=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6=E4=B8=AD=E5=8F=82?= =?UTF-8?q?=E6=95=B0=E7=9A=84=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/config/common_config.cfg | 2 +- omniadvisor/src/common/constant.py | 16 ++++++++++++++++ omniadvisor/src/hijack.py | 2 ++ omniadvisor/src/tuning.py | 2 ++ 4 files changed, 21 insertions(+), 1 deletion(-) diff --git a/omniadvisor/config/common_config.cfg b/omniadvisor/config/common_config.cfg index 4a9ad9b9e..53f7206a8 100755 --- a/omniadvisor/config/common_config.cfg +++ b/omniadvisor/config/common_config.cfg @@ -1,6 +1,6 @@ [common] # 调优复测次数 -tuning.retest.times=3 +tuning.retest.times=1 # 评估配置失效的阈值,当配置执行失败次数大于等于此值,则配置失效 config.fail.threshold = 1 #调优策略 diff --git a/omniadvisor/src/common/constant.py b/omniadvisor/src/common/constant.py index 75be2cc51..dd3f074c3 100644 --- a/omniadvisor/src/common/constant.py +++ b/omniadvisor/src/common/constant.py @@ -21,6 +21,22 @@ def load_common_config(config_path: str): return common_config +def check_oa_conf(): + """ + 校验OA_CONF中参数是否正确 + """ + if OA_CONF.tuning_retest_times <= 0: + raise ValueError('The tuning retest times must > 0, please check common configuration.') + if OA_CONF.config_fail_threshold <= 0: + raise ValueError('The config fail threshold must > 0, please check common configuration.') + if OA_CONF.spark_fetch_trace_timeout <= 0: + raise ValueError('The spark fetch trace timeout must > 0, please check common configuration.') + if OA_CONF.spark_fetch_trace_interval <= 0: + raise ValueError('The spark fetch trace interval must > 0, please check common configuration.') + if OA_CONF.spark_exec_timeout_ratio <= 0: + raise ValueError('The spark exec timeout ratio must > 0, please check common configuration.') + + class OmniAdvisorConf: """ OmniAdvisor常量 diff --git a/omniadvisor/src/hijack.py b/omniadvisor/src/hijack.py index 19047bed5..f9d42c4c4 100644 --- a/omniadvisor/src/hijack.py +++ b/omniadvisor/src/hijack.py @@ -1,5 +1,6 @@ import os +from common.constant import check_oa_conf from omniadvisor.interface import hijack_recommend from omniadvisor.utils.logger import global_logger, disable_console_handler @@ -8,6 +9,7 @@ if __name__ == '__main__': # 前台劫持功能为了保证用户无感 不应在控制台有日志输出 因此禁用控制台输出 disable_console_handler(global_logger) try: + check_oa_conf() hijack_recommend.main() # 无需进行逻辑处理的异常,直接抛至该层 # 若需进行逻辑处理(如环境清理等),则需在相应位置处理后重新抛至该层 diff --git a/omniadvisor/src/tuning.py b/omniadvisor/src/tuning.py index 8f75b8e07..ded400d4e 100644 --- a/omniadvisor/src/tuning.py +++ b/omniadvisor/src/tuning.py @@ -1,9 +1,11 @@ +from common.constant import check_oa_conf from omniadvisor.interface import config_tuning from omniadvisor.utils.logger import global_logger if __name__ == '__main__': try: + check_oa_conf() config_tuning.main() # 无需进行逻辑处理的异常,直接抛至该层 # 若需进行逻辑处理(如环境清理等),则需在相应位置处理后重新抛至该层 -- Gitee From c2cee71df33238a9ea7f194bd7d8d74028efb1fe Mon Sep 17 00:00:00 2001 From: wtingkai <330445001@qq.com> Date: Thu, 10 Jul 2025 11:59:20 +0800 Subject: [PATCH 5/6] =?UTF-8?q?feat(OmniAdvisor):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=B8=AD=E5=BC=82=E5=B8=B8=E6=89=93=E5=8D=B0?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/src/hijack.py | 4 ++-- .../src/omniadvisor/interface/config_tuning.py | 16 ++++++---------- omniadvisor/src/tuning.py | 4 ++-- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/omniadvisor/src/hijack.py b/omniadvisor/src/hijack.py index f9d42c4c4..a2658dd9f 100644 --- a/omniadvisor/src/hijack.py +++ b/omniadvisor/src/hijack.py @@ -14,8 +14,8 @@ if __name__ == '__main__': # 无需进行逻辑处理的异常,直接抛至该层 # 若需进行逻辑处理(如环境清理等),则需在相应位置处理后重新抛至该层 except Exception as e: - # 异常信息统一在此处打印堆栈,方便定位,抛出异常的地方无需打印log - global_logger.exception(e) + # 异常信息统一在此处打印,方便定位,抛出异常的地方无需打印log + global_logger.error(e) # 异常退出 # 劫持任务异常退出后,需在Spark脚本内执行原Spark命令语句 # 选择os._exit()方式退出,避免主进程等待子进程获取Trace的阻塞 diff --git a/omniadvisor/src/omniadvisor/interface/config_tuning.py b/omniadvisor/src/omniadvisor/interface/config_tuning.py index c85e7cbf5..af3f60dbf 100644 --- a/omniadvisor/src/omniadvisor/interface/config_tuning.py +++ b/omniadvisor/src/omniadvisor/interface/config_tuning.py @@ -64,7 +64,7 @@ def unified_tuning(load, retest_way: str, tuning_method: str): method_extend, next_config = _get_next_config(load, tuning_method) if not next_config: - raise NoOptimalConfigError('The recommending config is empty, please try other tuning methods.') + raise NoOptimalConfigError(f'The recommending config of method {tuning_method} is empty, please try other tuning methods.') # 用户的default_config上叠加next_config叠加 global_logger.info("Load config tuning success, get new config to retest.") @@ -72,7 +72,7 @@ def unified_tuning(load, retest_way: str, tuning_method: str): # 判断是否有重复推荐配置 if TuningRecordRepository.query_by_load_and_config(load=load, config=next_config): - raise NoOptimalConfigError('The recommending config already exists, please try other tuning methods.') + raise NoOptimalConfigError(f'The recommending config of method {tuning_method} already exists, please try other tuning methods.') TuningRecordRepository.create(load=load, config=next_config, method=tuning_method, method_extend=method_extend) @@ -185,12 +185,7 @@ def main(): # 3. 调优结果状态仍在运行中(复测中) if args.tuning_method or args.retest_way == OA_CONF.RetestWay.hijacking: - try: - unified_tuning(load, args.retest_way, args.tuning_method) - except NoOptimalConfigError: - global_logger.error( - 'The tuning method: %s cannot find optimal config, please try other method.', args.tuning_method - ) + unified_tuning(load, args.retest_way, args.tuning_method) return # 当且仅当后台复测,且未指定调优方法时,会连续拉起调优 @@ -200,6 +195,7 @@ def main(): for _ in range(tuning_times): try: unified_tuning(load, args.retest_way, tuning_method) - except NoOptimalConfigError: - global_logger.info('The tuning method: %s cannot find optimal config, trying next.', tuning_method) + except NoOptimalConfigError as e: + global_logger.warning(e) + global_logger.info('No config found in the loop of tuning strategy, trying next method.') break diff --git a/omniadvisor/src/tuning.py b/omniadvisor/src/tuning.py index ded400d4e..37e8b3954 100644 --- a/omniadvisor/src/tuning.py +++ b/omniadvisor/src/tuning.py @@ -10,8 +10,8 @@ if __name__ == '__main__': # 无需进行逻辑处理的异常,直接抛至该层 # 若需进行逻辑处理(如环境清理等),则需在相应位置处理后重新抛至该层 except Exception as e: - # 异常信息统一在此处打印堆栈,方便定位,抛出异常的地方无需打印log - global_logger.exception(e) + # 异常信息统一在此处打印,方便定位,抛出异常的地方无需打印log + global_logger.error(e) # 异常退出 exit(1) -- Gitee From c79f70633aa28eab3ebdaab5dfa4f042995de278 Mon Sep 17 00:00:00 2001 From: wtingkai <330445001@qq.com> Date: Fri, 11 Jul 2025 10:24:10 +0800 Subject: [PATCH 6/6] =?UTF-8?q?feat(OmniAdvisor):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E5=AF=B9spark=20fetcher=E4=B8=ADjson=E8=BD=AC=E6=8D=A2?= =?UTF-8?q?=E7=9A=84=E5=BC=82=E5=B8=B8=E6=8D=95=E6=8D=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../omniadvisor/service/spark_service/spark_fetcher.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py index 5c1180fde..de1a7dd73 100755 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py @@ -22,8 +22,14 @@ class SparkFetcher: endpoint = endpoint.strip("/") url = f"{self.history_server_url}/{endpoint}" response = requests.get(url) - response.raise_for_status() # 若status.code != 200~209则抛出异常 - return json.loads(response.text) # 使用response.json()代替json.loads(response.text) + # 若status.code != 200~209则抛出异常 + response.raise_for_status() + # 将获取到的数据转为json返回 + try: + json_data = json.loads(response.text) + except Exception as e: + raise ValueError('Something wrong in trace fetched, can not decode into Json data.') from e + return json_data def get_spark_apps(self): """ -- Gitee