From a4d4f01be2b3f18e74520e7f2e852b84c961d1ba Mon Sep 17 00:00:00 2001 From: wtingkai <330445001@qq.com> Date: Wed, 13 Aug 2025 20:25:22 +0800 Subject: [PATCH] =?UTF-8?q?feat(OmniAdvisor):=20=E5=90=8E=E7=AB=AF?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E5=8F=82=E6=95=B0=E6=9C=AA=E6=A0=A1?= =?UTF-8?q?=E9=AA=8C=E7=AD=89=E9=97=AE=E9=A2=98BUG=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/script/auto_deploy.sh | 1 + omniadvisor/src/common/constant.py | 14 ++++ omniadvisor/src/init.py | 15 ++++- .../service/spark_service/spark_run.py | 12 +++- .../tests/omniadvisor/common/test_constant.py | 64 ++++++++++++++++--- 5 files changed, 94 insertions(+), 12 deletions(-) diff --git a/omniadvisor/script/auto_deploy.sh b/omniadvisor/script/auto_deploy.sh index d6a018458..943507875 100644 --- a/omniadvisor/script/auto_deploy.sh +++ b/omniadvisor/script/auto_deploy.sh @@ -6,6 +6,7 @@ frame_dir=$prefix/BoostKit-omniadvisor_2.0.0 ls -l $frame_dir/src ls -l $core_dir/src cp -r $core_dir/src/algo $frame_dir/src/ +cp -r $core_dir/config/* $frame_dir/config/ if [ -z "${SPARK_HOME}" ]; then # 如果 SPARK_HOME 未设置,打印错误信息并退出 diff --git a/omniadvisor/src/common/constant.py b/omniadvisor/src/common/constant.py index 467d07f3c..29c762a91 100644 --- a/omniadvisor/src/common/constant.py +++ b/omniadvisor/src/common/constant.py @@ -29,6 +29,20 @@ def check_oa_conf() -> None: raise ValueError('The tuning retest times must > 0, please check common configuration.') if OA_CONF.config_fail_threshold <= 0: raise ValueError('The config fail threshold must > 0, please check common configuration.') + if not isinstance(OA_CONF.tuning_strategies, list): + raise ValueError('The type of tuning strategies must be list, please check common configuration.') + + if not OA_CONF.postgresql_database_name: + raise ValueError('The postgresql database name can not be empty, please check common configuration.') + if not OA_CONF.postgresql_database_user: + raise ValueError('The postgresql database user can not be empty, please check common configuration.') + if not OA_CONF.postgresql_database_password: + raise ValueError('The postgresql database password can not be empty, please check common configuration.') + if not OA_CONF.postgresql_database_host: + raise ValueError('The postgresql database host can not be empty, please check common configuration.') + if not OA_CONF.postgresql_database_port: + raise ValueError('The postgresql database port can not be empty, please check common configuration.') + if OA_CONF.spark_fetch_trace_timeout <= 0: raise ValueError('The spark fetch trace timeout must > 0, please check common configuration.') if OA_CONF.spark_fetch_trace_interval <= 0: diff --git a/omniadvisor/src/init.py b/omniadvisor/src/init.py index 42da2e822..cbcf49a70 100644 --- a/omniadvisor/src/init.py +++ b/omniadvisor/src/init.py @@ -1,5 +1,18 @@ from server.manage import main +from omniadvisor.utils.logger import global_logger +from common.constant import check_oa_conf if __name__ == '__main__': - main() \ No newline at end of file + try: + check_oa_conf() + main() + # 无需进行逻辑处理的异常,直接抛至该层 + # 若需进行逻辑处理(如环境清理等),则需在相应位置处理后重新抛至该层 + except Exception as e: + # 异常信息统一在此处打印,方便定位,抛出异常的地方无需打印log + global_logger.error(e) + # 异常退出 + exit(1) + + exit(0) \ No newline at end of file diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index 53bd47ae8..2a6eba772 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -85,7 +85,9 @@ def spark_run(load: Load, config: dict, wait_for_trace: bool = True, if wait_for_trace: # 阻塞获取trace global_logger.info('Going to fetch Spark execute trace, the process is blocking.') - _update_trace_and_runtime_from_history_server(exam_record=exam_record, application_id=exec_result.application_id) + status = _update_trace_and_runtime_from_history_server(exam_record=exam_record, application_id=exec_result.application_id) + if not status: + raise RuntimeError('Unknown exception happen when fetching spark trace, please check history server.') else: # 不阻塞获取trace,通过子进程进行获取 global_logger.info('Going to fetch Spark execute trace, the process is non-blocking.') @@ -108,7 +110,7 @@ def _calc_timeout_from_load(load: Load) -> int: return int(OA_CONF.spark_exec_timeout_ratio * baseline_result.runtime) -def _update_trace_and_runtime_from_history_server(exam_record: ExamRecord, application_id: str) -> None: +def _update_trace_and_runtime_from_history_server(exam_record: ExamRecord, application_id: str) -> bool: """ 根据application_id对history_server进行轮询, 查询该任务的trace信息和runtime信息,并刷新ExamRecord中相关字段的信息 @@ -131,6 +133,9 @@ def _update_trace_and_runtime_from_history_server(exam_record: ExamRecord, appli time.sleep(OA_CONF.spark_fetch_trace_interval) global_logger.debug(f"Cannot access history server: %s", httpe) continue + except Exception as e: + global_logger.error('Unknown exception happen, error info: %s', e) + return False trace_dict['sql'] = save_trace_data(data=trace_sql, data_dir=OA_CONF.data_dir) trace_dict['stages'] = save_trace_data(data=trace_stages, data_dir=OA_CONF.data_dir) trace_dict['executor'] = save_trace_data(data=trace_executor, data_dir=OA_CONF.data_dir) @@ -138,7 +143,8 @@ def _update_trace_and_runtime_from_history_server(exam_record: ExamRecord, appli break else: global_logger.error(f'Failed to get App %s trace from %s', application_id, history_server_url) - return + return False ExamRecordRepository.update_runtime(exam_record, runtime=runtime) ExamRecordRepository.update_trace(exam_record, trace=trace_dict) + return True diff --git a/omniadvisor/tests/omniadvisor/common/test_constant.py b/omniadvisor/tests/omniadvisor/common/test_constant.py index 77c823244..d72a7e142 100644 --- a/omniadvisor/tests/omniadvisor/common/test_constant.py +++ b/omniadvisor/tests/omniadvisor/common/test_constant.py @@ -17,7 +17,13 @@ class TestConstant: spark_fetch_trace_interval=5, spark_exec_timeout_ratio=0.5, spark_history_username='user', - spark_history_password='pass' + spark_history_password='pass', + tuning_strategies=[], + postgresql_database_name='name', + postgresql_database_user='user', + postgresql_database_password='password', + postgresql_database_host='host', + postgresql_database_port='port' )) def test_check_oa_conf_when_normal(self): # 不应抛异常 @@ -31,7 +37,13 @@ class TestConstant: spark_fetch_trace_interval=5, spark_exec_timeout_ratio=0.5, spark_history_username='user', - spark_history_password='pass' + spark_history_password='pass', + tuning_strategies=[], + postgresql_database_name='name', + postgresql_database_user='user', + postgresql_database_password='password', + postgresql_database_host='host', + postgresql_database_port='port' )) def test_check_oa_conf_when_invalid_tuning_retest_times(self): with pytest.raises(ValueError, match='tuning retest times'): @@ -45,7 +57,13 @@ class TestConstant: spark_fetch_trace_interval=5, spark_exec_timeout_ratio=0.5, spark_history_username='user', - spark_history_password='pass' + spark_history_password='pass', + tuning_strategies=[], + postgresql_database_name='name', + postgresql_database_user='user', + postgresql_database_password='password', + postgresql_database_host='host', + postgresql_database_port='port' )) def test_check_oa_conf_when_invalid_config_fail_threshold(self): with pytest.raises(ValueError, match='config fail threshold'): @@ -59,7 +77,13 @@ class TestConstant: spark_fetch_trace_interval=5, spark_exec_timeout_ratio=0.5, spark_history_username='user', - spark_history_password='pass' + spark_history_password='pass', + tuning_strategies=[], + postgresql_database_name='name', + postgresql_database_user='user', + postgresql_database_password='password', + postgresql_database_host='host', + postgresql_database_port='port' )) def test_check_oa_conf_when_invalid_fetch_trace_timeout(self): with pytest.raises(ValueError, match='spark fetch trace timeout'): @@ -73,7 +97,13 @@ class TestConstant: spark_fetch_trace_interval=0, spark_exec_timeout_ratio=0.5, spark_history_username='user', - spark_history_password='pass' + spark_history_password='pass', + tuning_strategies=[], + postgresql_database_name='name', + postgresql_database_user='user', + postgresql_database_password='password', + postgresql_database_host='host', + postgresql_database_port='port' )) def test_check_oa_conf_when_invalid_fetch_trace_interval(self): with pytest.raises(ValueError, match='spark fetch trace interval'): @@ -87,7 +117,13 @@ class TestConstant: # invalid spark_exec_timeout_ratio=0, spark_history_username='user', - spark_history_password='pass' + spark_history_password='pass', + tuning_strategies=[], + postgresql_database_name='name', + postgresql_database_user='user', + postgresql_database_password='password', + postgresql_database_host='host', + postgresql_database_port='port' )) def test_check_oa_conf_when_invalid_exec_timeout_ratio(self): with pytest.raises(ValueError, match='spark exec timeout ratio'): @@ -101,7 +137,13 @@ class TestConstant: spark_exec_timeout_ratio=0.5, spark_history_username='user', # mismatch - spark_history_password='' + spark_history_password='', + tuning_strategies=[], + postgresql_database_name='name', + postgresql_database_user='user', + postgresql_database_password='password', + postgresql_database_host='host', + postgresql_database_port='port' )) def test_check_oa_conf_when_username_password_mismatch_case1(self): with pytest.raises(ValueError, match='username and password'): @@ -115,7 +157,13 @@ class TestConstant: spark_exec_timeout_ratio=0.5, # mismatch spark_history_username='', - spark_history_password='pass' + spark_history_password='pass', + tuning_strategies=[], + postgresql_database_name='name', + postgresql_database_user='user', + postgresql_database_password='password', + postgresql_database_host='host', + postgresql_database_port='port' )) def test_check_oa_conf_when_username_password_mismatch_case2(self): with pytest.raises(ValueError, match='username and password'): -- Gitee