diff --git a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py index f6dd543745c711afa1a39a293125567c66da24d4..e29f9000ad3a293ac7b9d4143429e8fc1002dc5f 100644 --- a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py +++ b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py @@ -99,6 +99,7 @@ def hijack_recommend(argv: list): :param argv: Spark执行命令字段 """ # 获取用户传入的Spark命令 并解析命令 + global_logger.debug("Hijack input params: %s", argv) exec_attr, user_config = SparkCMDParser.parse_cmd(argv=argv) # 提取任务名字 if 'name' not in exec_attr.keys(): diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index 2e8b44d384ba739dd69cc66d8e3ba304b0558e3f..0a0537d57be27179f5db4824634e609d6f08881e 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -78,12 +78,12 @@ class SparkCMDParser: @staticmethod def _normalize_value(value): """ - 对spark-sql命令解析出来的value做标准化处理,对所有value的值使用单引号包裹进行强引用,维持字符串字面量的原义 + 对spark-sql命令解析出来的value做标准化处理 :param value: 原始命令解析出的value :return: 标准化的value """ - return f"'{value}'" + return value @staticmethod def _append_double_dash_args(cls, key, value): @@ -145,20 +145,21 @@ class SparkCMDParser: return exec_attr, conf_params @classmethod - def reconstruct_cmd(cls, exec_attr: dict, conf_params: dict): + def reconstruct_cmd(cls, exec_attr: dict, conf_params: dict) -> list: """ 根据执行属性和参数配置,重组命令 :param exec_attr: 执行属性 :param conf_params: 参数配置 - :return: + :return: cmd_fields 参数列表 """ cmd_fields = list() # 处理命令行开头 if _CMD_PREFIX_KEY not in exec_attr: raise ValueError(f"Cmd prefix key is not in exec_attr!") - cmd_fields += [exec_attr.get(_CMD_PREFIX_KEY)] + # 把${SPARK_HOME}/bin/spark-class org.apache.spark.deploy.SparkSubmit通过空格拆分成含有两个元素的参数列表 + cmd_fields += exec_attr.get(_CMD_PREFIX_KEY).split(" ") # 处理执行属性 for key, value in exec_attr.items(): @@ -178,5 +179,5 @@ class SparkCMDParser: for key, value in conf_params.items(): cmd_fields += ['--conf', f'{key}={cls._normalize_value(value)}'] - cmd = ' '.join(cmd_fields) - return cmd + global_logger.debug("cmd_fields = %s", cmd_fields) + return cmd_fields diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index f73a4f692cac73ed1b2cbb6cc32576658d882744..3cbef3e5ca3d1ce0950b650f691a1e6489d0d83a 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -17,7 +17,10 @@ from omniadvisor.utils.utils import save_trace_data CODE_MAP = { 0: "Success", 124: "timeout", - 1: "Spark SQL failed" + 1: "Spark SQL failed", + 255: "Spark subprocess(eg. Excutor,external command) terminated abnormally.", + 143: "Terminated by SIGTERM", + 2: "CLI argument error" } @@ -36,7 +39,7 @@ def spark_run(load: Load, conf: dict): # 获取当前default_config的平均测试性能 baseline_results = get_tuning_result(load, load.default_config) timeout_sec = OA_CONF.spark_exec_timeout_ratio * baseline_results.runtime - submit_cmd = f"timeout {timeout_sec} " + submit_cmd + submit_cmd[:0] = ["timeout", str(timeout_sec)] global_logger.debug(f"The submit cmd about to execute is: {submit_cmd}") # 根据执行命令创建测试记录 @@ -107,16 +110,16 @@ def _submit_spark_task(exam_record, submit_cmd): ExamRecordRepository.update_end_time(exam_record) except TimeoutError: # 任务提交超时等 - exam_record.delete() + ExamRecordRepository.delete(exam_record) global_logger.error('Spark command submission timed out.') raise except OSError: # 权限不足等 - exam_record.delete() + ExamRecordRepository.delete(exam_record) global_logger.error('Spark command submission permission denied.') raise except Exception: - exam_record.delete() + ExamRecordRepository.delete(exam_record) global_logger.error('During Spark command submission, known error occurred.') raise return exitcode, spark_output diff --git a/omniadvisor/src/omniadvisor/utils/utils.py b/omniadvisor/src/omniadvisor/utils/utils.py index fea781692b3a9bd3aa5a1af7b0f2efa8b3140dda..c4dee72f2776807be5ceb448fc3213d76225d08f 100644 --- a/omniadvisor/src/omniadvisor/utils/utils.py +++ b/omniadvisor/src/omniadvisor/utils/utils.py @@ -23,7 +23,7 @@ def run_cmd(submit_cmd) -> Tuple[int, str]: kwargs = { 'stdout': subprocess.PIPE, 'stderr': subprocess.STDOUT, - 'shell': True, + 'shell': False, 'text': True } result = subprocess.run(submit_cmd, **kwargs) diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py index 2b12bb210ec839a58cfacb55329ec3c461ce02ae..99e86fa73cae816758d4a015954e31cc80a5c75b 100644 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py @@ -90,14 +90,10 @@ class TestSparkCMDParser: cmd = SparkCMDParser.reconstruct_cmd(exec_attr, conf_params) - assert cmd.startswith(exec_attr['omniadvisor-cmd-prefix']) - assert "--master 'yarn'" in cmd - assert "--name 'test_app'" in cmd - assert "-e 'SELECT * FROM table'" in cmd - assert "'--unknown1' 'test'" in cmd - assert "'-uk' 'test'" in cmd - assert "--conf spark.executor.memory='4g'" in cmd - assert "--conf spark.driver.extraJavaOptions='-XX:+UseG1GC -XX:+UseG1GC'" in cmd + assert exec_attr['omniadvisor-cmd-prefix'] == cmd[0] + assert "SELECT * FROM table" in cmd + assert "spark.executor.memory=4g" in cmd + assert "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseG1GC" in cmd # 场景 2: 缺少前缀字段抛异常 def test_reconstruct_cmd_missing_prefix(self): @@ -123,7 +119,7 @@ class TestSparkCMDParser: assert "--extra" in cmd assert "arg" in cmd - assert "--conf spark.executor.instances='4'" in cmd + assert "spark.executor.instances=4" in cmd # 场景 3: 包含 boolean 参数 def test_reconstruct_cmd_with_boolean(self): @@ -134,7 +130,7 @@ class TestSparkCMDParser: "verbose": True, "supervise": True, "kill": True, - "status": True + "status": False } conf_params = { "spark.executor.instances": "4" @@ -143,9 +139,9 @@ class TestSparkCMDParser: assert "--extra" in cmd assert "arg" in cmd - assert "--conf spark.executor.instances='4'" in cmd - assert "verbose" in cmd - assert "supervise" in cmd - assert "kill" in cmd - assert "status" in cmd + assert "spark.executor.instances=4" in cmd + assert "--verbose" in cmd + assert "--supervise" in cmd + assert "--kill" in cmd + assert "--status" not in cmd