diff --git a/src/core/driver/drivers.py b/src/core/driver/drivers.py index 9ce774af79c0409df812b0ac4112cebcadf39a68..8f2535cd775685650f825c03bfa1435566a481e3 100755 --- a/src/core/driver/drivers.py +++ b/src/core/driver/drivers.py @@ -540,11 +540,52 @@ class CppTestDriver(IDriver): resource_dir, self.config.device) + @staticmethod + def _corpus_files_to_zip(corpus_path): + """ + zip corpus files + :param corpus_path: + :return: + """ + + if not os.path.exists(corpus_path): + return False, "" + + corpus_file_list = [] + for root, _, files in os.walk(corpus_path): + if files: + for file in files: + corpus_file_list.append(os.path.normcase( + os.path.join(root, file))) + + # zip corpus files + try: + with zipfile.ZipFile("corpus.zip", mode="w", + compression=zipfile.ZIP_DEFLATED) as zip_file: + for file_path in corpus_file_list: + file_name = file_path.split("corpus")[-1] + zip_file.write(file_path, os.path.join("corpus", file_name)) + corpus_zip_path = os.path.abspath('corpus.zip') + LOG.info(f"corpus.zip path : {corpus_zip_path}") + return True, corpus_zip_path + except (EOFError, IOError) as error: + LOG.error(f"compressing the curpus file:{error}.") + return False, "" + def _push_corpus_if_exist(self, suite_file): if "fuzztest" == self.config.testtype[0]: - corpus_path = os.path.join(get_fuzzer_path(suite_file), os.path.join("corpus", "init")) - self.config.device.push_file(corpus_path, - os.path.join(self.config.target_test_path, "corpus")) + corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus") + zip_result, corpus_zip_path = self._corpus_files_to_zip(corpus_path) + LOG.info(f"corpus files to zip result: {zip_result}") + if zip_result: + self.config.device.push_file(corpus_zip_path, + os.path.join(self.config.target_test_path, "corpus")) + target_path = os.path.normcase(os.path.join(os.path.join( + self.config.target_test_path, "corpus"), "corpus.zip")) + command = f"shell; unzip {target_path}" + self.config.device.hdc_command(command) + rm_command = f"shell; rm -rf corpus.zip" + self.config.device.hdc_command(rm_command) @staticmethod def _get_test_para(testcase,