From 469e460ec1ea3b6a52cbe6af99fb6b6aa2d2f0bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Tue, 19 Aug 2025 15:30:34 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dspark=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E5=AF=B9=E4=BA=8Eprimary=5Fresource=E6=8F=90?= =?UTF-8?q?=E5=8F=96=E4=B8=8E=E5=8E=9F=E7=94=9Fspark=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=E5=99=A8=E6=9C=89=E5=B7=AE=E5=BC=82=E7=9A=84=E9=97=AE=E9=A2=98?= =?UTF-8?q?=EF=BC=8C=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/spark_service/spark_cmd_parser.py | 23 +++++++----- .../spark_service/test_spark_cmd_parser.py | 36 ++++++++++++------- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index a344f6235..d2bbbd6af 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -10,8 +10,8 @@ from omniadvisor.utils.logger import global_logger # 命令行开头的字段名称 _CMD_PREFIX_KEY = 'omniadvisor-cmd-prefix' -# 命令行开头的字段名称 -_CMD_UNKNOWN_KEY = 'unknown' +# 命令中的primary_resource部分 +_CMD_PRIMARY_RESOURCE = 'primary_resource' # 命令行中class字段 _CMD_CLASS_KEY = 'class' # 单横杠参数的字段 @@ -155,8 +155,17 @@ class SparkCMDParser: exec_attr = dict() conf_params = dict() - # 参数解析 + # 原始参数解析 params, unknown = cls._parser.parse_known_args(args=argv) + + if unknown: + index = argv.index(unknown[0]) + params, sub_unknown = cls._parser.parse_known_args(args=argv[:index]) + primary_resource = argv[index:] + global_logger.warning(f"The remainder unknown params %s " + f"will be add to exec_attr[primary_resource]", primary_resource) + exec_attr[_CMD_PRIMARY_RESOURCE] = primary_resource + for key, value in vars(params).items(): # 过滤空值参数 if value is None: @@ -173,10 +182,6 @@ class SparkCMDParser: else: exec_attr[key] = value - if unknown: - global_logger.warning(f"The remainder unknown params {unknown} will be add to exec_attr[unknown]") - exec_attr[_CMD_UNKNOWN_KEY] = unknown - return exec_attr, conf_params @classmethod @@ -205,7 +210,7 @@ class SparkCMDParser: if key in [_CMD_PREFIX_KEY, _CMD_CLASS_KEY]: continue # 单独处理 unknown = ['xxxxxxx.jar', 'xxxxxx', 'xxxxx'] 这部分未被解析的残余参数追加到末尾 不在此处理 - elif key == _CMD_UNKNOWN_KEY: + elif key == _CMD_PRIMARY_RESOURCE: continue # 单独处理 - 作为前缀的参数 elif key in _SINGLE_HORIZONTAL_BAR_KEYS: @@ -219,7 +224,7 @@ class SparkCMDParser: cmd_fields += ['--conf', f'{key}={cls._normalize_value(value)}'] # 追加primary_resource - primary_resource = exec_attr.get(_CMD_UNKNOWN_KEY) + primary_resource = exec_attr.get(_CMD_PRIMARY_RESOURCE) if primary_resource: cmd_fields += [cls._normalize_value(item) for item in primary_resource] diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py index 4bf965afc..9de13b04e 100644 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py @@ -1,6 +1,6 @@ import shlex import pytest -from omniadvisor.service.spark_service.spark_cmd_parser import SparkCMDParser, _CMD_PREFIX_KEY, _CMD_UNKNOWN_KEY +from omniadvisor.service.spark_service.spark_cmd_parser import SparkCMDParser, _CMD_PREFIX_KEY, _CMD_PRIMARY_RESOURCE class TestSparkCMDParser: @@ -48,8 +48,8 @@ class TestSparkCMDParser: assert conf_params["spark.executor.memory"] == "4g" assert conf_params["spark.executor.cores"] == "2" assert conf_params["spark.executor.instances"] == "5" - assert _CMD_UNKNOWN_KEY in exec_attr - assert exec_attr[_CMD_UNKNOWN_KEY] == ["example.jar"] + assert _CMD_PRIMARY_RESOURCE in exec_attr + assert exec_attr[_CMD_PRIMARY_RESOURCE] == ["example.jar"] # 场景 2: 包含 supplement 参数 def test_parse_cmd_with_supplement(self): @@ -79,7 +79,7 @@ class TestSparkCMDParser: for flag in ["verbose", "version", "supervise", "usage-error", "help"]: assert exec_attr.get(flag) is True - assert exec_attr[_CMD_UNKNOWN_KEY] == ["example.jar"] + assert exec_attr[_CMD_PRIMARY_RESOURCE] == ["example.jar"] # 场景 4: 解析单短横杠的命令 def test_parse_cmd_single_dash_args(self): @@ -97,7 +97,7 @@ class TestSparkCMDParser: assert exec_attr["f"] == "script.sql" assert exec_attr["i"] == "init.sql" assert exec_attr["database"] == "testdb" - assert exec_attr[_CMD_UNKNOWN_KEY] == ["example.jar"] + assert exec_attr[_CMD_PRIMARY_RESOURCE] == ["example.jar"] # 场景 5: 空 argv 抛出异常 def test_parse_cmd_empty_argv(self): @@ -141,12 +141,12 @@ class TestSparkCMDParser: with pytest.raises(ValueError, match="Cmd prefix key is not in exec_attr"): SparkCMDParser.reconstruct_cmd(exec_attr, conf_params) - # 场景 3: 包含 unknown 参数 + # 场景 3: primary_resource中包含 unknown 参数 def test_reconstruct_cmd_with_unknown(self): exec_attr = { "omniadvisor-cmd-prefix": "spark-submit", "name": "job", - "unknown": ["--extra", "arg"] + "primary_resource": ["--extra", "arg"] } conf_params = { "spark.executor.instances": "4" @@ -162,7 +162,7 @@ class TestSparkCMDParser: exec_attr = { "omniadvisor-cmd-prefix": "spark-submit", "name": "job", - "unknown": ["--extra", "arg"], + "primary_resource": ["--extra", "arg"], "verbose": True, "supervise": False, } @@ -197,7 +197,6 @@ class TestSparkCMDParser: --files config.yaml --archives env.zip#env --verbose - --version --supervise --usage-error --help @@ -207,6 +206,8 @@ class TestSparkCMDParser: -f query.sql my_app.jar --custom-arg custom_value + --principal principal1 + --version """ # 使用 shlex.split 来模拟 shell 行为 @@ -236,7 +237,6 @@ class TestSparkCMDParser: "--files", "config.yaml", "--archives", "env.zip#env", "--verbose", - "--version", "--supervise", "--usage-error", "--help", @@ -245,7 +245,9 @@ class TestSparkCMDParser: "-e", "SELECT * FROM table", "-f", "query.sql", "my_app.jar", - "--custom-arg", "custom_value" + "--custom-arg", "custom_value", + "--principal", "principal1", + "--version" ] for item in expected: @@ -257,8 +259,18 @@ class TestSparkCMDParser: "queue", "jars", "files", "archives", "verbose", "version", "supervise", "usage-error", "help", "i", "database", "e", "f", - _CMD_UNKNOWN_KEY + _CMD_PRIMARY_RESOURCE } assert set(exec_attr.keys()).issuperset(expected_keys), "Missing expected keys in parsed exec_attr" + # Step 5 验证primary_resource是否符合预期 + expected_primary_resource = ["my_app.jar", "--custom-arg", "custom_value", + "--principal", "principal1", "--version"] + assert exec_attr[_CMD_PRIMARY_RESOURCE] == expected_primary_resource + + # Step 6 验证exec_attr是否不包含"--custom-arg", "--principal", "--version" + assert "custom-arg" not in exec_attr + assert "principal" not in exec_attr + assert exec_attr["version"] is False + -- Gitee From 299948f4828b734b996125295fb239bd93fdc506 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Tue, 19 Aug 2025 19:24:09 +0800 Subject: [PATCH 2/4] =?UTF-8?q?timeout=E7=B1=BB=E5=9E=8B=E7=94=B1int?= =?UTF-8?q?=E8=BD=AC=E5=8F=98=E4=B8=BAfloat=EF=BC=8C=E9=81=BF=E5=85=8D?= =?UTF-8?q?=E5=BD=93baseline=5Fresult.runtime=3Dfloat('inf')=E6=97=B6?= =?UTF-8?q?=E7=94=B1=E4=BA=8E=E7=B1=BB=E5=9E=8B=E8=BD=AC=E6=8D=A2=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=E5=AF=BC=E8=87=B4=E6=8A=A5=E9=94=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/omniadvisor/service/spark_service/spark_run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index 2a6eba772..6b1c9ddd1 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -98,7 +98,7 @@ def spark_run(load: Load, config: dict, wait_for_trace: bool = True, return exam_record, exec_result.output -def _calc_timeout_from_load(load: Load) -> int: +def _calc_timeout_from_load(load: Load) -> float: """ 根据负载的基线执行时间计算该负载在复测时的超时时间 @@ -107,7 +107,7 @@ def _calc_timeout_from_load(load: Load) -> int: """ # 超时时间为基线执行用时的倍数 baseline_result = get_tuning_result(load=load, config=load.default_config) - return int(OA_CONF.spark_exec_timeout_ratio * baseline_result.runtime) + return OA_CONF.spark_exec_timeout_ratio * baseline_result.runtime def _update_trace_and_runtime_from_history_server(exam_record: ExamRecord, application_id: str) -> bool: -- Gitee From 74e39c85cfcd4d148a2dd6f695387220bf25aaf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Tue, 19 Aug 2025 19:45:37 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E5=90=8D=20=E6=98=8E=E7=A1=AE=E9=85=8D=E7=BD=AE=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E7=B1=BB=E5=9E=8B=E4=B8=BAini?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/config/{common_config.cfg => common_config.ini} | 0 omniadvisor/src/common/constant.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename omniadvisor/config/{common_config.cfg => common_config.ini} (100%) diff --git a/omniadvisor/config/common_config.cfg b/omniadvisor/config/common_config.ini similarity index 100% rename from omniadvisor/config/common_config.cfg rename to omniadvisor/config/common_config.ini diff --git a/omniadvisor/src/common/constant.py b/omniadvisor/src/common/constant.py index 29c762a91..b48a2831c 100644 --- a/omniadvisor/src/common/constant.py +++ b/omniadvisor/src/common/constant.py @@ -78,7 +78,7 @@ class OmniAdvisorConf: # 配置文件路径 config_dir = f'{project_base_dir}/config' spark_repo_path = f'{config_dir}/spark_repo.json' - common_config_path = f'{config_dir}/common_config.cfg' + common_config_path = f'{config_dir}/common_config.ini' # 数据存储目录 data_dir = f'{project_base_dir}/data' -- Gitee From 2ca6cb2808f60cc7992b035faba7d19333735f34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Tue, 19 Aug 2025 20:44:07 +0800 Subject: [PATCH 4/4] =?UTF-8?q?--name=E5=8F=82=E6=95=B0=E9=95=BF=E5=BA=A6?= =?UTF-8?q?=E6=89=A9=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/src/server/app/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/omniadvisor/src/server/app/models.py b/omniadvisor/src/server/app/models.py index e700f9276..cf22d9195 100644 --- a/omniadvisor/src/server/app/models.py +++ b/omniadvisor/src/server/app/models.py @@ -9,7 +9,7 @@ class DatabaseLoad(models.Model): 负载模型 """ id = models.AutoField(primary_key=True) - name = models.CharField(max_length=100, null=True) + name = models.CharField(max_length=180, null=True) exec_attr = models.JSONField(null=False) default_config = models.JSONField(null=False) best_config = models.JSONField(null=True) -- Gitee