From 05915345486869ab583f3d0fccdbcb9c7d633d97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Mon, 9 Jun 2025 09:46:05 +0800 Subject: [PATCH 1/4] =?UTF-8?q?1.=E5=AE=8C=E5=96=84hijack=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=E7=9A=84=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95=202.spar?= =?UTF-8?q?k=5Fcmd=5Fparser=E5=A2=9E=E5=8A=A0=E5=AF=B9=E4=BA=8Eboolean?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E5=8F=82=E6=95=B0=E7=9A=84=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/spark_service/spark_cmd_parser.py | 14 +- .../interface/test_hijack_recommend.py | 220 ++++++++++++------ .../spark_service/test_spark_cmd_parser.py | 102 +++++++- 3 files changed, 257 insertions(+), 79 deletions(-) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index b798bc9f5..395d9652c 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -55,12 +55,8 @@ class SparkCMDParser: _parser.add_argument('--driver-class-path', dest='driver-class-path') _parser.add_argument('--executor-memory', dest='executor-memory') _parser.add_argument('--proxy-user', dest='proxy-user') - _parser.add_argument('--verbose') _parser.add_argument('--version') _parser.add_argument('--driver-cores', dest='driver-cores') - _parser.add_argument('--supervise') - _parser.add_argument('--kill') - _parser.add_argument('--status') _parser.add_argument('--total-executor-cores', dest='total-executor-cores') _parser.add_argument('--executor-cores', dest='executor-cores') _parser.add_argument('--num-executors', dest='num-executors') @@ -71,6 +67,11 @@ class SparkCMDParser: _parser.add_argument('-e', type=str, help='SQL statement to execute.') _parser.add_argument('-f', type=str, help='File containing SQL script.') _parser.add_argument('-i', help='Initialization SQL file') + # boolean变量 + _parser.add_argument('--verbose', action='store_true') + _parser.add_argument('--supervise', action='store_true') + _parser.add_argument('--kill', action='store_true') + _parser.add_argument('--status', action='store_true') @staticmethod def _normalize_value(value): @@ -150,6 +151,11 @@ class SparkCMDParser: # -i参数在处理上不支持--的形式 单独处理 if key in _SINGLE_HORIZONTAL_BAR_KEYS: cmd_fields += [f'-{key}', cls._normalize_value(value)] + # boolean类型的参数单独处理 + elif isinstance(value, bool): + if value: # True: 添加 flag + cmd_fields += [f'--{key}'] + # False: 不添加该参数 # 单独处理 remainder = ['--uk1','xxxx','--uk2','xxxx'] 这部分未被解析的残余参数 elif key == _CMD_UNKNOWN_KEY: cmd_fields += [cls._normalize_value(item) for item in value] diff --git a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py index 8fb13a32b..da46b039c 100644 --- a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py +++ b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py @@ -1,87 +1,167 @@ import unittest +import pytest from unittest.mock import MagicMock, patch -from common.constant import OA_CONF from omniadvisor.interface.hijack_recommend import hijack_recommend # 测试代码 class TestHijackRecommend(unittest.TestCase): - @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") - @patch("omniadvisor.interface.hijack_recommend.LoadRepository.query_by_exec_attr_and_default_config") + # 场景 1: 正常流程,Spark 执行成功 + @patch("omniadvisor.interface.hijack_recommend._process_load_config") @patch("omniadvisor.interface.hijack_recommend.spark_run") - def test_hijack_recommend_success(self, mock_spark_run, mock_load_repo_query, mock_parse_cmd): - """ - 测试 hijack_recommend 在任务成功时的行为。 - """ - # 模拟输入的 Spark SQL 命令 - spark_cmd_argv = ['test', '--conf', 'test=test'] - - # 模拟 parse_cmd 的行为 - mock_parse_cmd.return_value = ( - {'name': 'example_name', 'cpu': 4}, - {'timeout': 30} - ) - - # 模拟 LoadRepository.query 返回一个有效的负载对象 - mock_load = MagicMock() - mock_load.tuning_needed = False - mock_load.default_config = {"timeout": 30} - mock_load.attr_exec = {"name": "example_name"} - mock_load_repo_query.return_value = [mock_load] - - # 模拟 spark_run 的行为 - mock_spark_run.return_value = (MagicMock(status=OA_CONF.ExecStatus.fail), 'spark_output') - - # 调用被测函数 - hijack_recommend(argv=spark_cmd_argv) - - # 验证 LoadRepository.query 被正确调用 - mock_load_repo_query.assert_called_once_with(exec_attr={'name': 'example_name', 'cpu': 4}, - default_config={"timeout": 30}) - - # 验证 spark_run 被正确调用 - mock_spark_run.assert_called_once_with(mock_load, {"timeout": 30}) - + @patch("omniadvisor.interface.hijack_recommend._get_exec_config_from_load") + @patch("omniadvisor.interface.hijack_recommend._query_or_create_load") @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") - @patch("omniadvisor.interface.hijack_recommend.LoadRepository") - @patch("omniadvisor.interface.hijack_recommend.spark_run") + def test_successful_execution(self, mock_parse_cmd, mock_query_load, mock_get_config, mock_spark_run, mock_process_config): + argv = ["--class", "Job", "--conf", "spark.executor.memory=4g"] + exec_attr = {"name": "test_job"} + user_config = {"spark.executor.memory": "4g"} + load = MagicMock(test_config={}, default_config=user_config) + exec_config = user_config + + mock_parse_cmd.return_value = (exec_attr, user_config) + mock_query_load.return_value = load + mock_get_config.return_value = exec_config + mock_spark_run.return_value = (MagicMock(status="success"), "job output") + + hijack_recommend(argv) + mock_spark_run.assert_called_once_with(load, exec_config) + mock_process_config.assert_not_called() + + # 场景 2: 执行失败 + 进入安全机制 @patch("omniadvisor.interface.hijack_recommend._process_load_config") - def test_hijack_recommend_failure_with_fallback(self, mock_refreshed_load, mock_spark_run, mock_load_repo, mock_parse_cmd): - """ - 测试 hijack_recommend 在任务失败且需要回退到用户默认配置时的行为。 - """ - # 模拟输入的 Spark SQL 命令 - spark_cmd_argv = ['test', '--conf', 'test=test'] - - # 模拟 parse_cmd 的行为 - mock_parse_cmd.return_value = ( - {'name': 'example_name', 'cpu': 4}, - {'timeout': 30} - ) - - # 模拟 LoadRepository.query 返回一个有效的负载对象 - mock_load = MagicMock() - mock_load.tuning_needed = True - mock_load.test_config = {"timeout": 60} # 使用待测试配置 - mock_load_repo.query_by_exec_attr_and_default_config.return_value = [mock_load] - - # 模拟 spark_run 的行为 + @patch("omniadvisor.interface.hijack_recommend.spark_run") + @patch("omniadvisor.interface.hijack_recommend._get_exec_config_from_load") + @patch("omniadvisor.interface.hijack_recommend._query_or_create_load") + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") + def test_fallback_to_safe_config(self, mock_parse_cmd, mock_query_load, mock_get_config, mock_spark_run, mock_process_config): + argv = ["--class", "Job", "--conf", "spark.executor.memory=4g"] + exec_attr = {"name": "job_name"} + user_config = {"spark.executor.memory": "4g"} + load = MagicMock(test_config={}, default_config=user_config) + exec_config = {"spark.executor.memory": "8g"} + + mock_parse_cmd.return_value = (exec_attr, user_config) + mock_query_load.return_value = load + mock_get_config.return_value = exec_config + mock_spark_run.side_effect = [ - (MagicMock(status=OA_CONF.ExecStatus.fail), 'spark_output1'), # 第一次任务失败 - (MagicMock(status=OA_CONF.ExecStatus.success), 'spark_output2') # 回退到默认配置后成功 + (MagicMock(status="fail"), "fail output"), + (MagicMock(status="success"), "safe output") ] - # 调用被测函数 - hijack_recommend(argv=spark_cmd_argv) + hijack_recommend(argv) + assert mock_spark_run.call_count == 2 + mock_process_config.assert_not_called() - # 验证 LoadRepository.query 被正确调用 - mock_load_repo.query_by_exec_attr_and_default_config.assert_called_once_with( - exec_attr={'name': 'example_name', 'cpu': 4}, - default_config={"timeout": 30}) + # 场景 3: 执行失败 + 不进入安全机制 + @patch("omniadvisor.interface.hijack_recommend._process_load_config") + @patch("omniadvisor.interface.hijack_recommend.spark_run") + @patch("omniadvisor.interface.hijack_recommend._get_exec_config_from_load") + @patch("omniadvisor.interface.hijack_recommend._query_or_create_load") + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") + def test_failed_user_config(self, mock_parse_cmd, mock_query_load, mock_get_config, mock_spark_run, mock_process_config): + argv = ["--conf", "spark.executor.memory=4g"] + exec_attr = {"name": "job_name"} + user_config = {"spark.executor.memory": "4g"} + load = MagicMock(test_config={}, default_config=user_config) + exec_config = user_config + + mock_parse_cmd.return_value = (exec_attr, user_config) + mock_query_load.return_value = load + mock_get_config.return_value = exec_config + mock_spark_run.return_value = (MagicMock(status="fail"), "fail output") + + hijack_recommend(argv) + # 不进入安全机制的情况call_count的值为1 + assert mock_spark_run.call_count == 1 + mock_process_config.assert_not_called() + + # 场景 4: 缺少任务名称 + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") + def test_missing_task_name(self, mock_parse_cmd): + argv = ["--conf", "spark.executor.memory=4g"] + mock_parse_cmd.return_value = ({}, {"spark.executor.memory": "4g"}) - # 验证 spark_run 被调用了两次(第一次使用待测试配置,第二次回退到默认配置) - assert mock_spark_run.call_count == 2 - mock_spark_run.assert_any_call(mock_load, {"timeout": 60}) # 第一次调用 - mock_spark_run.assert_any_call(mock_load, {"timeout": 30}) # 第二次调用 + with pytest.raises(ValueError, match="Task name not in Spark submit cmd"): + hijack_recommend(argv) + + # 场景 5: 执行的是 test_config,触发 _process_load_config + @patch("omniadvisor.interface.hijack_recommend._process_load_config") + @patch("omniadvisor.interface.hijack_recommend.spark_run") + @patch("omniadvisor.interface.hijack_recommend._get_exec_config_from_load") + @patch("omniadvisor.interface.hijack_recommend._query_or_create_load") + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") + def test_process_test_config(self, mock_parse_cmd, mock_query_load, mock_get_config, mock_spark_run, mock_process_config): + argv = ["--class", "Job", "--conf", "spark.executor.memory=4g"] + exec_attr = {"name": "job"} + user_config = {"spark.executor.memory": "4g"} + test_config = {"spark.executor.memory": "6g"} + load = MagicMock(test_config=test_config, default_config=user_config) + exec_config = test_config + + mock_parse_cmd.return_value = (exec_attr, user_config) + mock_query_load.return_value = load + mock_get_config.return_value = exec_config + mock_spark_run.return_value = (MagicMock(status="success"), "job output") + + hijack_recommend(argv) + mock_process_config.assert_called_once_with(load=load) + + # 场景 6: 命令解析异常 + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") + def test_parse_cmd_raises(self, mock_parse_cmd): + mock_parse_cmd.side_effect = Exception("parse error") + with pytest.raises(Exception, match="parse error"): + hijack_recommend(["--conf", "x"]) + # 场景 7: 创建 Load 异常 + @patch("omniadvisor.interface.hijack_recommend._query_or_create_load") + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") + def test_query_create_load_raises(self, mock_parse_cmd, mock_query_load): + mock_parse_cmd.return_value = ({"name": "job"}, {}) + mock_query_load.side_effect = Exception("db error") + + with pytest.raises(Exception, match="db error"): + hijack_recommend(["--conf", "x"]) + + # 场景 8: spark_run 抛异常 + @patch("omniadvisor.interface.hijack_recommend.spark_run") + @patch("omniadvisor.interface.hijack_recommend._get_exec_config_from_load") + @patch("omniadvisor.interface.hijack_recommend._query_or_create_load") + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") + def test_spark_run_raises(self, mock_parse_cmd, mock_query_load, mock_get_config, mock_spark_run): + argv = ["--conf", "spark.executor.memory=4g"] + exec_attr = {"name": "job"} + config = {"spark.executor.memory": "4g"} + load = MagicMock(test_config={}, default_config=config) + + mock_parse_cmd.return_value = (exec_attr, config) + mock_query_load.return_value = load + mock_get_config.return_value = config + mock_spark_run.side_effect = RuntimeError("spark failed") + + with pytest.raises(RuntimeError, match="spark failed"): + hijack_recommend(argv) + + # 场景 9: _process_load_config 内部依赖出错 + @patch("omniadvisor.interface.hijack_recommend._process_load_config") + @patch("omniadvisor.interface.hijack_recommend.spark_run") + @patch("omniadvisor.interface.hijack_recommend._get_exec_config_from_load") + @patch("omniadvisor.interface.hijack_recommend._query_or_create_load") + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") + def test_process_load_config_raises(self, mock_parse_cmd, mock_query_load, mock_get_config, mock_spark_run, mock_process_config): + argv = ["--conf", "spark.executor.memory=4g"] + exec_attr = {"name": "job"} + config = {"spark.executor.memory": "4g"} + load = MagicMock(test_config=config, default_config=config) + + mock_parse_cmd.return_value = (exec_attr, config) + mock_query_load.return_value = load + mock_get_config.return_value = config + mock_spark_run.return_value = (MagicMock(status="success"), "output") + mock_process_config.side_effect = RuntimeError("process failed") + + with pytest.raises(RuntimeError, match="process failed"): + hijack_recommend(argv) diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py index 2d3e4b27e..92cbc080a 100644 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py @@ -1,3 +1,4 @@ +import pytest from omniadvisor.service.spark_service.spark_cmd_parser import SparkCMDParser @@ -5,10 +6,9 @@ class TestSparkCMDParser: """ 测试SparkCMDParser类的功能 """ - def test_parse_cmd(self): - """ - 测试基本命令解析 - """ + # 测试命令解析 + # 场景 1: 标准参数 + conf 参数 + def test_parse_cmd_standard(self): argv = [ '--omniadvisor-cmd-prefix', 'spark-submit', '--master', 'yarn', @@ -31,7 +31,48 @@ class TestSparkCMDParser: assert conf_params['spark.executor.memory'] == '4g' assert conf_params['spark.driver.extraJavaOptions'] == '-XX:+UseG1GC -XX:+UseG1GC' - def test_reconstruct_cmd(self): + # 场景 2: 包含 supplement 参数 + def test_parse_cmd_with_supplement(self): + argv = [ + "--num-executors", "2", + "--executor-cores", "4" + ] + exec_attr, conf_params = SparkCMDParser.parse_cmd(argv) + + assert "spark.executor.instances" in conf_params + assert conf_params["spark.executor.instances"] == "2" + assert conf_params["spark.executor.cores"] == "4" + + # 场景 3: 包含布尔类型的值 + def test_parse_cmd_with_boolean_flags(self): + argv = ["--verbose", "--supervise", "--kill", "--status"] + exec_attr, _ = SparkCMDParser.parse_cmd(argv) + + assert exec_attr["verbose"] is True + assert exec_attr["supervise"] is True + assert exec_attr["kill"] is True + assert exec_attr["status"] is True + + # 场景 4: 解析包含未知参数 + def test_parse_cmd_with_unknown(self): + argv = [ + "--name", "app", + "--foo", "bar", "--bar", "baz", "--kills" + ] + exec_attr, conf_params = SparkCMDParser.parse_cmd(argv) + + assert "unknown" in exec_attr + assert "--foo" in exec_attr["unknown"] + assert "bar" in exec_attr["unknown"] + + # 场景 5: 空 argv 抛出异常 + def test_parse_cmd_empty_argv(self): + with pytest.raises(ValueError, match="Submit command cannot be null"): + SparkCMDParser.parse_cmd([]) + + # 测试命令重建 + # 场景 1: 正常重建命令行 + def test_reconstruct_cmd_normal(self): """ 测试基本命令重组 """ @@ -57,3 +98,54 @@ class TestSparkCMDParser: assert '-uk test' in cmd assert '--conf spark.executor.memory=4g' in cmd assert '--conf spark.driver.extraJavaOptions="-XX:+UseG1GC -XX:+UseG1GC"' in cmd + + # 场景 2: 缺少前缀字段抛异常 + def test_reconstruct_cmd_missing_prefix(self): + exec_attr = { + "name": "job" + } + conf_params = {} + + with pytest.raises(ValueError, match="Cmd prefix key is not in exec_attr"): + SparkCMDParser.reconstruct_cmd(exec_attr, conf_params) + + # 场景 3: 包含 unknown 参数 + def test_reconstruct_cmd_with_unknown(self): + exec_attr = { + "omniadvisor-cmd-prefix": "spark-submit", + "name": "job", + "unknown": ["--extra", "arg"] + } + conf_params = { + "spark.executor.instances": "4" + } + cmd = SparkCMDParser.reconstruct_cmd(exec_attr, conf_params) + + assert "--extra" in cmd + assert "arg" in cmd + assert "--conf spark.executor.instances=4" in cmd + + # 场景 3: 包含 boolean 参数 + def test_reconstruct_cmd_with_boolean(self): + exec_attr = { + "omniadvisor-cmd-prefix": "spark-submit", + "name": "job", + "unknown": ["--extra", "arg"], + "verbose": True, + "supervise": True, + "kill": True, + "status": True + } + conf_params = { + "spark.executor.instances": "4" + } + cmd = SparkCMDParser.reconstruct_cmd(exec_attr, conf_params) + + assert "--extra" in cmd + assert "arg" in cmd + assert "--conf spark.executor.instances=4" in cmd + assert "verbose" in cmd + assert "supervise" in cmd + assert "kill" in cmd + assert "status" in cmd + -- Gitee From 8c4f434b55eb329c265f1f53bcd675dad90236c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Thu, 12 Jun 2025 10:31:43 +0800 Subject: [PATCH 2/4] =?UTF-8?q?1.compile.py=E4=BF=AE=E6=94=B9=20=E7=BC=96?= =?UTF-8?q?=E8=AF=91=E5=90=8E=E4=BF=9D=E7=95=99spark-submit=E8=84=9A?= =?UTF-8?q?=E6=9C=AC=202.=E9=83=A8=E5=88=86=E6=97=A5=E5=BF=97=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E8=BE=93=E5=87=BA=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/compile.py | 2 +- .../src/omniadvisor/service/spark_service/spark_run.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/omniadvisor/compile.py b/omniadvisor/compile.py index 8cf90de6d..d37eaa11d 100644 --- a/omniadvisor/compile.py +++ b/omniadvisor/compile.py @@ -10,7 +10,7 @@ import subprocess PROJECT_DIR = os.path.dirname(os.path.abspath(__file__)) # 与src同级的其他资源文件夹 -RESOURCES_LIST = ['config'] +RESOURCES_LIST = ['config', 'script'] # 要编译的源文件夹的名称 COMPILE_FROM_NAME = 'src' diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index 10ab73d39..d7ce73003 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -41,7 +41,7 @@ def spark_run(load: Load, conf: dict): # 不存在application_id的情况下不提取time_taken 直接返回 if exitcode != 0: - global_logger.info(f"Spark Load execute failed, update the exam result.") + global_logger.info(f"Spark Load {load.id} execute failed, update the exam result.") try: return ExamRecordRepository.update_exam_result( exam_record=exam_record, @@ -64,7 +64,7 @@ def spark_run(load: Load, conf: dict): raise try: - global_logger.info(f"Spark Load execute success, runtime: {runtime}, update the exam result.") + global_logger.info(f"Spark Load {load.id} execute success, runtime: {runtime}, update the exam result.") exam_record = ExamRecordRepository.update_exam_result( exam_record=exam_record, status=status, -- Gitee From eab762fb20fa395421687b6a5be5bc6c70e50cf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Thu, 12 Jun 2025 11:10:50 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E5=B8=83=E5=B0=94=E7=B1=BB=E5=9E=8B?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E7=9A=84=E6=8B=BC=E6=8E=A5=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/spark_service/spark_cmd_parser.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index 395d9652c..7c6fc2982 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -13,6 +13,8 @@ _CMD_PREFIX_KEY = 'omniadvisor-cmd-prefix' _CMD_UNKNOWN_KEY = 'unknown' # 单横杠参数的字段 _SINGLE_HORIZONTAL_BAR_KEYS = ['e', 'f', 'i'] +# 布尔类型的字段 +_BOOLEAN_TYPE_KEYS = ['verbose', 'supervise', 'kill', 'status'] # 需进行转换的字段 _SPARK_CONF_SUPPLEMENT_MAP = { 'num-executors': 'spark.executor.instances', @@ -148,19 +150,18 @@ class SparkCMDParser: for key, value in exec_attr.items(): if key == _CMD_PREFIX_KEY: continue - # -i参数在处理上不支持--的形式 单独处理 - if key in _SINGLE_HORIZONTAL_BAR_KEYS: - cmd_fields += [f'-{key}', cls._normalize_value(value)] - # boolean类型的参数单独处理 - elif isinstance(value, bool): - if value: # True: 添加 flag - cmd_fields += [f'--{key}'] - # False: 不添加该参数 # 单独处理 remainder = ['--uk1','xxxx','--uk2','xxxx'] 这部分未被解析的残余参数 - elif key == _CMD_UNKNOWN_KEY: + if key == _CMD_UNKNOWN_KEY: cmd_fields += [cls._normalize_value(item) for item in value] + continue + # 分别处理-开头的参数与--开头的参数 + if key in _SINGLE_HORIZONTAL_BAR_KEYS: + cmd_fields += [f'-{key}', cls._normalize_value(value)] else: - cmd_fields += [f'--{key}', cls._normalize_value(value)] + if key in _BOOLEAN_TYPE_KEYS and value is True: + cmd_fields += [f'--{key}'] + else: + cmd_fields += [f'--{key}', cls._normalize_value(value)] # 处理参数配置 for key, value in conf_params.items(): -- Gitee From efb014139dfedb7bfed7f2f4ff1f53f6235db37d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Thu, 12 Jun 2025 15:49:59 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E9=87=8D=E6=9E=84reconstruct=5Fcmd?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/spark_service/spark_cmd_parser.py | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index 7c6fc2982..100b5cdd8 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -88,6 +88,23 @@ class SparkCMDParser: else: return value + @staticmethod + def _append_double_dash_args(cls, key, value): + """ + 用于处理spark命令中以 "--" 作为前缀的参数 + :param cls: + :param key: + :param value: + :return: + """ + if key in _BOOLEAN_TYPE_KEYS: + if value is True: + return [f'--{key}'] + else: + return [] + else: + return [f'--{key}', cls._normalize_value(value)] + @classmethod def parse_cmd(cls, argv: list): """ @@ -151,17 +168,14 @@ class SparkCMDParser: if key == _CMD_PREFIX_KEY: continue # 单独处理 remainder = ['--uk1','xxxx','--uk2','xxxx'] 这部分未被解析的残余参数 - if key == _CMD_UNKNOWN_KEY: + elif key == _CMD_UNKNOWN_KEY: cmd_fields += [cls._normalize_value(item) for item in value] - continue - # 分别处理-开头的参数与--开头的参数 - if key in _SINGLE_HORIZONTAL_BAR_KEYS: + # 单独处理 - 作为前缀的参数 + elif key in _SINGLE_HORIZONTAL_BAR_KEYS: cmd_fields += [f'-{key}', cls._normalize_value(value)] + # 处理剩余 -- 作为前缀的参数 else: - if key in _BOOLEAN_TYPE_KEYS and value is True: - cmd_fields += [f'--{key}'] - else: - cmd_fields += [f'--{key}', cls._normalize_value(value)] + cmd_fields += cls._append_double_dash_args(cls, key, value) # 处理参数配置 for key, value in conf_params.items(): -- Gitee