diff --git a/omniadvisor/config/common_config.cfg b/omniadvisor/config/common_config.cfg index 4248a8900fdd7c2c4e1baa0426af76b6ef26896a..10df3ac232552bcbd2082f92d5bc2a6ee0b655d0 100755 --- a/omniadvisor/config/common_config.cfg +++ b/omniadvisor/config/common_config.cfg @@ -3,8 +3,22 @@ tuning.retest.times=1 # 评估配置失效的阈值,当配置执行失败次数大于等于此值,则配置失效 config.fail.threshold = 1 -#调优策略 +# 调优策略 tuning.strategy=[["transfer", 1],["expert", 2],["iterative", 10]] +# 后台复测使用的队列,若不设置,则保持用户队列 +backend.retest.queue= + +[database] +# 后端数据库postgresql的database名称 +postgresql.database.name= +# 后端数据库postgresql的用户 +postgresql.database.user= +# 后端数据库postgresql的用户登录密码 +postgresql.database.password= +# 后端数据库postgresql的连接host +postgresql.database.host= +# 后端数据库postgresql的连接端口 +postgresql.database.port= [spark] # Spark History Server的URL 仅用于Rest模式 diff --git a/omniadvisor/src/common/constant.py b/omniadvisor/src/common/constant.py index 8a75091fbde9d10761be2ab7a7f3274a53e56f2a..a102580a19a5d208607bc097adaaea0134241280 100644 --- a/omniadvisor/src/common/constant.py +++ b/omniadvisor/src/common/constant.py @@ -2,8 +2,6 @@ import json import os import configparser -from server.engine.settings import BASE_DIR - def load_common_config(config_path: str): """ @@ -56,8 +54,6 @@ class OmniAdvisorConf: # OmniAdvisor根目录 project_base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - # Django Server根目录 - server_base_dir = BASE_DIR # Log文件目录 log_dir = f'{project_base_dir}/logs' log_file_path = f'{log_dir}/app.log' @@ -106,15 +102,24 @@ class OmniAdvisorConf: # 输入配置解析 _common_config = load_common_config(config_path=common_config_path) # 配置罗列 + # common页 tuning_retest_times = _common_config.getint('common', 'tuning.retest.times') config_fail_threshold = _common_config.getint('common', 'config.fail.threshold') + tuning_strategies = json.loads(_common_config.get('common', 'tuning.strategy')) + backend_retest_queue = _common_config.get('common', 'backend.retest.queue') + # database页 + postgresql_database_name = _common_config.get('database', 'postgresql.database.name') + postgresql_database_user = _common_config.get('database', 'postgresql.database.user') + postgresql_database_password = _common_config.get('database', 'postgresql.database.password') + postgresql_database_host = _common_config.get('database', 'postgresql.database.host') + postgresql_database_port = _common_config.get('database', 'postgresql.database.port') + # spark页 spark_history_rest_url = _common_config.get('spark', 'spark.history.rest.url') - spark_history_username = _common_config.get('spark', 'spark.history.username', fallback='') - spark_history_password = _common_config.get('spark', 'spark.history.password', fallback='') + spark_history_username = _common_config.get('spark', 'spark.history.username') + spark_history_password = _common_config.get('spark', 'spark.history.password') spark_fetch_trace_timeout = _common_config.getint('spark', 'spark.fetch.trace.timeout') spark_fetch_trace_interval = _common_config.getint('spark', 'spark.fetch.trace.interval') spark_exec_timeout_ratio = _common_config.getfloat('spark', 'spark.exec.timeout.ratio') - tuning_strategies = json.loads(_common_config.get('common', 'tuning.strategy')) # 保留小数位数 decimal_digits = 3 diff --git a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py index 5da174b1a2ada9c482ddc511ab9385f9d86fa517..652eb35182747f47ff70e13b35ea3c64da87f592 100644 --- a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py +++ b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py @@ -177,7 +177,7 @@ def hijack_recommend(argv: list): # 根据配置和负载执行Spark任务 global_logger.info("Going to execute Spark load ……") - exam_record, output = spark_run(load=load, config=exec_config, wait_for_trace=False) + exam_record, output = spark_run(load=load, config=exec_config, wait_for_trace=False, exec_in_isolation=False) # 执行结果分析 if exam_record.status == OA_CONF.ExecStatus.success: # 打印结果输出 diff --git a/omniadvisor/src/omniadvisor/repository/__init__.py b/omniadvisor/src/omniadvisor/repository/__init__.py index 781bf154632a780bf8e976210af690ac8479171e..8116a638a1b8360451ab861e9476b19304fa730e 100644 --- a/omniadvisor/src/omniadvisor/repository/__init__.py +++ b/omniadvisor/src/omniadvisor/repository/__init__.py @@ -1,18 +1,11 @@ import django from django.conf import settings -from common.constant import OA_CONF +from server.engine.settings import DATABASES if not settings.configured: - # 启用内存数据库,确保不影响原来的数据库 - DATABASES = { - 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': OA_CONF.server_base_dir / 'db.sqlite3', - } - } - + # 配置Django数据库信息 settings.configure( DATABASES=DATABASES, DEBUG=False, diff --git a/omniadvisor/src/omniadvisor/service/retest_service.py b/omniadvisor/src/omniadvisor/service/retest_service.py index c06bbb642ef931d842b0ce616e757de30a75136f..adf7f5708a63f9c8e95a30bf1b1fbec13e76f896 100644 --- a/omniadvisor/src/omniadvisor/service/retest_service.py +++ b/omniadvisor/src/omniadvisor/service/retest_service.py @@ -17,7 +17,7 @@ def retest(load: Load, config: dict): global_logger.debug('Starting retest config...') for i in range(1, OA_CONF.tuning_retest_times + 1): try: - exam_record, spark_output = spark_run(load=load, config=config, wait_for_trace=True) + exam_record, spark_output = spark_run(load=load, config=config, wait_for_trace=True, exec_in_isolation=True) except Exception as e: global_logger.error( 'Retest failed in round %d. Exception source: Non-Spark exception. Details: %s.', i, e diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index ded0683b9620d7bd3a23ac8d9d3e0b2762b88be0..dcf8534b4fb04a7a7aa3239018a77be03cedf158 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -24,7 +24,7 @@ _RETURN_CODE_MAP = { } -def spark_run(load: Load, config: dict, wait_for_trace: bool = False): +def spark_run(load: Load, config: dict, wait_for_trace: bool = True, exec_in_isolation: bool = False): """ 输入负载与配置,执行Spark任务 并从Spark命令的返回值中获取 生成一条记录本次执行信息的exam_record @@ -33,10 +33,17 @@ def spark_run(load: Load, config: dict, wait_for_trace: bool = False): :param load: 负载 :param config: 参数配置 :param wait_for_trace: 是否阻塞等待获取trace + :param exec_in_isolation: 是否在隔离队列中执行 :return: """ + # 是否要在隔离中执行,若在隔离中执行,则将队列改为隔离队列 + if exec_in_isolation and OA_CONF.backend_retest_queue: + exec_attr = load.exec_attr + exec_attr['queue'] = OA_CONF.backend_retest_queue + else: + exec_attr = load.exec_attr # 从解析后的参数列表中提取负载与任务的相关信息 - submit_cmd = SparkCMDParser.reconstruct_cmd(exec_attr=load.exec_attr, conf_params=config) + submit_cmd = SparkCMDParser.reconstruct_cmd(exec_attr=exec_attr, conf_params=config) # 判断当前执行配置是否为租户基线配置,若不相同则需设置超时时间 if config != load.default_config: diff --git a/omniadvisor/src/server/app/models.py b/omniadvisor/src/server/app/models.py index 49f59be28678891af050d3b897ecbb49257f43be..49a6d00e491ed12ee30055f7da83fff2b414a799 100644 --- a/omniadvisor/src/server/app/models.py +++ b/omniadvisor/src/server/app/models.py @@ -40,7 +40,7 @@ class DatabaseTuningRecord(models.Model): null=False, choices=_TUNING_METHOD_CHOICES ) - method_extend = models.CharField(max_length=20, null=True) + method_extend = models.CharField(max_length=50, null=True) rounds = models.IntegerField(null=True) class Meta: diff --git a/omniadvisor/src/server/engine/settings.py b/omniadvisor/src/server/engine/settings.py index b0cf7bc78fd5e64a6fea82c22b5d7b8cb71487f1..ac376cfb2a617951a2c75b3f4cff3c21c915bbba 100644 --- a/omniadvisor/src/server/engine/settings.py +++ b/omniadvisor/src/server/engine/settings.py @@ -12,6 +12,8 @@ https://docs.djangoproject.com/en/4.2/ref/settings/ from pathlib import Path +from common.constant import OA_CONF + # Build paths inside the project like this: BASE_DIR / 'subdir'. BASE_DIR = Path(__file__).resolve().parent.parent @@ -76,8 +78,12 @@ WSGI_APPLICATION = 'server.engine.wsgi.application' DATABASES = { 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': BASE_DIR / 'db.sqlite3' + 'ENGINE': 'django.db.backends.postgresql', + 'NAME': OA_CONF.postgresql_database_name, + 'USER': OA_CONF.postgresql_database_user, + 'PASSWORD': OA_CONF.postgresql_database_password, + 'HOST': OA_CONF.postgresql_database_host, + 'PORT': OA_CONF.postgresql_database_port, } } diff --git a/omniadvisor/tests/omniadvisor/__init__.py b/omniadvisor/tests/omniadvisor/__init__.py index 4e4cfa66f9809a201d030c4f8a90014de8bca24e..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/omniadvisor/tests/omniadvisor/__init__.py +++ b/omniadvisor/tests/omniadvisor/__init__.py @@ -1,19 +0,0 @@ -import django -from django.conf import settings - -if not settings.configured: - # 启用内存数据库,确保不影响原来的数据库 - DATABASES = { - 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': ':memory:', - } - } - - settings.configure( - DATABASES=DATABASES, - DEBUG=False, - INSTALLED_APPS=['server'], - ) - # 初始化 Django 环境 - django.setup() diff --git a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py index fa03b8ea506bc4bc223e1dfbdfbd35b700f8c51b..f983beaa7134dcc19466eafa4a455ced69044946 100644 --- a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py +++ b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py @@ -31,7 +31,7 @@ class TestHijackRecommend: mock_spark_run.return_value = (MagicMock(status="success"), "job output") hijack_recommend(argv) - mock_spark_run.assert_called_once_with(load=load, config=exec_config, wait_for_trace=False) + mock_spark_run.assert_called_once_with(load=load, config=exec_config, wait_for_trace=False, exec_in_isolation=False) mock_process_config.assert_not_called() # 场景 2: 执行失败 + 进入安全机制 diff --git a/omniadvisor/tests/omniadvisor/repository/__init__.py b/omniadvisor/tests/omniadvisor/repository/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..868a2464865117a21a9e7e14a83d6a1a4f254787 100644 --- a/omniadvisor/tests/omniadvisor/repository/__init__.py +++ b/omniadvisor/tests/omniadvisor/repository/__init__.py @@ -0,0 +1,19 @@ +import django +from django.conf import settings + +if not settings.configured: + # 启用内存数据库,确保不影响原来的数据库 + DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.sqlite3', + 'NAME': ':memory:', + } + } + + settings.configure( + DATABASES=DATABASES, + DEBUG=False, + INSTALLED_APPS=['server'], + ) + # 初始化 Django 环境 + django.setup() \ No newline at end of file diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_run.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_run.py index b983bb87b364460b5efb1fc137396f2a438d67b3..0c26a2ad8d0e2c0af75f1dfd0bad3dab2ba27a65 100644 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_run.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_run.py @@ -46,6 +46,7 @@ class TestSparkRun: ] self.mock_response.text = json.dumps(response) + @patch('omniadvisor.service.spark_service.spark_run.SparkFetcher.get_spark_runtime_by_app') @patch('omniadvisor.service.spark_service.spark_run.SparkCMDParser.reconstruct_cmd') @patch('omniadvisor.service.spark_service.spark_run.ExamRecordRepository.create') @patch('omniadvisor.service.spark_service.spark_run.SparkExecutor.submit_spark_task') @@ -53,7 +54,7 @@ class TestSparkRun: @patch('omniadvisor.service.spark_service.spark_run.multiprocessing.Process') @patch('requests.get') def test_spark_run_success(self, mock_requests_get, mock_multiprocess, mock_save_trace_data, mock_submit_spark_task, - mock_create_exam_record, mock_reconstruct_cmd): + mock_create_exam_record, mock_reconstruct_cmd, mock_get_spark_runtime): """ 测试 spark_run 方法 """ @@ -71,10 +72,11 @@ class TestSparkRun: mock_submit_spark_task.return_value = self.mock_exec_result # 配置save_trace_data mock对象 mock_save_trace_data.return_value = f'{OA_CONF.data_dir}/testfile' + # 配置get_spark_runtime_by_app + mock_get_spark_runtime.return_value = 20.4 # 方法调用 - result_exam_record, spark_output = spark_run(self.mock_load, self.conf) - + result_exam_record, spark_output = spark_run(self.mock_load, self.conf, wait_for_trace=False, exec_in_isolation=False) # 结果验证 assert result_exam_record == mock_exam_record assert spark_output == self.mock_exec_result.output