diff --git a/profiler/test/run_ut.py b/profiler/test/run_ut.py new file mode 100644 index 0000000000000000000000000000000000000000..50cc98a643f47173b1a48ee6d353ac942f2a0e7d --- /dev/null +++ b/profiler/test/run_ut.py @@ -0,0 +1,56 @@ +import os +import shutil +import subprocess +import sys + + +def set_python_path(): + cluster_analyse_root = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "cluster_analyse") + compare_tools_root = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "compare_tools") + python_path = os.environ.get("PYTHONPATH", "") + if not python_path: + python_path += cluster_analyse_root + else: + python_path += f":{cluster_analyse_root}" + python_path += f":{compare_tools_root}" + os.environ["PYTHONPATH"] = python_path + + +def run_ut(): + cur_dir = os.path.realpath(os.path.dirname(__file__)) + top_dir = os.path.realpath(os.path.dirname(cur_dir)) + ut_path = os.path.join(cur_dir, "ut/") + src_dir = top_dir + report_dir = os.path.join(cur_dir, "report") + + if os.path.exists(report_dir): + shutil.rmtree(report_dir) + + os.makedirs(report_dir) + + cmd = ["python3", "-m", "pytest", ut_path, "--junitxml=" + report_dir + "/final.xml", + "--cov=" + src_dir, "--cov-branch", "--cov-report=xml:" + report_dir + "/coverage.xml"] + + result_ut = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + while result_ut.poll() is None: + line = result_ut.stdout.readline().strip() + if line: + print(line) + + ut_status = False + if result_ut.returncode == 0: + ut_status = True + print("run ut successfully.") + else: + print("run ut failed.") + + return ut_status + +if __name__=="__main__": + set_python_path() + ut_success = run_ut() + if ut_success: + sys.exit(0) + else: + sys.exit(1) diff --git a/profiler/test/ut/cluster_analyse/common_func/test_file_manager.py b/profiler/test/ut/cluster_analyse/common_func/test_file_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..5f73b20244ee8b9d922a78779b1b98737b6882df --- /dev/null +++ b/profiler/test/ut/cluster_analyse/common_func/test_file_manager.py @@ -0,0 +1,76 @@ +import os +import shutil +import stat +import json +import unittest +import pytest + +from common_func.file_manager import FileManager +from prof_bean.step_trace_time_bean import StepTraceTimeBean + + +class TestFileManager(unittest.TestCase): + + TMP_DIR = "./tmp_dir" + + @classmethod + def tearDownClass(cls) -> None: + super().tearDownClass() + if os.path.exists(TestFileManager.TMP_DIR): + shutil.rmtree(TestFileManager.TMP_DIR) + + @classmethod + def setUpClass(cls) -> None: + super().setUpClass() + if not os.path.exists(TestFileManager.TMP_DIR): + os.makedirs(TestFileManager.TMP_DIR) + # create csv files + with os.fdopen(os.open(f"{TestFileManager.TMP_DIR}/step_trace_time.csv", + os.O_WRONLY | os.O_CREAT, stat.S_IWUSR | stat.S_IRUSR), 'w') as fp: + fp.write("Step,Computing,Communication(Not Overlapped),Communication,Free\n") + fp.write("10,201420.74,195349.64,224087.84,230068.36") + + with os.fdopen(os.open(f"{TestFileManager.TMP_DIR}/empty_csv.csv", + os.O_WRONLY | os.O_CREAT, stat.S_IWUSR | stat.S_IRUSR), 'w') as fp: + pass + + # create json file + json_data = {"key1": "val1", "matrix": [1, 2, 3]} + with os.fdopen(os.open(f"{TestFileManager.TMP_DIR}/valid_json.json", + os.O_WRONLY | os.O_CREAT, stat.S_IWUSR | stat.S_IRUSR), 'w') as fp: + json.dump(json_data, fp) + + with os.fdopen(os.open(f"{TestFileManager.TMP_DIR}/empty_json.json", + os.O_WRONLY | os.O_CREAT, stat.S_IWUSR | stat.S_IRUSR), 'w') as fp: + pass + + cls.test_cases = { + "csv_cases": [ + # file_name,length,exception + ["step_trace_time.csv", 1, None], + ["empty_csv.csv", 0, None], + ], + "json_cases": [ + # file_name,obj,exception + ["valid_json.json", {"key1": "val1", "matrix": [1, 2, 3]}, None], + ["empty_json.json", {}, None], + ] + } + + def test_read_csv_file(self): + for file_name, length, exception in self.test_cases.get("csv_cases"): + if exception: + with pytest.raises(exception) as error: + FileManager().read_csv_file(os.path.join(TestFileManager.TMP_DIR, file_name), StepTraceTimeBean) + else: + ret_list = FileManager().read_csv_file(os.path.join(TestFileManager.TMP_DIR, file_name), StepTraceTimeBean) + self.assertEqual(length, len(ret_list)) + + def test_read_json_file(self): + for file_name, obj, exception in self.test_cases.get("json_cases"): + if exception: + with pytest.raises(exception) as error: + FileManager().read_json_file(os.path.join(TestFileManager.TMP_DIR, file_name)) + else: + ret_dict = FileManager().read_json_file(os.path.join(TestFileManager.TMP_DIR, file_name)) + self.assertEqual(obj, ret_dict)