diff --git a/omniadvisor/config/common_config.cfg b/omniadvisor/config/common_config.cfg index 9ece4a2209b3adf71638c1005b08b591ad5d960e..53f7206a8ddedf8ff965c15d65d2abd9dea07413 100755 --- a/omniadvisor/config/common_config.cfg +++ b/omniadvisor/config/common_config.cfg @@ -1,6 +1,6 @@ [common] # 调优复测次数 -tuning.retest.times=3 +tuning.retest.times=1 # 评估配置失效的阈值,当配置执行失败次数大于等于此值,则配置失效 config.fail.threshold = 1 #调优策略 @@ -14,4 +14,4 @@ spark.fetch.trace.timeout=10 # Spark从History Sever抓取Trace的间隔用时 spark.fetch.trace.interval=2 # Spark任务执行的超时时间,对比基线的比例 -spark.exec.timeout.ratio=3.0 \ No newline at end of file +spark.exec.timeout.ratio=10.0 \ No newline at end of file diff --git a/omniadvisor/src/common/constant.py b/omniadvisor/src/common/constant.py index 75be2cc51f4d30f9eeb5052fb3e84cdc07cb8786..dd3f074c338152a8cb914adc6a6ca498341161c0 100644 --- a/omniadvisor/src/common/constant.py +++ b/omniadvisor/src/common/constant.py @@ -21,6 +21,22 @@ def load_common_config(config_path: str): return common_config +def check_oa_conf(): + """ + 校验OA_CONF中参数是否正确 + """ + if OA_CONF.tuning_retest_times <= 0: + raise ValueError('The tuning retest times must > 0, please check common configuration.') + if OA_CONF.config_fail_threshold <= 0: + raise ValueError('The config fail threshold must > 0, please check common configuration.') + if OA_CONF.spark_fetch_trace_timeout <= 0: + raise ValueError('The spark fetch trace timeout must > 0, please check common configuration.') + if OA_CONF.spark_fetch_trace_interval <= 0: + raise ValueError('The spark fetch trace interval must > 0, please check common configuration.') + if OA_CONF.spark_exec_timeout_ratio <= 0: + raise ValueError('The spark exec timeout ratio must > 0, please check common configuration.') + + class OmniAdvisorConf: """ OmniAdvisor常量 diff --git a/omniadvisor/src/hijack.py b/omniadvisor/src/hijack.py index 19047bed5f3b83e9afa2f46ffcc0da5d561a51e1..a2658dd9f2fff2baca537c794f80caab45be9df2 100644 --- a/omniadvisor/src/hijack.py +++ b/omniadvisor/src/hijack.py @@ -1,5 +1,6 @@ import os +from common.constant import check_oa_conf from omniadvisor.interface import hijack_recommend from omniadvisor.utils.logger import global_logger, disable_console_handler @@ -8,12 +9,13 @@ if __name__ == '__main__': # 前台劫持功能为了保证用户无感 不应在控制台有日志输出 因此禁用控制台输出 disable_console_handler(global_logger) try: + check_oa_conf() hijack_recommend.main() # 无需进行逻辑处理的异常,直接抛至该层 # 若需进行逻辑处理(如环境清理等),则需在相应位置处理后重新抛至该层 except Exception as e: - # 异常信息统一在此处打印堆栈,方便定位,抛出异常的地方无需打印log - global_logger.exception(e) + # 异常信息统一在此处打印,方便定位,抛出异常的地方无需打印log + global_logger.error(e) # 异常退出 # 劫持任务异常退出后,需在Spark脚本内执行原Spark命令语句 # 选择os._exit()方式退出,避免主进程等待子进程获取Trace的阻塞 diff --git a/omniadvisor/src/omniadvisor/interface/config_tuning.py b/omniadvisor/src/omniadvisor/interface/config_tuning.py index 226a4eef4f426111c2d0009af0ecd87af42d3d0e..af3f60dbf97f46caf9948f95527c2099ac20183f 100644 --- a/omniadvisor/src/omniadvisor/interface/config_tuning.py +++ b/omniadvisor/src/omniadvisor/interface/config_tuning.py @@ -64,12 +64,16 @@ def unified_tuning(load, retest_way: str, tuning_method: str): method_extend, next_config = _get_next_config(load, tuning_method) if not next_config: - raise NoOptimalConfigError('The recommending config is empty, please try other tuning methods.') + raise NoOptimalConfigError(f'The recommending config of method {tuning_method} is empty, please try other tuning methods.') # 用户的default_config上叠加next_config叠加 global_logger.info("Load config tuning success, get new config to retest.") next_config = {**load.default_config, **next_config} + # 判断是否有重复推荐配置 + if TuningRecordRepository.query_by_load_and_config(load=load, config=next_config): + raise NoOptimalConfigError(f'The recommending config of method {tuning_method} already exists, please try other tuning methods.') + TuningRecordRepository.create(load=load, config=next_config, method=tuning_method, method_extend=method_extend) # 复测 @@ -181,12 +185,7 @@ def main(): # 3. 调优结果状态仍在运行中(复测中) if args.tuning_method or args.retest_way == OA_CONF.RetestWay.hijacking: - try: - unified_tuning(load, args.retest_way, args.tuning_method) - except NoOptimalConfigError: - global_logger.warning( - 'The tuning method: %s cannot find optimal config, please try other method.', args.tuning_method - ) + unified_tuning(load, args.retest_way, args.tuning_method) return # 当且仅当后台复测,且未指定调优方法时,会连续拉起调优 @@ -196,6 +195,7 @@ def main(): for _ in range(tuning_times): try: unified_tuning(load, args.retest_way, tuning_method) - except NoOptimalConfigError: - global_logger.warning('The tuning method: %s cannot find optimal config, trying next.', tuning_method) + except NoOptimalConfigError as e: + global_logger.warning(e) + global_logger.info('No config found in the loop of tuning strategy, trying next method.') break diff --git a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py index 02eeb4eaedb67d71c1ba69226c8c347050dfc046..f6dd543745c711afa1a39a293125567c66da24d4 100644 --- a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py +++ b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py @@ -117,7 +117,7 @@ def hijack_recommend(argv: list): if exam_record.status == OA_CONF.ExecStatus.success: # 打印结果输出 global_logger.info("Spark execute success, going to print Spark output.") - print(output, end="") + print(output, end="", flush=True) # 若执行失败 则判断是否需要拉起安全机制 else: if exec_config != user_config: @@ -125,10 +125,10 @@ def hijack_recommend(argv: list): safe_exam_record, safe_output = spark_run(load, user_config) global_logger.info("Spark execute in security protection mechanism, going to print Spark output.") # 打印安全机制下任务的输出 - print(safe_output, end="") + print(safe_output, end="", flush=True) else: global_logger.warning("Spark execute failed in user config, going to print Spark output.") - print(output, end="") + print(output, end="", flush=True) if exec_config == load.test_config: _process_load_config(load=load) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py index 5c1180fded8c2298fd414a1468c644df45a850ad..de1a7dd73ed7056686407834ef59c12cce9b5d86 100755 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py @@ -22,8 +22,14 @@ class SparkFetcher: endpoint = endpoint.strip("/") url = f"{self.history_server_url}/{endpoint}" response = requests.get(url) - response.raise_for_status() # 若status.code != 200~209则抛出异常 - return json.loads(response.text) # 使用response.json()代替json.loads(response.text) + # 若status.code != 200~209则抛出异常 + response.raise_for_status() + # 将获取到的数据转为json返回 + try: + json_data = json.loads(response.text) + except Exception as e: + raise ValueError('Something wrong in trace fetched, can not decode into Json data.') from e + return json_data def get_spark_apps(self): """ diff --git a/omniadvisor/src/tuning.py b/omniadvisor/src/tuning.py index 8f75b8e078735ca888c3b8884efe50e9a2275b3e..37e8b395422549896cb4dbd4294f173ebdb9422e 100644 --- a/omniadvisor/src/tuning.py +++ b/omniadvisor/src/tuning.py @@ -1,15 +1,17 @@ +from common.constant import check_oa_conf from omniadvisor.interface import config_tuning from omniadvisor.utils.logger import global_logger if __name__ == '__main__': try: + check_oa_conf() config_tuning.main() # 无需进行逻辑处理的异常,直接抛至该层 # 若需进行逻辑处理(如环境清理等),则需在相应位置处理后重新抛至该层 except Exception as e: - # 异常信息统一在此处打印堆栈,方便定位,抛出异常的地方无需打印log - global_logger.exception(e) + # 异常信息统一在此处打印,方便定位,抛出异常的地方无需打印log + global_logger.error(e) # 异常退出 exit(1) diff --git a/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py b/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py index dd4d701b23118195a593a096ff85f6f061deac11..5ca2633f1e1e0eef9f1c3a89b6860e5995b2a3d2 100644 --- a/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py +++ b/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py @@ -24,6 +24,7 @@ class TestTuning: :return: """ with patch('omniadvisor.repository.load_repository.LoadRepository.query_by_id'), \ + patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.query_by_load_and_config') as mock_query_tuning_record, \ patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.create'), \ patch('omniadvisor.service.retest_service.spark_run') as mock_spark_run, \ patch('omniadvisor.interface.config_tuning.get_tuning_result'), \ @@ -31,6 +32,7 @@ class TestTuning: patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning, \ patch('omniadvisor.interface.config_tuning.float_format'), \ patch('omniadvisor.service.retest_service.float_format'): + mock_query_tuning_record.return_value = list() mock_exam_record = MagicMock() mock_smac_tuning.return_value = self.tune_return_val mock_exam_record.status = OA_CONF.ExecStatus.success @@ -49,6 +51,7 @@ class TestTuning: :return: """ with patch('omniadvisor.repository.load_repository.LoadRepository.query_by_id'), \ + patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.query_by_load_and_config') as mock_query_tuning_record, \ patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.create'), \ patch('omniadvisor.interface.config_tuning.get_tuning_result'), \ patch('omniadvisor.service.retest_service.spark_run') as mock_spark_run, \ @@ -59,6 +62,7 @@ class TestTuning: patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config') as mock_update_best, \ patch('omniadvisor.interface.config_tuning.float_format'), \ patch('omniadvisor.service.retest_service.float_format'): + mock_query_tuning_record.return_value = list() mock_exam_record = MagicMock() mock_exam_record.status = OA_CONF.ExecStatus.fail spark_output = self.empty_str @@ -84,12 +88,14 @@ class TestTuning: """ with patch('omniadvisor.repository.load_repository.LoadRepository.query_by_id'), \ + patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.query_by_load_and_config') as mock_query_tuning_record, \ patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.create'), \ patch('omniadvisor.service.retest_service.spark_run', side_effect=RuntimeError) as mock_spark_run, \ patch('omniadvisor.interface.config_tuning.get_tuning_result_history'), \ patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config'), \ patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning, \ patch('omniadvisor.interface.config_tuning.remove_tuning_result') as mock_remove_tuning_result: + mock_query_tuning_record.return_value = list() mock_exam_record = MagicMock() mock_exam_record.status = OA_CONF.ExecStatus.success mock_smac_tuning.return_value = self.tune_return_val @@ -124,10 +130,12 @@ class TestTuning: :return: """ with patch('omniadvisor.repository.load_repository.LoadRepository.query_by_id'), \ + patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.query_by_load_and_config') as mock_query_tuning_record, \ patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.create'), \ patch('omniadvisor.interface.config_tuning.get_tuning_result_history') as mocked_query_perf, \ patch('omniadvisor.repository.load_repository.LoadRepository.update_test_config') as mock_update_test, \ patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning: + mock_query_tuning_record.return_value = list() mock_smac_tuning.return_value = self.tune_return_val unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.hijacking, tuning_method=self.tuning_method)