diff --git a/debug/accuracy_tools/api_accuracy_checker/run_ut/multi_run_ut.py b/debug/accuracy_tools/api_accuracy_checker/run_ut/multi_run_ut.py index 121c38722aa13f81604cbd6fa2ab980164e130e0..47e391f3febd9a1bfba7461cda4a28eb8b2c1251 100644 --- a/debug/accuracy_tools/api_accuracy_checker/run_ut/multi_run_ut.py +++ b/debug/accuracy_tools/api_accuracy_checker/run_ut/multi_run_ut.py @@ -5,6 +5,7 @@ import sys import argparse import time import signal +import threading from collections import namedtuple from itertools import cycle from tqdm import tqdm @@ -43,14 +44,14 @@ signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) -ParallelUTConfig = namedtuple('ParallelUTConfig', ['forward_files', 'backward_files', 'out_path', 'num_splits', 'save_error_data_flag', 'jit_compile_flag', 'device_id', 'result_csv_path', 'total_items']) +ParallelUTConfig = namedtuple('ParallelUTConfig', ['forward_files', 'backward_files', 'out_path', 'num_splits', 'save_error_data_flag', 'jit_compile_flag', 'device_id', 'result_csv_path', 'total_items', 'real_data_path']) def run_parallel_ut(config): processes = [] device_id_cycle = cycle(config.device_id) if config.save_error_data_flag: - print_info_log(f"UT task error datas will be saved") + print_info_log("UT task error datas will be saved") print_info_log(f"Starting parallel UT with {config.num_splits} processes") progress_bar = tqdm(total=config.total_items, desc="Total items", unit="items") @@ -63,14 +64,39 @@ def run_parallel_ut(config): '-d', str(dev_id), *(['-j'] if config.jit_compile_flag else []), *(['-save_error_data'] if config.save_error_data_flag else []), - '-csv_path', config.result_csv_path + '-csv_path', config.result_csv_path, + *(['-real_data_path', config.real_data_path] if config.real_data_path else []) ] return cmd + def read_process_output(process): + while True: + output = process.stdout.readline() + if output == '': + break + if '[ERROR]' in output: + print(output, end='') + + def update_progress_bar(progress_bar, result_csv_path): + while any(process.poll() is None for process in processes): + try: + with open(result_csv_path, 'r') as result_file: + completed_items = len(result_file.readlines()) - 1 + progress_bar.update(completed_items - progress_bar.n) + except FileNotFoundError: + print_warn_log(f"Result CSV file not found: {result_csv_path}.") + except Exception as e: + print_error_log(f"An unexpected error occurred while reading result CSV: {e}") + time.sleep(10) + for fwd, bwd in zip(config.forward_files, config.backward_files): cmd = create_cmd(fwd, bwd, next(device_id_cycle)) - process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, bufsize=1) processes.append(process) + threading.Thread(target=read_process_output, args=(process,), daemon=True).start() + + progress_bar_thread = threading.Thread(target=update_progress_bar, args=(progress_bar, config.result_csv_path)) + progress_bar_thread.start() def clean_up(): progress_bar.close() @@ -79,28 +105,21 @@ def run_parallel_ut(config): process.terminate() process.wait() for file in config.forward_files: - os.remove(file) - - try: - while any(process.poll() is None for process in processes): try: - with open(config.result_csv_path, 'r') as result_file: - completed_items = len(result_file.readlines()) - 1 - progress_bar.update(completed_items - progress_bar.n) + os.remove(file) except FileNotFoundError: - print_warn_log(f"Result CSV file not found: {config.result_csv_path}.") - except Exception as e: - print_error_log(f"An unexpected error occurred while reading result CSV: {e}") - time.sleep(10) + print_warn_log(f"File not found and could not be deleted: {file}") + try: for process in processes: process.communicate(timeout=None) except KeyboardInterrupt: - print_warn_log("Interrupted by user, terminating processes and clear up...") + print_warn_log("Interrupted by user, terminating processes and cleaning up...") except Exception as e: print_error_log(f"An unexpected error occurred: {e}") finally: clean_up() + progress_bar_thread.join() try: comparator = Comparator(config.result_csv_path, config.result_csv_path, False) comparator.print_pretest_result() @@ -132,7 +151,7 @@ def prepare_config(args): details_csv_path = get_validated_details_csv_path(result_csv_path) print_info_log(f"UT task result will be saved in {result_csv_path}") print_info_log(f"UT task details will be saved in {details_csv_path}") - return ParallelUTConfig(forward_splits, backward_splits, out_path, args.num_splits, args.save_error_data, args.jit_compile, args.device_id, result_csv_path, total_items) + return ParallelUTConfig(forward_splits, backward_splits, out_path, args.num_splits, args.save_error_data, args.jit_compile, args.device_id, result_csv_path, total_items, args.real_data_path) def main():