diff --git a/omniadvisor/src/django_server/app/__init__.py b/omniadvisor/src/common/__init__.py similarity index 100% rename from omniadvisor/src/django_server/app/__init__.py rename to omniadvisor/src/common/__init__.py diff --git a/omniadvisor/src/omniadvisor/utils/constant.py b/omniadvisor/src/common/constant.py similarity index 100% rename from omniadvisor/src/omniadvisor/utils/constant.py rename to omniadvisor/src/common/constant.py diff --git a/omniadvisor/src/django_server/app/migrations/__init__.py b/omniadvisor/src/hijack.py similarity index 100% rename from omniadvisor/src/django_server/app/migrations/__init__.py rename to omniadvisor/src/hijack.py diff --git a/omniadvisor/src/django_server/engine/__init__.py b/omniadvisor/src/init.py similarity index 100% rename from omniadvisor/src/django_server/engine/__init__.py rename to omniadvisor/src/init.py diff --git a/omniadvisor/src/omniadvisor/interface/__init__.py b/omniadvisor/src/omniadvisor/__init__.py similarity index 100% rename from omniadvisor/src/omniadvisor/interface/__init__.py rename to omniadvisor/src/omniadvisor/__init__.py diff --git a/omniadvisor/src/omniadvisor/interface/realtime_recommend.py b/omniadvisor/src/omniadvisor/interface/realtime_recommend.py new file mode 100644 index 0000000000000000000000000000000000000000..a9f27498247261735196cdbc7fb09747a15992ba --- /dev/null +++ b/omniadvisor/src/omniadvisor/interface/realtime_recommend.py @@ -0,0 +1,47 @@ +from omniadvisor.service.spark_service.spark_parameter_parser import SparkParameterParser +from omniadvisor.repository.load_repository import LoadRepository +from omniadvisor.utils.logger import logger +from omniadvisor.service.spark_service.spark_run import spark_run +from common.constant import OA_CONF + + +def realtime_recommend(spark_sql_cmd): + # 获取用户传入的Spark命令 并解析命令 + spark_parameter_parser = SparkParameterParser(spark_sql_cmd) + parsed_parameter = spark_parameter_parser.process_parameter() + + # 提取名字 执行参数 租户默认配置 + name = parsed_parameter.get("name") + exec_attr = parsed_parameter.get("exec_attr") + user_config = parsed_parameter.get("default_config") + + # 从负载数据库中查询是否有相关联的Load + load = LoadRepository.query(name=name, default_config=user_config) + config_for_test = user_config + if not load: # 查询失败 创建load的同时使用租户默认配置进行测试 + load = LoadRepository.create(name=name, exec_attr=exec_attr, default_config=user_config) + config_for_test = user_config + elif load.tuning_needed: # 查询成功且需要优化 + if load.test_config: # 有待测试配配置优先使用待测试配置 test_config为空时返回None + config_for_test = load.test_config + elif load.best_config: # 否则使用推荐配置 + config_for_test = load.best_config + + if not config_for_test: + logger.error("Failed to acquire an available execution configuration.") + raise ValueError("Failed to acquire an available execution configuration.") + if not load: + logger.error("Failed to acquire an available load") + raise ValueError("Failed to acquire an available load") + + # 根据配置和负载执行Spark任务 + task = spark_run(load, config_for_test) + + # 执行结果分析 若执行失败则调度用户默认配置重新拉起任务 + if task.status is not OA_CONF.TaskStatus.success and config_for_test != user_config: + task = spark_run(load, user_config) + + +def main(argv): + spark_sql_cmd = argv + realtime_recommend(spark_sql_cmd) diff --git a/omniadvisor/src/omniadvisor/repository/load_repository.py b/omniadvisor/src/omniadvisor/repository/load_repository.py index e1b3069069109b92ec0d8004db48f054167cec57..f455d083cd2fb2e5e5e5159c83618d94a6871ad1 100644 --- a/omniadvisor/src/omniadvisor/repository/load_repository.py +++ b/omniadvisor/src/omniadvisor/repository/load_repository.py @@ -101,3 +101,20 @@ class LoadRepository(Repository): model_attr=model_attr ) return Load(database_model=database_load) + + @classmethod + def update_tuning_needed(cls, load:Load, tuning_needed: bool): + """ + 更新当前负载是否需要调优这一信息 + :param load: 负载实例 + :param tuning_needed: 是否需要调优 布尔型变量 + :return: 更新完之后的负载实例 + """ + model_attr = { + Load.FieldName.tuning_needed: tuning_needed + } + database_load = cls._update( + database_model=load.database_model, + model_attr=model_attr + ) + return Load(database_model=database_load) \ No newline at end of file diff --git a/omniadvisor/src/omniadvisor/repository/model/load.py b/omniadvisor/src/omniadvisor/repository/model/load.py index afe559ba56e71670b4ef9d4fa63a774f740bbbb6..847228b5020e74167b658543754c28eae527c3ee 100644 --- a/omniadvisor/src/omniadvisor/repository/model/load.py +++ b/omniadvisor/src/omniadvisor/repository/model/load.py @@ -41,6 +41,10 @@ class Load: def create_time(self): return self._database_model.create_time + @property + def tuning_needed(self): + return self._database_model.tuning_needed + @property def database_model(self): return self._database_model @@ -53,3 +57,4 @@ class Load: best_config = 'best_config' test_config = 'test_config' create_time = 'create_time' + tuning_needed = 'tuning_needed' diff --git a/omniadvisor/src/omniadvisor/repository/model/perf_history.py b/omniadvisor/src/omniadvisor/repository/model/perf_history.py index 81ef765cbc4ba260ee1037fc9a40ec866b7ced45..2b27a3da2a76ded71bd5a1a645d8aca301f22078 100644 --- a/omniadvisor/src/omniadvisor/repository/model/perf_history.py +++ b/omniadvisor/src/omniadvisor/repository/model/perf_history.py @@ -1,7 +1,7 @@ import json from omniadvisor.repository.model.load import Load -from omniadvisor.utils.constant import OA_CONF +from common.constant import OA_CONF class PerfHistory: diff --git a/omniadvisor/src/omniadvisor/repository/task_repository.py b/omniadvisor/src/omniadvisor/repository/task_repository.py index f5d8074c2c38f22796539d9a89fce8d4d0c68609..82c38a85fd380be26d9d0a7793f592bea2b2c655 100644 --- a/omniadvisor/src/omniadvisor/repository/task_repository.py +++ b/omniadvisor/src/omniadvisor/repository/task_repository.py @@ -8,7 +8,7 @@ from omniadvisor.repository.model.load import Load from omniadvisor.repository.model.task import Task from omniadvisor.repository.model.perf_history import PerfHistory from omniadvisor.repository.repository import Repository -from omniadvisor.utils.constant import OA_CONF +from common.constant import OA_CONF class TaskRepository(Repository): diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_parameter_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_parameter_parser.py index bde123f24488f274ca686c40b64a9544c7bd6293..5d6512cb1914b8909b9437440eef626dde3c8854 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_parameter_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_parameter_parser.py @@ -2,7 +2,7 @@ import argparse import shlex from typing import Dict from omniadvisor.utils.logger import logger -from src.omniadvisor.utils.constant import OA_CONF +from common.constant import OA_CONF from omniadvisor.service.parameter_parser import ParserInterface @@ -60,7 +60,7 @@ class SparkParameterParser(ParserInterface): :return:dict 返回一个包含所有提取出的基础参数的字典,每个键代表一个参数名,详见add_argument中的配置的参数。 对应的值为该参数的具体设置或值。 - base_params = {name:"", conf_params:{}, exec_attr:{}} + base_params = {"name":"", "conf_params":{}, "exec_attr":{}} """ base_params = self._get_base_params() return base_params @@ -68,12 +68,12 @@ class SparkParameterParser(ParserInterface): def _get_base_params(self): """ 使用argparse来解析命令行中的输入命令,并返回解析后的非空参数,以字典类型保存 - :return: + :return: {"name": "", "conf_params": {}, "exec_attr": {}} 结构的字典 """ if not self._submit_cmd: raise ValueError("Submit command cannot be null") param_start_index = self._submit_cmd.strip().index(" ") - base_params = {} + base_params = {"name": "", "conf_params": {}, "exec_attr": {}} conf_params = {} exec_attr = {} args = shlex.split(self._submit_cmd[param_start_index:]) @@ -110,3 +110,59 @@ class SparkParameterParser(ParserInterface): base_params["conf_params"] = conf_params return base_params + + +if __name__ == "__main__": + submit_cmd = "spark-sql --master yarn --deploy-mode client \ + --name ${name}_advisor_${i} \ + --driver-cores 2 \ + --driver-memory 8G \ + --conf spark.sql.shuffle.partitions=3200 \ + --conf spark.sql.result.partitions=3200 \ + --conf spark.executor.cores=4 \ + --conf spark.executor.memory=28G \ + --conf spark.executor.memoryOverhead=2G \ + --conf spark.memory.storageFraction=0.5 \ + --conf spark.memory.fraction=0.6 \ + --num-executors 400 \ + --conf spark.shuffle.service.enabled=false \ + --conf spark.shuffle.io.maxRetries=6 \ + --conf spark.shuffle.io.retryWait=60 \ + --conf spark.shuffle.useOldFetchProtocol=false \ + --conf spark.shuffle.io.serverThreads=32 \ + --conf spark.sql.hive.convertMetastoreParquet=true \ + --conf spark.shuffle.file.buffer=1024k \ + --conf spark.reducer.maxSizeInFlight=96M \ + --conf spark.reducer.maxReqsInFlight=10 \ + --conf spark.network.timeout=300s \ + --conf spark.rpc.askTimeout=300s \ + --conf spark.sql.adaptive.enabled=true \ + --conf spark.sql.adaptive.coalescePartitions.enabled=true \ + --conf spark.sql.adaptive.coalescePartitions.minPartitionNum=100 \ + --conf spark.sql.adaptive.advisoryPartitionSizeInBytes=64M \ + --conf spark.sql.adaptive.fetchShuffleBlocksInBatch=true \ + --conf spark.sql.adaptive.localShuffleReader.enabled=true \ + --conf spark.sql.adaptive.skewJoin.enabled=true \ + --conf spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 \ + --conf spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256mb \ + --conf spark.sql.inMemoryColumnarStorage.compressed=true \ + --conf spark.sql.sources.partitionColumnTypeInference.enabled=false \ + --conf spark.sql.storeAssignmentPolicy=LEGACY \ + --conf spark.sql.legacy.timeParserPolicy=LEGACY \ + --conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 \ + --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \ + --conf spark.dynamicAllocation.shuffleTracking.enabled=true \ + --conf spark.speculation=true \ + --conf spark.speculation.multiplier=5 \ + --conf spark.speculation.quantile=0.99 \ + --conf spark.speculation.minTaskRuntime=3000ms \ + --conf spark.sql.autoBroadcastJoinThreshold=64mb \ + --conf spark.sql.adaptive.coalescePartitions.minPartitionSize=32mb \ + --conf spark.sql.adaptive.coalescePartitions.parallelismFirst=false \ + --conf spark.sql.parquet.writeLegacyFormat=true \ + --conf spark.driver.maxResultSize=40g \ + -f $file" + + spark_parameter_parser = SparkParameterParser(submit_cmd) + parsed_parameter = spark_parameter_parser.process_parameter() + print("") \ No newline at end of file diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py index 317888d37b9e1b6d51946baa070373e4f159e777..37efa9fd6f85edb43a005596ec7943d8fe628c12 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_run.py @@ -2,8 +2,9 @@ from omniadvisor.service.spark_service.spark_fetcher import SparkFetcher from omniadvisor.service.spark_service.spark_executor import SparkExecutor from omniadvisor.repository.task_repository import TaskRepository from omniadvisor.service.spark_service.spark_command_reconstruct import spark_command_reconstruct -from omniadvisor.utils.constant import OA_CONF +from common.constant import OA_CONF from omniadvisor.utils.utils import trace_data_saver +from omniadvisor.utils.logger import logger def spark_run(load, conf): @@ -17,6 +18,14 @@ def spark_run(load, conf): spark_executor = SparkExecutor() exitcode, spark_output, result_dict = spark_executor.submit_spark_task(submit_cmd) + # trace与time_taken(runtime)刷新进入task变量 + runtime = result_dict.get("time_taken") + if exitcode == 0: + status = OA_CONF.TaskStatus.success + logger.info(spark_output) + else: + status = OA_CONF.TaskStatus.fail + # 根据ApplicantID获取Trace trace_dict = {} application_id = result_dict.get('application_id') @@ -26,13 +35,6 @@ def spark_run(load, conf): trace_dict['stages'] = trace_data_saver(spark_fetcher.get_spark_stages_by_app(application_id), OA_CONF.DATA_DIR) trace_dict['executor'] = trace_data_saver(spark_fetcher.get_spark_executor_by_app(application_id), OA_CONF.DATA_DIR) - # trace与time_taken(runtime)刷新进入task变量 - runtime = result_dict.get("time_taken") - if exitcode == 0: - status = OA_CONF.TaskStatus.success - else: - status = OA_CONF.TaskStatus.fail - TaskRepository.update_task_result(task, status, runtime, trace_dict) return task diff --git a/omniadvisor/src/omniadvisor/utils/logger.py b/omniadvisor/src/omniadvisor/utils/logger.py index 516ce87ddedef0ec61c8eb11ab563d14eb652c5e..45f79292aa46438fdc4f3e74f4c4b2f71b9ecc48 100755 --- a/omniadvisor/src/omniadvisor/utils/logger.py +++ b/omniadvisor/src/omniadvisor/utils/logger.py @@ -1,7 +1,7 @@ import os import logging from logging.config import dictConfig -from omniadvisor.utils.constant import OA_CONF +from common.constant import OA_CONF LOGGING_CONFIG = { 'version': 1, diff --git a/omniadvisor/src/server/__init__.py b/omniadvisor/src/server/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/omniadvisor/src/server/app/__init__.py b/omniadvisor/src/server/app/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/omniadvisor/src/django_server/app/admin.py b/omniadvisor/src/server/app/admin.py similarity index 100% rename from omniadvisor/src/django_server/app/admin.py rename to omniadvisor/src/server/app/admin.py diff --git a/omniadvisor/src/django_server/app/apps.py b/omniadvisor/src/server/app/apps.py similarity index 100% rename from omniadvisor/src/django_server/app/apps.py rename to omniadvisor/src/server/app/apps.py diff --git a/omniadvisor/src/server/app/migrations/__init__.py b/omniadvisor/src/server/app/migrations/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/omniadvisor/src/django_server/app/models.py b/omniadvisor/src/server/app/models.py similarity index 94% rename from omniadvisor/src/django_server/app/models.py rename to omniadvisor/src/server/app/models.py index 90b198c23528cbfb7ec1348cdfb51f0cc4bace78..05e053e5f2a4ab9ef26666dcaadb204a19c50bce 100644 --- a/omniadvisor/src/django_server/app/models.py +++ b/omniadvisor/src/server/app/models.py @@ -1,7 +1,7 @@ # Create your models here. from django.db import models -from omniadvisor.utils.constant import OA_CONF +from common.constant import OA_CONF class DatabaseLoad(models.Model): @@ -15,6 +15,7 @@ class DatabaseLoad(models.Model): best_config = models.JSONField(null=True) test_config = models.JSONField(null=True) create_time = models.DateTimeField(auto_now_add=True) + tuning_needed = models.BooleanField(default=False) class Meta: db_table = 'omniadvisor_load' # 自定义表名 diff --git a/omniadvisor/src/django_server/app/views.py b/omniadvisor/src/server/app/views.py similarity index 100% rename from omniadvisor/src/django_server/app/views.py rename to omniadvisor/src/server/app/views.py diff --git a/omniadvisor/src/server/engine/__init__.py b/omniadvisor/src/server/engine/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/omniadvisor/src/django_server/engine/asgi.py b/omniadvisor/src/server/engine/asgi.py similarity index 100% rename from omniadvisor/src/django_server/engine/asgi.py rename to omniadvisor/src/server/engine/asgi.py diff --git a/omniadvisor/src/django_server/engine/settings.py b/omniadvisor/src/server/engine/settings.py similarity index 100% rename from omniadvisor/src/django_server/engine/settings.py rename to omniadvisor/src/server/engine/settings.py diff --git a/omniadvisor/src/django_server/engine/urls.py b/omniadvisor/src/server/engine/urls.py similarity index 100% rename from omniadvisor/src/django_server/engine/urls.py rename to omniadvisor/src/server/engine/urls.py diff --git a/omniadvisor/src/django_server/engine/wsgi.py b/omniadvisor/src/server/engine/wsgi.py similarity index 100% rename from omniadvisor/src/django_server/engine/wsgi.py rename to omniadvisor/src/server/engine/wsgi.py diff --git a/omniadvisor/src/django_server/manage.py b/omniadvisor/src/server/manage.py similarity index 100% rename from omniadvisor/src/django_server/manage.py rename to omniadvisor/src/server/manage.py diff --git a/omniadvisor/src/tuning.py b/omniadvisor/src/tuning.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/omniadvisor/tests/ut/omniadvisor/repository/__init__.py b/omniadvisor/tests/ut/omniadvisor/repository/__init__.py index d55dcdc5165315cd3a4a37161ad7b922178846b4..cb0ff29b51c90d1cf45d9fd3e79641ac8b3c3511 100644 --- a/omniadvisor/tests/ut/omniadvisor/repository/__init__.py +++ b/omniadvisor/tests/ut/omniadvisor/repository/__init__.py @@ -14,7 +14,7 @@ if not settings.configured: settings.configure( DATABASES=DATABASES, DEBUG=False, - INSTALLED_APPS=['django_server'], + INSTALLED_APPS=['server'], ) # 初始化 Django 环境 django.setup() diff --git a/omniadvisor/tests/ut/omniadvisor/repository/model/test_perf_history.py b/omniadvisor/tests/ut/omniadvisor/repository/model/test_perf_history.py index 120bd2be1b06e9ab3f002eaec2d69dc0db8f9e1f..cb566826767536df27e60d5d3920593afbb19954 100644 --- a/omniadvisor/tests/ut/omniadvisor/repository/model/test_perf_history.py +++ b/omniadvisor/tests/ut/omniadvisor/repository/model/test_perf_history.py @@ -1,6 +1,4 @@ -import pytest - -from omniadvisor.utils.constant import OA_CONF +from common.constant import OA_CONF from django_server.app.models import DatabaseLoad from omniadvisor.repository.model.load import Load from omniadvisor.repository.model.perf_history import PerfRecord diff --git a/omniadvisor/tests/ut/omniadvisor/repository/test_load_repository.py b/omniadvisor/tests/ut/omniadvisor/repository/test_load_repository.py index 5f54a746864d637e5a2075c0261b610fbbcc614b..9e41b185a27bc3862d8389c2d2f7727279b71ad7 100644 --- a/omniadvisor/tests/ut/omniadvisor/repository/test_load_repository.py +++ b/omniadvisor/tests/ut/omniadvisor/repository/test_load_repository.py @@ -102,3 +102,19 @@ class TestLoadRepository: test_config={"param2": "value2"} ) assert updated_load.test_config == {"param2": "value2"} + + @pytest.mark.django_db(transaction=True) + def test_update_tuning_needed(self): + # 创建测试数据 + load = LoadRepository.create( + name='test', + exec_attr={'cmd': 'spark-sql -f test.sql'}, + default_config={"param1": "value2"} + ) + + # 更新测试配置 + updated_load = LoadRepository.update_test_config( + load=load, + test_config={"is_need_tuning": True} + ) + assert updated_load.test_config == {"is_need_tuning": True} diff --git a/omniadvisor/tests/ut/omniadvisor/repository/test_task_repository.py b/omniadvisor/tests/ut/omniadvisor/repository/test_task_repository.py index 2211ec8b8dbf0dde05bcb5fd39cc24366a01f9f8..7a4ec0954d76f236a8bd67aed1f25ad0a9ad379c 100644 --- a/omniadvisor/tests/ut/omniadvisor/repository/test_task_repository.py +++ b/omniadvisor/tests/ut/omniadvisor/repository/test_task_repository.py @@ -2,7 +2,7 @@ import pytest from .common import create_test_table from .common import delete_test_table -from omniadvisor.utils.constant import OA_CONF +from common.constant import OA_CONF from django_server.app.models import DatabaseLoad from django_server.app.models import DatabaseTask from omniadvisor.repository.model.task import Task diff --git a/omniadvisor/tests/ut/omniadvisor/service/spark_service/__init__.py b/omniadvisor/tests/ut/omniadvisor/service/spark_service/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..cb0ff29b51c90d1cf45d9fd3e79641ac8b3c3511 --- /dev/null +++ b/omniadvisor/tests/ut/omniadvisor/service/spark_service/__init__.py @@ -0,0 +1,20 @@ +import django +from django.conf import settings + + +if not settings.configured: + # 启用内存数据库,确保不影响原来的数据库 + DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.sqlite3', + 'NAME': ':memory:', + } + } + + settings.configure( + DATABASES=DATABASES, + DEBUG=False, + INSTALLED_APPS=['server'], + ) + # 初始化 Django 环境 + django.setup() diff --git a/omniadvisor/tests/ut/omniadvisor/service/spark_service/test_spark_fetcher.py b/omniadvisor/tests/ut/omniadvisor/service/spark_service/test_spark_fetcher.py index 818d7c569e39f82a6aa6a9ac4748aca554875dea..01ca4050ae50f9ae0a36df90dd4ce0033979f3e5 100755 --- a/omniadvisor/tests/ut/omniadvisor/service/spark_service/test_spark_fetcher.py +++ b/omniadvisor/tests/ut/omniadvisor/service/spark_service/test_spark_fetcher.py @@ -3,7 +3,7 @@ import pytest import requests from unittest import mock from omniadvisor.service.spark_service.spark_fetcher import SparkFetcher -from src.omniadvisor.utils.constant import OA_CONF +from common.constant import OA_CONF history_server_url = OA_CONF.COMMON_CONFIG.get("sparkfetcher", "spark.history.rest.url") diff --git a/omniadvisor/tests/ut/omniadvisor/service/spark_service/test_spark_run.py b/omniadvisor/tests/ut/omniadvisor/service/spark_service/test_spark_run.py index a529a9e241b18dbac9164a53a3006da01eb56356..2d0bb587fe58461c1260f7ff889799d07f4ca734 100644 --- a/omniadvisor/tests/ut/omniadvisor/service/spark_service/test_spark_run.py +++ b/omniadvisor/tests/ut/omniadvisor/service/spark_service/test_spark_run.py @@ -1,8 +1,7 @@ import json -import unittest from unittest.mock import patch, MagicMock, Mock # 根据实际情况调整导入路径 -from omniadvisor.utils.constant import OA_CONF +from common.constant import OA_CONF from omniadvisor.service.spark_service.spark_run import spark_run MOCK_GET_RESPONSE = [