diff --git a/omniadvisor/src/algo/__init__.py b/omniadvisor/src/algo/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/omniadvisor/src/algo/common/__init__.py b/omniadvisor/src/algo/common/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/omniadvisor/src/algo/common/model.py b/omniadvisor/src/algo/common/model.py deleted file mode 100644 index cc6cdc3caf7c9681ce025163bc8ac433413460d9..0000000000000000000000000000000000000000 --- a/omniadvisor/src/algo/common/model.py +++ /dev/null @@ -1,12 +0,0 @@ -from dataclasses import dataclass - -# 临时用于测试 -@dataclass -class Tuning: - round: int - config: dict - method: str - rule: str - status: str - runtime: float - trace: dict diff --git a/omniadvisor/src/algo/expert/__init__.py b/omniadvisor/src/algo/expert/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/omniadvisor/src/algo/expert/tuning.py b/omniadvisor/src/algo/expert/tuning.py deleted file mode 100644 index dbcf3b072d81823a48803d7ec0089cb7017ecab7..0000000000000000000000000000000000000000 --- a/omniadvisor/src/algo/expert/tuning.py +++ /dev/null @@ -1,5 +0,0 @@ -# 临时用于测试 -class ExpertTuning: - @staticmethod - def tune(history: any): - return {} \ No newline at end of file diff --git a/omniadvisor/src/algo/iterative/__init__.py b/omniadvisor/src/algo/iterative/__init__.py deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/omniadvisor/src/algo/iterative/tuning.py b/omniadvisor/src/algo/iterative/tuning.py deleted file mode 100644 index ebb40fd06d8ac957dba96a166ce915e0610257d3..0000000000000000000000000000000000000000 --- a/omniadvisor/src/algo/iterative/tuning.py +++ /dev/null @@ -1,5 +0,0 @@ -# 临时用于测试 -class SmacAppendTuning: - @staticmethod - def tune(history: any): - return {} diff --git a/omniadvisor/src/common/constant.py b/omniadvisor/src/common/constant.py index 1e0366dce1ed0fd361f7722322e6d5d1954c57df..3725f1d96436f956060120c985cfeb2d82723245 100644 --- a/omniadvisor/src/common/constant.py +++ b/omniadvisor/src/common/constant.py @@ -54,7 +54,8 @@ class OmniAdvisorConf: iterative = 'iterative' expert = 'expert' transfer = 'transfer' - all = [user, iterative, expert, transfer] + native = 'native' + all = [user, iterative, expert, transfer, native] # 复测方式 class RetestWay: diff --git a/omniadvisor/src/omniadvisor/interface/config_tuning.py b/omniadvisor/src/omniadvisor/interface/config_tuning.py index 221a22961d2297b1318b566ff4abdd364e8598fe..962bed803c5603521ff5d304c8b95045e4e37322 100644 --- a/omniadvisor/src/omniadvisor/interface/config_tuning.py +++ b/omniadvisor/src/omniadvisor/interface/config_tuning.py @@ -2,11 +2,14 @@ import argparse from algo.expert.tuning import ExpertTuning from algo.iterative.tuning import SmacAppendTuning +from algo.native.tuning import NativeTuning +from algo.transfer.tuning import TransferTuning from omniadvisor.repository.load_repository import LoadRepository from omniadvisor.repository.tuning_record_repository import TuningRecordRepository from omniadvisor.service.retest_service import retest +from omniadvisor.service.tuning_result.tuning_result import remove_tuning_result from omniadvisor.service.tuning_result.tuning_result_history import get_tuning_result_history, \ - get_next_tuning_method + get_next_tuning_method, get_other_tuning_result_history from omniadvisor.utils.logger import global_logger from common.constant import OA_CONF @@ -41,29 +44,44 @@ def unified_tuning(load, retest_way: str, tuning_method: str): :param tuning_method: 算法的类型 :return: """ + if not load: + raise ValueError("Invalid load: configuration cannot be loaded") # 推荐下一个配置 if tuning_method == OA_CONF.TuningMethod.iterative: perf_history = get_tuning_result_history(load) - next_config = SmacAppendTuning.tune(perf_history.tuning_history) - elif tuning_method == OA_CONF.TuneMethod.expert: + next_config, method_extend = SmacAppendTuning.tune(perf_history.tuning_history) + elif tuning_method == OA_CONF.TuningMethod.expert: + perf_history = get_tuning_result_history(load) + next_config, method_extend = ExpertTuning.tune(perf_history.tuning_history) + elif tuning_method == OA_CONF.TuningMethod.native: + perf_history = get_tuning_result_history(load) + next_config, method_extend = NativeTuning.tune(perf_history.tuning_history) + elif tuning_method == OA_CONF.TuningMethod.transfer: perf_history = get_tuning_result_history(load) - next_config = ExpertTuning.tune(perf_history.tuning_history) - TuningRecordRepository.create(load=load, config=next_config, method=OA_CONF.TuningMethod.expert) + other_history = get_other_tuning_result_history(load) + next_config, method_extend = TransferTuning.tune(perf_history.tuning_history, other_history) else: raise ValueError(f'Not supported tuning method: {tuning_method}') + if not next_config: + global_logger.info('The recommending config is empty, please try other tuning methods.') + return + # 用户的default_config上叠加next_config叠加 - if load: - next_config = {**load.default_config, **next_config} - else: - raise ValueError("Invalid load: configuration cannot be loaded") + next_config = {**load.default_config, **next_config} - TuningRecordRepository.create(load=load, config=next_config, method=tuning_method) + TuningRecordRepository.create(load=load, config=next_config, method=tuning_method, method_extend=method_extend) # 复测 if retest_way == OA_CONF.RetestWay.backend: - retest(load, next_config) + try: + retest(load, next_config) + except Exception: + # 如果之前发生错误,状态被置为running,此调优配置不可能再被复测,那么删除对应的exam_records、tuning_record + global_logger.info('Remove tuning result because the status is running.') + remove_tuning_result(load, next_config) + raise perf_history = get_tuning_result_history(load) # 更新最优配置 if perf_history.best_config: @@ -114,11 +132,11 @@ def main(): # 用户输入了强制调优的请求,以用户为准 unified_tuning(load, args.retest_way, args.tuning_method) else: - tuning_method = get_next_tuning_method(args.load_id) + tuning_method = get_next_tuning_method(load) if not tuning_method: global_logger.info( 'The tuning times reaches default settings, if you want to append tuning times, add \'-t\' in your ' 'command' ) return - unified_tuning(args.load_id, args.retest_way, tuning_method) + unified_tuning(load, args.retest_way, tuning_method) diff --git a/omniadvisor/src/omniadvisor/repository/exam_record_repository.py b/omniadvisor/src/omniadvisor/repository/exam_record_repository.py index dc12b30e5959be0f45e8c0869417a35f64dcd1e2..381e9f4d46c4dd2979b3b6e4b5bb9670d5e191a7 100644 --- a/omniadvisor/src/omniadvisor/repository/exam_record_repository.py +++ b/omniadvisor/src/omniadvisor/repository/exam_record_repository.py @@ -137,3 +137,13 @@ class ExamRecordRepository(Repository): ) return ExamRecord(database_model=database_task) + + @classmethod + def delete(cls, exam_record: ExamRecord): + """ + 更新测试记录中的结束时间为此刻 + + :param exam_record: 测试记录实例 + :return: 更新完后的测试记录 + """ + return exam_record.database_model.delete() diff --git a/omniadvisor/src/omniadvisor/repository/tuning_record_repository.py b/omniadvisor/src/omniadvisor/repository/tuning_record_repository.py index 28eb94a28470253961cf5426e5639044f5b36001..c9797996eff91e43925afab888b7f09b534809d9 100644 --- a/omniadvisor/src/omniadvisor/repository/tuning_record_repository.py +++ b/omniadvisor/src/omniadvisor/repository/tuning_record_repository.py @@ -28,7 +28,7 @@ class TuningRecordRepository(Repository): } @classmethod - def create(cls, load: Load, config: dict, method: str): + def create(cls, load: Load, config: dict, method: str, method_extend: str = ''): """ 指定负载、参数配置和调优方法,新增调优记录 @@ -61,6 +61,7 @@ class TuningRecordRepository(Repository): TuningRecord.FieldName.config: config, TuningRecord.FieldName.method: method, TuningRecord.FieldName.rounds: tuning_rounds, + TuningRecord.FieldName.method_extend: method_extend } database_record = cls._create(model_attr=model_attr) return TuningRecord(database_model=database_record) @@ -103,4 +104,13 @@ class TuningRecordRepository(Repository): return [ TuningRecord(database_model=database_record) for database_record in database_records - ] \ No newline at end of file + ] + + @classmethod + def delete(cls, tuning_record: TuningRecord): + """ + 删除 tuning_record 记录 + :param tuning_record: + :return: + """ + return tuning_record.database_model.delete() diff --git a/omniadvisor/src/omniadvisor/service/retest_service.py b/omniadvisor/src/omniadvisor/service/retest_service.py index 36378f7bd51e562d43d5dc76e0d904c46c5eac99..cddb5b133ab6bc386aedd8ea54a7eed0f17b23cf 100644 --- a/omniadvisor/src/omniadvisor/service/retest_service.py +++ b/omniadvisor/src/omniadvisor/service/retest_service.py @@ -1,6 +1,7 @@ from common.constant import OA_CONF from omniadvisor.repository.model.load import Load from omniadvisor.service.spark_service.spark_run import spark_run +from omniadvisor.service.tuning_result.tuning_result import get_tuning_result from omniadvisor.utils.logger import global_logger @@ -17,12 +18,18 @@ def retest(load: Load, config: dict): try: exam_record, spark_output = spark_run(load, config) except Exception as e: - global_logger.error('复测第 %d 轮失败,异常来源:非spark运行异常,详情:%s', i, e) + global_logger.error( + 'Retest failed in round %d. Exception source: Non-Spark exception. Details: %s.', i, e + ) # 目前采取的策略是:抛异常,认为是其他原因导致的失败,可以肯定不是配置的原因 raise if exam_record.status == OA_CONF.ExecStatus.success: - global_logger.info('复测第 %d 轮成功,性能结果:%.3f', i, exam_record.runtime) + global_logger.info('Retest succeeded in round %d. Performance result: %.3f.', i, exam_record.runtime) else: # 若是出现异常配置,也要退出 - raise RuntimeError('复测第 %d 轮失败,异常来源:spark运行异常:%s', i, spark_output) + global_logger.warning('Retest failed in round %d. Exception source: Spark exception: %s.', i, + spark_output) + tuning_result = get_tuning_result(load, config) + if tuning_result.failed_times >= OA_CONF.config_fail_threshold: + raise RuntimeError('The number of retest failures has reached the failure threshold.') diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_command_reconstruct.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_command_reconstruct.py index 31aa2dfa3e0312df38ae23a6d60a795bcbc3d20b..6c8a8c7ddfdadd55cb67bf5aed757baf4ad68de7 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_command_reconstruct.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_command_reconstruct.py @@ -1,5 +1,4 @@ from omniadvisor.utils.logger import global_logger -from common.constant import OA_CONF def spark_command_reconstruct(load, conf): @@ -43,7 +42,7 @@ def spark_command_reconstruct(load, conf): submit_cmd_list.append(f"{_normalize_key(key)}={_normalize_value(value)}") submit_cmd = " ".join(submit_cmd_list) - global_logger.info(f"拼接后的spark-sql命令如下{submit_cmd}") + global_logger.info(f"The complete spark-sql command is as follows {submit_cmd}") return submit_cmd @@ -67,4 +66,3 @@ def _normalize_value(value): if isinstance(value, str) and ' ' in value: value = f'"{value}"' return value - diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py index 1c3426177c44e796cf4df8e98993c59fd5cba197..8dd3426ad5eef2a4e9db6f2bce950efb688ef734 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_executor.py @@ -3,7 +3,9 @@ from omniadvisor.utils.utils import run_cmd class SparkExecutor: - def submit_spark_task(self, execute_cmd): + + @classmethod + def submit_spark_task(cls, execute_cmd): """ 在shell终端提交spark命令 :param submit_config_str: spark的提交命令 @@ -13,10 +15,12 @@ class SparkExecutor: return exitcode, spark_output - def parser_spark_output(self, spark_output) -> dict: - return self._reg_match_application_id_and_time_taken(spark_output) + @classmethod + def parser_spark_output(cls, spark_output) -> dict: + return cls._reg_match_application_id_and_time_taken(spark_output) - def _reg_match_application_id_and_time_taken(self, spark_output): + @classmethod + def _reg_match_application_id_and_time_taken(cls, spark_output): spark_output = spark_output.split("\n") spark_submit_cmd = "" application_id = "" diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py index 648efd8bb2ea2dbd496484b5a232327f24d24c17..ade44dbf1e70e83714a294642d8e8108b3c1b7fd 100755 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_fetcher.py @@ -58,7 +58,7 @@ class SparkFetcher: :param app_id: 应用ID :return: 返回指定应用的阶段信息 """ - return self._make_request(f"api/v1/applications/{app_id}/stages") + return self._make_request(f"api/v1/applications/{app_id}/stages?withSummaries=true") def get_spark_executor_by_app(self, app_id): """ @@ -68,6 +68,3 @@ class SparkFetcher: :return: 返回指定应用的执行器信息 """ return self._make_request(f"api/v1/applications/{app_id}/executors") - - - diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_parameter_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_parameter_parser.py index e472b296a0ae451c97597bb2037145e165934eb7..069eca0e332622ef0bdd3e0a7d543a8ba7116155 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_parameter_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_parameter_parser.py @@ -1,8 +1,8 @@ import shlex from dataclasses import dataclass -from omniadvisor.utils.logger import global_logger -from common.constant import OA_CONF + from omniadvisor.service.parameter_parser import ParserInterface +from omniadvisor.utils.logger import global_logger SPARK_NON_EXEC_ATTR_SET = {"name", "conf"} SPARK_CONF_SUPPLEMENT_MAP = {"num_executors": "spark.executor.instances", @@ -11,6 +11,7 @@ SPARK_CONF_SUPPLEMENT_MAP = {"num_executors": "spark.executor.instances", "executor_memory": "spark.executor.memory", "driver_memory": "spark.driver.memory"} + @dataclass(frozen=True) class BaseParams: name: str @@ -106,15 +107,17 @@ class SparkParameterParser(ParserInterface): # 提取conf配置 elif key == "conf": for confitem in value: - parts = confitem.split("=", 1) - confkey, confvalue = parts + confkey, confvalue = confitem.split("=", 1) # 处理重复键问题,例如合并值为列表 if confkey in conf_params: if isinstance(conf_params[confkey], list): conf_params[confkey].append(confvalue) else: conf_params[confkey] = [conf_params[key], value] - global_logger.warn(f"{confkey}被重复配置,多次配置值如下{conf_params[confvalue]}") + global_logger.warn( + f"{confkey} is configured repeatedly, and the multiple configuration values are as follows:" + f" {conf_params[confvalue]}" + ) else: conf_params[confkey] = confvalue # 将 --num-executors --executors-cores --driver-cores这类的配置归类到conf_params中便于后续重建命令 diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index a0a1d46981fd59cf74726715de05c281f2986647..28c131a14702692a6042df75f002880a6b9eef56 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -29,25 +29,7 @@ def spark_run(load, conf): exam_record = ExamRecordRepository.create(load, conf) # 执行当前的submit_cmd - spark_executor = SparkExecutor() - try: - ExamRecordRepository.update_start_time(exam_record) - exitcode, spark_output = spark_executor.submit_spark_task(submit_cmd) - ExamRecordRepository.update_end_time(exam_record) - except TimeoutError: - # 任务提交超时等 - exam_record.delete() - global_logger.error('Spark command submission timed out.') - raise - except OSError: - # 权限不足等 - exam_record.delete() - global_logger.error('Spark command submission permission denied.') - raise - except Exception: - exam_record.delete() - global_logger.error('During Spark command submission, known error occurred.') - raise + exitcode, spark_output = _submit_spark_task(exam_record, submit_cmd) # 不存在application_id的情况下不提取time_taken 直接返回 if exitcode != 0: @@ -57,18 +39,18 @@ def spark_run(load, conf): status=OA_CONF.ExecStatus.fail, runtime=OA_CONF.exec_fail_return_runtime, trace=OA_CONF.exec_fail_return_trace - ), "" + ), spark_output except RuntimeError: - exam_record.delete() + global_logger.error('An error occurred when update exam record\'s status to %s', OA_CONF.ExecStatus.fail) + ExamRecordRepository.delete(exam_record) raise status = OA_CONF.ExecStatus.success # 提取application_id与 time_taken(runtime) - try: - spark_submit_cmd, application_id, runtime = spark_executor.parser_spark_output(spark_output) + spark_submit_cmd, application_id, runtime = SparkExecutor.parser_spark_output(spark_output) except Exception: - exam_record.delete() + ExamRecordRepository.delete(exam_record) global_logger.error('During parsing spark output, known error occurred.') raise @@ -79,7 +61,8 @@ def spark_run(load, conf): runtime=runtime ) except RuntimeError: - exam_record.delete() + global_logger.error('An error occurred when update exam record\'s status to %s', OA_CONF.ExecStatus.fail) + ExamRecordRepository.delete(exam_record) raise # 根据ApplicantID在子进程中获取Trace @@ -90,7 +73,34 @@ def spark_run(load, conf): return exam_record, spark_output -# TODO 这个也要端到端验证一下 用了子进程会不会对ExamRecordRepository有同时读写的问题? +def _submit_spark_task(exam_record, submit_cmd): + """ + 提交spark任务 + :param exam_record: 测试记录 + :param submit_cmd: 命令 + :return: + """ + try: + ExamRecordRepository.update_start_time(exam_record) + exitcode, spark_output = SparkExecutor.submit_spark_task(submit_cmd) + ExamRecordRepository.update_end_time(exam_record) + except TimeoutError: + # 任务提交超时等 + exam_record.delete() + global_logger.error('Spark command submission timed out.') + raise + except OSError: + # 权限不足等 + exam_record.delete() + global_logger.error('Spark command submission permission denied.') + raise + except Exception: + exam_record.delete() + global_logger.error('During Spark command submission, known error occurred.') + raise + return exitcode, spark_output + + def _update_trace_from_history_server(exam_record: ExamRecord, application_id: str): """ 创建一个子进程对history_server进行轮询 超时时间为10s @@ -108,9 +118,8 @@ def _update_trace_from_history_server(exam_record: ExamRecord, application_id: s trace_executor = spark_fetcher.get_spark_executor_by_app(application_id) except HTTPError as httpe: time.sleep(1) - global_logger.warning(f"HistoryServer访问错误:{httpe}") + global_logger.warning(f"Cannot access history server: {httpe}") continue - trace_dict['sql'] = save_trace_data(data=trace_sql, data_dir=OA_CONF.data_dir) trace_dict['stages'] = save_trace_data(data=trace_stages, data_dir=OA_CONF.data_dir) trace_dict['executor'] = save_trace_data(data=trace_executor, data_dir=OA_CONF.data_dir) diff --git a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py index e61d9629ab7705181bc634c28823261bea5f1b64..925df6265076b92591154d3b36f4771583b6f8b8 100644 --- a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py +++ b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result.py @@ -1,6 +1,6 @@ from typing import List -from algo.common.model import Tuning +from algo.common.model import Tuning, Trace from omniadvisor.repository.model.load import Load from omniadvisor.repository.model.tuning_record import TuningRecord from omniadvisor.repository.model.exam_record import ExamRecord @@ -36,6 +36,19 @@ def get_tuning_result(load: Load, config: dict): ) +def remove_tuning_result(load: Load, config: dict): + """ + 删除给定负载和配置的执行记录 + """ + tuning_result = get_tuning_result(load, config) + # 清测试记录 + for exam_record in tuning_result.exam_records: + ExamRecordRepository.delete(exam_record) + + # 清调优记录 + TuningRecordRepository.delete(tuning_result.tuning_record) + + class TuningResult: """ 调优结果,汇总相同负载和配置下的调优记录和测试记录 @@ -75,6 +88,10 @@ class TuningResult: def exam_times(self): return len(self._exam_records) + @property + def failed_times(self): + return sum([exam_record.status == OA_CONF.ExecStatus.fail for exam_record in self._exam_records]) + @property def status(self): """ @@ -84,8 +101,7 @@ class TuningResult: 复测中:复测次数小于指定复测次数,且失败次数没有超过阈值,复测流程尚在进行中 :return: """ - failed_times = sum([exam_record.status == OA_CONF.ExecStatus.fail for exam_record in self._exam_records]) - if failed_times >= OA_CONF.config_fail_threshold: + if self.failed_times >= OA_CONF.config_fail_threshold: overall_status = OA_CONF.ExecStatus.fail elif len(self._exam_records) == OA_CONF.tuning_retest_times: overall_status = OA_CONF.ExecStatus.success @@ -121,13 +137,26 @@ class TuningResult: best_exam_record = min(success_exam_results, key=lambda exam_record: exam_record.runtime) return best_exam_record.trace + @property + def tuning_record(self): + return self._tuning_record + + @property + def exam_records(self): + return self._exam_records + def to_tuning(self) -> Tuning: + if self.trace is not None: + trace = Trace(stages_with_summaries=self.trace.get('stages'), sql=self.trace.get('sql')) + else: + trace = None + return Tuning( round=self.rounds, config=self.config, method=self.method, - rule='', + rule=self.method_extend, status=self.status, runtime=self.runtime, - trace=self.trace + trace=trace ) diff --git a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result_history.py b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result_history.py index 64d26df45dc5634d15beff22655ce5c304fa5c0a..5985cc12f35c54d456bc71bdeed02f4f687125d5 100644 --- a/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result_history.py +++ b/omniadvisor/src/omniadvisor/service/tuning_result/tuning_result_history.py @@ -9,6 +9,11 @@ from omniadvisor.service.tuning_result.tuning_result import get_tuning_result from common.constant import OA_CONF +def get_other_tuning_result_history(load: Load): + # 暂未实现 + return None + + def get_next_tuning_method(load) -> str: """ 根据 load,获取调优历史记录,并根据OA_CONF中的tuning_strategies决定本轮的调优方法 @@ -20,7 +25,7 @@ def get_next_tuning_method(load) -> str: next_tuning_method = None for tuning_method, expected_tuning_times in OA_CONF.tuning_strategies: # 历史中该方法的调优次数已经超过了设定的调优次数,跳过(超过是因为用户强制使用) - if statistics.get(tuning_method) >= expected_tuning_times: + if statistics.get(tuning_method, 0) >= expected_tuning_times: continue next_tuning_method = tuning_method break @@ -100,4 +105,3 @@ class TuningResultHistory: else: data[tuning_result.method] = 1 return data - diff --git a/omniadvisor/tests/conftest.py b/omniadvisor/tests/conftest.py new file mode 100644 index 0000000000000000000000000000000000000000..815fff6e575dd39b5f0728ff2af0e71db50940bc --- /dev/null +++ b/omniadvisor/tests/conftest.py @@ -0,0 +1,58 @@ +import sys +from types import ModuleType + + +# 这个类的术语为 Dummy Object,仅用于任意对象的任意方法、属性,避免报错 +class SafeModule: + + def __getattr__(self, name): + # 动态注册子模块到 sys.modules + return self + + def __call__(self, *args, **kwargs): + return self + + +safe_instance = SafeModule() + + +def dot_expansion(s): + parts = s.split('.') + result = [] + current = [] + + for part in parts: + current.append(part) + result.append((part, '.'.join(current))) + + return result + + +# 所有算法相关的包,都在这里注册 +to_registers = { + 'algo.expert.tuning.Trace', + 'algo.common.model.Trace', + 'algo.expert.tuning.ExpertTuning', + 'algo.iterative.tuning.SmacAppendTuning', + 'algo.common.model.Tuning', + 'algo.native.tuning.NativeTuning', + 'algo.transfer.tuning.TransferTuning', +} + +# 注册顶层包: +sys.modules['algo'] = ModuleType('algo') + +for to_register in to_registers: + expansions = dot_expansion(to_register) + # 注册父子间的链式关系,如algo.iterative.tuning.SmacAppendTuning,先注册一个module 'algo.iterative',并建立与module 'algo'关系 + for idx, expansion in enumerate(expansions): + current_node_name, pathlike = expansion + if pathlike in sys.modules: + continue + father_node = sys.modules[expansions[idx - 1][1]] + if idx == len(expansions) - 1: + current_node = safe_instance + else: + current_node = ModuleType(pathlike) + setattr(father_node, current_node_name, current_node) + sys.modules[pathlike] = current_node diff --git a/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py b/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py index c437e60aa6a9276922fc19e486a2b8c295abd044..603375f6884f7e2b41bc0d36472dd7e559b5d51f 100644 --- a/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py +++ b/omniadvisor/tests/omniadvisor/interface/test_config_tuning.py @@ -3,9 +3,9 @@ import sys from unittest.mock import patch, MagicMock import pytest +from omniadvisor.interface.config_tuning import unified_tuning, main from common.constant import OA_CONF -from omniadvisor.interface.config_tuning import unified_tuning, main class TestTuning: @@ -16,6 +16,8 @@ class TestTuning: self.load.default_config = {"test_config": "1"} self.retest_times = 3 self.tuning_method = OA_CONF.TuningMethod.iterative + self.empty_str = '' + self.tune_return_val = ({'key': 'value'}, '') def test_unified_tuning_when_retest_backend(self): """ @@ -29,8 +31,9 @@ class TestTuning: patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config') as mock_update_best, \ patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning: mock_exam_record = MagicMock() + mock_smac_tuning.return_value = self.tune_return_val mock_exam_record.status = OA_CONF.ExecStatus.success - spark_output = '' + spark_output = self.empty_str mock_spark_run.return_value = mock_exam_record, spark_output unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.backend, tuning_method=self.tuning_method) mock_update_best.assert_called_once() @@ -42,25 +45,32 @@ class TestTuning: 后台复测 spark命令执行异常 :return: """ - caplog.set_level(logging.ERROR) + caplog.set_level(logging.WARNING) with patch('omniadvisor.repository.load_repository.LoadRepository.query_by_id'), \ patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.create'), \ patch('omniadvisor.service.retest_service.spark_run') as mock_spark_run, \ patch('omniadvisor.interface.config_tuning.get_tuning_result_history'), \ + patch('omniadvisor.service.retest_service.get_tuning_result') as mock_get_tuning_result, \ + patch('omniadvisor.interface.config_tuning.remove_tuning_result') as mock_remove_tuning_result, \ patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning, \ - patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config') as mock_update_best, \ - pytest.raises(RuntimeError): + patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config') as mock_update_best: mock_exam_record = MagicMock() mock_exam_record.status = OA_CONF.ExecStatus.fail - spark_output = '' + spark_output = self.empty_str mock_spark_run.return_value = mock_exam_record, spark_output - unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.backend, tuning_method=self.tuning_method) + mock_smac_tuning.return_value = self.tune_return_val + mock_tuning_result = MagicMock() + mock_tuning_result.failed_times = OA_CONF.config_fail_threshold + mock_get_tuning_result.return_value = mock_tuning_result + with pytest.raises(RuntimeError): + unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.backend, tuning_method=self.tuning_method) mock_smac_tuning.assert_called_once() mock_spark_run.assert_called_once() mock_update_best.assert_not_called() + mock_remove_tuning_result.assert_called_once() - assert '复测第 1 轮失败,异常来源:spark运行异常' in caplog.text + assert 'Retest failed in round 1. Exception source: Spark exception' in caplog.text def test_unified_tuning_when_other_exception(self, caplog): """ @@ -72,16 +82,19 @@ class TestTuning: patch('omniadvisor.repository.tuning_record_repository.TuningRecordRepository.create'), \ patch('omniadvisor.service.retest_service.spark_run', side_effect=RuntimeError) as mock_spark_run, \ patch('omniadvisor.interface.config_tuning.get_tuning_result_history'), \ - patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config') as mock_update_best, \ + patch('omniadvisor.repository.load_repository.LoadRepository.update_best_config'), \ patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning, \ - pytest.raises(RuntimeError): + patch('omniadvisor.interface.config_tuning.remove_tuning_result') as mock_remove_tuning_result: mock_exam_record = MagicMock() mock_exam_record.status = OA_CONF.ExecStatus.success - unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.backend, tuning_method=self.tuning_method) - mock_update_best.assert_called_once() + mock_smac_tuning.return_value = self.tune_return_val + with pytest.raises(RuntimeError): + unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.backend, tuning_method=self.tuning_method) + mock_smac_tuning.assert_called_once() mock_spark_run.assert_called_once() - assert '非spark运行异常' in caplog.text + mock_remove_tuning_result.assert_called_once() + assert 'Retest failed in round 1. Exception source: Non-Spark exception' in caplog.text def test_main_when_load_id_not_exist(self): """ @@ -111,6 +124,7 @@ class TestTuning: patch('omniadvisor.interface.config_tuning.get_tuning_result_history') as mocked_query_perf, \ patch('omniadvisor.repository.load_repository.LoadRepository.update_test_config') as mock_update_test, \ patch('algo.iterative.tuning.SmacAppendTuning.tune') as mock_smac_tuning: + mock_smac_tuning.return_value = self.tune_return_val unified_tuning(load=self.load, retest_way=OA_CONF.RetestWay.hijacking, tuning_method=self.tuning_method) mocked_query_perf.assert_called_once() diff --git a/omniadvisor/tests/omniadvisor/service/tuning_result/test_tuning_result_history.py b/omniadvisor/tests/omniadvisor/service/tuning_result/test_tuning_result_history.py index d72a4441b0acb79ca3bd043cb38c4e6eb684861e..97cb5c6544d59af645246543c0fdde82e49e7519 100644 --- a/omniadvisor/tests/omniadvisor/service/tuning_result/test_tuning_result_history.py +++ b/omniadvisor/tests/omniadvisor/service/tuning_result/test_tuning_result_history.py @@ -50,6 +50,7 @@ class TestTuningResultHistory: assert len(result._tuning_results) == 1 assert result._tuning_results[0] == tuning_result_mock + @patch.object(TuningResult, 'to_tuning', lambda self: self) def test_tuning_result_history_config_perf_pairs(self, setup_mocks): load, tuning_record_mock, tuning_result_mock = setup_mocks