From 486cf7c0ae4618ac9c7310d7c84cada85cbbb2e8 Mon Sep 17 00:00:00 2001 From: wtingkai <330445001@qq.com> Date: Thu, 7 Aug 2025 17:09:51 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(OmniAdvisor):=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=E5=90=8E=E5=8F=B0=E8=B0=83=E4=BC=98=E9=9A=94=E7=A6=BB=E9=98=9F?= =?UTF-8?q?=E5=88=97=E6=89=A7=E8=A1=8C=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/config/common_config.cfg | 4 +++- omniadvisor/src/common/constant.py | 3 ++- .../src/omniadvisor/interface/hijack_recommend.py | 2 +- omniadvisor/src/omniadvisor/service/retest_service.py | 2 +- .../omniadvisor/service/spark_service/spark_run.py | 11 +++++++++-- .../omniadvisor/interface/test_hijack_recommend.py | 2 +- .../service/spark_service/test_spark_run.py | 8 +++++--- 7 files changed, 22 insertions(+), 10 deletions(-) diff --git a/omniadvisor/config/common_config.cfg b/omniadvisor/config/common_config.cfg index 4248a8900f..36a941bfea 100755 --- a/omniadvisor/config/common_config.cfg +++ b/omniadvisor/config/common_config.cfg @@ -3,8 +3,10 @@ tuning.retest.times=1 # 评估配置失效的阈值,当配置执行失败次数大于等于此值,则配置失效 config.fail.threshold = 1 -#调优策略 +# 调优策略 tuning.strategy=[["transfer", 1],["expert", 2],["iterative", 10]] +# 后台复测使用的队列,若不设置,则保持用户队列 +backend.retest.queue= [spark] # Spark History Server的URL 仅用于Rest模式 diff --git a/omniadvisor/src/common/constant.py b/omniadvisor/src/common/constant.py index 8a75091fbd..fde0ee5cba 100644 --- a/omniadvisor/src/common/constant.py +++ b/omniadvisor/src/common/constant.py @@ -108,13 +108,14 @@ class OmniAdvisorConf: # 配置罗列 tuning_retest_times = _common_config.getint('common', 'tuning.retest.times') config_fail_threshold = _common_config.getint('common', 'config.fail.threshold') + tuning_strategies = json.loads(_common_config.get('common', 'tuning.strategy')) + backend_retest_queue = _common_config.get('common', 'backend.retest.queue', fallback='') spark_history_rest_url = _common_config.get('spark', 'spark.history.rest.url') spark_history_username = _common_config.get('spark', 'spark.history.username', fallback='') spark_history_password = _common_config.get('spark', 'spark.history.password', fallback='') spark_fetch_trace_timeout = _common_config.getint('spark', 'spark.fetch.trace.timeout') spark_fetch_trace_interval = _common_config.getint('spark', 'spark.fetch.trace.interval') spark_exec_timeout_ratio = _common_config.getfloat('spark', 'spark.exec.timeout.ratio') - tuning_strategies = json.loads(_common_config.get('common', 'tuning.strategy')) # 保留小数位数 decimal_digits = 3 diff --git a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py index 5da174b1a2..652eb35182 100644 --- a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py +++ b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py @@ -177,7 +177,7 @@ def hijack_recommend(argv: list): # 根据配置和负载执行Spark任务 global_logger.info("Going to execute Spark load ……") - exam_record, output = spark_run(load=load, config=exec_config, wait_for_trace=False) + exam_record, output = spark_run(load=load, config=exec_config, wait_for_trace=False, exec_in_isolation=False) # 执行结果分析 if exam_record.status == OA_CONF.ExecStatus.success: # 打印结果输出 diff --git a/omniadvisor/src/omniadvisor/service/retest_service.py b/omniadvisor/src/omniadvisor/service/retest_service.py index c06bbb642e..adf7f5708a 100644 --- a/omniadvisor/src/omniadvisor/service/retest_service.py +++ b/omniadvisor/src/omniadvisor/service/retest_service.py @@ -17,7 +17,7 @@ def retest(load: Load, config: dict): global_logger.debug('Starting retest config...') for i in range(1, OA_CONF.tuning_retest_times + 1): try: - exam_record, spark_output = spark_run(load=load, config=config, wait_for_trace=True) + exam_record, spark_output = spark_run(load=load, config=config, wait_for_trace=True, exec_in_isolation=True) except Exception as e: global_logger.error( 'Retest failed in round %d. Exception source: Non-Spark exception. Details: %s.', i, e diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index ded0683b96..dcf8534b4f 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -24,7 +24,7 @@ _RETURN_CODE_MAP = { } -def spark_run(load: Load, config: dict, wait_for_trace: bool = False): +def spark_run(load: Load, config: dict, wait_for_trace: bool = True, exec_in_isolation: bool = False): """ 输入负载与配置,执行Spark任务 并从Spark命令的返回值中获取 生成一条记录本次执行信息的exam_record @@ -33,10 +33,17 @@ def spark_run(load: Load, config: dict, wait_for_trace: bool = False): :param load: 负载 :param config: 参数配置 :param wait_for_trace: 是否阻塞等待获取trace + :param exec_in_isolation: 是否在隔离队列中执行 :return: """ + # 是否要在隔离中执行,若在隔离中执行,则将队列改为隔离队列 + if exec_in_isolation and OA_CONF.backend_retest_queue: + exec_attr = load.exec_attr + exec_attr['queue'] = OA_CONF.backend_retest_queue + else: + exec_attr = load.exec_attr # 从解析后的参数列表中提取负载与任务的相关信息 - submit_cmd = SparkCMDParser.reconstruct_cmd(exec_attr=load.exec_attr, conf_params=config) + submit_cmd = SparkCMDParser.reconstruct_cmd(exec_attr=exec_attr, conf_params=config) # 判断当前执行配置是否为租户基线配置,若不相同则需设置超时时间 if config != load.default_config: diff --git a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py index fa03b8ea50..f983beaa71 100644 --- a/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py +++ b/omniadvisor/tests/omniadvisor/interface/test_hijack_recommend.py @@ -31,7 +31,7 @@ class TestHijackRecommend: mock_spark_run.return_value = (MagicMock(status="success"), "job output") hijack_recommend(argv) - mock_spark_run.assert_called_once_with(load=load, config=exec_config, wait_for_trace=False) + mock_spark_run.assert_called_once_with(load=load, config=exec_config, wait_for_trace=False, exec_in_isolation=False) mock_process_config.assert_not_called() # 场景 2: 执行失败 + 进入安全机制 diff --git a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_run.py b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_run.py index b983bb87b3..0c26a2ad8d 100644 --- a/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_run.py +++ b/omniadvisor/tests/omniadvisor/service/spark_service/test_spark_run.py @@ -46,6 +46,7 @@ class TestSparkRun: ] self.mock_response.text = json.dumps(response) + @patch('omniadvisor.service.spark_service.spark_run.SparkFetcher.get_spark_runtime_by_app') @patch('omniadvisor.service.spark_service.spark_run.SparkCMDParser.reconstruct_cmd') @patch('omniadvisor.service.spark_service.spark_run.ExamRecordRepository.create') @patch('omniadvisor.service.spark_service.spark_run.SparkExecutor.submit_spark_task') @@ -53,7 +54,7 @@ class TestSparkRun: @patch('omniadvisor.service.spark_service.spark_run.multiprocessing.Process') @patch('requests.get') def test_spark_run_success(self, mock_requests_get, mock_multiprocess, mock_save_trace_data, mock_submit_spark_task, - mock_create_exam_record, mock_reconstruct_cmd): + mock_create_exam_record, mock_reconstruct_cmd, mock_get_spark_runtime): """ 测试 spark_run 方法 """ @@ -71,10 +72,11 @@ class TestSparkRun: mock_submit_spark_task.return_value = self.mock_exec_result # 配置save_trace_data mock对象 mock_save_trace_data.return_value = f'{OA_CONF.data_dir}/testfile' + # 配置get_spark_runtime_by_app + mock_get_spark_runtime.return_value = 20.4 # 方法调用 - result_exam_record, spark_output = spark_run(self.mock_load, self.conf) - + result_exam_record, spark_output = spark_run(self.mock_load, self.conf, wait_for_trace=False, exec_in_isolation=False) # 结果验证 assert result_exam_record == mock_exam_record assert spark_output == self.mock_exec_result.output -- Gitee From f768caa0016f5e1b6afc0de1a16388ce73000919 Mon Sep 17 00:00:00 2001 From: wtingkai <330445001@qq.com> Date: Fri, 8 Aug 2025 09:16:22 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat(OmniAdvisor):=20=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E5=90=8E=E7=AB=AF=E7=94=B1sqlite=E5=88=87=E6=8D=A2?= =?UTF-8?q?=E8=87=B3postgresql?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- omniadvisor/config/common_config.cfg | 12 ++++++++++++ omniadvisor/src/common/constant.py | 18 +++++++++++------- .../src/omniadvisor/repository/__init__.py | 11 ++--------- omniadvisor/src/server/app/models.py | 2 +- omniadvisor/src/server/engine/settings.py | 10 ++++++++-- omniadvisor/tests/omniadvisor/__init__.py | 19 ------------------- .../tests/omniadvisor/repository/__init__.py | 19 +++++++++++++++++++ 7 files changed, 53 insertions(+), 38 deletions(-) diff --git a/omniadvisor/config/common_config.cfg b/omniadvisor/config/common_config.cfg index 36a941bfea..10df3ac232 100755 --- a/omniadvisor/config/common_config.cfg +++ b/omniadvisor/config/common_config.cfg @@ -8,6 +8,18 @@ tuning.strategy=[["transfer", 1],["expert", 2],["iterative", 10]] # 后台复测使用的队列,若不设置,则保持用户队列 backend.retest.queue= +[database] +# 后端数据库postgresql的database名称 +postgresql.database.name= +# 后端数据库postgresql的用户 +postgresql.database.user= +# 后端数据库postgresql的用户登录密码 +postgresql.database.password= +# 后端数据库postgresql的连接host +postgresql.database.host= +# 后端数据库postgresql的连接端口 +postgresql.database.port= + [spark] # Spark History Server的URL 仅用于Rest模式 spark.history.rest.url=http://localhost:18080 diff --git a/omniadvisor/src/common/constant.py b/omniadvisor/src/common/constant.py index fde0ee5cba..a102580a19 100644 --- a/omniadvisor/src/common/constant.py +++ b/omniadvisor/src/common/constant.py @@ -2,8 +2,6 @@ import json import os import configparser -from server.engine.settings import BASE_DIR - def load_common_config(config_path: str): """ @@ -56,8 +54,6 @@ class OmniAdvisorConf: # OmniAdvisor根目录 project_base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - # Django Server根目录 - server_base_dir = BASE_DIR # Log文件目录 log_dir = f'{project_base_dir}/logs' log_file_path = f'{log_dir}/app.log' @@ -106,13 +102,21 @@ class OmniAdvisorConf: # 输入配置解析 _common_config = load_common_config(config_path=common_config_path) # 配置罗列 + # common页 tuning_retest_times = _common_config.getint('common', 'tuning.retest.times') config_fail_threshold = _common_config.getint('common', 'config.fail.threshold') tuning_strategies = json.loads(_common_config.get('common', 'tuning.strategy')) - backend_retest_queue = _common_config.get('common', 'backend.retest.queue', fallback='') + backend_retest_queue = _common_config.get('common', 'backend.retest.queue') + # database页 + postgresql_database_name = _common_config.get('database', 'postgresql.database.name') + postgresql_database_user = _common_config.get('database', 'postgresql.database.user') + postgresql_database_password = _common_config.get('database', 'postgresql.database.password') + postgresql_database_host = _common_config.get('database', 'postgresql.database.host') + postgresql_database_port = _common_config.get('database', 'postgresql.database.port') + # spark页 spark_history_rest_url = _common_config.get('spark', 'spark.history.rest.url') - spark_history_username = _common_config.get('spark', 'spark.history.username', fallback='') - spark_history_password = _common_config.get('spark', 'spark.history.password', fallback='') + spark_history_username = _common_config.get('spark', 'spark.history.username') + spark_history_password = _common_config.get('spark', 'spark.history.password') spark_fetch_trace_timeout = _common_config.getint('spark', 'spark.fetch.trace.timeout') spark_fetch_trace_interval = _common_config.getint('spark', 'spark.fetch.trace.interval') spark_exec_timeout_ratio = _common_config.getfloat('spark', 'spark.exec.timeout.ratio') diff --git a/omniadvisor/src/omniadvisor/repository/__init__.py b/omniadvisor/src/omniadvisor/repository/__init__.py index 781bf15463..8116a638a1 100644 --- a/omniadvisor/src/omniadvisor/repository/__init__.py +++ b/omniadvisor/src/omniadvisor/repository/__init__.py @@ -1,18 +1,11 @@ import django from django.conf import settings -from common.constant import OA_CONF +from server.engine.settings import DATABASES if not settings.configured: - # 启用内存数据库,确保不影响原来的数据库 - DATABASES = { - 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': OA_CONF.server_base_dir / 'db.sqlite3', - } - } - + # 配置Django数据库信息 settings.configure( DATABASES=DATABASES, DEBUG=False, diff --git a/omniadvisor/src/server/app/models.py b/omniadvisor/src/server/app/models.py index 49f59be286..49a6d00e49 100644 --- a/omniadvisor/src/server/app/models.py +++ b/omniadvisor/src/server/app/models.py @@ -40,7 +40,7 @@ class DatabaseTuningRecord(models.Model): null=False, choices=_TUNING_METHOD_CHOICES ) - method_extend = models.CharField(max_length=20, null=True) + method_extend = models.CharField(max_length=50, null=True) rounds = models.IntegerField(null=True) class Meta: diff --git a/omniadvisor/src/server/engine/settings.py b/omniadvisor/src/server/engine/settings.py index b0cf7bc78f..ac376cfb2a 100644 --- a/omniadvisor/src/server/engine/settings.py +++ b/omniadvisor/src/server/engine/settings.py @@ -12,6 +12,8 @@ https://docs.djangoproject.com/en/4.2/ref/settings/ from pathlib import Path +from common.constant import OA_CONF + # Build paths inside the project like this: BASE_DIR / 'subdir'. BASE_DIR = Path(__file__).resolve().parent.parent @@ -76,8 +78,12 @@ WSGI_APPLICATION = 'server.engine.wsgi.application' DATABASES = { 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': BASE_DIR / 'db.sqlite3' + 'ENGINE': 'django.db.backends.postgresql', + 'NAME': OA_CONF.postgresql_database_name, + 'USER': OA_CONF.postgresql_database_user, + 'PASSWORD': OA_CONF.postgresql_database_password, + 'HOST': OA_CONF.postgresql_database_host, + 'PORT': OA_CONF.postgresql_database_port, } } diff --git a/omniadvisor/tests/omniadvisor/__init__.py b/omniadvisor/tests/omniadvisor/__init__.py index 4e4cfa66f9..e69de29bb2 100644 --- a/omniadvisor/tests/omniadvisor/__init__.py +++ b/omniadvisor/tests/omniadvisor/__init__.py @@ -1,19 +0,0 @@ -import django -from django.conf import settings - -if not settings.configured: - # 启用内存数据库,确保不影响原来的数据库 - DATABASES = { - 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': ':memory:', - } - } - - settings.configure( - DATABASES=DATABASES, - DEBUG=False, - INSTALLED_APPS=['server'], - ) - # 初始化 Django 环境 - django.setup() diff --git a/omniadvisor/tests/omniadvisor/repository/__init__.py b/omniadvisor/tests/omniadvisor/repository/__init__.py index e69de29bb2..868a246486 100644 --- a/omniadvisor/tests/omniadvisor/repository/__init__.py +++ b/omniadvisor/tests/omniadvisor/repository/__init__.py @@ -0,0 +1,19 @@ +import django +from django.conf import settings + +if not settings.configured: + # 启用内存数据库,确保不影响原来的数据库 + DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.sqlite3', + 'NAME': ':memory:', + } + } + + settings.configure( + DATABASES=DATABASES, + DEBUG=False, + INSTALLED_APPS=['server'], + ) + # 初始化 Django 环境 + django.setup() \ No newline at end of file -- Gitee