From 89c04b24bee7a2f2aea6af0c187955ae40a916a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Fri, 25 Jul 2025 14:50:12 +0800 Subject: [PATCH 01/10] =?UTF-8?q?=E4=BC=98=E5=8C=96spark-submit=E7=9A=84?= =?UTF-8?q?=E5=91=BD=E4=BB=A4=E8=A7=A3=E6=9E=90=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../omniadvisor/interface/hijack_recommend.py | 8 ++ .../service/spark_service/spark_cmd_parser.py | 78 +++++++++++++------ .../spark_service/test_spark_cmd_parser.py | 3 + 3 files changed, 65 insertions(+), 24 deletions(-) diff --git a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py index f6dd54374..6717e76aa 100644 --- a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py +++ b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py @@ -5,6 +5,7 @@ from omniadvisor.service.spark_service.spark_cmd_parser import SparkCMDParser from omniadvisor.repository.model.load import Load from omniadvisor.repository.load_repository import LoadRepository from omniadvisor.repository.tuning_record_repository import TuningRecordRepository +from omniadvisor.service.spark_service.spark_executor import SparkExecutor from omniadvisor.service.tuning_result.tuning_result import get_tuning_result from omniadvisor.service.tuning_result.tuning_result_history import get_tuning_result_history from omniadvisor.utils.logger import global_logger @@ -98,6 +99,13 @@ def hijack_recommend(argv: list): :param argv: Spark执行命令字段 """ + if not SparkCMDParser.validate_submit_arguments(argv): + # 非SUBMIT动作的提交直接回退到原生Spark执行 不被特性所劫持 + global_logger.debug("Spark command is not a SUBMIT command, going to execute directly") + global_logger.debug(f"The argvs list is {argv}") + exitcode, output = SparkExecutor.submit_spark_task(argv) + return + # 获取用户传入的Spark命令 并解析命令 exec_attr, user_config = SparkCMDParser.parse_cmd(argv=argv) # 提取任务名字 diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index 2e8b44d38..0d4ffe8b7 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -4,6 +4,7 @@ # Copyright (c) Huawei Technologies Co, Ltd. 2023-2023. All rights reserved. import argparse +from typing import List from omniadvisor.utils.logger import global_logger @@ -34,46 +35,75 @@ class SparkCMDParser: # 解析Spark CMD开头,需脚本配合 _parser.add_argument(f'--{_CMD_PREFIX_KEY}', dest=_CMD_PREFIX_KEY, help="The head of Spark CMD.") - # 解析Spark相关参数 - _parser.add_argument('--master', - help="spark://host:port, mesos://host:port, yarn, k8s://https://host:port, " - "or local (Default: local[*]).") + + # SparkSubmit Options 3.3.1 + _parser.add_argument('--class', help="Your application's main class(for Java / Scala apps") + _parser.add_argument('--conf', action='append', help="Arbitrary Spark configuration property") _parser.add_argument('--deploy-mode', dest='deploy-mode', help="Whether to launch the driver program locally (client) or on one of the worker " "machines inside the cluster (cluster)(Default: client)") - _parser.add_argument('--class', help="Your application's main class(for Java / Scala apps") + _parser.add_argument('--driver-class-path', dest='driver-class-path') + _parser.add_argument('--driver-cores', dest='driver-cores') + _parser.add_argument('--driver-java-options', dest='driver-java-options') + _parser.add_argument('--driver-library-path', dest='driver-library-path') + _parser.add_argument('--driver-memory', dest='driver-memory') + _parser.add_argument('--executor-memory', dest='executor-memory') + _parser.add_argument('--files') + _parser.add_argument('--jars') # TODO 这个入参和--jar有什么区别 + _parser.add_argument('--kill', action='store_true') # TODO 这也是非SUBMIT相关的命令 + _parser.add_argument('--master', + help="spark://host:port, mesos://host:port, yarn, k8s://https://host:port, " + "or local (Default: local[*]).") _parser.add_argument('--name', help="A name of your application") - _parser.add_argument('--jar') _parser.add_argument('--packages') _parser.add_argument('--exclude-packages', dest='exclude-packages') - _parser.add_argument('--repositories') - _parser.add_argument('--py-files', dest='py-files') - _parser.add_argument('--files') - _parser.add_argument('--archives') - _parser.add_argument('--conf', action='append', help="Arbitrary Spark configuration property") - _parser.add_argument('--driver-memory', dest='driver-memory') - _parser.add_argument('--driver-library-path', dest='driver-library-path') - _parser.add_argument('--driver-class-path', dest='driver-class-path') - _parser.add_argument('--executor-memory', dest='executor-memory') _parser.add_argument('--proxy-user', dest='proxy-user') - _parser.add_argument('--version') - _parser.add_argument('--driver-cores', dest='driver-cores') + _parser.add_argument('--py-files', dest='py-files') + _parser.add_argument('--repositories') + _parser.add_argument('--status', action='store_true') # TODO boolean _parser.add_argument('--total-executor-cores', dest='total-executor-cores') + + # Options that do not take arguments boolean变量 + _parser.add_argument('--help','-h', action='store_true') + _parser.add_argument('--supervise', action='store_true') + _parser.add_argument('--usage-error', action='store_true') + _parser.add_argument('--verbose', '-v', action='store_true') + _parser.add_argument('--version', action='store_true') + + # YARN-only options + _parser.add_argument('--archives') _parser.add_argument('--executor-cores', dest='executor-cores') + _parser.add_argument('--keytab') _parser.add_argument('--num-executors', dest='num-executors') _parser.add_argument('--principal') - _parser.add_argument('--keytab') _parser.add_argument('--queue') - _parser.add_argument('-d', '--database') + + + # Spark Sql 相关选项 # 支持对于--e这种形式的入参解析 但是--e为spark非官方用法 故在命令reconstruct的时候仍然使用-e的方式重建命令 _parser.add_argument('-e', '--e', type=str, help='SQL statement to execute.') _parser.add_argument('-f', type=str, help='File containing SQL script.') _parser.add_argument('-i', help='Initialization SQL file') - # boolean变量 - _parser.add_argument('--verbose', action='store_true') - _parser.add_argument('--supervise', action='store_true') - _parser.add_argument('--kill', action='store_true') - _parser.add_argument('--status', action='store_true') + _parser.add_argument('-d', '--database') + + @classmethod + def validate_submit_arguments(cls, args: List) -> bool: + """ + 检查当前提交的spark-submit命令是否属于SUBMIT动作的spark命令 + 该函数与Spark 3.3.1源码中SparkSubmitArguments.scala文件中的validateSubmitArguments逻辑保持一致 + + :param args: 从spark-submit文件中劫持的入参列表 + :return: 属于SUBMIT动作返回True 不属于SUBMIT动作返回False + """ + if len(args) == 0: + raise ValueError("No arguments were provided. At least one argument is required.") + + # 待检查参数列表 + primary_resource = "" # 其实主要就是去看这个primary_resource是从哪里拿到的 其他都是取值范围的限定 不需要我来做检查 + params, unknown = cls._parser.parse_known_args(args=args) + + if primary_resource is None: + return False @staticmethod def _normalize_value(value): diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py index 2b12bb210..8e1af5e06 100644 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py @@ -6,6 +6,9 @@ class TestSparkCMDParser: """ 测试SparkCMDParser类的功能 """ + def test_validate_submit_argument(self): + pass + # 测试命令解析 # 场景 1: 标准参数 + conf 参数 def test_parse_cmd_standard(self): -- Gitee From 0cc8da55dc18134fbbf1871d740a502a62975922 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Thu, 31 Jul 2025 14:54:48 +0800 Subject: [PATCH 02/10] =?UTF-8?q?=E4=BF=AE=E6=94=B9exam=5Frecord=E7=9A=84r?= =?UTF-8?q?untime=E8=8E=B7=E5=8F=96=E9=80=BB=E8=BE=91=EF=BC=8C=E7=94=B1Tim?= =?UTF-8?q?e=20taken=E6=94=B9=E4=B8=BALastJob-FirstJob?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/src/common/constant.py | 3 ++ .../repository/exam_record_repository.py | 21 ++++++++- .../service/spark_service/spark_executor.py | 10 +++-- .../service/spark_service/spark_fetcher.py | 45 ++++++++++++++++++- .../service/spark_service/spark_run.py | 16 ++++--- 5 files changed, 84 insertions(+), 11 deletions(-) diff --git a/omniadvisor/src/common/constant.py b/omniadvisor/src/common/constant.py index 98a20ff69..2ad42180d 100644 --- a/omniadvisor/src/common/constant.py +++ b/omniadvisor/src/common/constant.py @@ -72,6 +72,9 @@ class OmniAdvisorConf: empty_config = dict() exec_fail_return_runtime = float('inf') + # 仅作为非spark-sql提交场景执行成功的占位(此时返回结果中Time taken缺失)用变量 维持spark_executor的接口不变 + # 实际的执行时间需要从jobs中获取 + exec_complete_return_runtime = float('inf') exec_fail_return_app_id = '' exec_fail_return_trace = dict() diff --git a/omniadvisor/src/omniadvisor/repository/exam_record_repository.py b/omniadvisor/src/omniadvisor/repository/exam_record_repository.py index 29e97c086..ebb4c1e47 100644 --- a/omniadvisor/src/omniadvisor/repository/exam_record_repository.py +++ b/omniadvisor/src/omniadvisor/repository/exam_record_repository.py @@ -102,7 +102,7 @@ class ExamRecordRepository(Repository): @classmethod def update_trace(cls, exam_record: ExamRecord, trace: dict): """ - 更新测试记录中的状态 + 更新测试记录中的trace信息 :param exam_record: 测试记录实例 :param trace: 执行Trace @@ -118,6 +118,25 @@ class ExamRecordRepository(Repository): return ExamRecord(database_model=database_task) + @classmethod + def update_runtime(cls, exam_record: ExamRecord, runtime: float): + """ + 更新从jobs中获取的runtime信息 + + :param exam_record: 测试记录实例 + :param runtime: 从Jobs信息中计算出的runtime值 + :return: 更新完后的测试记录 + """ + model_attr = { + ExamRecord.FieldName.runtime: runtime + } + database_task = cls._update( + database_model=exam_record.database_model, + model_attr=model_attr + ) + + return ExamRecord(database_model=database_task) + @classmethod def delete(cls, exam_record: ExamRecord): """ diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py index 4e24575e7..b00179209 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py @@ -25,7 +25,7 @@ class SparkExecutor: @classmethod def submit_spark_task(cls, cmd_fields: list, timeout: int): """ - 在shell终端提交spark命令,并解析结果 + 在shell终端提交spark命令,并在命令成功执行并返回时解析结果中的数据 :param cmd_fields: Spark的提交命令 :param timeout: 超时时间 @@ -66,7 +66,8 @@ class SparkExecutor: @classmethod def _parser_spark_output(cls, spark_output: str): """ - 解析Spark输出,获得spark提交命令、Application ID和执行用时 + 解析Spark输出,获得spark提交命令、Application ID和Time taken + (注: Time taken后续不再作为性能指标被使用 目前仅作为性能参考获取系其数值) :param spark_output: Spark执行输出 :return: @@ -83,7 +84,10 @@ class SparkExecutor: if match_objs: time_taken_list = [float(obj) for obj in match_objs] total_time_taken = sum(time_taken_list) + global_logger.debug(f'Total time taken is {total_time_taken} seconds') else: - raise RuntimeError('Can not find Time taken message in spark output.') + # 没有捕获到Time Taken时使用OA_CONF.exec_complete_return_runtime作为返回值,后续需要从trace中获取实际运行时间 + total_time_taken = OA_CONF.exec_complete_return_runtime + global_logger.info('This spark task do not have Time taken message.') return application_id, total_time_taken diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py index de1a7dd73..544a8b4f3 100755 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py @@ -1,9 +1,10 @@ import json import requests +from datetime import datetime class SparkFetcher: - def __init__(self, history_server_url,): + def __init__(self, history_server_url, ): """ 初始化SparkFetcher类 @@ -74,3 +75,45 @@ class SparkFetcher: :return: 返回指定应用的执行器信息 """ return self._make_request(f"api/v1/applications/{app_id}/executors") + + def get_spark_jobs_by_app(self, app_id): + """ + 根据应用ID获取指定Spark应用的Jobs信息 + + :param app_id: 应用ID + :return: 返回指定应用的Jobs信息 + """ + return self._make_request(f"api/v1/applications/{app_id}/jobs") + + def get_spark_runtime_by_app(self) -> float: + """ + 通过从HistoryServer上获取的jobs信息,计算任务的执行耗时 + runtime的计算的耗时为 最早的Job提交的时间 与 最晚的Job完成时间 之间的差值(注:最晚完成的Job并不一定是最后提交的Job) + + :return: runtime (seconds) + """ + jobs_detail = self.get_spark_jobs_by_app() + + time_format = "%Y-%m-%dT%H:%M:%S.%f" + + # 获取最早的Job提交时间 + first_submit_time = jobs_detail[-1]["submissionTime"] + first_submittion_timestamp = datetime.strptime(first_submit_time.replace("GMT", ""), time_format) + + # 获取最后结束的Job的完成时间 + completiontime_list = [] + sortedcompletiontime_list = [] + for job_detail in jobs_detail: + completiontime_str = job_detail['completionTime'] + completion_timestamp = datetime.strptime(completiontime_str.replace("GMT", ""), time_format) + completiontime_list.append(completion_timestamp) + sortedcompletiontime_list = sorted(completiontime_list) + + if sortedcompletiontime_list: + last_completion_timestamp = sortedcompletiontime_list[-1] + else: + return float('inf') + + # 计算时间差 + time_diff = last_completion_timestamp - first_submittion_timestamp + return time_diff.total_seconds() diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index 403e120eb..e5b702e30 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -26,7 +26,9 @@ _RETURN_CODE_MAP = { def spark_run(load: Load, config: dict, wait_for_trace: bool = False): """ - 输入负载与配置,执行Spark任务 + 输入负载与配置,执行Spark任务 并从Spark命令的返回值中获取 + 生成一条记录本次执行信息的exam_record + 同时从执行结果中获取Application :param load: 负载 :param config: 参数配置 @@ -46,7 +48,7 @@ def spark_run(load: Load, config: dict, wait_for_trace: bool = False): global_logger.debug('Spark exec cmd is: %s, timeout is: %d', submit_cmd, timeout) exec_result = SparkExecutor.submit_spark_task(cmd_fields=submit_cmd, timeout=timeout) if exec_result.exitcode == 0: - global_logger.info('Spark Load %d execute success, runtime: %.3f', load.id, exec_result.duration) + global_logger.info('Spark Load %d execute success', load.id) exam_record_status = OA_CONF.ExecStatus.success exam_record_runtime = exec_result.duration exam_record_app_id = exec_result.app_id @@ -73,11 +75,11 @@ def spark_run(load: Load, config: dict, wait_for_trace: bool = False): if wait_for_trace: # 阻塞获取trace global_logger.info('Going to fetch Spark execute trace, the process is blocking.') - _update_trace_from_history_server(exam_record=exam_record, application_id=exec_result.app_id) + _update_trace_and_runtime_from_history_server(exam_record=exam_record, application_id=exec_result.app_id) else: # 不阻塞获取trace,通过子进程进行获取 global_logger.info('Going to fetch Spark execute trace, the process is non-blocking.') - p = multiprocessing.Process(target=_update_trace_from_history_server, args=(exam_record, exec_result.app_id)) + p = multiprocessing.Process(target=_update_trace_and_runtime_from_history_server, args=(exam_record, exec_result.app_id)) p.daemon = False p.start() @@ -96,9 +98,9 @@ def _calc_timeout_from_load(load: Load): return int(OA_CONF.spark_exec_timeout_ratio * baseline_result.runtime) -def _update_trace_from_history_server(exam_record: ExamRecord, application_id: str): +def _update_trace_and_runtime_from_history_server(exam_record: ExamRecord, application_id: str): """ - 创建一个子进程对history_server进行轮询 + 根据application_id对history_server进行轮询, 查询该任务的trace信息和runtime信息 :param application_id: Spark任务application_id :return: @@ -112,6 +114,7 @@ def _update_trace_from_history_server(exam_record: ExamRecord, application_id: s trace_sql = spark_fetcher.get_spark_sql_by_app(application_id) trace_stages = spark_fetcher.get_spark_stages_by_app(application_id) trace_executor = spark_fetcher.get_spark_executor_by_app(application_id) + runtime = spark_fetcher.get_spark_runtime_by_app() except HTTPError as httpe: time.sleep(OA_CONF.spark_fetch_trace_interval) global_logger.debug(f"Cannot access history server: %s", httpe) @@ -125,4 +128,5 @@ def _update_trace_from_history_server(exam_record: ExamRecord, application_id: s global_logger.error(f'Failed to get App %s trace from %s', application_id, history_server_url) return + ExamRecordRepository.update_trace(exam_record, runtime=runtime) ExamRecordRepository.update_trace(exam_record, trace=trace_dict) -- Gitee From 9024ebe6f972c1f81c197d42239843cbbbc36fcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Thu, 31 Jul 2025 15:02:34 +0800 Subject: [PATCH 03/10] =?UTF-8?q?primary=5Fresource=E8=8E=B7=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/omniadvisor/service/spark_service/spark_cmd_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index d96fd272e..b6ea5b056 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -99,8 +99,8 @@ class SparkCMDParser: raise ValueError("No arguments were provided. At least one argument is required.") # 待检查参数列表 - primary_resource = "" # 其实主要就是去看这个primary_resource是从哪里拿到的 其他都是取值范围的限定 不需要我来做检查 params, unknown = cls._parser.parse_known_args(args=args) + primary_resource = unknown if primary_resource is None: return False -- Gitee From 74d82c4c200f55f9caa5d2d5a08e19b09d8bfbcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Fri, 1 Aug 2025 16:21:17 +0800 Subject: [PATCH 04/10] =?UTF-8?q?1.=E4=BF=AE=E6=94=B9=E4=BB=8Ejobs?= =?UTF-8?q?=E4=B8=AD=E8=8E=B7=E5=8F=96=E8=BF=90=E8=A1=8C=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E7=9A=84=E9=80=BB=E8=BE=91=202.=E7=9B=B8=E5=85=B3=E5=8D=95?= =?UTF-8?q?=E5=85=83=E6=B5=8B=E8=AF=95=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/src/common/constant.py | 3 - .../omniadvisor/interface/hijack_recommend.py | 9 +- .../service/spark_service/spark_cmd_parser.py | 17 ++-- .../service/spark_service/spark_executor.py | 6 +- .../service/spark_service/spark_fetcher.py | 48 ++++++---- .../service/tuning_result/tuning_result.py | 5 +- .../spark_service/test_spark_executor.py | 5 +- .../spark_service/test_spark_fetcher.py | 93 ++++++++++++++++++- 8 files changed, 138 insertions(+), 48 deletions(-) diff --git a/omniadvisor/src/common/constant.py b/omniadvisor/src/common/constant.py index 2ad42180d..98a20ff69 100644 --- a/omniadvisor/src/common/constant.py +++ b/omniadvisor/src/common/constant.py @@ -72,9 +72,6 @@ class OmniAdvisorConf: empty_config = dict() exec_fail_return_runtime = float('inf') - # 仅作为非spark-sql提交场景执行成功的占位(此时返回结果中Time taken缺失)用变量 维持spark_executor的接口不变 - # 实际的执行时间需要从jobs中获取 - exec_complete_return_runtime = float('inf') exec_fail_return_app_id = '' exec_fail_return_trace = dict() diff --git a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py index 3359dda5e..6634e2622 100644 --- a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py +++ b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py @@ -6,7 +6,6 @@ from omniadvisor.service.spark_service.spark_cmd_parser import SparkCMDParser from omniadvisor.repository.model.load import Load from omniadvisor.repository.load_repository import LoadRepository from omniadvisor.repository.tuning_record_repository import TuningRecordRepository -from omniadvisor.service.spark_service.spark_executor import SparkExecutor from omniadvisor.service.tuning_result.tuning_result import get_tuning_result from omniadvisor.service.tuning_result.tuning_result_history import get_tuning_result_history from omniadvisor.utils.logger import global_logger @@ -117,12 +116,8 @@ def hijack_recommend(argv: list): :param argv: Spark执行命令字段 """ - if not SparkCMDParser.validate_submit_arguments(argv): - # 非SUBMIT动作的提交直接回退到原生Spark执行 不被特性所劫持 - global_logger.debug("Spark command is not a SUBMIT command, going to execute directly") - global_logger.debug(f"The argvs list is {argv}") - exitcode, output = SparkExecutor.submit_spark_task(argv) - return + # 非SUBMIT动作(指kill任务/查询状态/查询版本)的提交直接回退到原生spark-submit脚本执行 不被特性所劫持 + SparkCMDParser.validate_submit_arguments(argv) # 获取用户传入的Spark命令 并解析命令 global_logger.debug("Hijack input params: %s", argv) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index b6ea5b056..f62b87579 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -40,8 +40,8 @@ class SparkCMDParser: _parser.add_argument('--class', help="Your application's main class(for Java / Scala apps") _parser.add_argument('--conf', action='append', help="Arbitrary Spark configuration property") _parser.add_argument('--deploy-mode', dest='deploy-mode', - help="Whether to launch the driver program locally (client) or on one of the worker " - "machines inside the cluster (cluster)(Default: client)") + help="Whether to launch the driver program locally (client) or on one of the worker " + "machines inside the cluster (cluster)(Default: client)") _parser.add_argument('--driver-class-path', dest='driver-class-path') _parser.add_argument('--driver-cores', dest='driver-cores') _parser.add_argument('--driver-java-options', dest='driver-java-options') @@ -50,7 +50,7 @@ class SparkCMDParser: _parser.add_argument('--executor-memory', dest='executor-memory') _parser.add_argument('--files') _parser.add_argument('--jars') # TODO 这个入参和--jar有什么区别 - _parser.add_argument('--kill', action='store_true') # TODO 这也是非SUBMIT相关的命令 + _parser.add_argument('--kill', action='store_true') # TODO 这也是非SUBMIT相关的命令 _parser.add_argument('--master', help="spark://host:port, mesos://host:port, yarn, k8s://https://host:port, " "or local (Default: local[*]).") @@ -60,11 +60,11 @@ class SparkCMDParser: _parser.add_argument('--proxy-user', dest='proxy-user') _parser.add_argument('--py-files', dest='py-files') _parser.add_argument('--repositories') - _parser.add_argument('--status', action='store_true') # TODO boolean + _parser.add_argument('--status', action='store_true') # TODO boolean _parser.add_argument('--total-executor-cores', dest='total-executor-cores') # Options that do not take arguments boolean变量 - _parser.add_argument('--help','-h', action='store_true') + # '--help', '-h' 由于是argparse的内置 这里不显示写明 _parser.add_argument('--supervise', action='store_true') _parser.add_argument('--usage-error', action='store_true') _parser.add_argument('--verbose', '-v', action='store_true') @@ -78,8 +78,7 @@ class SparkCMDParser: _parser.add_argument('--principal') _parser.add_argument('--queue') - - # Spark Sql 相关选项 + # Spark sql 相关选项 # 支持对于--e这种形式的入参解析 但是--e为spark非官方用法 故在命令reconstruct的时候仍然使用-e的方式重建命令 _parser.add_argument('-e', '--e', type=str, help='SQL statement to execute.') _parser.add_argument('-f', type=str, help='File containing SQL script.') @@ -87,7 +86,7 @@ class SparkCMDParser: _parser.add_argument('-d', '--database') @classmethod - def validate_submit_arguments(cls, args: List) -> bool: + def validate_submit_arguments(cls, args: List): """ 检查当前提交的spark-submit命令是否属于SUBMIT动作的spark命令 该函数与Spark 3.3.1源码中SparkSubmitArguments.scala文件中的validateSubmitArguments逻辑保持一致 @@ -103,7 +102,7 @@ class SparkCMDParser: primary_resource = unknown if primary_resource is None: - return False + raise TypeError("This is not a SUBMIT type spark task.") @staticmethod def _normalize_value(value): diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py index b00179209..8f9c77bab 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py @@ -86,8 +86,8 @@ class SparkExecutor: total_time_taken = sum(time_taken_list) global_logger.debug(f'Total time taken is {total_time_taken} seconds') else: - # 没有捕获到Time Taken时使用OA_CONF.exec_complete_return_runtime作为返回值,后续需要从trace中获取实际运行时间 - total_time_taken = OA_CONF.exec_complete_return_runtime global_logger.info('This spark task do not have Time taken message.') - + # OA_CONF.exec_fail_return_runtime仅作为占位用变量 + # 维持spark_executor的接口不变 实际的执行时间需要从jobs中获取 + total_time_taken = OA_CONF.exec_fail_return_runtime return application_id, total_time_taken diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py index 544a8b4f3..4e5fc8b5d 100755 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py @@ -1,3 +1,4 @@ +import re import json import requests from datetime import datetime @@ -85,35 +86,42 @@ class SparkFetcher: """ return self._make_request(f"api/v1/applications/{app_id}/jobs") - def get_spark_runtime_by_app(self) -> float: + def get_spark_runtime_by_app(self, app_id) -> float: """ 通过从HistoryServer上获取的jobs信息,计算任务的执行耗时 runtime的计算的耗时为 最早的Job提交的时间 与 最晚的Job完成时间 之间的差值(注:最晚完成的Job并不一定是最后提交的Job) :return: runtime (seconds) """ - jobs_detail = self.get_spark_jobs_by_app() + jobs_detail = self.get_spark_jobs_by_app(app_id) - time_format = "%Y-%m-%dT%H:%M:%S.%f" - - # 获取最早的Job提交时间 - first_submit_time = jobs_detail[-1]["submissionTime"] - first_submittion_timestamp = datetime.strptime(first_submit_time.replace("GMT", ""), time_format) - - # 获取最后结束的Job的完成时间 - completiontime_list = [] - sortedcompletiontime_list = [] + # 获取全部的时间戳信息 + time_str_list = [] for job_detail in jobs_detail: - completiontime_str = job_detail['completionTime'] - completion_timestamp = datetime.strptime(completiontime_str.replace("GMT", ""), time_format) - completiontime_list.append(completion_timestamp) - sortedcompletiontime_list = sorted(completiontime_list) - - if sortedcompletiontime_list: - last_completion_timestamp = sortedcompletiontime_list[-1] + time_str_list.append(job_detail['completionTime']) + time_str_list.append(job_detail['submissionTime']) + + # 检查时间戳信息完整性 要求时间戳数目大于0且为偶数个 + if len(time_str_list) > 0: + if len(time_str_list) % 2 == 0: + pass + else: + raise ValueError(f"Timestamps must always be submitted in even numbers " + f"({len(time_str_list)} timestamps are not allowed (because of single/unpaired)).") else: - return float('inf') + raise ValueError("No timestamp in jobs info. Need at least 2 timestamps") + + # 从 2025-07-28T08:30:32.374GMT 中匹配出 2025-07-28T08:30:32.37 + reg_pattern = r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}' + time_format = "%Y-%m-%dT%H:%M:%S.%f" + timestamp_list = [] + for time_str in time_str_list: + match = re.search(reg_pattern, time_str) + if match: + timestamp_list.append(datetime.strptime(match.group(), time_format)) + else: + raise ValueError(f"The format of timestamp '{time_str}' does not match expected pattern.") # 计算时间差 - time_diff = last_completion_timestamp - first_submittion_timestamp + time_diff = max(timestamp_list) - min(timestamp_list) return time_diff.total_seconds() diff --git a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py index 5aa0979f0..147415203 100644 --- a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py +++ b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py @@ -133,7 +133,10 @@ class TuningResult: # 计算执行状态为success的平均runtime success_runtimes = [ exam_record.runtime - for exam_record in self._exam_records if exam_record.status == OA_CONF.ExecStatus.success + for exam_record in self._exam_records if ( + exam_record.status == OA_CONF.ExecStatus.success + and exam_record.runtime != OA_CONF.exec_fail_return_runtime + ) ] # 若配置成功复测的次数为0,则runtime为失败返回值 diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_executor.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_executor.py index f5e42d71c..22605eb66 100644 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_executor.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_executor.py @@ -1,4 +1,5 @@ import pytest +from common.constant import OA_CONF from unittest.mock import patch from datetime import datetime @@ -44,7 +45,7 @@ class TestSparkExecutor: assert exec_result.exitcode == self.exitcode assert exec_result.output == self.output assert exec_result.app_id == self.application_id - assert exec_result.duration == sum(self.time_taken_list) + assert exec_result.duration == OA_CONF.exec_fail_return_runtime assert type(exec_result.start_time) is datetime assert type(exec_result.end_time) is datetime @@ -56,7 +57,7 @@ class TestSparkExecutor: app_id, total_time_taken = self.spark_executor._parser_spark_output(self.output) # 验证结果 assert app_id == self.application_id - assert total_time_taken == sum(self.time_taken_list) + assert total_time_taken == OA_CONF.exec_fail_return_runtime # 解析spark output缺失application id output = (f'Time taken: {self.time_taken_list[0]} seconds\n' diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_fetcher.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_fetcher.py index 8c05130d0..843aac749 100755 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_fetcher.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_fetcher.py @@ -1,9 +1,11 @@ import json import pytest import requests +import unittest from unittest import mock from omniadvisor.service.spark_service.spark_fetcher import SparkFetcher from common.constant import OA_CONF +from datetime import datetime history_server_url = OA_CONF.spark_history_rest_url @@ -36,8 +38,13 @@ MOCK_EXECUTORS_RESPONSE = [ {"executorId": "exec-2", "memoryUsed": 2048} ] +MOCK_JOBS_RESPONSE = [ + {'submissionTime': '2025-07-28T08:30:32.374GMT', 'completionTime': '2025-07-28T08:31:32.374GMT'}, + {'submissionTime': '2025-07-28T08:32:32.374GMT', 'completionTime': '2025-07-28T08:33:32.374GMT'} +] + -class TestSparkFetcher: +class TestSparkFetcher(unittest.TestCase): @mock.patch('requests.get') def test_get_spark_apps(self, mock_get): # 配置mock对象返回值 @@ -108,6 +115,86 @@ class TestSparkFetcher: executors_info = spark_fetcher.get_spark_executor_by_app("app-1") assert executors_info == MOCK_EXECUTORS_RESPONSE + @mock.patch('requests.get') + def test_get_spark_jobs_by_app(self, mock_get): + # 配置mock对象返回值 + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_response.text = json.dumps(MOCK_JOBS_RESPONSE) + + # 配置mock对象的返回值 + mock_get.return_value = mock_response + + # 调用方法并断言结果 + executors_info = spark_fetcher.get_spark_jobs_by_app("app-1") + assert executors_info == MOCK_JOBS_RESPONSE + + @mock.patch('requests.get') + def test_get_spark_runtime_by_app_normal_case(self, mock_get): + """测试正常情况:偶数个时间戳,格式正确""" + # 模拟返回的 jobs 数据 + mock_jobs = [ + {'submissionTime': '2025-07-28T08:30:32.374GMT', 'completionTime': '2025-07-28T08:31:32.374GMT'}, + {'submissionTime': '2025-07-28T08:32:32.374GMT', 'completionTime': '2025-07-28T08:33:32.374GMT'} + ] + + # 配置mock对象返回值 + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_response.text = json.dumps(mock_jobs) + + # 配置mock对象的返回值 + mock_get.return_value = mock_response + + # 调用方法 + runtime = spark_fetcher.get_spark_runtime_by_app("app-1") + + # 验证结果 + expected_runtime = (datetime.strptime('2025-07-28T08:33:32.374', "%Y-%m-%dT%H:%M:%S.%f") - + datetime.strptime('2025-07-28T08:30:32.374', "%Y-%m-%dT%H:%M:%S.%f")).total_seconds() + assert runtime == expected_runtime + + @mock.patch('requests.get') + def test_get_spark_runtime_by_app_no_jobs(self, mock_get): + """测试异常情况:无 jobs 数据(空列表)""" + # 模拟返回的 jobs 数据(空列表) + mock_jobs = [] + + # 配置mock对象返回值 + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_response.text = json.dumps(mock_jobs) + + # 配置mock对象的返回值 + mock_get.return_value = mock_response + + # 验证是否抛出异常 + with self.assertRaises(ValueError) as context: + spark_fetcher.get_spark_runtime_by_app("app-1") + self.assertIn("No timestamp in jobs info. Need at least 2 timestamps", str(context.exception)) + + @mock.patch('requests.get') + def test_get_spark_runtime_by_app_invalid_timestamp_format(self, mock_get): + """测试异常情况:时间戳格式错误""" + # 模拟返回的 jobs 数据(时间戳格式错误) + mock_jobs = [ + {'submissionTime': 'invalid-format', 'completionTime': '2025-07-28T08:31:32.374GMT'} + ] + + # 配置mock对象返回值 + mock_response = mock.Mock() + mock_response.status_code = 200 + mock_response.text = json.dumps(mock_jobs) + + # 配置mock对象的返回值 + mock_get.return_value = mock_response + + # 验证是否抛出异常 + with self.assertRaises(ValueError) as context: + spark_fetcher.get_spark_runtime_by_app("app-1") + self.assertIn("The format of timestamp 'invalid-format' does not match expected pattern", + str(context.exception)) + @mock.patch('requests.get') def test_http_error_handling(self, mock_get): # 配置mock对象返回值 @@ -117,12 +204,12 @@ class TestSparkFetcher: mock_response.text = json.dumps(MOCK_APPS_RESPONSE) # 配置raise_for_status_code方法 使其根据状态码抛出HTTPError - mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError("HTTP error occurred", response=mock_response) + mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError("HTTP error occurred", + response=mock_response) # 配置mock对象的返回值 mock_get.return_value = mock_response - # 断言HTTPError被抛出 with pytest.raises(requests.exceptions.HTTPError): spark_fetcher.get_spark_apps() -- Gitee From c9b2193ad1a95276ecd9d5b680e8960b10efb8fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Fri, 1 Aug 2025 17:28:46 +0800 Subject: [PATCH 05/10] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AF=B9=E4=BA=8Espark?= =?UTF-8?q?-sql=E5=91=BD=E4=BB=A4=E7=9A=84=E8=AF=86=E5=88=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/src/common/constant.py | 3 +++ .../src/omniadvisor/service/spark_service/spark_cmd_parser.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/omniadvisor/src/common/constant.py b/omniadvisor/src/common/constant.py index 98a20ff69..54c368f1f 100644 --- a/omniadvisor/src/common/constant.py +++ b/omniadvisor/src/common/constant.py @@ -104,5 +104,8 @@ class OmniAdvisorConf: # 保留小数位数 decimal_digits = 3 + # spark-sql命令所提交的类名 + SparkSQLCLIDriver = 'org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver' + OA_CONF = OmniAdvisorConf() diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index f62b87579..12ace18b7 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -101,7 +101,7 @@ class SparkCMDParser: params, unknown = cls._parser.parse_known_args(args=args) primary_resource = unknown - if primary_resource is None: + if primary_resource is None and params['class'] == '': raise TypeError("This is not a SUBMIT type spark task.") @staticmethod -- Gitee From 6d5be8154954c585f06f3e46dcf3e5ea998d1732 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Mon, 4 Aug 2025 17:04:35 +0800 Subject: [PATCH 06/10] =?UTF-8?q?1.=E4=BF=AE=E6=94=B9application=20id?= =?UTF-8?q?=E7=9A=84=E5=8C=B9=E9=85=8D=E8=A7=84=E5=88=99=E4=B8=BA=E5=AF=B9?= =?UTF-8?q?yarn=20client=E6=97=A5=E5=BF=97=E4=B8=AD=E7=9A=84=E6=96=87?= =?UTF-8?q?=E6=9C=AC=E8=BF=9B=E8=A1=8C=E5=8C=B9=E9=85=8D=202.=E5=9C=A8?= =?UTF-8?q?=E8=AE=A1=E7=AE=97spark=E4=BB=BB=E5=8A=A1=E8=BF=90=E8=A1=8C?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E6=97=B6=EF=BC=8C=E4=BD=BF=E7=94=A8dateutil?= =?UTF-8?q?=E5=AE=8C=E6=88=90=E6=97=B6=E9=97=B4=E6=88=B3=E8=BD=AC=E6=8D=A2?= =?UTF-8?q?=203.=E4=BF=AE=E5=A4=8Dspark=20parser=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=E5=AF=B9=E4=BA=8E--help=E4=B8=8D=E4=BC=9A=E5=9B=9E=E6=BB=9A?= =?UTF-8?q?=E5=88=B0=E5=8E=9F=E7=94=9F=E6=89=A7=E8=A1=8C=E7=9A=84bug=204.?= =?UTF-8?q?=E5=AE=8C=E5=96=84=E7=9B=B8=E5=85=B3=E5=8D=95=E5=85=83=E6=B5=8B?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/spark_service/spark_cmd_parser.py | 25 ++- .../service/spark_service/spark_executor.py | 10 +- .../service/spark_service/spark_fetcher.py | 41 ++--- .../service/spark_service/spark_run.py | 4 +- .../interface/test_hijack_recommend.py | 20 ++- .../spark_service/test_spark_cmd_parser.py | 155 ++++++++++-------- .../spark_service/test_spark_executor.py | 4 +- .../spark_service/test_spark_fetcher.py | 15 +- 8 files changed, 153 insertions(+), 121 deletions(-) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index 12ace18b7..425969848 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -15,7 +15,7 @@ _CMD_UNKNOWN_KEY = 'unknown' # 单横杠参数的字段 _SINGLE_HORIZONTAL_BAR_KEYS = ['e', 'f', 'i'] # 布尔类型的字段 -_BOOLEAN_TYPE_KEYS = ['verbose', 'supervise', 'kill', 'status'] +_BOOLEAN_TYPE_KEYS = ['verbose', 'supervise', 'version', 'usage_error', 'help'] # 需进行转换的字段 _SPARK_CONF_SUPPLEMENT_MAP = { 'num-executors': 'spark.executor.instances', @@ -30,7 +30,9 @@ class SparkCMDParser: """ Spark 引擎的解析器 """ - _parser = argparse.ArgumentParser() + # 当用户提交-h和--help时需要退回原生执行 因此关闭argparse自带的--help/-h参数解析 + _parser = argparse.ArgumentParser(add_help=False) + # 参数命中包含'-'的参数,设置dest属性,避免参数解析中'-'被转换为'_' # 解析Spark CMD开头,需脚本配合 _parser.add_argument(f'--{_CMD_PREFIX_KEY}', dest=_CMD_PREFIX_KEY, @@ -49,8 +51,8 @@ class SparkCMDParser: _parser.add_argument('--driver-memory', dest='driver-memory') _parser.add_argument('--executor-memory', dest='executor-memory') _parser.add_argument('--files') - _parser.add_argument('--jars') # TODO 这个入参和--jar有什么区别 - _parser.add_argument('--kill', action='store_true') # TODO 这也是非SUBMIT相关的命令 + _parser.add_argument('--jars') + _parser.add_argument('--kill', action='store_true') _parser.add_argument('--master', help="spark://host:port, mesos://host:port, yarn, k8s://https://host:port, " "or local (Default: local[*]).") @@ -60,11 +62,11 @@ class SparkCMDParser: _parser.add_argument('--proxy-user', dest='proxy-user') _parser.add_argument('--py-files', dest='py-files') _parser.add_argument('--repositories') - _parser.add_argument('--status', action='store_true') # TODO boolean + _parser.add_argument('--status', action='store_true') _parser.add_argument('--total-executor-cores', dest='total-executor-cores') # Options that do not take arguments boolean变量 - # '--help', '-h' 由于是argparse的内置 这里不显示写明 + _parser.add_argument('-h', '--help', action='store_true') _parser.add_argument('--supervise', action='store_true') _parser.add_argument('--usage-error', action='store_true') _parser.add_argument('--verbose', '-v', action='store_true') @@ -101,7 +103,7 @@ class SparkCMDParser: params, unknown = cls._parser.parse_known_args(args=args) primary_resource = unknown - if primary_resource is None and params['class'] == '': + if not primary_resource and vars(params)['class'] is None: raise TypeError("This is not a SUBMIT type spark task.") @staticmethod @@ -194,9 +196,9 @@ class SparkCMDParser: for key, value in exec_attr.items(): if key == _CMD_PREFIX_KEY: continue - # 单独处理 remainder = ['--uk1','xxxx','--uk2','xxxx'] 这部分未被解析的残余参数 + # 单独处理 unknown = ['xxxxxxx.jar', 'xxxxxx', 'xxxxx'] 这部分未被解析的残余参数追加到末尾 不在此处理 elif key == _CMD_UNKNOWN_KEY: - cmd_fields += [cls._normalize_value(item) for item in value] + continue # 单独处理 - 作为前缀的参数 elif key in _SINGLE_HORIZONTAL_BAR_KEYS: cmd_fields += [f'-{key}', cls._normalize_value(value)] @@ -208,5 +210,10 @@ class SparkCMDParser: for key, value in conf_params.items(): cmd_fields += ['--conf', f'{key}={cls._normalize_value(value)}'] + # 追加primary_resource + primary_resource = exec_attr.get(_CMD_UNKNOWN_KEY) + if primary_resource: + cmd_fields += [cls._normalize_value(item) for item in primary_resource] + global_logger.debug("cmd_fields = %s", cmd_fields) return cmd_fields diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py index 8f9c77bab..2bcb584f6 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py @@ -66,21 +66,25 @@ class SparkExecutor: @classmethod def _parser_spark_output(cls, spark_output: str): """ - 解析Spark输出,获得spark提交命令、Application ID和Time taken + 解析Spark输出,获得spark提交命令、Application ID和Time taken(若存在) (注: Time taken后续不再作为性能指标被使用 目前仅作为性能参考获取系其数值) :param spark_output: Spark执行输出 :return: """ # 解析Application id: - search_obj = re.search(r'Application Id: (.*)\n', spark_output) + # 该条匹配模式的文本来自Spark3.3.1源码中yarn/Client.scala文件在Line224所打印的日志 + # 若Spark版本迭代时对该日志内容做出修改 则存在application id匹配失败的风险 + pattern = r'Client: Submitting application (application_\d+_\d+) to ResourceManager' + search_obj = re.search(pattern, spark_output) if search_obj: application_id = search_obj.group(1) else: raise RuntimeError('Can not find application id message in spark output.') # 解析Time Taken,只解析行首匹配字符串,避免Spark debug log中重复匹配 - match_objs = re.findall(r'^Time taken: (.*) seconds', spark_output, re.MULTILINE) + pattern = r'^Time taken: (.*) seconds' + match_objs = re.findall(pattern, spark_output, re.MULTILINE) if match_objs: time_taken_list = [float(obj) for obj in match_objs] total_time_taken = sum(time_taken_list) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py index 4e5fc8b5d..edfa9e994 100755 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py @@ -1,11 +1,13 @@ import re import json import requests -from datetime import datetime +from dateutil import parser + +from omniadvisor.utils.logger import global_logger class SparkFetcher: - def __init__(self, history_server_url, ): + def __init__(self, history_server_url): """ 初始化SparkFetcher类 @@ -94,34 +96,19 @@ class SparkFetcher: :return: runtime (seconds) """ jobs_detail = self.get_spark_jobs_by_app(app_id) + if not jobs_detail: + global_logger.error(f"No job info returned for app_id={app_id}") + raise ValueError(f"No job information found for app_id={app_id}") # 获取全部的时间戳信息 - time_str_list = [] - for job_detail in jobs_detail: - time_str_list.append(job_detail['completionTime']) - time_str_list.append(job_detail['submissionTime']) - - # 检查时间戳信息完整性 要求时间戳数目大于0且为偶数个 - if len(time_str_list) > 0: - if len(time_str_list) % 2 == 0: - pass - else: - raise ValueError(f"Timestamps must always be submitted in even numbers " - f"({len(time_str_list)} timestamps are not allowed (because of single/unpaired)).") - else: - raise ValueError("No timestamp in jobs info. Need at least 2 timestamps") - - # 从 2025-07-28T08:30:32.374GMT 中匹配出 2025-07-28T08:30:32.37 - reg_pattern = r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}' - time_format = "%Y-%m-%dT%H:%M:%S.%f" timestamp_list = [] - for time_str in time_str_list: - match = re.search(reg_pattern, time_str) - if match: - timestamp_list.append(datetime.strptime(match.group(), time_format)) - else: - raise ValueError(f"The format of timestamp '{time_str}' does not match expected pattern.") + for idx, job_detail in enumerate(jobs_detail): + if 'completionTime' not in job_detail or 'submissionTime' not in job_detail: + raise KeyError(f"Job at index {idx} is missing 'completionTime' or 'submissionTime'") + timestamp_list.append(parser.parse(job_detail['completionTime']).timestamp()) + timestamp_list.append(parser.parse(job_detail['submissionTime']).timestamp()) # 计算时间差 time_diff = max(timestamp_list) - min(timestamp_list) - return time_diff.total_seconds() + global_logger.debug(f"Total time diff calculate by jobs is {time_diff} seconds") + return time_diff diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index e5b702e30..0a6931dc5 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -114,7 +114,7 @@ def _update_trace_and_runtime_from_history_server(exam_record: ExamRecord, appli trace_sql = spark_fetcher.get_spark_sql_by_app(application_id) trace_stages = spark_fetcher.get_spark_stages_by_app(application_id) trace_executor = spark_fetcher.get_spark_executor_by_app(application_id) - runtime = spark_fetcher.get_spark_runtime_by_app() + runtime = spark_fetcher.get_spark_runtime_by_app(application_id) except HTTPError as httpe: time.sleep(OA_CONF.spark_fetch_trace_interval) global_logger.debug(f"Cannot access history server: %s", httpe) @@ -128,5 +128,5 @@ def _update_trace_and_runtime_from_history_server(exam_record: ExamRecord, appli global_logger.error(f'Failed to get App %s trace from %s', application_id, history_server_url) return - ExamRecordRepository.update_trace(exam_record, runtime=runtime) + ExamRecordRepository.update_runtime(exam_record, runtime=runtime) ExamRecordRepository.update_trace(exam_record, trace=trace_dict) diff --git a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py index 3c9bfd314..13d153c7a 100644 --- a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py +++ b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py @@ -66,7 +66,9 @@ class TestHijackRecommend(unittest.TestCase): @patch("omniadvisor.interface.hijack_recommend._get_exec_config_from_load") @patch("omniadvisor.interface.hijack_recommend._query_or_create_load") @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") - def test_failed_user_config(self, mock_parse_cmd, mock_query_load, mock_get_config, mock_spark_run, mock_process_config): + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.validate_submit_arguments") + def test_failed_user_config(self, mock_validate_submit_arguments, mock_parse_cmd, + mock_query_load, mock_get_config, mock_spark_run, mock_process_config): argv = ["--conf", "spark.executor.memory=4g"] exec_attr = {"name": "job_name"} user_config = {"spark.executor.memory": "4g"} @@ -85,7 +87,8 @@ class TestHijackRecommend(unittest.TestCase): # 场景 4: 缺少任务名称 @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") - def test_missing_task_name(self, mock_parse_cmd): + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.validate_submit_arguments") + def test_missing_task_name(self, mock_validate_submit_arguments, mock_parse_cmd): argv = ["--conf", "spark.executor.memory=4g"] mock_parse_cmd.return_value = ({}, {"spark.executor.memory": "4g"}) @@ -99,7 +102,8 @@ class TestHijackRecommend(unittest.TestCase): @patch("omniadvisor.interface.hijack_recommend._get_exec_config_from_load") @patch("omniadvisor.interface.hijack_recommend._query_or_create_load") @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") - def test_process_test_config(self, mock_parse_cmd, mock_query_load, mock_get_config, mock_spark_run, mock_process_config, mock_multiprocess): + def test_process_test_config(self, mock_parse_cmd, mock_query_load, + mock_get_config, mock_spark_run, mock_process_config, mock_multiprocess): argv = ["--class", "Job", "--conf", "spark.executor.memory=4g"] exec_attr = {"name": "job"} user_config = {"spark.executor.memory": "4g"} @@ -119,7 +123,8 @@ class TestHijackRecommend(unittest.TestCase): # 场景 6: 命令解析异常 @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") - def test_parse_cmd_raises(self, mock_parse_cmd): + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.validate_submit_arguments") + def test_parse_cmd_raises(self, mock_validate_submit_arguments, mock_parse_cmd): mock_parse_cmd.side_effect = Exception("parse error") with pytest.raises(Exception, match="parse error"): hijack_recommend(["--conf", "x"]) @@ -127,7 +132,8 @@ class TestHijackRecommend(unittest.TestCase): # 场景 7: 创建 Load 异常 @patch("omniadvisor.interface.hijack_recommend._query_or_create_load") @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") - def test_query_create_load_raises(self, mock_parse_cmd, mock_query_load): + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.validate_submit_arguments") + def test_query_create_load_raises(self, mock_validate_submit_arguments, mock_parse_cmd, mock_query_load): mock_parse_cmd.return_value = ({"name": "job"}, {}) mock_query_load.side_effect = Exception("db error") @@ -139,7 +145,9 @@ class TestHijackRecommend(unittest.TestCase): @patch("omniadvisor.interface.hijack_recommend._get_exec_config_from_load") @patch("omniadvisor.interface.hijack_recommend._query_or_create_load") @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") - def test_spark_run_raises(self, mock_parse_cmd, mock_query_load, mock_get_config, mock_spark_run): + @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.validate_submit_arguments") + def test_spark_run_raises(self, mock_validate_submit_arguments, + mock_parse_cmd, mock_query_load, mock_get_config, mock_spark_run): argv = ["--conf", "spark.executor.memory=4g"] exec_attr = {"name": "job"} config = {"spark.executor.memory": "4g"} diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py index 75bbeb50d..a5e283473 100644 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py @@ -1,38 +1,54 @@ import pytest -from omniadvisor.service.spark_service.spark_cmd_parser import SparkCMDParser +from omniadvisor.service.spark_service.spark_cmd_parser import SparkCMDParser, _CMD_PREFIX_KEY, _CMD_UNKNOWN_KEY class TestSparkCMDParser: """ 测试SparkCMDParser类的功能 """ - def test_validate_submit_argument(self): - pass + # Spark任务类型判断测试 + def test_validate_submit_arguments_valid(self): + args = [ + f"--{_CMD_PREFIX_KEY}", "spark-submit", + "--class", "com.example.Main", + "app.jar" + ] + assert SparkCMDParser.validate_submit_arguments(args) is None + + def test_validate_submit_arguments_missing_class_and_jar(self): + args = [ + f"--{_CMD_PREFIX_KEY}", "spark-submit" + ] + with pytest.raises(TypeError): + SparkCMDParser.validate_submit_arguments(args) + + + def test_validate_submit_arguments_empty_raises(self): + with pytest.raises(ValueError): + SparkCMDParser.validate_submit_arguments([]) # 测试命令解析 # 场景 1: 标准参数 + conf 参数 - def test_parse_cmd_standard(self): + def test_parse_cmd_basic_conf_and_exec_args(self): argv = [ - '--omniadvisor-cmd-prefix', 'spark-submit', - '--master', 'yarn', - '--name', 'test_app', - '-e', 'SELECT * FROM table', - '--executor-memory', '4g', - '--conf', 'spark.executor.cores=8', - '--conf', 'spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseG1GC', - '--unknown1', 'test', - '-uk', 'test', + f"--{_CMD_PREFIX_KEY}", "spark-submit", + "--class", "com.example.Main", + "--conf", "spark.executor.memory=4g", + "--conf", "spark.executor.cores=2", + "--num-executors", "5", + "--name", "test-app", + "example.jar" ] exec_attr, conf_params = SparkCMDParser.parse_cmd(argv) - assert exec_attr['omniadvisor-cmd-prefix'] == 'spark-submit' - assert exec_attr['master'] == 'yarn' - assert exec_attr['name'] == 'test_app' - assert exec_attr['e'] == 'SELECT * FROM table' - assert exec_attr['unknown'] == ['--unknown1', 'test', '-uk', 'test'] - assert conf_params['spark.executor.memory'] == '4g' - assert conf_params['spark.driver.extraJavaOptions'] == '-XX:+UseG1GC -XX:+UseG1GC' + assert exec_attr["class"] == "com.example.Main" + assert exec_attr["name"] == "test-app" + assert conf_params["spark.executor.memory"] == "4g" + assert conf_params["spark.executor.cores"] == "2" + assert conf_params["spark.executor.instances"] == "5" + assert _CMD_UNKNOWN_KEY in exec_attr + assert exec_attr[_CMD_UNKNOWN_KEY] == ["example.jar"] # 场景 2: 包含 supplement 参数 def test_parse_cmd_with_supplement(self): @@ -47,26 +63,40 @@ class TestSparkCMDParser: assert conf_params["spark.executor.cores"] == "4" # 场景 3: 包含布尔类型的值 - def test_parse_cmd_with_boolean_flags(self): - argv = ["--verbose", "--supervise", "--kill", "--status"] + def test_parse_cmd_boolean_flags(self): + argv = [ + f"--{_CMD_PREFIX_KEY}", "spark-submit", + "--verbose", + "--version", + "--supervise", + "--usage-error", + "--help", + "example.jar" + ] exec_attr, _ = SparkCMDParser.parse_cmd(argv) - assert exec_attr["verbose"] is True - assert exec_attr["supervise"] is True - assert exec_attr["kill"] is True - assert exec_attr["status"] is True + for flag in ["verbose", "version", "supervise", "usage_error", "help"]: + assert exec_attr.get(flag) is True + + assert exec_attr[_CMD_UNKNOWN_KEY] == ["example.jar"] - # 场景 4: 解析包含未知参数 - def test_parse_cmd_with_unknown(self): + # 场景 4: 解析单短横杠的命令 + def test_parse_cmd_single_dash_args(self): argv = [ - "--name", "app", - "--foo", "bar", "--bar", "baz", "--kills" + f"--{_CMD_PREFIX_KEY}", "spark-submit", + "-e", "SELECT 1", + "-f", "script.sql", + "-i", "init.sql", + "-d", "testdb", + "example.jar" ] - exec_attr, conf_params = SparkCMDParser.parse_cmd(argv) + exec_attr, _ = SparkCMDParser.parse_cmd(argv) - assert "unknown" in exec_attr - assert "--foo" in exec_attr["unknown"] - assert "bar" in exec_attr["unknown"] + assert exec_attr["e"] == "SELECT 1" + assert exec_attr["f"] == "script.sql" + assert exec_attr["i"] == "init.sql" + assert exec_attr["database"] == "testdb" + assert exec_attr[_CMD_UNKNOWN_KEY] == ["example.jar"] # 场景 5: 空 argv 抛出异常 def test_parse_cmd_empty_argv(self): @@ -75,28 +105,30 @@ class TestSparkCMDParser: # 测试命令重建 # 场景 1: 正常重建命令行 - def test_reconstruct_cmd_normal(self): - """ - 测试基本命令重组 - """ - exec_attr = { - 'omniadvisor-cmd-prefix': 'spark-submit', - 'master': 'yarn', - 'name': 'test_app', - 'e': 'SELECT * FROM table', - 'unknown': ['--unknown1', 'test', '-uk', 'test'] - } - conf_params = { - 'spark.executor.memory': '4g', - 'spark.driver.extraJavaOptions': '-XX:+UseG1GC -XX:+UseG1GC', - } - - cmd = SparkCMDParser.reconstruct_cmd(exec_attr, conf_params) - - assert exec_attr['omniadvisor-cmd-prefix'] == cmd[0] - assert "SELECT * FROM table" in cmd - assert "spark.executor.memory=4g" in cmd - assert "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseG1GC" in cmd + def test_reconstruct_cmd_round_trip(self): + original_argv = [ + f"--{_CMD_PREFIX_KEY}", "spark-submit", + "--class", "com.example.Main", + "--conf", "spark.executor.memory=4g", + "--executor-cores", "2", + "--driver-memory", "2g", + "--verbose", + '-e', 'SELECT * FROM table', + "app.jar" + ] + exec_attr, conf_params = SparkCMDParser.parse_cmd(original_argv) + reconstructed = SparkCMDParser.reconstruct_cmd(exec_attr, conf_params) + + assert "spark-submit" in reconstructed + assert "--class" in reconstructed + assert "com.example.Main" in reconstructed + assert "--conf" in reconstructed + assert "spark.executor.memory=4g" in reconstructed + assert "spark.executor.cores=2" in reconstructed + assert "spark.driver.memory=2g" in reconstructed + assert "--verbose" in reconstructed + assert "SELECT * FROM table" in reconstructed + assert "app.jar" in reconstructed # 场景 2: 缺少前缀字段抛异常 def test_reconstruct_cmd_missing_prefix(self): @@ -124,16 +156,14 @@ class TestSparkCMDParser: assert "arg" in cmd assert "spark.executor.instances=4" in cmd - # 场景 3: 包含 boolean 参数 + # 场景 3: 包含 boolean 参数的命令构建 def test_reconstruct_cmd_with_boolean(self): exec_attr = { "omniadvisor-cmd-prefix": "spark-submit", "name": "job", "unknown": ["--extra", "arg"], "verbose": True, - "supervise": True, - "kill": True, - "status": False + "supervise": False, } conf_params = { "spark.executor.instances": "4" @@ -144,7 +174,4 @@ class TestSparkCMDParser: assert "arg" in cmd assert "spark.executor.instances=4" in cmd assert "--verbose" in cmd - assert "--supervise" in cmd - assert "--kill" in cmd - assert "--status" not in cmd - + assert "--supervise" not in cmd diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_executor.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_executor.py index 22605eb66..c9ff0f041 100644 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_executor.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_executor.py @@ -22,9 +22,9 @@ class TestSparkExecutor: self.cmd_fields = ['spark-submit', '--class', 'MyClass', 'my_app.jar'] self.timeout = 10 self.exitcode = 0 - self.application_id = 'application_123456' + self.application_id = 'application_123456_303209' self.time_taken_list = [23.42, 25.83] - self.output = (f'Spark master: yarn, Application Id: {self.application_id}\n' + self.output = (f'Client: Submitting application {self.application_id} to ResourceManager\n' f'XXXXXX\n' f'Time taken: {self.time_taken_list[0]} seconds\n' f'Time taken: {self.time_taken_list[1]} seconds\n' diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_fetcher.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_fetcher.py index 843aac749..f39617d0d 100755 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_fetcher.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_fetcher.py @@ -171,14 +171,14 @@ class TestSparkFetcher(unittest.TestCase): # 验证是否抛出异常 with self.assertRaises(ValueError) as context: spark_fetcher.get_spark_runtime_by_app("app-1") - self.assertIn("No timestamp in jobs info. Need at least 2 timestamps", str(context.exception)) + self.assertIn("No job information found for app_id=app-1", str(context.exception)) @mock.patch('requests.get') - def test_get_spark_runtime_by_app_invalid_timestamp_format(self, mock_get): - """测试异常情况:时间戳格式错误""" - # 模拟返回的 jobs 数据(时间戳格式错误) + def test_get_spark_runtime_by_app_incomplete_jobs(self, mock_get): + """测试异常情况:jobs 数据中的提交与完成时间信息不完整""" + # 模拟返回的 jobs 数据(空列表) mock_jobs = [ - {'submissionTime': 'invalid-format', 'completionTime': '2025-07-28T08:31:32.374GMT'} + {'submissionTime': '2025-07-28T08:30:32.374GMT'}, ] # 配置mock对象返回值 @@ -190,10 +190,9 @@ class TestSparkFetcher(unittest.TestCase): mock_get.return_value = mock_response # 验证是否抛出异常 - with self.assertRaises(ValueError) as context: + with self.assertRaises(KeyError) as context: spark_fetcher.get_spark_runtime_by_app("app-1") - self.assertIn("The format of timestamp 'invalid-format' does not match expected pattern", - str(context.exception)) + self.assertIn("missing 'completionTime' or 'submissionTime'", str(context.exception)) @mock.patch('requests.get') def test_http_error_handling(self, mock_get): -- Gitee From 4782399159264969b7cdc4cf8b618517c7386deb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Tue, 5 Aug 2025 20:18:27 +0800 Subject: [PATCH 07/10] =?UTF-8?q?spark=5Fcmd=5Fparser=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/pyproject.toml | 1 + .../service/spark_service/spark_cmd_parser.py | 8 +- .../spark_service/test_spark_cmd_parser.py | 89 ++++++++++++++++++- 3 files changed, 93 insertions(+), 5 deletions(-) diff --git a/omniadvisor/pyproject.toml b/omniadvisor/pyproject.toml index 94b80b53c..3dd09bc05 100755 --- a/omniadvisor/pyproject.toml +++ b/omniadvisor/pyproject.toml @@ -20,6 +20,7 @@ django = "~4.2.10" smac = "~2.2.0" colorlog = "~6.9.0" requests = "^2.32.3" +python-dateutil = "2.9.0.post0" [tool.poetry.group.test.dependencies] pytest = "^7.4.4" diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index 425969848..1b0e077f6 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -18,11 +18,11 @@ _SINGLE_HORIZONTAL_BAR_KEYS = ['e', 'f', 'i'] _BOOLEAN_TYPE_KEYS = ['verbose', 'supervise', 'version', 'usage_error', 'help'] # 需进行转换的字段 _SPARK_CONF_SUPPLEMENT_MAP = { - 'num-executors': 'spark.executor.instances', - 'executor-cores': 'spark.executor.cores', + 'driver-memory': 'spark.driver.memory', 'driver-cores': 'spark.driver.cores', 'executor-memory': 'spark.executor.memory', - 'driver-memory': 'spark.driver.memory' + 'executor-cores': 'spark.executor.cores', + 'num-executors': 'spark.executor.instances', } @@ -68,7 +68,7 @@ class SparkCMDParser: # Options that do not take arguments boolean变量 _parser.add_argument('-h', '--help', action='store_true') _parser.add_argument('--supervise', action='store_true') - _parser.add_argument('--usage-error', action='store_true') + _parser.add_argument('--usage-error', dest="usage-error", action='store_true') _parser.add_argument('--verbose', '-v', action='store_true') _parser.add_argument('--version', action='store_true') diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py index a5e283473..4bf965afc 100644 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_cmd_parser.py @@ -1,3 +1,4 @@ +import shlex import pytest from omniadvisor.service.spark_service.spark_cmd_parser import SparkCMDParser, _CMD_PREFIX_KEY, _CMD_UNKNOWN_KEY @@ -75,7 +76,7 @@ class TestSparkCMDParser: ] exec_attr, _ = SparkCMDParser.parse_cmd(argv) - for flag in ["verbose", "version", "supervise", "usage_error", "help"]: + for flag in ["verbose", "version", "supervise", "usage-error", "help"]: assert exec_attr.get(flag) is True assert exec_attr[_CMD_UNKNOWN_KEY] == ["example.jar"] @@ -175,3 +176,89 @@ class TestSparkCMDParser: assert "spark.executor.instances=4" in cmd assert "--verbose" in cmd assert "--supervise" not in cmd + + # 全量参数测试 + def test_parse_and_reconstruct_all_params_from_cmd_string(self): + cmd_string = """ + --omniadvisor-cmd-prefix '$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit' + --class com.example.MainClass + --name MySparkApp + --master yarn + --deploy-mode cluster + --driver-memory 2g + --driver-cores 2 + --executor-memory 4g + --executor-cores 4 + --num-executors 20 + --queue root.default + --conf spark.executor.memoryOverhead=512 + --conf spark.yarn.maxAppAttempts=1 + --jars lib1.jar,lib2.jar + --files config.yaml + --archives env.zip#env + --verbose + --version + --supervise + --usage-error + --help + -i init_script.sql + -d my_database + --e "SELECT * FROM table" + -f query.sql + my_app.jar + --custom-arg custom_value + """ + + # 使用 shlex.split 来模拟 shell 行为 + argv = shlex.split(cmd_string) + + # Step 1: Parse + exec_attr, conf_attr = SparkCMDParser.parse_cmd(argv) + + # Step 2: Reconstruct + reconstructed_cmd = SparkCMDParser.reconstruct_cmd(exec_attr, conf_attr) + + # Step 3: 验证 reconstruct 是否保留了所有参数 + expected = [ + "--class", "com.example.MainClass", + "--name", "MySparkApp", + "--master", "yarn", + "--deploy-mode", "cluster", + "--conf", "spark.driver.memory=2g", + "--conf", "spark.driver.cores=2", + "--conf", "spark.executor.memory=4g", + "--conf", "spark.executor.cores=4", + "--conf", "spark.executor.instances=20", + "--queue", "root.default", + "--conf", "spark.executor.memoryOverhead=512", + "--conf", "spark.yarn.maxAppAttempts=1", + "--jars", "lib1.jar,lib2.jar", + "--files", "config.yaml", + "--archives", "env.zip#env", + "--verbose", + "--version", + "--supervise", + "--usage-error", + "--help", + "-i", "init_script.sql", + "--database", "my_database", + "-e", "SELECT * FROM table", + "-f", "query.sql", + "my_app.jar", + "--custom-arg", "custom_value" + ] + + for item in expected: + assert item in reconstructed_cmd, f"Missing '{item}' in reconstructed command" + + # Step 4: 验证 parse_cmd 是否解析了所有 key + expected_keys = { + "class", "name", "master", "deploy-mode", + "queue", "jars", "files", "archives", + "verbose", "version", "supervise", "usage-error", "help", + "i", "database", "e", "f", + _CMD_UNKNOWN_KEY + } + + assert set(exec_attr.keys()).issuperset(expected_keys), "Missing expected keys in parsed exec_attr" + -- Gitee From 3ef46240ec575a0518667ae44a86f4c0f38c59a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Tue, 5 Aug 2025 20:59:42 +0800 Subject: [PATCH 08/10] =?UTF-8?q?bool=E7=B1=BB=E5=9E=8B=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../omniadvisor/service/spark_service/spark_cmd_parser.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index 1b0e077f6..f778d2f2d 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -15,7 +15,7 @@ _CMD_UNKNOWN_KEY = 'unknown' # 单横杠参数的字段 _SINGLE_HORIZONTAL_BAR_KEYS = ['e', 'f', 'i'] # 布尔类型的字段 -_BOOLEAN_TYPE_KEYS = ['verbose', 'supervise', 'version', 'usage_error', 'help'] +_BOOLEAN_TYPE_KEYS = ['verbose', 'supervise', 'version', 'usage-error', 'help'] # 需进行转换的字段 _SPARK_CONF_SUPPLEMENT_MAP = { 'driver-memory': 'spark.driver.memory', @@ -52,7 +52,7 @@ class SparkCMDParser: _parser.add_argument('--executor-memory', dest='executor-memory') _parser.add_argument('--files') _parser.add_argument('--jars') - _parser.add_argument('--kill', action='store_true') + _parser.add_argument('--kill') _parser.add_argument('--master', help="spark://host:port, mesos://host:port, yarn, k8s://https://host:port, " "or local (Default: local[*]).") @@ -62,7 +62,7 @@ class SparkCMDParser: _parser.add_argument('--proxy-user', dest='proxy-user') _parser.add_argument('--py-files', dest='py-files') _parser.add_argument('--repositories') - _parser.add_argument('--status', action='store_true') + _parser.add_argument('--status') _parser.add_argument('--total-executor-cores', dest='total-executor-cores') # Options that do not take arguments boolean变量 -- Gitee From 7f22cb43beaf09e3db2f3c9b096790c185bd2a87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Thu, 7 Aug 2025 10:58:25 +0800 Subject: [PATCH 09/10] clean code --- .../service/tuning_result/tuning_result.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py index 147415203..43e656acf 100644 --- a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py +++ b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py @@ -131,13 +131,11 @@ class TuningResult: return OA_CONF.exec_fail_return_runtime # 计算执行状态为success的平均runtime - success_runtimes = [ - exam_record.runtime - for exam_record in self._exam_records if ( - exam_record.status == OA_CONF.ExecStatus.success - and exam_record.runtime != OA_CONF.exec_fail_return_runtime - ) - ] + success_runtimes = [] + for exam_record in self._exam_records: + if (exam_record.status == OA_CONF.ExecStatus.success + and exam_record.runtime != OA_CONF.exec_fail_return_runtime): + success_runtimes.append(exam_record.runtime) # 若配置成功复测的次数为0,则runtime为失败返回值 if not success_runtimes: -- Gitee From fdbd082e934d1116dac43c24416ccbbdd43e879f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E8=89=BA=E4=B8=B9?= <53546877+Craven1701@users.noreply.github.com> Date: Thu, 7 Aug 2025 14:47:11 +0800 Subject: [PATCH 10/10] =?UTF-8?q?=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E5=88=B7=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../omniadvisor/interface/test_hijack_recommend.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py index 96045adbf..fa03b8ea5 100644 --- a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py +++ b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py @@ -73,7 +73,7 @@ class TestHijackRecommend: @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.validate_submit_arguments") def test_failed_user_config(self, mock_validate_submit_arguments, mock_parse_cmd,mock_create_or_update_load, - mock_query_load, mock_get_config, mock_spark_run, mock_process_config): + mock_get_config, mock_spark_run, mock_process_config): argv = ["--conf", "spark.executor.memory=4g"] exec_attr = {"name": "job_name"} user_config = {"spark.executor.memory": "4g"} @@ -90,16 +90,6 @@ class TestHijackRecommend: assert mock_spark_run.call_count == 1 mock_process_config.assert_not_called() - # 场景 4: 缺少任务名称 - @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.parse_cmd") - @patch("omniadvisor.interface.hijack_recommend.SparkCMDParser.validate_submit_arguments") - def test_missing_task_name(self, mock_validate_submit_arguments, mock_parse_cmd): - argv = ["--conf", "spark.executor.memory=4g"] - mock_parse_cmd.return_value = ({}, {"spark.executor.memory": "4g"}) - - with pytest.raises(ValueError, match="Task name not in Spark submit cmd"): - hijack_recommend(argv) - # 场景 5: 执行的是 test_config,触发 _process_load_config @patch('omniadvisor.service.spark_service.spark_run.multiprocessing.Process') @patch("omniadvisor.interface.hijack_recommend._process_load_config") -- Gitee