diff --git a/src/core/driver/drivers.py b/src/core/driver/drivers.py index e034b1c66d0fa350493021ed5cfdffbc4a3c8269..488dda52998a5ef2b4dd33d2a454dd9b80c42fb6 100755 --- a/src/core/driver/drivers.py +++ b/src/core/driver/drivers.py @@ -45,7 +45,6 @@ from core.utils import get_fuzzer_path from core.config.resource_manager import ResourceManager from core.config.config_manager import FuzzerConfigManager - __all__ = [ "CppTestDriver", "JSUnitTestDriver", @@ -203,6 +202,7 @@ def _sleep_according_to_result(result): if result: time.sleep(1) + def _create_fuzz_crash_file(filepath, filename): if not os.path.exists(filepath): with open(filepath, "w", encoding='utf-8') as file_desc: @@ -228,6 +228,7 @@ def _create_fuzz_crash_file(filepath, filename): file_desc.write('\n') return + def _create_fuzz_pass_file(filepath, filename): if not os.path.exists(filepath): with open(filepath, "w", encoding='utf-8') as file_desc: @@ -248,6 +249,7 @@ def _create_fuzz_pass_file(filepath, filename): file_desc.write('\n') return + def _create_fuzz_result_file(filepath, filename, error_message): error_message = str(error_message) error_message = error_message.replace("\"", "") @@ -265,6 +267,8 @@ def _create_fuzz_result_file(filepath, filename, error_message): LOG.error("FUZZ TEST UNAVAILABLE") _create_empty_result_file(filepath, filename, error_message) return + + ############################################################################## ############################################################################## @@ -291,7 +295,9 @@ class ResultManager(object): LOG.info("create fuzz test report") _create_fuzz_result_file(filepath, self.testsuite_name, error_message) - return filepath + if not self.is_coverage: + self._obtain_fuzz_corpus() + if not os.path.exists(filepath): _create_empty_result_file(filepath, self.testsuite_name, error_message) @@ -303,6 +309,13 @@ class ResultManager(object): return filepath + def _obtain_fuzz_corpus(self): + command = f"cd {DEFAULT_TEST_PATH}; tar czf {self.testsuite_name}_corpus.tar.gz corpus;" + self.config.device.execute_shell_command(command) + result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath) + LOG.info(f"fuzz_dir = {result_save_path}") + self.device.pull_file(f"{DEFAULT_TEST_PATH}/{self.testsuite_name}_corpus.tar.gz", result_save_path) + def _obtain_benchmark_result(self): benchmark_root_dir = os.path.abspath( os.path.join(self.result_rootpath, "benchmark")) @@ -316,9 +329,9 @@ class ResultManager(object): LOG.info("benchmark_dir = %s" % benchmark_dir) self.device.pull_file(os.path.join(self.device_testpath, - "%s.json" % self.testsuite_name), benchmark_dir) + "%s.json" % self.testsuite_name), benchmark_dir) if not os.path.exists(os.path.join(benchmark_dir, - "%s.json" % self.testsuite_name)): + "%s.json" % self.testsuite_name)): os.rmdir(benchmark_dir) return benchmark_dir @@ -334,30 +347,30 @@ class ResultManager(object): def obtain_test_result_file(self): result_save_path = get_result_savepath(self.testsuite_path, - self.result_rootpath) + self.result_rootpath) result_file_path = os.path.join(result_save_path, - "%s.xml" % self.testsuite_name) + "%s.xml" % self.testsuite_name) result_josn_file_path = os.path.join(result_save_path, - "%s.json" % self.testsuite_name) + "%s.json" % self.testsuite_name) if self.testsuite_path.endswith('.hap'): remote_result_file = os.path.join(self.device_testpath, - "testcase_result.xml") + "testcase_result.xml") remote_json_result_file = os.path.join(self.device_testpath, - "%s.json" % self.testsuite_name) + "%s.json" % self.testsuite_name) else: remote_result_file = os.path.join(self.device_testpath, - "%s.xml" % self.testsuite_name) + "%s.xml" % self.testsuite_name) remote_json_result_file = os.path.join(self.device_testpath, - "%s.json" % self.testsuite_name) + "%s.json" % self.testsuite_name) if self.config.testtype[0] != "fuzztest": if self.device.is_file_exist(remote_result_file): self.device.pull_file(remote_result_file, result_file_path) elif self.device.is_file_exist(remote_json_result_file): self.device.pull_file(remote_json_result_file, - result_josn_file_path) + result_josn_file_path) result_file_path = result_josn_file_path else: LOG.info("%s not exist", remote_result_file) @@ -366,12 +379,12 @@ class ResultManager(object): def make_empty_result_file(self, error_message=""): result_savepath = get_result_savepath(self.testsuite_path, - self.result_rootpath) + self.result_rootpath) result_filepath = os.path.join(result_savepath, "%s.xml" % - self.testsuite_name) + self.testsuite_name) if not os.path.exists(result_filepath): _create_empty_result_file(result_filepath, - self.testsuite_name, error_message) + self.testsuite_name, error_message) def is_exist_target_in_device(self, path, target): if platform.system() == "Windows": @@ -420,7 +433,6 @@ class ResultManager(object): subprocess.Popen("rm -rf %s" % tar_path, shell=True) - ############################################################################## ############################################################################## @@ -491,7 +503,7 @@ class CppTestDriver(IDriver): if "fuzztest" == self.config.testtype[0]: self.config.device.execute_shell_command( "mkdir -p %s" % os.path.join(self.config.target_test_path, - "corpus")) + "corpus")) def _run_gtest(self, suite_file): from xdevice import Variables @@ -524,14 +536,20 @@ class CppTestDriver(IDriver): test_para) else: coverage_outpath = self.config.coverage_outpath - strip_num = len(coverage_outpath.split("/")) - 1 - command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=. " \ - "GCOV_PREFIX_STRIP=%s ./%s %s" % \ - (self.config.target_test_path, - filename, - str(strip_num), - filename, - test_para) + strip_num = len(coverage_outpath.strip("/").split("/")) + if "fuzztest" == self.config.testtype[0]: + self._push_corpus_cov_if_exist(suite_file) + command = f"cd {self.config.target_test_path}; tar zxf {filename}_corpus.tar.gz; \ + rm -rf {filename}.xml; chmod +x *; GCOV_PREFIX=.; \ + GCOV_PREFIX_STRIP={strip_num} ./{filename} {test_para}" + else: + command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=. " \ + "GCOV_PREFIX_STRIP=%s ./%s %s" % \ + (self.config.target_test_path, + filename, + str(strip_num), + filename, + test_para) result = ResultManager(suite_file, self.config) result.set_is_coverage(is_coverage_test) @@ -550,8 +568,27 @@ class CppTestDriver(IDriver): self.result = result.get_test_results(return_message) resource_manager.process_cleaner_data(resource_data_dic, - resource_dir, - self.config.device) + resource_dir, + self.config.device) + + @staticmethod + def _alter_init(name): + lines = list() + with open(name, "r") as f: + for line in f: + line_strip = line.strip() + if not line_strip: + continue + if line_strip[0] != "#" and line_strip[0] != "/" and line_strip[0] != "*": + lines.append(line_strip) + with open(name, "w") as f: + f.writelines(lines) + + def _push_corpus_cov_if_exist(self, suite_file): + cov_file = suite_file + "_corpus.tar.gz" + LOG.info("corpus_cov file :%s" % str(cov_file)) + self.config.device.push_file(cov_file, os.path.join(self.config.target_test_path)) + def _push_corpus_if_exist(self, suite_file): if "fuzztest" == self.config.testtype[0]: @@ -565,14 +602,16 @@ class CppTestDriver(IDriver): for root, _, files in os.walk(corpus_path): if not files: continue - + corpus_dir = root.split("corpus")[-1] if corpus_dir != "": corpus_dirs.append(corpus_dir) for file in files: - corpus_file_list.append(os.path.normcase( - os.path.join(root, file))) + cp_file = os.path.normcase(os.path.join(root, file)) + corpus_file_list.append(cp_file) + if file == "init": + self._alter_init(cp_file) # mkdir corpus files dir if corpus_dirs: @@ -584,10 +623,11 @@ class CppTestDriver(IDriver): if corpus_file_list: for corpus_file in corpus_file_list: self.config.device.push_file(corpus_file, - os.path.join(self.config.target_test_path, "corpus")) + os.path.join(self.config.target_test_path, "corpus")) - @staticmethod - def _get_test_para(testcase, + + def _get_test_para(self, + testcase, testlevel, testtype, target_test_path, @@ -611,9 +651,16 @@ class CppTestDriver(IDriver): cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path( suite_file), "project.xml")).get_fuzzer_config("fuzztest") LOG.info("config list :%s" % str(cfg_list)) - test_para += "corpus -max_len=" + cfg_list[0] + \ - " -max_total_time=" + cfg_list[1] + \ - " -rss_limit_mb=" + cfg_list[2] + if self.config.coverage: + test_para += "corpus -runs=0" + \ + " -max_len=" + cfg_list[0] + \ + " -max_total_time=" + cfg_list[1] + \ + " -rss_limit_mb=" + cfg_list[2] + else: + test_para += "corpus -max_len=" + cfg_list[0] + \ + " -max_total_time=" + cfg_list[1] + \ + " -rss_limit_mb=" + cfg_list[2] + return test_para @@ -847,11 +894,11 @@ class JSUnitTestDriver(IDriver): @staticmethod def _get_acts_test_para(testcase, - testlevel, - testtype, - target_test_path, - suite_file, - filename): + testlevel, + testtype, + target_test_path, + suite_file, + filename): if "actstest" == testtype[0]: test_para = (" --actstest_out_format=json" " --actstest_out=%s%s.json") % ( @@ -886,7 +933,6 @@ class JSUnitTestDriver(IDriver): finally: print(" get json shell timeout finally") - @staticmethod def _get_package_and_ability_name(hap_filepath): package_name = "" @@ -894,7 +940,7 @@ class JSUnitTestDriver(IDriver): if os.path.exists(hap_filepath): filename = os.path.basename(hap_filepath) - #unzip the hap file + # unzip the hap file hap_bak_path = os.path.abspath(os.path.join( os.path.dirname(hap_filepath), "%s.bak" % filename)) @@ -905,7 +951,7 @@ class JSUnitTestDriver(IDriver): print(error) zf_desc.close() - #verify config.json file + # verify config.json file app_profile_path = os.path.join(hap_bak_path, "config.json") if not os.path.exists(app_profile_path): print("file %s not exist" % app_profile_path) @@ -915,7 +961,7 @@ class JSUnitTestDriver(IDriver): print("%s is a folder, and not a file" % app_profile_path) return package_name, ability_name - #get package_name and ability_name value + # get package_name and ability_name value load_dict = {} with open(app_profile_path, 'r') as load_f: load_dict = json.load(load_f) @@ -929,13 +975,13 @@ class JSUnitTestDriver(IDriver): abilities_name = abilitie.get("name") if abilities_name.startswith("."): ability_name = package_name + abilities_name[ - abilities_name.find("."):] + abilities_name.find("."):] else: ability_name = abilities_name break break - #delete hap_bak_path + # delete hap_bak_path if os.path.exists(hap_bak_path): shutil.rmtree(hap_bak_path) else: