diff --git a/omniadvisor/src/omniadvisor/interface/config_tuning.py b/omniadvisor/src/omniadvisor/interface/config_tuning.py index a735c323b0e2f81ebd3dbd6bd2ba24f1293446f5..226a4eef4f426111c2d0009af0ecd87af42d3d0e 100644 --- a/omniadvisor/src/omniadvisor/interface/config_tuning.py +++ b/omniadvisor/src/omniadvisor/interface/config_tuning.py @@ -184,7 +184,7 @@ def main(): try: unified_tuning(load, args.retest_way, args.tuning_method) except NoOptimalConfigError: - global_logger.error( + global_logger.warning( 'The tuning method: %s cannot find optimal config, please try other method.', args.tuning_method ) return @@ -197,5 +197,5 @@ def main(): try: unified_tuning(load, args.retest_way, tuning_method) except NoOptimalConfigError: - global_logger.info('The tuning method: %s cannot find optimal config, trying next.', tuning_method) + global_logger.warning('The tuning method: %s cannot find optimal config, trying next.', tuning_method) break diff --git a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py index ef6a67f4d61aa21931dd8aee83a7119f68cd239d..02eeb4eaedb67d71c1ba69226c8c347050dfc046 100644 --- a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py +++ b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py @@ -117,7 +117,7 @@ def hijack_recommend(argv: list): if exam_record.status == OA_CONF.ExecStatus.success: # 打印结果输出 global_logger.info("Spark execute success, going to print Spark output.") - print(output) + print(output, end="") # 若执行失败 则判断是否需要拉起安全机制 else: if exec_config != user_config: @@ -125,10 +125,10 @@ def hijack_recommend(argv: list): safe_exam_record, safe_output = spark_run(load, user_config) global_logger.info("Spark execute in security protection mechanism, going to print Spark output.") # 打印安全机制下任务的输出 - print(safe_output) + print(safe_output, end="") else: global_logger.warning("Spark execute failed in user config, going to print Spark output.") - print(output) + print(output, end="") if exec_config == load.test_config: _process_load_config(load=load) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index 100b5cdd8ea9acfa9b50c24cb361a4e5c3972a9b..2f865a2b13b01183aaf8f424434ef292dc458015 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -52,7 +52,6 @@ class SparkCMDParser: _parser.add_argument('--archives') _parser.add_argument('--conf', action='append', help="Arbitrary Spark configuration property") _parser.add_argument('--driver-memory', dest='driver-memory') - _parser.add_argument('--driver-java-options', dest='driver-java-options') _parser.add_argument('--driver-library-path', dest='driver-library-path') _parser.add_argument('--driver-class-path', dest='driver-class-path') _parser.add_argument('--executor-memory', dest='executor-memory') @@ -78,15 +77,12 @@ class SparkCMDParser: @staticmethod def _normalize_value(value): """ - 对spark-sql命令解析出来的value做标准化处理,将含有空格字符串类型的值用双引号包裹 + 对spark-sql命令解析出来的value做标准化处理,对所有value的值使用单引号包裹进行强引用,维持字符串字面量的原义 :param value: 原始命令解析出的value :return: 标准化的value """ - if isinstance(value, str) and ' ' in value: - return f'"{value}"' - else: - return value + return f"'{value}'" @staticmethod def _append_double_dash_args(cls, key, value): diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index 4c03a96ea19771d9ff50019c4ba4fc217810b87c..f73a4f692cac73ed1b2cbb6cc32576658d882744 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -1,6 +1,5 @@ import multiprocessing import time - from requests.exceptions import HTTPError from common.constant import OA_CONF @@ -15,6 +14,13 @@ from omniadvisor.utils.logger import global_logger from omniadvisor.utils.utils import save_trace_data +CODE_MAP = { + 0: "Success", + 124: "timeout", + 1: "Spark SQL failed" +} + + def spark_run(load: Load, conf: dict): """ 输入负载与配置,执行Spark任务 @@ -41,7 +47,11 @@ def spark_run(load: Load, conf: dict): # 不存在application_id的情况下不提取time_taken 直接返回 if exitcode != 0: - global_logger.info(f"Spark Load {load.id} execute failed, update the exam result.") + exitcode_describe = CODE_MAP.get(exitcode, "Abnormal exit code") + global_logger.warning(f"Spark Load %s did not exit normally and return a non-zero exit code {exitcode}" + f", exitcode describe: %s", str(load.id), exitcode_describe) + global_logger.info(f"Update exam result, set the exam exec status to %s because of %s", + OA_CONF.ExecStatus.fail, exitcode_describe) try: return ExamRecordRepository.update_exam_result( exam_record=exam_record, diff --git a/omniadvisor/src/omniadvisor/utils/logger.py b/omniadvisor/src/omniadvisor/utils/logger.py index 93754762337f3fca6c4d2fdda15ecad944953cfe..3a2e0c8af18955cd6b327d4a61f318c7fb4cf6d5 100755 --- a/omniadvisor/src/omniadvisor/utils/logger.py +++ b/omniadvisor/src/omniadvisor/utils/logger.py @@ -1,5 +1,6 @@ import os import logging +import warnings from logging.config import dictConfig from common.constant import OA_CONF @@ -65,6 +66,9 @@ global_logger = logging.getLogger('omniadvisor') # 禁止logger作用域上升到root global_logger.propagate = False +# 屏蔽warnings模块中的RuntimeWarning 避免smac库在运行时偶发的warning打印影响用户体验 +warnings.filterwarnings('ignore', category=RuntimeWarning) + # 屏蔽部分第三方库中的Logger modules_setting_error = ['smac'] for model in modules_setting_error: