From a791f6c7ba5a82ec5e5e226fea5e11709b582373 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Tue, 15 Jul 2025 14:23:20 +0800 Subject: [PATCH] =?UTF-8?q?1.=E9=87=87=E7=94=A8subprocess.run=E7=9A=84?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E5=88=97=E8=A1=A8=E5=BD=A2=E5=BC=8F=E6=8F=90?= =?UTF-8?q?=E4=BA=A4spark=E4=BB=BB=E5=8A=A1=202.=E4=BF=AE=E5=A4=8Dexam=5Fr?= =?UTF-8?q?ecord.delete()=E8=A2=AB=E9=94=99=E8=AF=AF=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98=203.=E7=9B=B8=E5=85=B3=E5=8D=95?= =?UTF-8?q?=E5=85=83=E6=B5=8B=E8=AF=95=E5=90=8C=E6=AD=A5=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1.采用subprocess.run的参数列表形式提交spark任务 2.修复exam_record.delete()被错误调用的问题 3.相关单元测试同步修改 --- .../omniadvisor/interface/hijack_recommend.py | 1 + .../service/spark_service/spark_cmd_parser.py | 15 ++++++----- .../service/spark_service/spark_run.py | 13 ++++++---- omniadvisor/src/omniadvisor/utils/utils.py | 2 +- .../spark_service/test_spark_cmd_parser.py | 26 ++++++++----------- 5 files changed, 29 insertions(+), 28 deletions(-) diff --git a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py index f6dd54374..e29f9000a 100644 --- a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py +++ b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py @@ -99,6 +99,7 @@ def hijack_recommend(argv: list): :param argv: Spark执行命令字段 """ # 获取用户传入的Spark命令 并解析命令 + global_logger.debug("Hijack input params: %s", argv) exec_attr, user_config = SparkCMDParser.parse_cmd(argv=argv) # 提取任务名字 if 'name' not in exec_attr.keys(): diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index 2e8b44d38..0a0537d57 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -78,12 +78,12 @@ class SparkCMDParser: @staticmethod def _normalize_value(value): """ - 对spark-sql命令解析出来的value做标准化处理,对所有value的值使用单引号包裹进行强引用,维持字符串字面量的原义 + 对spark-sql命令解析出来的value做标准化处理 :param value: 原始命令解析出的value :return: 标准化的value """ - return f"'{value}'" + return value @staticmethod def _append_double_dash_args(cls, key, value): @@ -145,20 +145,21 @@ class SparkCMDParser: return exec_attr, conf_params @classmethod - def reconstruct_cmd(cls, exec_attr: dict, conf_params: dict): + def reconstruct_cmd(cls, exec_attr: dict, conf_params: dict) -> list: """ 根据执行属性和参数配置,重组命令 :param exec_attr: 执行属性 :param conf_params: 参数配置 - :return: + :return: cmd_fields 参数列表 """ cmd_fields = list() # 处理命令行开头 if _CMD_PREFIX_KEY not in exec_attr: raise ValueError(f"Cmd prefix key is not in exec_attr!") - cmd_fields += [exec_attr.get(_CMD_PREFIX_KEY)] + # 把${SPARK_HOME}/bin/spark-class org.apache.spark.deploy.SparkSubmit通过空格拆分成含有两个元素的参数列表 + cmd_fields += exec_attr.get(_CMD_PREFIX_KEY).split(" ") # 处理执行属性 for key, value in exec_attr.items(): @@ -178,5 +179,5 @@ class SparkCMDParser: for key, value in conf_params.items(): cmd_fields += ['--conf', f'{key}={cls._normalize_value(value)}'] - cmd = ' '.join(cmd_fields) - return cmd + global_logger.debug("cmd_fields = %s", cmd_fields) + return cmd_fields diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index f73a4f692..3cbef3e5c 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -17,7 +17,10 @@ from omniadvisor.utils.utils import save_trace_data CODE_MAP = { 0: "Success", 124: "timeout", - 1: "Spark SQL failed" + 1: "Spark SQL failed", + 255: "Spark subprocess(eg. Excutor,external command) terminated abnormally.", + 143: "Terminated by SIGTERM", + 2: "CLI argument error" } @@ -36,7 +39,7 @@ def spark_run(load: Load, conf: dict): # 获取当前default_config的平均测试性能 baseline_results = get_tuning_result(load, load.default_config) timeout_sec = OA_CONF.spark_exec_timeout_ratio * baseline_results.runtime - submit_cmd = f"timeout {timeout_sec} " + submit_cmd + submit_cmd[:0] = ["timeout", str(timeout_sec)] global_logger.debug(f"The submit cmd about to execute is: {submit_cmd}") # 根据执行命令创建测试记录 @@ -107,16 +110,16 @@ def _submit_spark_task(exam_record, submit_cmd): ExamRecordRepository.update_end_time(exam_record) except TimeoutError: # 任务提交超时等 - exam_record.delete() + ExamRecordRepository.delete(exam_record) global_logger.error('Spark command submission timed out.') raise except OSError: # 权限不足等 - exam_record.delete() + ExamRecordRepository.delete(exam_record) global_logger.error('Spark command submission permission denied.') raise except Exception: - exam_record.delete() + ExamRecordRepository.delete(exam_record) global_logger.error('During Spark command submission, known error occurred.') raise return exitcode, spark_output diff --git a/omniadvisor/src/omniadvisor/utils/utils.py b/omniadvisor/src/omniadvisor/utils/utils.py index fea781692..c4dee72f2 100644 --- a/omniadvisor/src/omniadvisor/utils/utils.py +++ b/omniadvisor/src/omniadvisor/utils/utils.py @@ -23,7 +23,7 @@ def run_cmd(submit_cmd) -> Tuple[int, str]: kwargs = { 'stdout': subprocess.PIPE, 'stderr': subprocess.STDOUT, - 'shell': True, + 'shell': False, 'text': True } result = subprocess.run(submit_cmd, **kwargs) diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py index 2b12bb210..99e86fa73 100644 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py @@ -90,14 +90,10 @@ class TestSparkCMDParser: cmd = SparkCMDParser.reconstruct_cmd(exec_attr, conf_params) - assert cmd.startswith(exec_attr['omniadvisor-cmd-prefix']) - assert "--master 'yarn'" in cmd - assert "--name 'test_app'" in cmd - assert "-e 'SELECT * FROM table'" in cmd - assert "'--unknown1' 'test'" in cmd - assert "'-uk' 'test'" in cmd - assert "--conf spark.executor.memory='4g'" in cmd - assert "--conf spark.driver.extraJavaOptions='-XX:+UseG1GC -XX:+UseG1GC'" in cmd + assert exec_attr['omniadvisor-cmd-prefix'] == cmd[0] + assert "SELECT * FROM table" in cmd + assert "spark.executor.memory=4g" in cmd + assert "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseG1GC" in cmd # 场景 2: 缺少前缀字段抛异常 def test_reconstruct_cmd_missing_prefix(self): @@ -123,7 +119,7 @@ class TestSparkCMDParser: assert "--extra" in cmd assert "arg" in cmd - assert "--conf spark.executor.instances='4'" in cmd + assert "spark.executor.instances=4" in cmd # 场景 3: 包含 boolean 参数 def test_reconstruct_cmd_with_boolean(self): @@ -134,7 +130,7 @@ class TestSparkCMDParser: "verbose": True, "supervise": True, "kill": True, - "status": True + "status": False } conf_params = { "spark.executor.instances": "4" @@ -143,9 +139,9 @@ class TestSparkCMDParser: assert "--extra" in cmd assert "arg" in cmd - assert "--conf spark.executor.instances='4'" in cmd - assert "verbose" in cmd - assert "supervise" in cmd - assert "kill" in cmd - assert "status" in cmd + assert "spark.executor.instances=4" in cmd + assert "--verbose" in cmd + assert "--supervise" in cmd + assert "--kill" in cmd + assert "--status" not in cmd -- Gitee