From 9fefedad26d18f7b172b5b502677c33f691efc27 Mon Sep 17 00:00:00 2001 From: yang_feida Date: Thu, 15 May 2025 17:15:17 +0800 Subject: [PATCH 1/8] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=8B=A5=E5=B9=B2bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/src/algo/expert/tuning.py | 2 +- omniadvisor/src/algo/iterative/tuning.py | 2 +- omniadvisor/src/algo/native/__init__.py | 0 omniadvisor/src/algo/native/tuning.py | 5 +++ omniadvisor/src/algo/transfer/__init__.py | 0 omniadvisor/src/algo/transfer/tuning.py | 5 +++ omniadvisor/src/common/constant.py | 3 +- .../omniadvisor/interface/config_tuning.py | 40 ++++++++++++++---- .../repository/exam_record_repository.py | 12 ++++++ .../repository/tuning_record_repository.py | 14 ++++++- .../src/omniadvisor/service/retest_service.py | 13 ++++-- .../spark_command_reconstruct.py | 4 +- .../service/spark_service/spark_fetcher.py | 7 +--- .../spark_service/spark_parameter_parser.py | 13 +++--- .../service/spark_service/spark_run.py | 34 +++++---------- .../service/tuning_result/tuning_result.py | 41 ++++++++++++++++--- .../tuning_result/tuning_result_history.py | 8 +++- 17 files changed, 144 insertions(+), 59 deletions(-) create mode 100644 omniadvisor/src/algo/native/__init__.py create mode 100644 omniadvisor/src/algo/native/tuning.py create mode 100644 omniadvisor/src/algo/transfer/__init__.py create mode 100644 omniadvisor/src/algo/transfer/tuning.py diff --git a/omniadvisor/src/algo/expert/tuning.py b/omniadvisor/src/algo/expert/tuning.py index dbcf3b072..696a273e3 100644 --- a/omniadvisor/src/algo/expert/tuning.py +++ b/omniadvisor/src/algo/expert/tuning.py @@ -2,4 +2,4 @@ class ExpertTuning: @staticmethod def tune(history: any): - return {} \ No newline at end of file + return {}, '' diff --git a/omniadvisor/src/algo/iterative/tuning.py b/omniadvisor/src/algo/iterative/tuning.py index ebb40fd06..4600f404f 100644 --- a/omniadvisor/src/algo/iterative/tuning.py +++ b/omniadvisor/src/algo/iterative/tuning.py @@ -2,4 +2,4 @@ class SmacAppendTuning: @staticmethod def tune(history: any): - return {} + return {}, '' diff --git a/omniadvisor/src/algo/native/__init__.py b/omniadvisor/src/algo/native/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/omniadvisor/src/algo/native/tuning.py b/omniadvisor/src/algo/native/tuning.py new file mode 100644 index 000000000..e6c5479c5 --- /dev/null +++ b/omniadvisor/src/algo/native/tuning.py @@ -0,0 +1,5 @@ +# 临时用于测试 +class NativeTuning: + @staticmethod + def tune(history: any): + return {}, '' diff --git a/omniadvisor/src/algo/transfer/__init__.py b/omniadvisor/src/algo/transfer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/omniadvisor/src/algo/transfer/tuning.py b/omniadvisor/src/algo/transfer/tuning.py new file mode 100644 index 000000000..d9a008a63 --- /dev/null +++ b/omniadvisor/src/algo/transfer/tuning.py @@ -0,0 +1,5 @@ +# 临时用于测试 +class TransferTuning: + @staticmethod + def tune(history: any, other_history: any): + return {}, '' diff --git a/omniadvisor/src/common/constant.py b/omniadvisor/src/common/constant.py index 1e0366dce..3725f1d96 100644 --- a/omniadvisor/src/common/constant.py +++ b/omniadvisor/src/common/constant.py @@ -54,7 +54,8 @@ class OmniAdvisorConf: iterative = 'iterative' expert = 'expert' transfer = 'transfer' - all = [user, iterative, expert, transfer] + native = 'native' + all = [user, iterative, expert, transfer, native] # 复测方式 class RetestWay: diff --git a/omniadvisor/src/omniadvisor/interface/config_tuning.py b/omniadvisor/src/omniadvisor/interface/config_tuning.py index c0a5523d4..599daee1a 100644 --- a/omniadvisor/src/omniadvisor/interface/config_tuning.py +++ b/omniadvisor/src/omniadvisor/interface/config_tuning.py @@ -1,14 +1,18 @@ import argparse +import time from algo.expert.tuning import ExpertTuning from algo.iterative.tuning import SmacAppendTuning from omniadvisor.repository.load_repository import LoadRepository from omniadvisor.repository.tuning_record_repository import TuningRecordRepository from omniadvisor.service.retest_service import retest +from omniadvisor.service.tuning_result.tuning_result import remove_tuning_result from omniadvisor.service.tuning_result.tuning_result_history import get_tuning_result_history, \ - get_next_tuning_method + get_next_tuning_method, get_other_tuning_result_history from omniadvisor.utils.logger import global_logger +from algo.native.tuning import NativeTuning +from algo.transfer.tuning import TransferTuning from common.constant import OA_CONF @@ -45,18 +49,38 @@ def unified_tuning(load, retest_way: str, tuning_method: str): # 推荐下一个配置 if tuning_method == OA_CONF.TuningMethod.iterative: perf_history = get_tuning_result_history(load) - next_config = SmacAppendTuning.tune(perf_history.tuning_history) + next_config, method_extend = SmacAppendTuning.tune(perf_history.tuning_history) TuningRecordRepository.create(load=load, config=next_config, method=OA_CONF.TuningMethod.iterative) - elif tuning_method == OA_CONF.TuneMethod.expert: + elif tuning_method == OA_CONF.TuningMethod.expert: perf_history = get_tuning_result_history(load) - next_config = ExpertTuning.tune(perf_history.tuning_history) - TuningRecordRepository.create(load=load, config=next_config, method=OA_CONF.TuningMethod.expert) + next_config, method_extend = ExpertTuning.tune(perf_history.tuning_history) + elif tuning_method == OA_CONF.TuningMethod.native: + perf_history = get_tuning_result_history(load) + next_config, method_extend = NativeTuning.tune(perf_history.tuning_history) + elif tuning_method == OA_CONF.TuningMethod.transfer: + perf_history = get_tuning_result_history(load) + other_history = get_other_tuning_result_history(load) + next_config, method_extend = TransferTuning.tune(perf_history.tuning_history, other_history) else: raise ValueError(f'Not supported tuning method: {tuning_method}') + if next_config: + TuningRecordRepository.create(load=load, config=next_config, method=OA_CONF.TuningMethod.expert, + method_extend=method_extend) + else: + global_logger.info('The recommending config is empty, please try other tuning methods.') + return + # 复测 if retest_way == OA_CONF.RetestWay.backend: - retest(load, next_config) + try: + retest(load, next_config) + except Exception: + # 如果是running,说明此调优配置不可能再被复测,那么删除对应的exam_records、tuning_record + global_logger.info('Remove tuning result because the status is running.') + remove_tuning_result(load, next_config) + raise + time.sleep(10) perf_history = get_tuning_result_history(load) # 更新最优配置 if perf_history.best_config: @@ -107,11 +131,11 @@ def main(): # 用户输入了强制调优的请求,以用户为准 unified_tuning(load, args.retest_way, args.tuning_method) else: - tuning_method = get_next_tuning_method(args.load_id) + tuning_method = get_next_tuning_method(load) if not tuning_method: global_logger.info( 'The tuning times reaches default settings, if you want to append tuning times, add \'-t\' in your ' 'command' ) return - unified_tuning(args.load_id, args.retest_way, tuning_method) + unified_tuning(load, args.retest_way, tuning_method) diff --git a/omniadvisor/src/omniadvisor/repository/exam_record_repository.py b/omniadvisor/src/omniadvisor/repository/exam_record_repository.py index dc12b30e5..22ba160b3 100644 --- a/omniadvisor/src/omniadvisor/repository/exam_record_repository.py +++ b/omniadvisor/src/omniadvisor/repository/exam_record_repository.py @@ -47,6 +47,8 @@ class ExamRecordRepository(Repository): model_attr = { ExamRecord.FieldName.load: load.database_model, ExamRecord.FieldName.config: config, + ExamRecord.FieldName.start_time: timezone.now(), + ExamRecord.FieldName.end_time: timezone.now() } database_record = cls._create(model_attr=model_attr) return ExamRecord(database_model=database_record) @@ -137,3 +139,13 @@ class ExamRecordRepository(Repository): ) return ExamRecord(database_model=database_task) + + @classmethod + def delete(cls, exam_record: ExamRecord): + """ + 更新测试记录中的结束时间为此刻 + + :param exam_record: 测试记录实例 + :return: 更新完后的测试记录 + """ + return exam_record.database_model.delete() diff --git a/omniadvisor/src/omniadvisor/repository/tuning_record_repository.py b/omniadvisor/src/omniadvisor/repository/tuning_record_repository.py index 28eb94a28..c9797996e 100644 --- a/omniadvisor/src/omniadvisor/repository/tuning_record_repository.py +++ b/omniadvisor/src/omniadvisor/repository/tuning_record_repository.py @@ -28,7 +28,7 @@ class TuningRecordRepository(Repository): } @classmethod - def create(cls, load: Load, config: dict, method: str): + def create(cls, load: Load, config: dict, method: str, method_extend: str = ''): """ 指定负载、参数配置和调优方法,新增调优记录 @@ -61,6 +61,7 @@ class TuningRecordRepository(Repository): TuningRecord.FieldName.config: config, TuningRecord.FieldName.method: method, TuningRecord.FieldName.rounds: tuning_rounds, + TuningRecord.FieldName.method_extend: method_extend } database_record = cls._create(model_attr=model_attr) return TuningRecord(database_model=database_record) @@ -103,4 +104,13 @@ class TuningRecordRepository(Repository): return [ TuningRecord(database_model=database_record) for database_record in database_records - ] \ No newline at end of file + ] + + @classmethod + def delete(cls, tuning_record: TuningRecord): + """ + 删除 tuning_record 记录 + :param tuning_record: + :return: + """ + return tuning_record.database_model.delete() diff --git a/omniadvisor/src/omniadvisor/service/retest_service.py b/omniadvisor/src/omniadvisor/service/retest_service.py index 36378f7bd..cddb5b133 100644 --- a/omniadvisor/src/omniadvisor/service/retest_service.py +++ b/omniadvisor/src/omniadvisor/service/retest_service.py @@ -1,6 +1,7 @@ from common.constant import OA_CONF from omniadvisor.repository.model.load import Load from omniadvisor.service.spark_service.spark_run import spark_run +from omniadvisor.service.tuning_result.tuning_result import get_tuning_result from omniadvisor.utils.logger import global_logger @@ -17,12 +18,18 @@ def retest(load: Load, config: dict): try: exam_record, spark_output = spark_run(load, config) except Exception as e: - global_logger.error('复测第 %d 轮失败,异常来源:非spark运行异常,详情:%s', i, e) + global_logger.error( + 'Retest failed in round %d. Exception source: Non-Spark exception. Details: %s.', i, e + ) # 目前采取的策略是:抛异常,认为是其他原因导致的失败,可以肯定不是配置的原因 raise if exam_record.status == OA_CONF.ExecStatus.success: - global_logger.info('复测第 %d 轮成功,性能结果:%.3f', i, exam_record.runtime) + global_logger.info('Retest succeeded in round %d. Performance result: %.3f.', i, exam_record.runtime) else: # 若是出现异常配置,也要退出 - raise RuntimeError('复测第 %d 轮失败,异常来源:spark运行异常:%s', i, spark_output) + global_logger.warning('Retest failed in round %d. Exception source: Spark exception: %s.', i, + spark_output) + tuning_result = get_tuning_result(load, config) + if tuning_result.failed_times >= OA_CONF.config_fail_threshold: + raise RuntimeError('The number of retest failures has reached the failure threshold.') diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_command_reconstruct.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_command_reconstruct.py index 31aa2dfa3..6c8a8c7dd 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_command_reconstruct.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_command_reconstruct.py @@ -1,5 +1,4 @@ from omniadvisor.utils.logger import global_logger -from common.constant import OA_CONF def spark_command_reconstruct(load, conf): @@ -43,7 +42,7 @@ def spark_command_reconstruct(load, conf): submit_cmd_list.append(f"{_normalize_key(key)}={_normalize_value(value)}") submit_cmd = " ".join(submit_cmd_list) - global_logger.info(f"拼接后的spark-sql命令如下{submit_cmd}") + global_logger.info(f"The complete spark-sql command is as follows {submit_cmd}") return submit_cmd @@ -67,4 +66,3 @@ def _normalize_value(value): if isinstance(value, str) and ' ' in value: value = f'"{value}"' return value - diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py index 648efd8bb..f41e89976 100755 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py @@ -3,7 +3,7 @@ import json class SparkFetcher: - def __init__(self, history_server_url,): + def __init__(self, history_server_url, ): """ 初始化SparkFetcher类 @@ -58,7 +58,7 @@ class SparkFetcher: :param app_id: 应用ID :return: 返回指定应用的阶段信息 """ - return self._make_request(f"api/v1/applications/{app_id}/stages") + return self._make_request(f"api/v1/applications/{app_id}/stages?withSummaries=true") def get_spark_executor_by_app(self, app_id): """ @@ -68,6 +68,3 @@ class SparkFetcher: :return: 返回指定应用的执行器信息 """ return self._make_request(f"api/v1/applications/{app_id}/executors") - - - diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_parameter_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_parameter_parser.py index e472b296a..069eca0e3 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_parameter_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_parameter_parser.py @@ -1,8 +1,8 @@ import shlex from dataclasses import dataclass -from omniadvisor.utils.logger import global_logger -from common.constant import OA_CONF + from omniadvisor.service.parameter_parser import ParserInterface +from omniadvisor.utils.logger import global_logger SPARK_NON_EXEC_ATTR_SET = {"name", "conf"} SPARK_CONF_SUPPLEMENT_MAP = {"num_executors": "spark.executor.instances", @@ -11,6 +11,7 @@ SPARK_CONF_SUPPLEMENT_MAP = {"num_executors": "spark.executor.instances", "executor_memory": "spark.executor.memory", "driver_memory": "spark.driver.memory"} + @dataclass(frozen=True) class BaseParams: name: str @@ -106,15 +107,17 @@ class SparkParameterParser(ParserInterface): # 提取conf配置 elif key == "conf": for confitem in value: - parts = confitem.split("=", 1) - confkey, confvalue = parts + confkey, confvalue = confitem.split("=", 1) # 处理重复键问题,例如合并值为列表 if confkey in conf_params: if isinstance(conf_params[confkey], list): conf_params[confkey].append(confvalue) else: conf_params[confkey] = [conf_params[key], value] - global_logger.warn(f"{confkey}被重复配置,多次配置值如下{conf_params[confvalue]}") + global_logger.warn( + f"{confkey} is configured repeatedly, and the multiple configuration values are as follows:" + f" {conf_params[confvalue]}" + ) else: conf_params[confkey] = confvalue # 将 --num-executors --executors-cores --driver-cores这类的配置归类到conf_params中便于后续重建命令 diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index a0a1d4698..bb576b097 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -9,7 +9,6 @@ from omniadvisor.repository.model.exam_record import ExamRecord from omniadvisor.service.spark_service.spark_command_reconstruct import spark_command_reconstruct from omniadvisor.service.spark_service.spark_executor import SparkExecutor from omniadvisor.service.spark_service.spark_fetcher import SparkFetcher -from omniadvisor.service.tuning_result.tuning_result import get_tuning_result from omniadvisor.utils.logger import global_logger from omniadvisor.utils.utils import save_trace_data @@ -18,37 +17,27 @@ def spark_run(load, conf): # 从解析后的参数列表中提取负载与任务的相关信息 submit_cmd = spark_command_reconstruct(load, conf) - # 判断当前的conf是否和load.default_config相同 不相同则在submit_cmd前增加超时时间 - if conf != load.default_config: - # 获取当前default_config的平均测试性能 - baseline_results = get_tuning_result(load, load.default_config) - timeout_sec = OA_CONF.timeout_ratio * baseline_results.runtime - submit_cmd = f"timeout {timeout_sec} " + submit_cmd - - # 根据执行命令创建测试记录 - exam_record = ExamRecordRepository.create(load, conf) # 执行当前的submit_cmd spark_executor = SparkExecutor() + # 为什么需要删除 exam_record?因为前台复测下,一旦exam_record处于running状态中断之后状态就不会再变了 try: - ExamRecordRepository.update_start_time(exam_record) exitcode, spark_output = spark_executor.submit_spark_task(submit_cmd) - ExamRecordRepository.update_end_time(exam_record) except TimeoutError: # 任务提交超时等 - exam_record.delete() global_logger.error('Spark command submission timed out.') raise except OSError: # 权限不足等 - exam_record.delete() global_logger.error('Spark command submission permission denied.') raise except Exception: - exam_record.delete() global_logger.error('During Spark command submission, known error occurred.') raise + # 根据执行命令创建测试记录 + exam_record = ExamRecordRepository.create(load, conf) + # 不存在application_id的情况下不提取time_taken 直接返回 if exitcode != 0: try: @@ -57,18 +46,18 @@ def spark_run(load, conf): status=OA_CONF.ExecStatus.fail, runtime=OA_CONF.exec_fail_return_runtime, trace=OA_CONF.exec_fail_return_trace - ), "" + ), spark_output except RuntimeError: - exam_record.delete() + global_logger.error('An error occurred when update exam record\'s status to %s', OA_CONF.ExecStatus.fail) + ExamRecordRepository.delete(exam_record) raise status = OA_CONF.ExecStatus.success # 提取application_id与 time_taken(runtime) - try: spark_submit_cmd, application_id, runtime = spark_executor.parser_spark_output(spark_output) except Exception: - exam_record.delete() + ExamRecordRepository.delete(exam_record) global_logger.error('During parsing spark output, known error occurred.') raise @@ -79,7 +68,8 @@ def spark_run(load, conf): runtime=runtime ) except RuntimeError: - exam_record.delete() + global_logger.error('An error occurred when update exam record\'s status to %s', OA_CONF.ExecStatus.fail) + ExamRecordRepository.delete(exam_record) raise # 根据ApplicantID在子进程中获取Trace @@ -90,7 +80,6 @@ def spark_run(load, conf): return exam_record, spark_output -# TODO 这个也要端到端验证一下 用了子进程会不会对ExamRecordRepository有同时读写的问题? def _update_trace_from_history_server(exam_record: ExamRecord, application_id: str): """ 创建一个子进程对history_server进行轮询 超时时间为10s @@ -108,9 +97,8 @@ def _update_trace_from_history_server(exam_record: ExamRecord, application_id: s trace_executor = spark_fetcher.get_spark_executor_by_app(application_id) except HTTPError as httpe: time.sleep(1) - global_logger.warning(f"HistoryServer访问错误:{httpe}") + global_logger.warning(f"Cannot access history server: {httpe}") continue - trace_dict['sql'] = save_trace_data(data=trace_sql, data_dir=OA_CONF.data_dir) trace_dict['stages'] = save_trace_data(data=trace_stages, data_dir=OA_CONF.data_dir) trace_dict['executor'] = save_trace_data(data=trace_executor, data_dir=OA_CONF.data_dir) diff --git a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py index 34a993943..9abdfdf99 100644 --- a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py +++ b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py @@ -1,6 +1,6 @@ from typing import List -from algo.common.model import Tuning +from algo.common.model import Tuning, Trace from omniadvisor.repository.model.load import Load from omniadvisor.repository.model.tuning_record import TuningRecord from omniadvisor.repository.model.exam_record import ExamRecord @@ -36,6 +36,19 @@ def get_tuning_result(load: Load, config: dict): ) +def remove_tuning_result(load: Load, config: dict): + """ + 删除给定负载和配置的执行记录 + """ + tuning_result = get_tuning_result(load, config) + # 清测试记录 + for exam_record in tuning_result.exam_records: + ExamRecordRepository.delete(exam_record) + + # 清调优记录 + TuningRecordRepository.delete(tuning_result.tuning_record) + + class TuningResult: """ 调优结果,汇总相同负载和配置下的调优记录和测试记录 @@ -75,6 +88,10 @@ class TuningResult: def exam_times(self): return len(self._exam_records) + @property + def failed_times(self): + return sum([exam_record.status == OA_CONF.ExecStatus.fail for exam_record in self._exam_records]) + @property def status(self): """ @@ -84,8 +101,7 @@ class TuningResult: 复测中:复测次数小于指定复测次数,且失败次数没有超过阈值,复测流程尚在进行中 :return: """ - failed_times = sum([exam_record.status == OA_CONF.ExecStatus.fail for exam_record in self._exam_records]) - if failed_times >= OA_CONF.config_fail_threshold: + if self.failed_times >= OA_CONF.config_fail_threshold: overall_status = OA_CONF.ExecStatus.fail elif len(self._exam_records) == OA_CONF.tuning_retest_times: overall_status = OA_CONF.ExecStatus.success @@ -116,16 +132,31 @@ class TuningResult: exam_record for exam_record in self._exam_records if exam_record.status == OA_CONF.ExecStatus.success ] + if not success_exam_results: + return OA_CONF.exec_fail_return_trace best_exam_record = min(success_exam_results, key=lambda exam_record: exam_record.runtime) return best_exam_record.trace + @property + def tuning_record(self): + return self._tuning_record + + @property + def exam_records(self): + return self._exam_records + def to_tuning(self) -> Tuning: + if self.trace is not None: + trace = Trace(stages_with_summaries=self.trace.get('stages'), sql=self.trace.get('sql')) + else: + trace = None + return Tuning( round=self.rounds, config=self.config, method=self.method, - rule='', + rule=self.method_extend, status=self.status, runtime=self.runtime, - trace=self.trace + trace=trace ) diff --git a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result_history.py b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result_history.py index 64d26df45..6236a7552 100644 --- a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result_history.py +++ b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result_history.py @@ -9,6 +9,11 @@ from omniadvisor.service.tuning_result.tuning_result import get_tuning_result from common.constant import OA_CONF +def get_other_tuning_result_history(load: Load): + # TODO 暂未实现 + pass + + def get_next_tuning_method(load) -> str: """ 根据 load,获取调优历史记录,并根据OA_CONF中的tuning_strategies决定本轮的调优方法 @@ -20,7 +25,7 @@ def get_next_tuning_method(load) -> str: next_tuning_method = None for tuning_method, expected_tuning_times in OA_CONF.tuning_strategies: # 历史中该方法的调优次数已经超过了设定的调优次数,跳过(超过是因为用户强制使用) - if statistics.get(tuning_method) >= expected_tuning_times: + if statistics.get(tuning_method, 0) >= expected_tuning_times: continue next_tuning_method = tuning_method break @@ -100,4 +105,3 @@ class TuningResultHistory: else: data[tuning_result.method] = 1 return data - -- Gitee From 678dd2338f349d0abdd8dbec76064b1bdc3ce5dc Mon Sep 17 00:00:00 2001 From: yang_feida Date: Thu, 15 May 2025 21:39:26 +0800 Subject: [PATCH 2/8] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=AF=AF=E5=88=A0?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../omniadvisor/interface/config_tuning.py | 4 ++-- .../repository/exam_record_repository.py | 2 -- .../service/spark_service/spark_run.py | 19 +++++++++++++++---- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/omniadvisor/src/omniadvisor/interface/config_tuning.py b/omniadvisor/src/omniadvisor/interface/config_tuning.py index 599daee1a..792353a4d 100644 --- a/omniadvisor/src/omniadvisor/interface/config_tuning.py +++ b/omniadvisor/src/omniadvisor/interface/config_tuning.py @@ -65,7 +65,7 @@ def unified_tuning(load, retest_way: str, tuning_method: str): raise ValueError(f'Not supported tuning method: {tuning_method}') if next_config: - TuningRecordRepository.create(load=load, config=next_config, method=OA_CONF.TuningMethod.expert, + TuningRecordRepository.create(load=load, config=next_config, method=tuning_method, method_extend=method_extend) else: global_logger.info('The recommending config is empty, please try other tuning methods.') @@ -76,7 +76,7 @@ def unified_tuning(load, retest_way: str, tuning_method: str): try: retest(load, next_config) except Exception: - # 如果是running,说明此调优配置不可能再被复测,那么删除对应的exam_records、tuning_record + # 如果之前发生错误,状态被置为running,此调优配置不可能再被复测,那么删除对应的exam_records、tuning_record global_logger.info('Remove tuning result because the status is running.') remove_tuning_result(load, next_config) raise diff --git a/omniadvisor/src/omniadvisor/repository/exam_record_repository.py b/omniadvisor/src/omniadvisor/repository/exam_record_repository.py index 22ba160b3..381e9f4d4 100644 --- a/omniadvisor/src/omniadvisor/repository/exam_record_repository.py +++ b/omniadvisor/src/omniadvisor/repository/exam_record_repository.py @@ -47,8 +47,6 @@ class ExamRecordRepository(Repository): model_attr = { ExamRecord.FieldName.load: load.database_model, ExamRecord.FieldName.config: config, - ExamRecord.FieldName.start_time: timezone.now(), - ExamRecord.FieldName.end_time: timezone.now() } database_record = cls._create(model_attr=model_attr) return ExamRecord(database_model=database_record) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index bb576b097..1815d76aa 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -9,6 +9,7 @@ from omniadvisor.repository.model.exam_record import ExamRecord from omniadvisor.service.spark_service.spark_command_reconstruct import spark_command_reconstruct from omniadvisor.service.spark_service.spark_executor import SparkExecutor from omniadvisor.service.spark_service.spark_fetcher import SparkFetcher +from omniadvisor.service.tuning_result.tuning_result import get_tuning_result from omniadvisor.utils.logger import global_logger from omniadvisor.utils.utils import save_trace_data @@ -17,27 +18,37 @@ def spark_run(load, conf): # 从解析后的参数列表中提取负载与任务的相关信息 submit_cmd = spark_command_reconstruct(load, conf) + # 判断当前的conf是否和load.default_config相同 不相同则在submit_cmd前增加超时时间 + if conf != load.default_config: + # 获取当前default_config的平均测试性能 + baseline_results = get_tuning_result(load, load.default_config) + timeout_sec = OA_CONF.timeout_ratio * baseline_results.runtime + submit_cmd = f"timeout {timeout_sec} " + submit_cmd + + # 根据执行命令创建测试记录 + exam_record = ExamRecordRepository.create(load, conf) # 执行当前的submit_cmd spark_executor = SparkExecutor() - # 为什么需要删除 exam_record?因为前台复测下,一旦exam_record处于running状态中断之后状态就不会再变了 try: + ExamRecordRepository.update_start_time(exam_record) exitcode, spark_output = spark_executor.submit_spark_task(submit_cmd) + ExamRecordRepository.update_end_time(exam_record) except TimeoutError: # 任务提交超时等 + exam_record.delete() global_logger.error('Spark command submission timed out.') raise except OSError: # 权限不足等 + exam_record.delete() global_logger.error('Spark command submission permission denied.') raise except Exception: + exam_record.delete() global_logger.error('During Spark command submission, known error occurred.') raise - # 根据执行命令创建测试记录 - exam_record = ExamRecordRepository.create(load, conf) - # 不存在application_id的情况下不提取time_taken 直接返回 if exitcode != 0: try: -- Gitee From eb762c1bd6bee105531763db5d1e3511b74859a6 Mon Sep 17 00:00:00 2001 From: yang_feida Date: Sat, 17 May 2025 11:04:27 +0800 Subject: [PATCH 3/8] =?UTF-8?q?=E8=AF=84=E5=AE=A1=E6=84=8F=E8=A7=81?= =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/src/omniadvisor/interface/config_tuning.py | 1 - .../src/omniadvisor/service/spark_service/spark_fetcher.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/omniadvisor/src/omniadvisor/interface/config_tuning.py b/omniadvisor/src/omniadvisor/interface/config_tuning.py index 792353a4d..8b429c15e 100644 --- a/omniadvisor/src/omniadvisor/interface/config_tuning.py +++ b/omniadvisor/src/omniadvisor/interface/config_tuning.py @@ -80,7 +80,6 @@ def unified_tuning(load, retest_way: str, tuning_method: str): global_logger.info('Remove tuning result because the status is running.') remove_tuning_result(load, next_config) raise - time.sleep(10) perf_history = get_tuning_result_history(load) # 更新最优配置 if perf_history.best_config: diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py index f41e89976..ade44dbf1 100755 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py @@ -3,7 +3,7 @@ import json class SparkFetcher: - def __init__(self, history_server_url, ): + def __init__(self, history_server_url,): """ 初始化SparkFetcher类 -- Gitee From e9c1437a9a80e04a1b17ac3a3b5a5d2dfeb12c21 Mon Sep 17 00:00:00 2001 From: yang_feida Date: Sat, 17 May 2025 18:31:12 +0800 Subject: [PATCH 4/8] =?UTF-8?q?=E7=A7=BB=E9=99=A4=E9=9D=9E=E5=BF=85?= =?UTF-8?q?=E8=A6=81=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/src/algo/__init__.py | 0 omniadvisor/src/algo/common/__init__.py | 0 omniadvisor/src/algo/common/model.py | 12 ------------ omniadvisor/src/algo/expert/__init__.py | 0 omniadvisor/src/algo/expert/tuning.py | 5 ----- omniadvisor/src/algo/iterative/__init__.py | 0 omniadvisor/src/algo/iterative/tuning.py | 5 ----- omniadvisor/src/algo/native/__init__.py | 0 omniadvisor/src/algo/native/tuning.py | 5 ----- omniadvisor/src/algo/transfer/__init__.py | 0 omniadvisor/src/algo/transfer/tuning.py | 5 ----- 11 files changed, 32 deletions(-) delete mode 100644 omniadvisor/src/algo/__init__.py delete mode 100644 omniadvisor/src/algo/common/__init__.py delete mode 100644 omniadvisor/src/algo/common/model.py delete mode 100644 omniadvisor/src/algo/expert/__init__.py delete mode 100644 omniadvisor/src/algo/expert/tuning.py delete mode 100644 omniadvisor/src/algo/iterative/__init__.py delete mode 100644 omniadvisor/src/algo/iterative/tuning.py delete mode 100644 omniadvisor/src/algo/native/__init__.py delete mode 100644 omniadvisor/src/algo/native/tuning.py delete mode 100644 omniadvisor/src/algo/transfer/__init__.py delete mode 100644 omniadvisor/src/algo/transfer/tuning.py diff --git a/omniadvisor/src/algo/__init__.py b/omniadvisor/src/algo/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/omniadvisor/src/algo/common/__init__.py b/omniadvisor/src/algo/common/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/omniadvisor/src/algo/common/model.py b/omniadvisor/src/algo/common/model.py deleted file mode 100644 index cc6cdc3ca..000000000 --- a/omniadvisor/src/algo/common/model.py +++ /dev/null @@ -1,12 +0,0 @@ -from dataclasses import dataclass - -# 临时用于测试 -@dataclass -class Tuning: - round: int - config: dict - method: str - rule: str - status: str - runtime: float - trace: dict diff --git a/omniadvisor/src/algo/expert/__init__.py b/omniadvisor/src/algo/expert/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/omniadvisor/src/algo/expert/tuning.py b/omniadvisor/src/algo/expert/tuning.py deleted file mode 100644 index 696a273e3..000000000 --- a/omniadvisor/src/algo/expert/tuning.py +++ /dev/null @@ -1,5 +0,0 @@ -# 临时用于测试 -class ExpertTuning: - @staticmethod - def tune(history: any): - return {}, '' diff --git a/omniadvisor/src/algo/iterative/__init__.py b/omniadvisor/src/algo/iterative/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/omniadvisor/src/algo/iterative/tuning.py b/omniadvisor/src/algo/iterative/tuning.py deleted file mode 100644 index 4600f404f..000000000 --- a/omniadvisor/src/algo/iterative/tuning.py +++ /dev/null @@ -1,5 +0,0 @@ -# 临时用于测试 -class SmacAppendTuning: - @staticmethod - def tune(history: any): - return {}, '' diff --git a/omniadvisor/src/algo/native/__init__.py b/omniadvisor/src/algo/native/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/omniadvisor/src/algo/native/tuning.py b/omniadvisor/src/algo/native/tuning.py deleted file mode 100644 index e6c5479c5..000000000 --- a/omniadvisor/src/algo/native/tuning.py +++ /dev/null @@ -1,5 +0,0 @@ -# 临时用于测试 -class NativeTuning: - @staticmethod - def tune(history: any): - return {}, '' diff --git a/omniadvisor/src/algo/transfer/__init__.py b/omniadvisor/src/algo/transfer/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/omniadvisor/src/algo/transfer/tuning.py b/omniadvisor/src/algo/transfer/tuning.py deleted file mode 100644 index d9a008a63..000000000 --- a/omniadvisor/src/algo/transfer/tuning.py +++ /dev/null @@ -1,5 +0,0 @@ -# 临时用于测试 -class TransferTuning: - @staticmethod - def tune(history: any, other_history: any): - return {}, '' -- Gitee From 7dc18489a499b4e7682731457c8f3964cd66a7bd Mon Sep 17 00:00:00 2001 From: yang_feida Date: Sat, 17 May 2025 18:32:06 +0800 Subject: [PATCH 5/8] =?UTF-8?q?=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E6=97=B6=EF=BC=8C=E9=BB=84=E5=8C=BA=E6=A8=A1=E5=9D=97=E5=9C=A8?= =?UTF-8?q?=E8=BF=99=E9=87=8C=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/tests/conftest.py | 58 +++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 omniadvisor/tests/conftest.py diff --git a/omniadvisor/tests/conftest.py b/omniadvisor/tests/conftest.py new file mode 100644 index 000000000..815fff6e5 --- /dev/null +++ b/omniadvisor/tests/conftest.py @@ -0,0 +1,58 @@ +import sys +from types import ModuleType + + +# 这个类的术语为 Dummy Object,仅用于任意对象的任意方法、属性,避免报错 +class SafeModule: + + def __getattr__(self, name): + # 动态注册子模块到 sys.modules + return self + + def __call__(self, *args, **kwargs): + return self + + +safe_instance = SafeModule() + + +def dot_expansion(s): + parts = s.split('.') + result = [] + current = [] + + for part in parts: + current.append(part) + result.append((part, '.'.join(current))) + + return result + + +# 所有算法相关的包,都在这里注册 +to_registers = { + 'algo.expert.tuning.Trace', + 'algo.common.model.Trace', + 'algo.expert.tuning.ExpertTuning', + 'algo.iterative.tuning.SmacAppendTuning', + 'algo.common.model.Tuning', + 'algo.native.tuning.NativeTuning', + 'algo.transfer.tuning.TransferTuning', +} + +# 注册顶层包: +sys.modules['algo'] = ModuleType('algo') + +for to_register in to_registers: + expansions = dot_expansion(to_register) + # 注册父子间的链式关系,如algo.iterative.tuning.SmacAppendTuning,先注册一个module 'algo.iterative',并建立与module 'algo'关系 + for idx, expansion in enumerate(expansions): + current_node_name, pathlike = expansion + if pathlike in sys.modules: + continue + father_node = sys.modules[expansions[idx - 1][1]] + if idx == len(expansions) - 1: + current_node = safe_instance + else: + current_node = ModuleType(pathlike) + setattr(father_node, current_node_name, current_node) + sys.modules[pathlike] = current_node -- Gitee From 0bfcdf0af6a8841ca60020dfaa8e59d245cf6852 Mon Sep 17 00:00:00 2001 From: yang_feida Date: Sat, 17 May 2025 18:32:22 +0800 Subject: [PATCH 6/8] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../omniadvisor/interface/config_tuning.py | 6 +-- .../tuning_result/tuning_result_history.py | 4 +- .../interface/test_config_tuning.py | 40 +++++++++++++------ .../test_tuning_result_history.py | 1 + 4 files changed, 32 insertions(+), 19 deletions(-) diff --git a/omniadvisor/src/omniadvisor/interface/config_tuning.py b/omniadvisor/src/omniadvisor/interface/config_tuning.py index 8b429c15e..b56a6c7f3 100644 --- a/omniadvisor/src/omniadvisor/interface/config_tuning.py +++ b/omniadvisor/src/omniadvisor/interface/config_tuning.py @@ -1,8 +1,9 @@ import argparse -import time from algo.expert.tuning import ExpertTuning from algo.iterative.tuning import SmacAppendTuning +from algo.native.tuning import NativeTuning +from algo.transfer.tuning import TransferTuning from omniadvisor.repository.load_repository import LoadRepository from omniadvisor.repository.tuning_record_repository import TuningRecordRepository from omniadvisor.service.retest_service import retest @@ -10,9 +11,6 @@ from omniadvisor.service.tuning_result.tuning_result import remove_tuning_result from omniadvisor.service.tuning_result.tuning_result_history import get_tuning_result_history, \ get_next_tuning_method, get_other_tuning_result_history from omniadvisor.utils.logger import global_logger - -from algo.native.tuning import NativeTuning -from algo.transfer.tuning import TransferTuning from common.constant import OA_CONF diff --git a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result_history.py b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result_history.py index 6236a7552..5985cc12f 100644 --- a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result_history.py +++ b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result_history.py @@ -10,8 +10,8 @@ from common.constant import OA_CONF def get_other_tuning_result_history(load: Load): - # TODO 暂未实现 - pass + # 暂未实现 + return None def get_next_tuning_method(load) -> str: diff --git a/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py b/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py index 16c350e25..1c9914469 100644 --- a/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py +++ b/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py @@ -3,9 +3,9 @@ import sys from unittest.mock import patch, MagicMock import pytest +from omniadvisor.interface.config_tuning import unified_tuning, main from common.constant import OA_CONF -from omniadvisor.interface.config_tuning import unified_tuning, main class TestTuning: @@ -15,6 +15,8 @@ class TestTuning: self.load = None self.retest_times = 3 self.tuning_method = OA_CONF.TuningMethod.iterative + self.empty_str = '' + self.tune_return_val = ({'key': 'value'}, '') def test_unified_tuning_when_retest_backend(self): """ @@ -28,8 +30,9 @@ class TestTuning: patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config') as mock_update_best, \ patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning: mock_exam_record = MagicMock() + mock_smac_tuning.return_value = self.tune_return_val mock_exam_record.status = OA_CONF.ExecStatus.success - spark_output = '' + spark_output = self.empty_str mock_spark_run.return_value = mock_exam_record, spark_output unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.backend, tuning_method=self.tuning_method) mock_update_best.assert_called_once() @@ -41,25 +44,32 @@ class TestTuning: 后台复测 spark命令执行异常 :return: """ - caplog.set_level(logging.ERROR) + caplog.set_level(logging.WARNING) with patch('omniadvisor.repository.load_repository.LoadRepository.query_by_id'), \ patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.create'), \ patch('omniadvisor.service.retest_service.spark_run') as mock_spark_run, \ patch('omniadvisor.interface.config_tuning.get_tuning_result_history'), \ + patch('omniadvisor.service.retest_service.get_tuning_result') as mock_get_tuning_result, \ + patch('omniadvisor.interface.config_tuning.remove_tuning_result') as mock_remove_tuning_result, \ patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning, \ - patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config') as mock_update_best, \ - pytest.raises(RuntimeError): + patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config') as mock_update_best: mock_exam_record = MagicMock() mock_exam_record.status = OA_CONF.ExecStatus.fail - spark_output = '' + spark_output = self.empty_str mock_spark_run.return_value = mock_exam_record, spark_output - unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.backend, tuning_method=self.tuning_method) + mock_smac_tuning.return_value = self.tune_return_val + mock_tuning_result = MagicMock() + mock_tuning_result.failed_times = OA_CONF.config_fail_threshold + mock_get_tuning_result.return_value = mock_tuning_result + with pytest.raises(RuntimeError): + unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.backend, tuning_method=self.tuning_method) mock_smac_tuning.assert_called_once() mock_spark_run.assert_called_once() mock_update_best.assert_not_called() + mock_remove_tuning_result.assert_called_once() - assert '复测第 1 轮失败,异常来源:spark运行异常' in caplog.text + assert 'Retest failed in round 1. Exception source: Spark exception' in caplog.text def test_unified_tuning_when_other_exception(self, caplog): """ @@ -71,16 +81,19 @@ class TestTuning: patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.create'), \ patch('omniadvisor.service.retest_service.spark_run', side_effect=RuntimeError) as mock_spark_run, \ patch('omniadvisor.interface.config_tuning.get_tuning_result_history'), \ - patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config') as mock_update_best, \ + patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config'), \ patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning, \ - pytest.raises(RuntimeError): + patch('omniadvisor.interface.config_tuning.remove_tuning_result') as mock_remove_tuning_result: mock_exam_record = MagicMock() mock_exam_record.status = OA_CONF.ExecStatus.success - unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.backend, tuning_method=self.tuning_method) - mock_update_best.assert_called_once() + mock_smac_tuning.return_value = self.tune_return_val + with pytest.raises(RuntimeError): + unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.backend, tuning_method=self.tuning_method) + mock_smac_tuning.assert_called_once() mock_spark_run.assert_called_once() - assert '非spark运行异常' in caplog.text + mock_remove_tuning_result.assert_called_once() + assert 'Retest failed in round 1. Exception source: Non-Spark exception' in caplog.text def test_main_when_load_id_not_exist(self): """ @@ -110,6 +123,7 @@ class TestTuning: patch('omniadvisor.interface.config_tuning.get_tuning_result_history') as mocked_query_perf, \ patch('omniadvisor.repository.load_repository.LoadRepository.update_test_config') as mock_update_test, \ patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning: + mock_smac_tuning.return_value = self.tune_return_val unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.hijacking, tuning_method=self.tuning_method) mocked_query_perf.assert_called_once() diff --git a/omniadvisor/tests/omniadvisor/service/tuning_result/test_tuning_result_history.py b/omniadvisor/tests/omniadvisor/service/tuning_result/test_tuning_result_history.py index d72a4441b..97cb5c654 100644 --- a/omniadvisor/tests/omniadvisor/service/tuning_result/test_tuning_result_history.py +++ b/omniadvisor/tests/omniadvisor/service/tuning_result/test_tuning_result_history.py @@ -50,6 +50,7 @@ class TestTuningResultHistory: assert len(result._tuning_results) == 1 assert result._tuning_results[0] == tuning_result_mock + @patch.object(TuningResult, 'to_tuning', lambda self: self) def test_tuning_result_history_config_perf_pairs(self, setup_mocks): load, tuning_record_mock, tuning_result_mock = setup_mocks -- Gitee From 7ce43573c8336a2733f710847944e2845dfdf965 Mon Sep 17 00:00:00 2001 From: yang_feida Date: Mon, 19 May 2025 09:28:36 +0800 Subject: [PATCH 7/8] =?UTF-8?q?clean=20code=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/spark_service/spark_run.py | 48 ++++++++++++------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index 1815d76aa..5712afa8a 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -30,24 +30,7 @@ def spark_run(load, conf): # 执行当前的submit_cmd spark_executor = SparkExecutor() - try: - ExamRecordRepository.update_start_time(exam_record) - exitcode, spark_output = spark_executor.submit_spark_task(submit_cmd) - ExamRecordRepository.update_end_time(exam_record) - except TimeoutError: - # 任务提交超时等 - exam_record.delete() - global_logger.error('Spark command submission timed out.') - raise - except OSError: - # 权限不足等 - exam_record.delete() - global_logger.error('Spark command submission permission denied.') - raise - except Exception: - exam_record.delete() - global_logger.error('During Spark command submission, known error occurred.') - raise + exitcode, spark_output = _submit_spark_task(exam_record, spark_executor, submit_cmd) # 不存在application_id的情况下不提取time_taken 直接返回 if exitcode != 0: @@ -91,6 +74,35 @@ def spark_run(load, conf): return exam_record, spark_output +def _submit_spark_task(exam_record, spark_executor, submit_cmd): + """ + 提交spark任务 + :param exam_record: 测试记录 + :param spark_executor: 执行器 + :param submit_cmd: 命令 + :return: + """ + try: + ExamRecordRepository.update_start_time(exam_record) + exitcode, spark_output = spark_executor.submit_spark_task(submit_cmd) + ExamRecordRepository.update_end_time(exam_record) + except TimeoutError: + # 任务提交超时等 + exam_record.delete() + global_logger.error('Spark command submission timed out.') + raise + except OSError: + # 权限不足等 + exam_record.delete() + global_logger.error('Spark command submission permission denied.') + raise + except Exception: + exam_record.delete() + global_logger.error('During Spark command submission, known error occurred.') + raise + return exitcode, spark_output + + def _update_trace_from_history_server(exam_record: ExamRecord, application_id: str): """ 创建一个子进程对history_server进行轮询 超时时间为10s -- Gitee From 22d53b01173778876b84e2c7bfc7a1b0703866b9 Mon Sep 17 00:00:00 2001 From: yang_feida Date: Mon, 19 May 2025 11:32:31 +0800 Subject: [PATCH 8/8] =?UTF-8?q?=E5=8F=98=E6=9B=B4=E4=B8=BA=E7=B1=BB?= =?UTF-8?q?=E6=96=B9=E6=B3=95=EF=BC=8C=E6=96=B9=E4=BE=BF=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/spark_service/spark_executor.py | 12 ++++++++---- .../omniadvisor/service/spark_service/spark_run.py | 10 ++++------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py index 1c3426177..8dd3426ad 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py @@ -3,7 +3,9 @@ from omniadvisor.utils.utils import run_cmd class SparkExecutor: - def submit_spark_task(self, execute_cmd): + + @classmethod + def submit_spark_task(cls, execute_cmd): """ 在shell终端提交spark命令 :param submit_config_str: spark的提交命令 @@ -13,10 +15,12 @@ class SparkExecutor: return exitcode, spark_output - def parser_spark_output(self, spark_output) -> dict: - return self._reg_match_application_id_and_time_taken(spark_output) + @classmethod + def parser_spark_output(cls, spark_output) -> dict: + return cls._reg_match_application_id_and_time_taken(spark_output) - def _reg_match_application_id_and_time_taken(self, spark_output): + @classmethod + def _reg_match_application_id_and_time_taken(cls, spark_output): spark_output = spark_output.split("\n") spark_submit_cmd = "" application_id = "" diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index 5712afa8a..28c131a14 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -29,8 +29,7 @@ def spark_run(load, conf): exam_record = ExamRecordRepository.create(load, conf) # 执行当前的submit_cmd - spark_executor = SparkExecutor() - exitcode, spark_output = _submit_spark_task(exam_record, spark_executor, submit_cmd) + exitcode, spark_output = _submit_spark_task(exam_record, submit_cmd) # 不存在application_id的情况下不提取time_taken 直接返回 if exitcode != 0: @@ -49,7 +48,7 @@ def spark_run(load, conf): status = OA_CONF.ExecStatus.success # 提取application_id与 time_taken(runtime) try: - spark_submit_cmd, application_id, runtime = spark_executor.parser_spark_output(spark_output) + spark_submit_cmd, application_id, runtime = SparkExecutor.parser_spark_output(spark_output) except Exception: ExamRecordRepository.delete(exam_record) global_logger.error('During parsing spark output, known error occurred.') @@ -74,17 +73,16 @@ def spark_run(load, conf): return exam_record, spark_output -def _submit_spark_task(exam_record, spark_executor, submit_cmd): +def _submit_spark_task(exam_record, submit_cmd): """ 提交spark任务 :param exam_record: 测试记录 - :param spark_executor: 执行器 :param submit_cmd: 命令 :return: """ try: ExamRecordRepository.update_start_time(exam_record) - exitcode, spark_output = spark_executor.submit_spark_task(submit_cmd) + exitcode, spark_output = SparkExecutor.submit_spark_task(submit_cmd) ExamRecordRepository.update_end_time(exam_record) except TimeoutError: # 任务提交超时等 -- Gitee