diff --git a/config/user_config.xml b/config/user_config.xml
index d2dd0b50d6d3f59203d7b1970b1ffb44fd78c0ec..9dc434465974c5d55cc441a2e9992ebaa19026c3 100755
--- a/config/user_config.xml
+++ b/config/user_config.xml
@@ -20,7 +20,7 @@
false
- true
+ false
diff --git a/localCoverage/interfaceCoverage/results/coverage/interface_kits/ohos_interfaceCoverage.html b/localCoverage/interfaceCoverage/results/coverage/interface_kits/ohos_interfaceCoverage.html
index e506f9b41058743fd5414c45b414c886218e7bd4..0de8cbbc6dd898a18969218860312e3088e71d36 100644
--- a/localCoverage/interfaceCoverage/results/coverage/interface_kits/ohos_interfaceCoverage.html
+++ b/localCoverage/interfaceCoverage/results/coverage/interface_kits/ohos_interfaceCoverage.html
@@ -49,7 +49,6 @@
-  |
Summary Report
接口总数147,已覆盖0,未覆盖147
diff --git a/localCoverage/interfaceCoverage/results/coverage/interface_kits/test.png b/localCoverage/interfaceCoverage/results/coverage/interface_kits/test.png
deleted file mode 100644
index 9926563cf3090c93597c170f83e1951d9fabb462..0000000000000000000000000000000000000000
Binary files a/localCoverage/interfaceCoverage/results/coverage/interface_kits/test.png and /dev/null differ
diff --git a/localCoverage/restore_comment/build_before_generate.py b/localCoverage/restore_comment/build_before_generate.py
index df29066b629980e1a959b8dcc82ae58239377a15..dd266b191506ac4d7340b12fa3d3a907a0c50f14 100644
--- a/localCoverage/restore_comment/build_before_generate.py
+++ b/localCoverage/restore_comment/build_before_generate.py
@@ -44,7 +44,7 @@ def rewrite_source_file(source_path_list: list):
"""
源文件加“//LCOV_EXCL_BR_LINE”
"""
- keys = ["if", "while", "switch", "case", "for", "try", "catch"]
+ keys = ["if", "while", "switch", "case", "try", "catch"]
if not source_path_list:
print("no any source file here")
return
diff --git a/src/core/config/resource_manager.py b/src/core/config/resource_manager.py
index 647c999687757fe64a6c83703436087726590a32..9c982b09ece94357b773de6b3f2b7907c6776372 100755
--- a/src/core/config/resource_manager.py
+++ b/src/core/config/resource_manager.py
@@ -97,7 +97,7 @@ class ResourceManager(object):
if os.path.exists(file_path):
tree = ElementTree.parse(file_path)
root = tree.getroot()
- targets = root.getiterator("target")
+ targets = root.iter("target")
for target in targets:
curr_dic = target.attrib
if curr_dic.get("name") == targe_tname or \
diff --git a/src/core/driver/drivers.py b/src/core/driver/drivers.py
index 77a19bef7045434d96d5638ddd606caff76aa599..d9821f51ccf692f9cd72d1db5243ea541aa3b7c3 100644
--- a/src/core/driver/drivers.py
+++ b/src/core/driver/drivers.py
@@ -1167,7 +1167,6 @@ class JSUnitTestDriver(IDriver):
return package_name, ability_name
-
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test)
class OHRustTestDriver(IDriver):
def __init__(self):
@@ -1187,7 +1186,6 @@ class OHRustTestDriver(IDriver):
self.config = request.config
self.config.device = request.config.environment.devices[0]
self.config.target_test_path = DEFAULT_TEST_PATH
-
suite_file = request.root.source.source_file
LOG.debug("Testsuite filepath:{}".format(suite_file))
@@ -1196,9 +1194,8 @@ class OHRustTestDriver(IDriver):
request.root.source.source_string))
return
- self.result = "{}.xml".format(
- os.path.join(request.config.report_path,
- "result", request.get_module_name()))
+ result_save_path = get_result_savepath(suite_file, self.config.report_path)
+ self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name())
self.config.device.set_device_report_path(request.config.report_path)
self.config.device.device_log_collector.start_hilog_task()
self._init_oh_rust()
@@ -1215,14 +1212,13 @@ class OHRustTestDriver(IDriver):
request.get_module_name(), str(serial).replace(":", "_"))
self.config.device.device_log_collector.stop_hilog_task(
log_tar_file_name, module_name=request.get_module_name())
+ xml_path = os.path.join(
+ request.config.report_path, "result",
+ '.'.join((request.get_module_name(), "xml")))
+ shutil.move(xml_path, self.result)
self.result = check_result_report(
request.config.report_path, self.result, self.error_message)
- result_save_path = get_result_savepath(
- request.root.source.source_file, self.config.report_path)
- shutil.move(self.result, result_save_path)
- update_xml(request.root.source.source_file,
- os.path.join(result_save_path, os.path.basename(self.result)))
-
+ update_xml(request.root.source.source_file, self.result)
def _init_oh_rust(self):
self.config.device.connector_command("target mount")
diff --git a/src/core/driver/openharmony.py b/src/core/driver/openharmony.py
new file mode 100644
index 0000000000000000000000000000000000000000..bc7e23e0a28f5fedae08cc9e875ec3725fe3a591
--- /dev/null
+++ b/src/core/driver/openharmony.py
@@ -0,0 +1,507 @@
+#!/usr/bin/env python3
+# coding=utf-8
+
+#
+# Copyright (c) 2024 Huawei Device Co., Ltd.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+import os
+import shutil
+import stat
+import time
+
+from xdevice import ParamError
+from xdevice import get_device_log_file
+from xdevice import check_result_report
+from xdevice import get_kit_instances
+from xdevice import do_module_kit_setup
+from xdevice import do_module_kit_teardown
+from xdevice import get_config_value
+from xdevice import Plugin
+from xdevice import DeviceTestType
+from xdevice import IDriver
+from xdevice import get_plugin
+from xdevice import CommonParserType
+from xdevice import ShellHandler
+from xdevice import ConfigConst
+from xdevice import JsonParser
+from xdevice import TestDescription
+from xdevice import platform_logger
+
+from ohos.constants import CKit
+from ohos.executor.listener import CollectingPassListener
+from core.driver.drivers import update_xml
+from core.driver.drivers import get_result_savepath
+
+__all__ = ["OHJSUnitTestDriver", "oh_jsunit_para_parse"]
+
+LOG = platform_logger("Drivers")
+TIME_OUT = 300 * 1000
+
+
+def oh_jsunit_para_parse(runner, junit_paras):
+ junit_paras = dict(junit_paras)
+ test_type_list = ["function", "performance", "reliability", "security"]
+ size_list = ["small", "medium", "large"]
+ level_list = ["0", "1", "2", "3"]
+ for para_name in junit_paras.keys():
+ para_name = para_name.strip()
+ para_values = junit_paras.get(para_name, [])
+ if para_name == "class":
+ runner.add_arg(para_name, ",".join(para_values))
+ elif para_name == "notClass":
+ runner.add_arg(para_name, ",".join(para_values))
+ elif para_name == "testType":
+ if para_values[0] not in test_type_list:
+ continue
+ # function/performance/reliability/security
+ runner.add_arg(para_name, para_values[0])
+ elif para_name == "size":
+ if para_values[0] not in size_list:
+ continue
+ # size small/medium/large
+ runner.add_arg(para_name, para_values[0])
+ elif para_name == "level":
+ if para_values[0] not in level_list:
+ continue
+ # 0/1/2/3/4
+ runner.add_arg(para_name, para_values[0])
+ elif para_name == "stress":
+ runner.add_arg(para_name, para_values[0])
+
+
+class OHJSUnitTestRunner:
+ MAX_RETRY_TIMES = 3
+
+ def __init__(self, config):
+ self.arg_list = {}
+ self.suites_name = None
+ self.config = config
+ self.rerun_attemp = 3
+ self.suite_recorder = {}
+ self.finished = False
+ self.expect_tests_dict = dict()
+ self.finished_observer = None
+ self.retry_times = 1
+ self.compile_mode = ""
+
+ def dry_run(self):
+ parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit_list)
+ if parsers:
+ parsers = parsers[:1]
+ parser_instances = []
+ for parser in parsers:
+ parser_instance = parser.__class__()
+ parser_instances.append(parser_instance)
+ handler = ShellHandler(parser_instances)
+ command = self._get_dry_run_command()
+ self.config.device.execute_shell_command(
+ command, timeout=self.config.timeout, receiver=handler, retry=0)
+ self.expect_tests_dict = parser_instances[0].tests_dict
+ return parser_instances[0].tests
+
+ def run(self, listener):
+ handler = self._get_shell_handler(listener)
+ command = self._get_run_command()
+ self.config.device.execute_shell_command(
+ command, timeout=self.config.timeout, receiver=handler, retry=0)
+
+ def notify_finished(self):
+ if self.finished_observer:
+ self.finished_observer.notify_task_finished()
+ self.retry_times -= 1
+
+ def _get_shell_handler(self, listener):
+ parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit)
+ if parsers:
+ parsers = parsers[:1]
+ parser_instances = []
+ for parser in parsers:
+ parser_instance = parser.__class__()
+ parser_instance.suites_name = self.suites_name
+ parser_instance.listeners = listener
+ parser_instance.runner = self
+ parser_instances.append(parser_instance)
+ self.finished_observer = parser_instance
+ handler = ShellHandler(parser_instances)
+ return handler
+
+ def add_arg(self, name, value):
+ if not name or not value:
+ return
+ self.arg_list[name] = value
+
+ def remove_arg(self, name):
+ if not name:
+ return
+ if name in self.arg_list:
+ del self.arg_list[name]
+
+ def get_args_command(self):
+ args_commands = ""
+ for key, value in self.arg_list.items():
+ if "wait_time" == key:
+ args_commands = "%s -w %s " % (args_commands, value)
+ else:
+ args_commands = "%s -s %s %s " % (args_commands, key, value)
+ return args_commands
+
+ def _get_run_command(self):
+ command = ""
+ if self.config.package_name:
+ # aa test -p ${packageName} -b ${bundleName}-s
+ # unittest OpenHarmonyTestRunner
+ command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \
+ " {}".format(self.config.package_name,
+ self.config.bundle_name,
+ self.get_args_command())
+ elif self.config.module_name:
+ # aa test -m ${moduleName} -b ${bundleName}
+ # -s unittest OpenHarmonyTestRunner
+ command = "aa test -m {} -b {} -s unittest {} {}".format(
+ self.config.module_name, self.config.bundle_name,
+ self.get_oh_test_runner_path(), self.get_args_command())
+ return command
+
+ def _get_dry_run_command(self):
+ command = ""
+ if self.config.package_name:
+ command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \
+ " {} -s dryRun true".format(self.config.package_name,
+ self.config.bundle_name,
+ self.get_args_command())
+ elif self.config.module_name:
+ command = "aa test -m {} -b {} -s unittest {}" \
+ " {} -s dryRun true".format(self.config.module_name,
+ self.config.bundle_name,
+ self.get_oh_test_runner_path(),
+ self.get_args_command())
+
+ return command
+
+ def get_oh_test_runner_path(self):
+ if self.compile_mode == "esmodule":
+ return "/ets/testrunner/OpenHarmonyTestRunner"
+ else:
+ return "OpenHarmonyTestRunner"
+
+
+@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test)
+class OHJSUnitTestDriver(IDriver):
+ """
+ OHJSUnitTestDriver is a Test that runs a native test package on
+ given device.
+ """
+
+ def __init__(self):
+ self.timeout = 80 * 1000
+ self.start_time = None
+ self.result = ""
+ self.error_message = ""
+ self.kits = []
+ self.config = None
+ self.runner = None
+ self.rerun = True
+ self.rerun_all = True
+ # log
+ self.device_log = None
+ self.hilog = None
+ self.log_proc = None
+ self.hilog_proc = None
+
+ def __check_environment__(self, device_options):
+ pass
+
+ def __check_config__(self, config):
+ pass
+
+ def __execute__(self, request):
+ try:
+ LOG.debug("Developer_test Start execute OpenHarmony JSUnitTest")
+ self.config = request.config
+ self.config.device = request.config.environment.devices[0]
+
+ config_file = request.root.source.config_file
+ suite_file = request.root.source.source_file
+ result_save_path = get_result_savepath(suite_file, self.config.report_path)
+ self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name())
+ if not suite_file:
+ raise ParamError(
+ "test source '%s' not exists" %
+ request.root.source.source_string, error_no="00110")
+ LOG.debug("Test case file path: %s" % suite_file)
+ self.config.device.set_device_report_path(request.config.report_path)
+ self.hilog = get_device_log_file(request.config.report_path,
+ request.config.device.__get_serial__() + "_" + request.
+ get_module_name(),
+ "device_hilog")
+
+ hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
+ 0o755)
+ self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog)
+ self.config.device.execute_shell_command(command="hilog -r")
+ with os.fdopen(hilog_open, "a") as hilog_file_pipe:
+ if hasattr(self.config, ConfigConst.device_log) \
+ and self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \
+ and hasattr(self.config.device, "clear_crash_log"):
+ self.config.device.device_log_collector.clear_crash_log()
+ self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\
+ start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
+ self._run_oh_jsunit(config_file, request)
+ except Exception as exception:
+ self.error_message = exception
+ if not getattr(exception, "error_no", ""):
+ setattr(exception, "error_no", "03409")
+ LOG.exception(self.error_message, exc_info=True, error_no="03409")
+ raise exception
+ finally:
+ try:
+ self._handle_logs(request)
+ finally:
+ xml_path = os.path.join(
+ request.config.report_path, "result",
+ '.'.join((request.get_module_name(), "xml")))
+ shutil.move(xml_path, self.result)
+ self.result = check_result_report(
+ request.config.report_path, self.result, self.error_message)
+ update_xml(request.root.source.source_file, self.result)
+
+ def _run_oh_jsunit(self, config_file, request):
+ try:
+ if not os.path.exists(config_file):
+ LOG.error("Error: Test cases don't exist %s." % config_file)
+ raise ParamError(
+ "Error: Test cases don't exist %s." % config_file,
+ error_no="00102")
+ json_config = JsonParser(config_file)
+ self.kits = get_kit_instances(json_config,
+ self.config.resource_path,
+ self.config.testcases_path)
+
+ self._get_driver_config(json_config)
+ self.config.device.connector_command("target mount")
+ self._start_smart_perf()
+ do_module_kit_setup(request, self.kits)
+ self.runner = OHJSUnitTestRunner(self.config)
+ self.runner.suites_name = request.get_module_name()
+ self._get_runner_config(json_config)
+ if hasattr(self.config, "history_report_path") and \
+ self.config.testargs.get("test"):
+ self._do_test_retry(request.listeners, self.config.testargs)
+ else:
+ if self.rerun:
+ self.runner.retry_times = self.runner.MAX_RETRY_TIMES
+ # execute test case
+ self._do_tf_suite()
+ self._make_exclude_list_file(request)
+ oh_jsunit_para_parse(self.runner, self.config.testargs)
+ self._do_test_run(listener=request.listeners)
+
+ finally:
+ do_module_kit_teardown(request)
+
+ def _get_driver_config(self, json_config):
+ package = get_config_value('package-name',
+ json_config.get_driver(), False)
+ module = get_config_value('module-name',
+ json_config.get_driver(), False)
+ bundle = get_config_value('bundle-name',
+ json_config. get_driver(), False)
+ is_rerun = get_config_value('rerun', json_config.get_driver(), False)
+
+ self.config.package_name = package
+ self.config.module_name = module
+ self.config.bundle_name = bundle
+ self.rerun = True if is_rerun == 'true' else False
+
+ if not package and not module:
+ raise ParamError("Neither package nor module is found"
+ " in config file.", error_no="03201")
+ timeout_config = get_config_value("shell-timeout",
+ json_config.get_driver(), False)
+ if timeout_config:
+ self.config.timeout = int(timeout_config)
+ else:
+ self.config.timeout = TIME_OUT
+
+ def _get_runner_config(self, json_config):
+ test_timeout = get_config_value('test-timeout',
+ json_config.get_driver(), False)
+ if test_timeout:
+ self.runner.add_arg("wait_time", int(test_timeout))
+
+ testcase_timeout = get_config_value('testcase-timeout',
+ json_config.get_driver(), False)
+ if testcase_timeout:
+ self.runner.add_arg("timeout", int(testcase_timeout))
+ self.runner.compile_mode = get_config_value(
+ 'compile-mode', json_config.get_driver(), False)
+
+ def _do_test_run(self, listener):
+ test_to_run = self._collect_test_to_run()
+ LOG.info("Collected suite count is: {}, test count is: {}".
+ format(len(self.runner.expect_tests_dict.keys()),
+ len(test_to_run) if test_to_run else 0))
+ if not test_to_run or not self.rerun:
+ self.runner.run(listener)
+ self.runner.notify_finished()
+ else:
+ self._run_with_rerun(listener, test_to_run)
+
+ def _collect_test_to_run(self):
+ run_results = self.runner.dry_run()
+ return run_results
+
+ def _run_tests(self, listener):
+ test_tracker = CollectingPassListener()
+ listener_copy = listener.copy()
+ listener_copy.append(test_tracker)
+ self.runner.run(listener_copy)
+ test_run = test_tracker.get_current_run_results()
+ return test_run
+
+ def _run_with_rerun(self, listener, expected_tests):
+ LOG.debug("Developer_test Ready to run with rerun, expect run: %s"
+ % len(expected_tests))
+ test_run = self._run_tests(listener)
+ self.runner.retry_times -= 1
+ LOG.debug("Run with rerun, has run: %s" % len(test_run)
+ if test_run else 0)
+ if len(test_run) < len(expected_tests):
+ expected_tests = TestDescription.remove_test(expected_tests,
+ test_run)
+ if not expected_tests:
+ LOG.debug("No tests to re-run twice,please check")
+ self.runner.notify_finished()
+ else:
+ self._rerun_twice(expected_tests, listener)
+ else:
+ LOG.debug("Rerun once success")
+ self.runner.notify_finished()
+
+ def _rerun_twice(self, expected_tests, listener):
+ tests = []
+ for test in expected_tests:
+ tests.append("%s#%s" % (test.class_name, test.test_name))
+ self.runner.add_arg("class", ",".join(tests))
+ LOG.debug("Ready to rerun twice, expect run: %s" % len(expected_tests))
+ test_run = self._run_tests(listener)
+ self.runner.retry_times -= 1
+ LOG.debug("Rerun twice, has run: %s" % len(test_run))
+ if len(test_run) < len(expected_tests):
+ expected_tests = TestDescription.remove_test(expected_tests,
+ test_run)
+ if not expected_tests:
+ LOG.debug("No tests to re-run third,please check")
+ self.runner.notify_finished()
+ else:
+ self._rerun_third(expected_tests, listener)
+ else:
+ LOG.debug("Rerun twice success")
+ self.runner.notify_finished()
+
+ def _rerun_third(self, expected_tests, listener):
+ tests = []
+ for test in expected_tests:
+ tests.append("%s#%s" % (test.class_name, test.test_name))
+ self.runner.add_arg("class", ",".join(tests))
+ LOG.debug("Rerun to rerun third, expect run: %s" % len(expected_tests))
+ self._run_tests(listener)
+ LOG.debug("Rerun third success")
+ self.runner.notify_finished()
+
+ def _make_exclude_list_file(self, request):
+ if "all-test-file-exclude-filter" in self.config.testargs:
+ json_file_list = self.config.testargs.get(
+ "all-test-file-exclude-filter")
+ self.config.testargs.pop("all-test-file-exclude-filter")
+ if not json_file_list:
+ LOG.warning("all-test-file-exclude-filter value is empty!")
+ else:
+ if not os.path.isfile(json_file_list[0]):
+ LOG.warning(
+ "[{}] is not a valid file".format(json_file_list[0]))
+ return
+ file_open = os.open(json_file_list[0], os.O_RDONLY,
+ stat.S_IWUSR | stat.S_IRUSR)
+ with os.fdopen(file_open, "r") as file_handler:
+ json_data = json.load(file_handler)
+ exclude_list = json_data.get(
+ DeviceTestType.oh_jsunit_test, [])
+ filter_list = []
+ for exclude in exclude_list:
+ if request.get_module_name() not in exclude:
+ continue
+ filter_list.extend(exclude.get(request.get_module_name()))
+ if not isinstance(self.config.testargs, dict):
+ return
+ if 'notClass' in self.config.testargs.keys():
+ filter_list.extend(self.config.testargs.get('notClass', []))
+ self.config.testargs.update({"notClass": filter_list})
+
+ def _do_test_retry(self, listener, testargs):
+ tests_dict = dict()
+ case_list = list()
+ for test in testargs.get("test"):
+ test_item = test.split("#")
+ if len(test_item) != 2:
+ continue
+ case_list.append(test)
+ if test_item[0] not in tests_dict:
+ tests_dict.update({test_item[0] : []})
+ tests_dict.get(test_item[0]).append(
+ TestDescription(test_item[0], test_item[1]))
+ self.runner.add_arg("class", ",".join(case_list))
+ self.runner.expect_tests_dict = tests_dict
+ self.config.testargs.pop("test")
+ self.runner.run(listener)
+ self.runner.notify_finished()
+
+ def _do_tf_suite(self):
+ if hasattr(self.config, "tf_suite") and \
+ self.config.tf_suite.get("cases", []):
+ case_list = self.config["tf_suite"]["cases"]
+ self.config.testargs.update({"class": case_list})
+
+ def _start_smart_perf(self):
+ if not hasattr(self.config, ConfigConst.kits_in_module):
+ return
+ if CKit.smartperf not in self.config.get(ConfigConst.kits_in_module):
+ return
+ sp_kits = get_plugin(Plugin.TEST_KIT, CKit.smartperf)[0]
+ sp_kits.target_name = self.config.bundle_name
+ param_config = self.config.get(ConfigConst.kits_params).get(
+ CKit.smartperf, "")
+ sp_kits.__check_config__(param_config)
+ self.kits.insert(0, sp_kits)
+
+ def _handle_logs(self, request):
+ serial = "{}_{}".format(str(self.config.device.__get_serial__()), time.time_ns())
+ log_tar_file_name = "{}".format(str(serial).replace(":", "_"))
+ if hasattr(self.config, ConfigConst.device_log) and \
+ self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \
+ and hasattr(self.config.device, "start_get_crash_log"):
+ self.config.device.device_log_collector.\
+ start_get_crash_log(log_tar_file_name, module_name=request.get_module_name())
+ self.config.device.device_log_collector.\
+ remove_log_address(self.device_log, self.hilog)
+ self.config.device.device_log_collector.\
+ stop_catch_device_log(self.log_proc)
+ self.config.device.device_log_collector.\
+ stop_catch_device_log(self.hilog_proc)
+
+ def __result__(self):
+ return self.result if os.path.exists(self.result) else ""
+
|